LSTM预测股价的APP

根据之前的代码:LSTM预测股票收盘价做了一点修改。
首先收集数据:

import tushare as ts
import pandas as pd

def get_data(code, date):
    #数据准备/data preparation 
    #变量选取Open,High,Low,Close,Volume等,以浦发银行股票为例
    pro = ts.pro_api('f3bbc97d0ffbbed8666e6f7c82e712165950d048987f5d6cfbf1e0ce') #token可以在新版tushare的网站上找到
    stock_data = pro.query('daily', ts_code = code, start_date = '20000101', end_date = date)
    stock_data = stock_data[::-1] #倒序,使日期靠前的排在前面
    stock_data.reset_index(drop=True, inplace=True) #把每行的索引改为“0、1、2……”
    return stock_data

然后进行预处理:

import numpy as np
from sklearn.preprocessing import MinMaxScaler

def preprocess(stock_data, day, seq_length, data_dim, output_dim, visual_window):
    xy = stock_data[['open', 'close', 'high', 'low', 'vol', 'pct_chg', 'amount']] #选取需要的features
    xy = np.array(xy.values) #转为array
    
    dataXY = []
    for i in range(0, len(xy) - seq_length - day + 1):
        _xy = xy[i:i + seq_length + day] #包括用于计算的seq_length天的数据,以及day天后的价格
        dataXY.append(_xy)
    
    #调整数据的shape
    xy_real = np.vstack(dataXY).reshape(-1, seq_length + day, data_dim)
    dataXY = xy_real
    
    app_dataX = []
    for i in range(len(xy) - seq_length - day + 1, len(xy) - seq_length + 1):
        _x = xy[i:i + seq_length] #包括用于计算的seq_length天的数据
        app_dataX.append(_x)
    
    #调整数据的shape
    x_real = np.vstack(app_dataX).reshape(-1, seq_length, data_dim)
    app_dataX = x_real
    
    xy_visual = np.copy(dataXY[- visual_window:]) #取最近visual_window天的数据,用于最后的画图
    
    np.random.shuffle(dataXY) #打乱顺序
    
    #切分训练集合测试集/split to train and testing
    train_size = int(len(dataXY) * 0.7) #训练集长度
    test_size = len(dataXY) - train_size #测试集长度
    xy_train, xy_test = np.array(dataXY[0:train_size]), np.array(dataXY[train_size:len(dataXY)]) #划分训练集、测试集
    
    #先处理训练集的数据
    scaler = MinMaxScaler()
    xy_train = xy_train.reshape((-1, data_dim)) #先变成2维,才能transform
    xy_train_new = scaler.fit_transform(xy_train) #预处理,按列操作,每列最小值为0,最大值为1
    xy_train_new = xy_train_new.reshape((-1, seq_length + day, data_dim)) #变回3维
    
    x_new = xy_train_new[:,0:seq_length] #features
    y_new = xy_train_new[:,-1,1] * 10 #取最后一天的收盘价,用作label,适当放大,便于训练
    
    trainX, trainY = x_new, y_new
    
    #然后处理测试集的数据
    xy_test = xy_test.reshape((-1, data_dim))
    xy_test_new = scaler.transform(xy_test) #使用训练集的scaler预处理测试集的数据
    xy_test_new = xy_test_new.reshape((-1, seq_length + day, data_dim))
    
    x_new = xy_test_new[:, 0:seq_length]
    y_new = xy_test_new[:, -1, 1] * 10
    #以下3项用于计算收入
    close_price = xy_test_new[:, seq_length - 1, 1]
    buy_price = xy_test_new[:, seq_length, 0]
    sell_price = xy_test_new[:, -1, 1]
    
    testX, testY, test_close, test_buy, test_sell = x_new, y_new, close_price, buy_price, sell_price
    
    #再处理应用集
    x_app = app_dataX.reshape((-1, data_dim))
    appX = scaler.transform(x_app) #用训练集的scaler进行预处理
    appX = appX.reshape((-1, seq_length, data_dim))
    
    #最后处理用于画图的数据
    xy_visual = xy_visual.reshape((-1, data_dim))
    xy_visual_new = scaler.transform(xy_visual) #使用训练集的scaler预处理
    xy_visual_new = xy_visual_new.reshape((-1, seq_length + day, data_dim))
    
    x_new = xy_visual_new[:, 0:seq_length]
    y_new = xy_visual_new[:, -1, 1] * 10
    
    visualX, visualY = x_new, y_new
    
    return trainX, trainY, testX, testY, appX, scaler, test_close, test_buy, test_sell, visualX, visualY

如果之前没有建立模型,就训练一个:

from keras.layers import Input, Dense, LSTM, Reshape
from keras.models import Model
from keras import regularizers, callbacks

