LSTM预测股价的APP
根据之前的代码:LSTM预测股票收盘价做了一点修改。
首先收集数据:
import tushare as ts
import pandas as pd
def get_data(code, date):
#数据准备/data preparation
#变量选取Open,High,Low,Close,Volume等,以浦发银行股票为例
pro = ts.pro_api('f3bbc97d0ffbbed8666e6f7c82e712165950d048987f5d6cfbf1e0ce') #token可以在新版tushare的网站上找到
stock_data = pro.query('daily', ts_code = code, start_date = '20000101', end_date = date)
stock_data = stock_data[::-1] #倒序,使日期靠前的排在前面
stock_data.reset_index(drop=True, inplace=True) #把每行的索引改为“0、1、2……”
return stock_data
然后进行预处理:
import numpy as np
from sklearn.preprocessing import MinMaxScaler
def preprocess(stock_data, day, seq_length, data_dim, output_dim, visual_window):
xy = stock_data[['open', 'close', 'high', 'low', 'vol', 'pct_chg', 'amount']] #选取需要的features
xy = np.array(xy.values) #转为array
dataXY = []
for i in range(0, len(xy) - seq_length - day + 1):
_xy = xy[i:i + seq_length + day] #包括用于计算的seq_length天的数据,以及day天后的价格
dataXY.append(_xy)
#调整数据的shape
xy_real = np.vstack(dataXY).reshape(-1, seq_length + day, data_dim)
dataXY = xy_real
app_dataX = []
for i in range(len(xy) - seq_length - day + 1, len(xy) - seq_length + 1):
_x = xy[i:i + seq_length] #包括用于计算的seq_length天的数据
app_dataX.append(_x)
#调整数据的shape
x_real = np.vstack(app_dataX).reshape(-1, seq_length, data_dim)
app_dataX = x_real
xy_visual = np.copy(dataXY[- visual_window:]) #取最近visual_window天的数据,用于最后的画图
np.random.shuffle(dataXY) #打乱顺序
#切分训练集合测试集/split to train and testing
train_size = int(len(dataXY) * 0.7) #训练集长度
test_size = len(dataXY) - train_size #测试集长度
xy_train, xy_test = np.array(dataXY[0:train_size]), np.array(dataXY[train_size:len(dataXY)]) #划分训练集、测试集
#先处理训练集的数据
scaler = MinMaxScaler()
xy_train = xy_train.reshape((-1, data_dim)) #先变成2维,才能transform
xy_train_new = scaler.fit_transform(xy_train) #预处理,按列操作,每列最小值为0,最大值为1
xy_train_new = xy_train_new.reshape((-1, seq_length + day, data_dim)) #变回3维
x_new = xy_train_new[:,0:seq_length] #features
y_new = xy_train_new[:,-1,1] * 10 #取最后一天的收盘价,用作label,适当放大,便于训练
trainX, trainY = x_new, y_new
#然后处理测试集的数据
xy_test = xy_test.reshape((-1, data_dim))
xy_test_new = scaler.transform(xy_test) #使用训练集的scaler预处理测试集的数据
xy_test_new = xy_test_new.reshape((-1, seq_length + day, data_dim))
x_new = xy_test_new[:, 0:seq_length]
y_new = xy_test_new[:, -1, 1] * 10
#以下3项用于计算收入
close_price = xy_test_new[:, seq_length - 1, 1]
buy_price = xy_test_new[:, seq_length, 0]
sell_price = xy_test_new[:, -1, 1]
testX, testY, test_close, test_buy, test_sell = x_new, y_new, close_price, buy_price, sell_price
#再处理应用集
x_app = app_dataX.reshape((-1, data_dim))
appX = scaler.transform(x_app) #用训练集的scaler进行预处理
appX = appX.reshape((-1, seq_length, data_dim))
#最后处理用于画图的数据
xy_visual = xy_visual.reshape((-1, data_dim))
xy_visual_new = scaler.transform(xy_visual) #使用训练集的scaler预处理
xy_visual_new = xy_visual_new.reshape((-1, seq_length + day, data_dim))
x_new = xy_visual_new[:, 0:seq_length]
y_new = xy_visual_new[:, -1, 1] * 10
visualX, visualY = x_new, y_new
return trainX, trainY, testX, testY, appX, scaler, test_close, test_buy, test_sell, visualX, visualY
如果之前没有建立模型,就训练一个:
from keras.layers import Input, Dense, LSTM, Reshape
from keras.models import Model
from keras import regularizers, callbacks
def train(code, day, trainX, trainY, seq_length, data_dim, output_dim):
# 构建神经网络层 1层Dense层+1层LSTM层+4层Dense层
rnn_units = 32
Dense_input = Input(shape=(seq_length, data_dim), name='dense_input') #输入层
#shape: 形状元组(整型)不包括batch size。表示了预期的输入将是一批(seq_len,data_dim)的向量。
Dense_output_1 = Dense(rnn_units, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense1')(Dense_input) #全连接网络
lstm_input = Reshape(target_shape=(seq_length, rnn_units), name='reshape2')(Dense_output_1)
#改变Tensor形状,改变后是(None,seq_length, rnn_units)
lstm_output = LSTM(rnn_units, activation='tanh', dropout=1.0, name='lstm')(lstm_input) #LSTM网络
#units: Positive integer,dimensionality of the output space.
#dropout: Float between 0 and 1. Fraction of the units to drop for the linear transformation of the inputs.
Dense_input_2 = Reshape(target_shape=(rnn_units,), name='reshape3')(lstm_output)
#改变Tensor形状,改变后是(None,rnn_units)
Dense_output_2 = Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense2')(Dense_input_2) #全连接网络
Dense_output_3 = Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense3')(Dense_output_2) #全连接网络
Dense_output_4 = Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense4')(Dense_output_3) #全连接网络
predictions = Dense(output_dim, activation=None, kernel_regularizer=regularizers.l2(0.0), name='dense5')(Dense_output_4) #全连接网络
model = Model(inputs=Dense_input, outputs=predictions)
#This model will include all layers required in the computation of output given input.
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
#Configures the model for training.
#optimizer: String (name of optimizer) or optimizer instance. See optimizers.
#loss: String (name of objective function) or objective function.The loss value will be minimized by the model.
#metrics: List of metrics to be evaluated by the model during training and testing. Typically you will use metrics=['accuracy'].
ES = callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='auto', baseline=None)
model.fit(trainX, trainY, batch_size=256, epochs=400, verbose=0, callbacks=[ES], validation_split=0.1)
#Trains the model for a given number of epochs (iterations on a dataset).
#verbose: Integer. 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch.
# 保存模型
model.save(code + '(1)' + str(day) + '.h5') # HDF5文件,pip install h5py
return model
测试一下模型的效果:
def test(model, testX, testY, scaler, day, close_price, buy_price, sell_price, visualX, visualY):
testPredict = model.predict(testX) #查看测试结果
testPredict2 = testPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
testY2 = testY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
#以下3项用于计算收入
#今天的收盘价,用于判断买不买
close_price2 = close_price * scaler.data_range_[1] + scaler.data_min_[1]
#明天的开盘价,如果买需要付多少钱
buy_price2 = buy_price * scaler.data_range_[0] + scaler.data_min_[0]
#持有day天之后的收盘价,这时卖能卖多少钱
sell_price2 = sell_price * scaler.data_range_[1] + scaler.data_min_[1]
#平均误差(%)
mean_error = np.mean(abs(testPredict2 - testY2) / testY2 * 100)
mean_error = round(mean_error, 2)
print('平均误差(%):', mean_error)
#最大误差(%)
max_error = np.max(abs(testPredict2 - testY2) / testY2 * 100)
max_error = round(max_error, 2)
print('最大误差(%):', max_error)
count = 0 #绝对误差小于1%的比例
correct = np.zeros(len(testPredict2)) #预测涨或跌的正确率
model_income = 0 #模型能挣多少钱
trade = 0 #计算交易频率
max_income = 0 #最理想的状况下,能挣多少钱
random_income = 0 #随机购买,能挣多少钱
tolerance = 1
for i in range(len(testY2)):
#计算绝对误差小于 tolerance% 的比例
if abs(testPredict2[i] - testY2[i]) / testY2[i] * 100 <= tolerance:
count += 1
#计算对转折点的预测正确率
if np.sign(testPredict2[i] - close_price2[i]) == np.sign(testY2[i] - close_price2[i]):
#如果对涨或跌的判断准确,这里用正负符号判断
correct[i] = 1 #就加1
#如果对“day”天后的预测价格高于今天的收盘价,就买进并持有“day”天,计算能挣多少钱
if testPredict2[i] > close_price2[i]:
model_income = model_income + sell_price2[i] - buy_price2[i]
trade += 1
#最理想的状况下,能挣多少钱
if testY2[i] > close_price2[i]:
max_income = max_income + sell_price2[i] - buy_price2[i]
#随机购买,能挣多少钱
buy = np.random.randint(0, 2) #随机产生0或1
if buy: #如果是1就买
random_income = random_income + sell_price2[i] - buy_price2[i]
count = count / len(testY2) * 100
count = round(count, 2)
print('误差小于' + str(tolerance) + '%的比例:', count)
accuracy = np.sum(correct) / len(correct) * 100
accuracy = round(accuracy, 2)
print('预测涨或跌的正确率:', accuracy)
print('模型的购买策略是,如果对%d天之后的预测值大于今天的收盘价,就在明天开市时买进1股,并且持有%d天,再卖出'%(day, day))
frequency = trade / len(testPredict2) * 100
model_income = round(float(model_income), 2)
frequency = round(frequency, 2)
print('在%d天中,模型交易了%d次,交易频率为%g'%(len(testPredict2), trade, frequency) + '%')
print('按照模型进行操作所得的收入:', model_income)
max_income = round(float(max_income), 2)
print('最理想状况下的收入:', max_income)
random_income = round(float(random_income), 2)
print('随机购买的收入:', random_income)
visualPredict = model.predict(visualX)
visualPredict2 = visualPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
visualY2 = visualY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
return visualY2, visualPredict2
用模型预测未来的股价:
def apply(model, appX, scaler):
#查看应用结果
appPredict = model.predict(appX)
appPredict2 = appPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
appPredict2 = appPredict2.reshape(-1)
return appPredict2
把测试和预测的结果可视化:
import matplotlib.pyplot as plt
def visualize(visualY2, visualPredict2, appPredict2, visual_window, day):
plt.figure(figsize=(16,8)) #画布大小
plt.plot(list(range(len(visualY2))), visualY2, color='blue') #只显示最近“period”天的测试记录
plt.plot(list(range(len(visualPredict2))), visualPredict2, color='orange')
plt.scatter(
list(range(len(visualPredict2), len(visualPredict2) + len(appPredict2))), appPredict2, color='red')
plt.xlim((0, visual_window + day)) #x坐标范围
plt.legend(['True price', 'Model result', 'Prediction'], loc='upper left')
plt.ylabel('price')
plt.xlabel('time')
plt.show()
主函数:
def main():
code = input("请输入6位代码:") #输入股票代码
code = code + '.SH'
day = input("请输入预测天数:") #输入预测多少天后的价格
day = int(day)
import time
date = time.strftime('%Y%m%d',time.localtime(time.time())) #获取当天日期
#参数设置/parameter setting
timesteps = seq_length = 20 #时间窗/window length
data_dim = 7 #输入数据维度/dimension of input data
output_dim = 1 #输出数据维度/dimension of output data
visual_window = 200
try:
stock_data = get_data(code, date)
except:
print('代码不正确或无法获得该股票的数据')
return
if len(stock_data) == 0:
print('代码不正确或无法获得该股票的数据')
return
trainX, trainY, testX, testY, appX, scaler, test_close, test_buy, test_sell, visualX, visualY = preprocess(
stock_data, day, seq_length, data_dim, output_dim, visual_window)
try:
# 载入模型
from keras.models import load_model
model = load_model(code + '(1)' + str(day) + '.h5')
except:
print('第一次预测%d天内'%(day) + code + '的估价,需要一点时间建模')
model = train(
code, day, trainX, trainY, seq_length, data_dim, output_dim)
visualY2, visualPredict2 = test(model, testX, testY, scaler, day, test_close, test_buy, test_sell, visualX, visualY)
appPredict2 = apply(model, appX, scaler)
visualize(visualY2, visualPredict2, appPredict2, visual_window, day)
运行一下,看看效果:
main()
请输入6位代码:600004
请输入预测天数:20
第一次预测20天内600004.SH的估价,需要一点时间建模
平均误差(%): 38.58
最大误差(%): 234.63
误差小于1%的比例: 12.3
预测涨或跌的正确率: 56.55
模型的购买策略是,如果对20天之后的预测值大于今天的收盘价,就在明天开市时买进1股,并且持有20天,再卖出
在1114天中,模型交易了541次,交易频率为48.56%
按照模型进行操作所得的收入: 165.19
最理想状况下的收入: 459.25
随机购买的收入: -8.36

代码在GitHub里:GitHub