# 数据处理: import numpy as np import pandas as pd from datetime import date, datetime # RNN自编码器: from tensorflow import keras from tensorflow.keras import layers # 绘图: !pip install chart-studio import plotly.graph_objects as go


# 导入 Historic Crypto 包: !pip install Historic-Crypto from Historic_Crypto import HistoricalData # 获取比特币数据,计算收益和日内波动: dataset = HistoricalData(start_date = '2013-06-06',ticker = 'BTC').retrieve_data() dataset['Returns'] = dataset['Close'].pct_change() dataset['Volatility'] = np.abs(dataset['Close']- dataset['Open']) dataset.dropna(axis = 0, how = 'any', inplace = True) dataset.head()


def plot_dates_values(data_timestamps, data_plot): ''' 这个函数提供输入序列的平面图 Arguments: data_timestamps: 与每个数据实例关联的时间戳。 data_plot: 要绘制的数据序列。 Returns: fig: 用滑块和按钮显示序列的图形。 ''' fig = go.Figure() fig.add_trace(go.Scatter(x = data_timestamps, y = data_plot, mode = 'lines', name = data_plot.name, connectgaps=True)) fig.update_xaxes( rangeslider_visible=True, rangeselector=dict( buttons=list([ dict(count=1, label="YTD", step="year", stepmode="todate"), dict(count=1, label="1 Years", step="year", stepmode="backward"), dict(count=2, label="2 Years", step="year", stepmode="backward"), dict(count=3, label="3 Years", step="year", stepmode="backward"), dict(label="All", step="all") ]))) fig.update_layout( title=data_plot.name, xaxis_title="Date", yaxis_title="", font=dict( family="Arial", size=11, color="#7f7f7f" )) return fig.show()


plot_dates_values(dataset.index, dataset['Volume'])

plot_dates_values(dataset.index, dataset['Close'])

plot_dates_values(dataset.index, dataset['Open'])

plot_dates_values(dataset.index, dataset['Volatility'])

plot_dates_values(dataset.index, dataset['Returns'])

def generate_train_test_split(data, train_end, test_start): ''' 此函数通过使用字符串将数据集分解为训练数据和测试数据。作为'train_end'和'test_start'参数提供的字符串必须是连续的天。 Arguments: data: 数据分割为训练数据和测试数据。 train_end: 训练数据结束的日期(str)。 test_start: 测试数据开始的日期(str)。 Returns: training_data: 模型训练中使用的数据(Pandas DataFrame)。 testing_data: 模型测试中使用的数据(panda DataFrame)。 ''' if isinstance(train_end, str) is False: raise TypeError("train_end argument should be a string.") if isinstance(test_start, str) is False: raise TypeError("test_start argument should be a string.") train_end_datetime = datetime.strptime(train_end, '%Y-%m-%d') test_start_datetime = datetime.strptime(test_start, '%Y-%m-%d') while train_end_datetime >= test_start_datetime: raise ValueError("train_end argument cannot occur prior to the test_start argument.") while abs((train_end_datetime - test_start_datetime).days) > 1: raise ValueError("the train_end argument and test_start argument should be seperated by 1 day.") training_data = data[:train_end] testing_data = data[test_start:] print('Train Dataset shape:',training_data.shape) print('Test Dataset Shape:',testing_data.shape) return training_data, testing_data # 我们现在调用上面的函数,生成训练和测试数据 training_data, testing_data = generate_train_test_split(dataset, '2018-12-31','2019-01-01')



def normalise_training_values(data): ''' 这个函数用平均值和标准差对输入值进行规格化。 Arguments: data: 要标准化的DataFrame列。 Returns: values: 用于模型训练的归一化数据(numpy数组)。 mean: 训练集mean,用于标准化测试集(float)。 std: 训练集的标准差,用于标准化测试集(float)。 ''' if isinstance(data, pd.Series) is False: raise TypeError("data argument should be a Pandas Series.") values = data.to_list() mean = np.mean(values) values -= mean std = np.std(values) values /= std print("*"*80) print("The length of the training data is: {}".format(len(values))) print("The mean of the training data is: {}".format(mean.round(2))) print("The standard deviation of the training data is {}".format(std.round(2))) print("*"*80) return values, mean, std # 现在调用上面的函数: training_values, training_mean, training_std = normalise_training_values(training_data['Volume'])