def train(code, day, trainX, trainY, seq_length, data_dim, output_dim):
    # 构建神经网络层 1层Dense层+1层LSTM层+4层Dense层
    rnn_units = 32
    Dense_input = Input(shape=(seq_length, data_dim), name='dense_input') #输入层
    #shape: 形状元组(整型)不包括batch size。表示了预期的输入将是一批(seq_len,data_dim)的向量。
    Dense_output_1 = Dense(rnn_units, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense1')(Dense_input) #全连接网络

    lstm_input = Reshape(target_shape=(seq_length, rnn_units), name='reshape2')(Dense_output_1) 
    #改变Tensor形状,改变后是(None,seq_length, rnn_units)
    lstm_output = LSTM(rnn_units, activation='tanh', dropout=1.0, name='lstm')(lstm_input) #LSTM网络
    #units: Positive integer,dimensionality of the output space.
    #dropout: Float between 0 and 1. Fraction of the units to drop for the linear transformation of the inputs.
    
    Dense_input_2 = Reshape(target_shape=(rnn_units,), name='reshape3')(lstm_output) 
    #改变Tensor形状,改变后是(None,rnn_units)
    Dense_output_2 = Dense(64, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense2')(Dense_input_2) #全连接网络
    Dense_output_3 = Dense(32, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense3')(Dense_output_2) #全连接网络
    Dense_output_4 = Dense(16, activation='relu', kernel_regularizer=regularizers.l2(0.0), name='dense4')(Dense_output_3) #全连接网络
    predictions = Dense(output_dim, activation=None, kernel_regularizer=regularizers.l2(0.0), name='dense5')(Dense_output_4) #全连接网络
    
    model = Model(inputs=Dense_input, outputs=predictions)
    #This model will include all layers required in the computation of output given input.
    
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    #Configures the model for training.
    #optimizer: String (name of optimizer) or optimizer instance. See optimizers.
    #loss: String (name of objective function) or objective function.The loss value will be minimized by the model.
    #metrics: List of metrics to be evaluated by the model during training and testing. Typically you will use  metrics=['accuracy'].
    
    ES = callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=10, verbose=0, mode='auto', baseline=None)
    
    model.fit(trainX, trainY, batch_size=256, epochs=400, verbose=0, callbacks=[ES], validation_split=0.1)
    #Trains the model for a given number of epochs (iterations on a dataset).
    #verbose: Integer. 0, 1, or 2. Verbosity mode. 0 = silent, 1 = progress bar, 2 = one line per epoch.
    
    # 保存模型
    model.save(code + '(1)' + str(day) + '.h5')   # HDF5文件,pip install h5py
    
    return model

测试一下模型的效果:

def test(model, testX, testY, scaler, day, close_price, buy_price, sell_price, visualX, visualY):
    testPredict = model.predict(testX) #查看测试结果
    testPredict2 = testPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
    testY2 = testY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
    
    #以下3项用于计算收入
    #今天的收盘价,用于判断买不买
    close_price2 = close_price * scaler.data_range_[1] + scaler.data_min_[1]
    #明天的开盘价,如果买需要付多少钱
    buy_price2 = buy_price * scaler.data_range_[0] + scaler.data_min_[0]
    #持有day天之后的收盘价,这时卖能卖多少钱
    sell_price2 = sell_price * scaler.data_range_[1] + scaler.data_min_[1]
    
    #平均误差(%)
    mean_error = np.mean(abs(testPredict2 - testY2) / testY2 * 100)
    mean_error = round(mean_error, 2)
    print('平均误差(%):', mean_error)
    
    #最大误差(%)
    max_error = np.max(abs(testPredict2 - testY2) / testY2 * 100)
    max_error = round(max_error, 2)
    print('最大误差(%):', max_error)
    
    count = 0 #绝对误差小于1%的比例
    correct = np.zeros(len(testPredict2)) #预测涨或跌的正确率
    model_income = 0 #模型能挣多少钱
    trade = 0 #计算交易频率
    max_income = 0 #最理想的状况下,能挣多少钱
    random_income = 0 #随机购买,能挣多少钱
    
    tolerance = 1
    for i in range(len(testY2)):
        #计算绝对误差小于 tolerance% 的比例
        if abs(testPredict2[i] - testY2[i]) / testY2[i] * 100 <= tolerance:
            count += 1
        #计算对转折点的预测正确率
        if np.sign(testPredict2[i] - close_price2[i]) == np.sign(testY2[i] - close_price2[i]):
            #如果对涨或跌的判断准确,这里用正负符号判断
            correct[i] = 1 #就加1
        #如果对“day”天后的预测价格高于今天的收盘价,就买进并持有“day”天,计算能挣多少钱
        if testPredict2[i] > close_price2[i]:
            model_income = model_income + sell_price2[i] - buy_price2[i]
            trade += 1
        #最理想的状况下,能挣多少钱
        if testY2[i] > close_price2[i]:
            max_income = max_income + sell_price2[i] - buy_price2[i]
        #随机购买,能挣多少钱
        buy = np.random.randint(0, 2) #随机产生0或1
        if buy: #如果是1就买
            random_income = random_income + sell_price2[i] - buy_price2[i]
    
    count = count / len(testY2) * 100
    count = round(count, 2)
    print('误差小于' + str(tolerance) + '%的比例:', count)
    
    accuracy = np.sum(correct) / len(correct) * 100
    accuracy = round(accuracy, 2)
    print('预测涨或跌的正确率:', accuracy)
    
    print('模型的购买策略是,如果对%d天之后的预测值大于今天的收盘价,就在明天开市时买进1股,并且持有%d天,再卖出'%(day, day))
    frequency = trade / len(testPredict2) * 100
    model_income = round(float(model_income), 2)
    frequency = round(frequency, 2)
    print('在%d天中,模型交易了%d次,交易频率为%g'%(len(testPredict2), trade, frequency) + '%')
    print('按照模型进行操作所得的收入:', model_income)
    
    max_income = round(float(max_income), 2)
    print('最理想状况下的收入:', max_income)
    
    random_income = round(float(random_income), 2)
    print('随机购买的收入:', random_income)
    
    visualPredict = model.predict(visualX)
    visualPredict2 = visualPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
    visualY2 = visualY / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
    
    return visualY2, visualPredict2

用模型预测未来的股价:

def apply(model, appX, scaler):
     #查看应用结果
    appPredict = model.predict(appX)
    appPredict2 = appPredict / 10 * scaler.data_range_[1] + scaler.data_min_[1] #放大和scale的逆运算
    appPredict2 = appPredict2.reshape(-1)
    return appPredict2

把测试和预测的结果可视化:

import matplotlib.pyplot as plt
def visualize(visualY2, visualPredict2, appPredict2, visual_window, day):
    plt.figure(figsize=(16,8)) #画布大小
    plt.plot(list(range(len(visualY2))), visualY2, color='blue') #只显示最近“period”天的测试记录
    plt.plot(list(range(len(visualPredict2))), visualPredict2, color='orange')
    plt.scatter(
        list(range(len(visualPredict2), len(visualPredict2) + len(appPredict2))), appPredict2, color='red')    
    plt.xlim((0, visual_window + day)) #x坐标范围
    plt.legend(['True price', 'Model result', 'Prediction'], loc='upper left')
    plt.ylabel('price')
    plt.xlabel('time')
    plt.show()

主函数:

def main():
    code = input("请输入6位代码:") #输入股票代码
    code = code + '.SH'
    
    day = input("请输入预测天数:") #输入预测多少天后的价格
    day = int(day)
    
    import time
    date = time.strftime('%Y%m%d',time.localtime(time.time())) #获取当天日期
    
    #参数设置/parameter setting
    timesteps = seq_length = 20 #时间窗/window length
    data_dim = 7 #输入数据维度/dimension of input data
    output_dim = 1 #输出数据维度/dimension of output data
    visual_window = 200
    
    try:
        stock_data = get_data(code, date)
    except:
        print('代码不正确或无法获得该股票的数据')
        return
    
    if len(stock_data) == 0:
        print('代码不正确或无法获得该股票的数据')
        return
    
    trainX, trainY, testX, testY, appX, scaler, test_close, test_buy, test_sell, visualX, visualY = preprocess(
        stock_data, day, seq_length, data_dim, output_dim, visual_window)
    
    try:
        # 载入模型
        from keras.models import load_model
        model = load_model(code + '(1)' + str(day) + '.h5')
    except:
        print('第一次预测%d天内'%(day) + code + '的估价,需要一点时间建模')
        model = train(
            code, day, trainX, trainY, seq_length, data_dim, output_dim)
    
    visualY2, visualPredict2 = test(model, testX, testY, scaler, day, test_close, test_buy, test_sell, visualX, visualY)
    appPredict2 = apply(model, appX, scaler)
    visualize(visualY2, visualPredict2, appPredict2, visual_window, day)

运行一下,看看效果:

main()

请输入6位代码:600004
请输入预测天数:20
第一次预测20天内600004.SH的估价,需要一点时间建模
平均误差(%): 38.58
最大误差(%): 234.63
误差小于1%的比例: 12.3
预测涨或跌的正确率: 56.55
模型的购买策略是,如果对20天之后的预测值大于今天的收盘价,就在明天开市时买进1股,并且持有20天,再卖出
在1114天中,模型交易了541次,交易频率为48.56%
按照模型进行操作所得的收入: 165.19
最理想状况下的收入: 459.25
随机购买的收入: -8.36
结果可视化
代码在GitHub里:GitHub