ztools_ai.py
# -*- coding: utf-8 -*-
'''
TopQuant-简称TQ极宽智能量化回溯分析系统,培训课件-配套教学python程序
Top极宽量化(原zw量化),Python量化第一品牌
by Top极宽·量化开源团队 2017.10.1 首发
文件名:ztools_ai.py
默认缩写:import ztools_ai as zai
简介:Top极宽量化·常用AI工具函数集
'''
#
import sys,os,re,pickle
import arrow,bs4,random,copy
import numexpr as ne
import numpy as np
import pandas as pd
import tushare as ts
import pandas_datareader.data as pdat # 网络财经数据接口库
#import talib as ta
import pypinyin
#
import matplotlib as mpl
from matplotlib import pyplot as plt
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from sklearn import metrics
#import multiprocessing
#
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
#
import sklearn
from sklearn import datasets, linear_model
from sklearn import metrics
from sklearn import tree
#from sklearn.cross_validation import train_test_split
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import RandomForestClassifier
#from sklearn.grid_search import GridSearchCV
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LinearRegression
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_predict
from sklearn.naive_bayes import MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.neural_network import BernoulliRBM
from sklearn.neural_network import MLPClassifier
from sklearn.neural_network import MLPRegressor
from sklearn.externals import joblib
#
import keras as ks
from keras.models import Sequential,load_model
from keras.utils import plot_model
#
#import tflearn
#import tensorflow as tf
#
import zsys
import ztools as zt
import ztools_str as zstr
import ztools_data as zdat
#-------------------
#
import zpd_talib as zta
#
#-------------------
#---------------------------ai.xxx
def ai_varRd(fmx0):
fvar=fmx0+'tqvar.pkl'
qx=zt.f_varRd(fvar)
for xkey in qx.aiMKeys:
fss=fmx0+xkey+'.mx'
mx=load_model(fss)
qx.aiModel[xkey]=mx
#
return qx
def ai_varWr(qx,fmx0):
fvar=fmx0+'tqvar.pkl'
mx9=qx.aiModel
qx.aiMKeys=list(mx9.keys())
qx.aiModel={}
zt.f_varWr(fvar,qx)
print('fvar,',fvar)
#
for xkey in mx9:
fss=fmx0+xkey+'.mx'
mx9[xkey].save(fss)
print('fmx,',fss)
#
qx.aiModel=mx9
#---------------------------ai.xxx
#---------------------------ai.dacc
def ai_acc_xed2x(y_true,y_pred,ky0=5,fgDebug=False):
'''
效果评估函数,用于评估机器学习算法函数的效果。
输入:
y_true,y_pred,pandas的Series数据列格式。
ky0,结果数据误差k值,默认是5,表示百分之五。
fgDebug,调试模式变量,默认为False。
返回:
dacc,准确率,float格式
df,结果数据,pandas列表格式DataFrame
'''
#1
df,dacc=pd.DataFrame(),-1
#print('n,',len(y_true),len(y_pred))
if (len(y_true)==0) or (len(y_pred)==0):
#print('n,',len(y_true),len(y_pred))
return dacc,df
#
y_num=len(y_true)
#df['y_true'],df['y_pred']=zdat.ds4x(y_true,df.index),zdat.ds4x(y_pred,df.index)
df['y_true'],df['y_pred']=pd.Series(y_true),pd.Series(y_pred)
df['y_diff']=np.abs(df.y_true-df.y_pred)
#2
df['y_true2']=df['y_true']
df.loc[df['y_true'] == 0, 'y_true2'] =0.00001
df['y_kdif']=df.y_diff/df.y_true2*100
#3
dfk=df[df.y_kdif<ky0]
knum=len(dfk['y_pred'])
dacc=knum/y_num*100
#
#5
dacc=round(dacc,3)
return dacc,df
def ai_acc_xed2ext(y_true,y_pred,ky0=5,fgDebug=False):
'''
效果评估函数,用于评估机器学习算法函数的效果。
输入:
y_true,y_pred,pandas的Series数据列格式。
ky0,结果数据误差k值,默认是5,表示百分之五。
fgDebug,调试模式变量,默认为False。
返回:
dacc,准确率,float格式
df,结果数据,pandas列表格式DataFrame
[dmae,dmse,drmse,dr2sc],各种扩充评估数据
'''
#1
df,dacc=pd.DataFrame(),-1
if (len(y_true)==0) or (len(y_pred)==0):
#print('n,',len(y_true),len(y_pred))
return dacc,df
#2
y_num=len(y_true)
#df['y_true'],df['y_pred']=zdat.ds4x(y_true,df.index),zdat.ds4x(y_pred,df.index)
df['y_true'],df['y_pred']=y_true,y_pred
df['y_diff']=np.abs(df.y_true-df.y_pred)
#3
df['y_true2']=df['y_true']
df.loc[df['y_true'] == 0, 'y_true2'] =0.00001
df['y_kdif']=df.y_diff/df.y_true2*100
#4
dfk=df[df.y_kdif<ky0]
knum=len(dfk['y_pred'])
dacc=knum/y_num*100
#
#5
dmae=metrics.mean_absolute_error(y_true, y_pred)
dmse=metrics.mean_squared_error(y_true, y_pred)
drmse=np.sqrt(metrics.mean_squared_error(y_true, y_pred))
dr2sc=metrics.r2_score(y_true,y_pred)
#
#6
if fgDebug:
#print('\nai_acc_xed')
#print(df.head())
#y_test,y_pred=df['y_test'],df['y_pred']
print('ky0={0}; n_df9,{1},n_dfk,{2}'.format(ky0,y_num,knum))
print('acc: {0:.2f}%; MSE:{1:.2f}, MAE:{2:.2f}, RMSE:{3:.2f}, r2score:{4:.2f}, @ky0:{5:.2f}'.format(dacc,dmse,dmae,drmse,dr2sc,ky0))
#
#7
dacc=round(dacc,3)
xlst=[dmae,dmse,drmse,dr2sc]
return dacc,df,xlst
#---------------------------ai.model.xxx
def ai_mul_var_tst(mx,df_train,df_test,nepochs=200,nsize=128,ky0=5):
x_train,y_train=df_train['x'].values,df_train['y'].values
x_test, y_test = df_test['x'].values,df_test['y'].values
#
mx.fit(x_train, y_train, epochs=nepochs, batch_size=nsize)
#
y_pred = mx.predict(x_test)
df_test['y_pred']=zdat.ds4x(y_pred,df_test.index,True)
dacc,_=ai_acc_xed2x(df_test.y,df_test['y_pred'],ky0,False)
#
return dacc
def ai_mx_tst_epochs(f_mx,f_tg,df_train,df_test,kepochs=100,nsize=128,ky0=5):
ds,df={},pd.DataFrame()
for xc in range(1,11):
print('\n#',xc)
dnum=xc*kepochs
mx=ks.models.load_model(f_mx)
t0=arrow.now()
dacc=ai_mul_var_tst(mx,df_train,df_test,dnum,nsize,ky0=ky0)
tn=zt.timNSec('',t0)
ds['nepoch'],ds['epoch_acc'],ds['ntim']=dnum,dacc,tn
df=df.append(ds,ignore_index=True)
#
df=df.dropna()
df['nepoch']=df['nepoch'].astype(int)
print('\ndf')
print(df)
print('\nf,',f_tg)
df.to_csv(f_tg,index=False)
#
df.plot(kind='bar',x='nepoch',y='epoch_acc',rot=0)
df.plot(kind='bar',x='nepoch',y='ntim',rot=0)
#
return df
def ai_mx_tst_bsize(f_mx,f_tg,df_train,df_test,nepochs=500,ksize=32,ky0=5):
ds,df={},pd.DataFrame()
for xc in range(1,11):
print('\n#',xc)
dnum=xc*ksize
mx=ks.models.load_model(f_mx)
t0=arrow.now()
dacc=ai_mul_var_tst(mx,df_train,df_test,nepochs,dnum,ky0=ky0)
tn=zt.timNSec('',t0)
ds['bsize'],ds['size_acc'],ds['ntim']=dnum,dacc,tn
df=df.append(ds,ignore_index=True)
#
df=df.dropna()
df['bsize']=df['bsize'].astype(int)
print('\ndf')
print(df)
print('\nf,',f_tg)
df.to_csv(f_tg,index=False)
#
df.plot(kind='bar',x='bsize',y='size_acc',rot=0)
df.plot(kind='bar',x='bsize',y='ntim',rot=0)
return df
def ai_mx_tst_kacc(f_mx,f_tg,df_train,df_test,nepochs=500,nsize=128):
ds,df={},pd.DataFrame()
for xc in range(1,11):
print('\n#',xc)
dnum=xc*1
mx=ks.models.load_model(f_mx)
dacc=ai_mul_var_tst(mx,df_train,df_test,nepochs,nsize,ky0=dnum)
ds['kacc'],ds['dacc']=dnum,dacc
df=df.append(ds,ignore_index=True)
#
df=df.dropna()
df['kacc']=df['kacc'].astype(int)
print('\ndf')
print(df)
print('\nf,',f_tg)
df.to_csv(f_tg,index=False)
#
df.plot(kind='bar',x='kacc',y='dacc',rot=0)
#
return df
#------edit @2020.0202,by @zw
def mxtst_DTree010(mfun, df_train,df_test, ntree=100,ndepth=3, ftg='tmp/pic001.png',xsgnlst=['open','high','low','close'],ysgnlst=['close_next','close_pred']):
#1
print('#1,模型设置')
mx =mfun(n_estimators=ntree,max_depth=ndepth)
#2
print('#2,准备AI数据')
x_train=df_train[xsgnlst].values
ysgn=ysgnlst[0]
y_train=df_train[ysgn].values
#
x_tst=df_test[xsgnlst].values
ysgn2=ysgnlst[1]
#y_tst=df_test[ysgn2].values
#3
print('#3,fit训练模型')
mx.fit(x_train,y_train)
#4
print('#4,predict模型预测数据')
df_test[ysgn2]=mx.predict(x_tst) #cross_val_predict(mx,x, y, cv=20)
#5
print('#5,按1%精度验证模型')
dacc,df,xlst=ai_acc_xed2ext(df_test[ysgn],df_test[ysgn2], 1,True)
#zt.prDF('df',df_test)
#6
print('#6,绘制对比曲线')
if ftg!='':
css='ntree={0},max_depth={1}'.format(ntree,ndepth)
pic=df_test[ysgnlst].plot(linewidth=3,title=css)
fig =pic.get_figure()
fig.savefig(ftg)
plt.show()
#
return dacc
#----------------
#----------fb.model.x.xxx
#线性回归算法,最小二乘法,函数名,LinearRegression
def mx_line(train_x, train_y):
mx = LinearRegression()
mx.fit(train_x, train_y)
#print('\nlinreg.intercept_')
#print (mx.intercept_);print (mx.coef_)
#linreg::model
#
return mx
# 逻辑回归算法,函数名,LogisticRegression
def mx_log(train_x, train_y):
mx = LogisticRegression(penalty='l2')
mx.fit(train_x, train_y)
return mx
# 多项式朴素贝叶斯算法,Multinomial Naive Bayes,函数名,multinomialnb
def mx_bayes(train_x, train_y):
mx = MultinomialNB(alpha=0.01)
mx.fit(train_x, train_y)
return mx
# KNN近邻算法,函数名,KNeighborsClassifier
def mx_knn(train_x, train_y):
mx = KNeighborsClassifier()
mx.fit(train_x, train_y)
return mx
# 随机森林算法, Random Forest Classifier, 函数名,RandomForestClassifier
def mx_forest(train_x, train_y):
mx = RandomForestClassifier(n_estimators=8)
mx.fit(train_x, train_y)
return mx
# 决策树算法,函数名,tree.DecisionTreeClassifier()
def mx_dtree(train_x, train_y):
mx = tree.DecisionTreeClassifier()
mx.fit(train_x, train_y)
return mx
# GBDT迭代决策树算法,Gradient Boosting Decision Tree,
# 又叫 MART(Multiple Additive Regression Tree),函数名,GradientBoostingClassifier
def mx_GBDT(train_x, train_y):
mx = GradientBoostingClassifier(n_estimators=200)
mx.fit(train_x, train_y)
return mx
# SVM向量机算法,函数名,SVC
def mx_svm(train_x, train_y):
mx = SVC(kernel='rbf', probability=True)
mx.fit(train_x, train_y)
return mx
# SVM- cross向量机交叉算法,函数名,SVC
def mx_svm_cross(train_x, train_y):
mx = SVC(kernel='rbf', probability=True)
param_grid = {'C': [1e-3, 1e-2, 1e-1, 1, 10, 100, 1000], 'gamma': [0.001, 0.0001]}
grid_search = GridSearchCV(mx, param_grid, n_jobs = 1, verbose=1)
grid_search.fit(train_x, train_y)
best_parameters = grid_search.best_estimator_.get_params()
#for para, val in best_parameters.items():
# print( para, val)
mx = SVC(kernel='rbf', C=best_parameters['C'], gamma=best_parameters['gamma'], probability=True)
mx.fit(train_x, train_y)
return mx
#----神经网络算法
# MLP神经网络算法
def mx_MLP(train_x, train_y):
#mx = MLPClassifier(solver='lbfgs', alpha=1e-5,hidden_layer_sizes=(5, 2), random_state=1)
mx = MLPClassifier()
mx.fit(train_x, train_y)
return mx
# MLP神经网络回归算法
def mx_MLP_reg(train_x, train_y):
#mx = MLPClassifier(solver='lbfgs', alpha=1e-5,hidden_layer_sizes=(5, 2), random_state=1)
mx = MLPRegressor()
mx.fit(train_x, train_y)
return mx
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/913365
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!