# 定义每个序列的时间步数: TIME_STEPS = 30 def generate_sequences(values, time_steps = TIME_STEPS): ''' 这个函数生成要传递给模型的长度序列'TIME_STEPS'。 Arguments: values: 生成序列(numpy数组)的标准化值。 time_steps: 序列的长度(int)。 Returns: train_data: 用于模型训练的3D数据(numpy array)。 ''' if isinstance(values, np.ndarray) is False: raise TypeError("values argument must be a numpy array.") if isinstance(time_steps, int) is False: raise TypeError("time_steps must be an integer object.") output = [] for i in range(len(values) - time_steps): output.append(values[i : (i time_steps)]) train_data = np.expand_dims(output, axis =2) print("Training input data shape: {}".format(train_data.shape)) return train_data # 现在调用上面的函数生成x_train: x_train = generate_sequences(training_values)


def define_model(x_train): ''' 这个函数使用x_train的维度来生成RNN模型。 Arguments: x_train: 用于模型训练的3D数据(numpy array)。 Returns: model: 模型架构(Tensorflow对象)。 model_summary: 模型架构的摘要。 ''' if isinstance(x_train, np.ndarray) is False: raise TypeError("The x_train argument should be a 3 dimensional numpy array.") num_steps = x_train.shape[1] num_features = x_train.shape[2] keras.backend.clear_session() model = keras.Sequential( [ layers.Input(shape=(num_steps, num_features)), layers.Conv1D(filters=32, kernel_size = 15, padding = 'same', data_format= 'channels_last', dilation_rate = 1, activation = 'linear'), layers.LSTM(units = 25, activation = 'tanh', name = 'LSTM_layer_1',return_sequences= False), layers.RepeatVector(num_steps), layers.LSTM(units = 25, activation = 'tanh', name = 'LSTM_layer_2', return_sequences= True), layers.Conv1D(filters = 32, kernel_size = 15, padding = 'same', data_format = 'channels_last', dilation_rate = 1, activation = 'linear'), layers.TimeDistributed(layers.Dense(1, activation = 'linear')) ] ) model.compile(optimizer=keras.optimizers.Adam(learning_rate = 0.001), loss = "mse") return model, model.summary()


def model_fit(): ''' 这个函数调用上面的'define_model()'函数,然后根据x_train数据对模型进行训练。 Arguments: N/A. Returns: model: 训练好的模型。 history: 模型如何训练的摘要(训练错误,验证错误)。 ''' # 在x_train上调用上面的define_model函数: model, summary = define_model(x_train) history = model.fit( x_train, x_train, epochs=400, batch_size=128, validation_split=0.1, callbacks=[keras.callbacks.EarlyStopping(monitor="val_loss", patience=25, mode="min", restore_best_weights=True)]) return model, history # 调用上面的函数,生成模型和模型的历史: model, history = model_fit()


def plot_training_validation_loss(): ''' 这个函数绘制了训练模型的训练和验证损失曲线,可以对欠拟合或过拟合进行可视化诊断。 Arguments: N/A. Returns: fig:模型的训练损失和验证的可视化表示 ''' training_validation_loss = pd.DataFrame.from_dict(history.history, orient='columns') fig = go.Figure() fig.add_trace(go.Scatter(x = training_validation_loss.index, y = training_validation_loss["loss"].round(6), mode = 'lines', name = 'Training Loss', connectgaps=True)) fig.add_trace(go.Scatter(x = training_validation_loss.index, y = training_validation_loss["val_loss"].round(6), mode = 'lines', name = 'Validation Loss', connectgaps=True)) fig.update_layout( title='Training and Validation Loss', xaxis_title="Epoch", yaxis_title="Loss", font=dict( family="Arial", size=11, color="#7f7f7f" )) return fig.show() # 调用上面的函数: plot_training_validation_loss()

def reconstruction_error(x_train): ''' 这个函数计算重建误差,并显示训练平均绝对误差的直方图 Arguments: x_train: 用于模型训练的3D数据(numpy array)。 Returns: fig: 训练MAE分布的可视化图。 ''' if isinstance(x_train, np.ndarray) is False: raise TypeError("x_train argument should be a numpy array.") x_train_pred = model.predict(x_train) global train_mae_loss train_mae_loss = np.mean(np.abs(x_train_pred - x_train), axis = 1) histogram = train_mae_loss.flatten() fig =go.Figure(data = [go.Histogram(x = histogram, histnorm = 'probability', name = 'MAE Loss')]) fig.update_layout( title='Mean Absolute Error Loss', xaxis_title="Training MAE Loss (%)", yaxis_title="Number of Samples", font=dict( family="Arial", size=11, color="#7f7f7f" )) print("*"*80) print("Reconstruction error threshold: {} ".format(np.max(train_mae_loss).round(4))) print("*"*80) return fig.show() # 调用上面的函数: reconstruction_error(x_train)

def normalise_testing_values(data, training_mean, training_std): ''' 该函数使用训练平均值和标准差对测试数据进行归一化,生成一个测试值的numpy数组。 Arguments: data: 使用的数据(panda DataFrame列) mean: 训练集平均值(浮点数)。 std: 训练集标准差(float)。 Returns: values: 数组 (numpy array). ''' if isinstance(data, pd.Series) is False: raise TypeError("data argument should be a Pandas Series.") values = data.to_list() values -= training_mean values /= training_std print("*"*80) print("The length of the testing data is: {}".format(data.shape[0])) print("The mean of the testing data is: {}".format(data.mean())) print("The standard deviation of the testing data is {}".format(data.std())) print("*"*80) return values


# 调用上面的函数: test_value = normalise_testing_values(testing_data['Volume'], training_mean, training_std)


def generate_testing_loss(test_value): ''' 这个函数使用模型来预测测试集中的异常情况。此外,该函数生成“异常”全局变量,包含由RNN识别的异常值。 Arguments: test_value: 测试的数组(numpy数组)。 Returns: fig: 训练MAE分布的可视化图。 ''' x_test = generate_sequences(test_value) print("*"*80) print("Test input shape: {}".format(x_test.shape)) x_test_pred = model.predict(x_test) test_mae_loss = np.mean(np.abs(x_test_pred - x_test), axis = 1) test_mae_loss = test_mae_loss.reshape((-1)) global anomalies anomalies = (test_mae_loss >= np.max(train_mae_loss)).tolist() print("Number of anomaly samples: ", np.sum(anomalies)) print("Indices of anomaly samples: ", np.where(anomalies)) print("*"*80) histogram = test_mae_loss.flatten() fig =go.Figure(data = [go.Histogram(x = histogram, histnorm = 'probability', name = 'MAE Loss')]) fig.update_layout( title='Mean Absolute Error Loss', xaxis_title="Testing MAE Loss (%)", yaxis_title="Number of Samples", font=dict( family="Arial", size=11, color="#7f7f7f" )) return fig.show() # 调用上面的函数: generate_testing_loss(test_value)


def plot_outliers(data): ''' 这个函数决定了时间序列中离群点的位置,这些离群点被依次绘制出来。 Arguments: data: 初始数据集(Pandas DataFrame)。 Returns: fig: 由RNN确定的序列中出现的异常值的可视化表示。 ''' outliers = [] for data_idx in range(TIME_STEPS -1, len(test_value) - TIME_STEPS 1): time_series = range(data_idx - TIME_STEPS 1, data_idx) if all([anomalies[j] for j in time_series]): outliers.append(data_idx len(training_data)) outlying_data = data.iloc[outliers, :] cond = data.index.isin(outlying_data.index) no_outliers = data.drop(data[cond].index) fig = go.Figure() fig.add_trace(go.Scatter(x = no_outliers.index, y = no_outliers["Volume"], mode = 'markers', name = no_outliers["Volume"].name, connectgaps=False)) fig.add_trace(go.Scatter(x = outlying_data.index, y = outlying_data["Volume"], mode = 'markers', name = outlying_data["Volume"].name ' Outliers', connectgaps=False)) fig.update_xaxes(rangeslider_visible=True) fig.update_layout( title='Detected Outliers', xaxis_title=data.index.name, yaxis_title=no_outliers["Volume"].name, font=dict( family="Arial", size=11, color="#7f7f7f" )) return fig.show() # 调用上面的函数: plot_outliers(dataset)


