这篇分享的是我个人收集整理的金融计量学python方法。我会不停更新因为平时自己会用。借知乎这个平台分享,纠错,然后向大家学习更好的编程方法。
These are my personal notes about python methods in Econometrics. I will constantly update them to confirm accuracy, clarity, and usefulness. Happy to receive any corrections or suggestions!
- Numpy and Matrix 矩阵
- Latex Output 论文编辑器
- Statistics 统计
- Random Number and Simulation 随机数和模拟
- Linear Regression 回归
- Plotting 画图
- Pandas 数据结构包
- Data 数据
- Optimization 优化
- Time Series 时间序列
- Formatting 格式
array operations
x=np.append(x,y)
np.repeat(x,5)
np.vstack(()) # same as np.row_stack(())
np.hstack(())
x_var = np.vstack([df_option.iloc[0]['vega'],df_option.iloc[0]['theta']])
x_var = np.hstack((x_var,np.array([[row['vega']],[row['theta']]])))
np.concatenate((t1,t2),axis=)
creating array
a = np.empty((13,))
a = np.zeros((13,))
a = np.ones((13,))
a = np.full((13,),7)
a = np.random.random((13))
selection
ndArray[row_index][column_index] = ndArray[row_index,column_index]
ndArray[row_index]
ndArray[start_index: end_index , :]
ndArray[ : , column_index]
ndArray[ : , start_index: end_index]
#https://thispointer.com/python-numpy-select-rows-columns-by-index-from-a-2d-ndarray-multi-dimension/
# higher dimensions
price_paths = price_paths[:,-1,:]
vector multiplication
import numpy as np
# variance matrix
var = np.matmul(annual_v.T,annual_v)
# cov = var * correlation matrix
cov = np.matmul(var,corr)
A @ B
port_var = np.dot(weights.T, np.dot(df_invest.cov(), weights))
Scalar multiplication
list = map(lambda x: x * 2, list)
list = map(lambda x,y:x*y,lista,listb)
products = [a * b for a, b in zip(list1, list2)]
# or just use numpy array
# matrix addition:
list(map(lambda x:x+2,[2,3,4]))
np.array([2,3,4])+2
Diagonal matrix
x = np.diag(np.arange(4))
Triangular matrix
a = np.triu(np.ones((3, 3)), 1)
a.T # transpose
Solving for system of linear equations
matrix = np.vstack([beta,smb,hml])
z = np.array([[-1]*3,[0]*3,[0]*3])
x, residuals, rank, s = np.linalg.lstsq(matrix,z)
# Least squares is a standard approach to problems with more equations than unknowns, also known as overdetermined systems
# x, residuals, rank, singular version of matrix(also absolute of eigenvalue)
# https://sodocumentation.net/numpy/topic/3753/linear-algebra-with-np-linalg
x0 + 2 * x1 + x2 = 4
x1 + x2 = 3
x0 + x2 = 5
A = np.array([[1, 2, 1],
[0, 1, 1],
[1, 0, 1]])
b = np.array([4, 3, 5])
x = np.linalg.solve(A, b)
# Out: x = array([ 1.5, -0.5, 3.5])
# https://stackoverflow.com/questions/31098228/solving-system-using-linalg-with-constraints
Latex Output
import array_to_latex as a2l
a2l.to_ltx ()
print(df.describe().to_latex())
print(df.corr().to_latex())
print(model.summary().as_latex())
print(sm.stats.anova_lm(model, typ=2).to_latex())
from tabulate import tabulate
tabulate([['95%',abs(VaR_95*invest)]],headers=['Confidence level','Value at Risk'])
Statistics
Student T-test for unknown population standard deviation (sample n>30 or normally distributed). This is to check if the sample and the ‘hypothesis=0 sample’ come from the same distribution.
from scipy import stats
alpha = 0.05
df = 5
t_stat = 1.96
# calculate the one tail critical value, equivalent to Excel TINV(alpha,df)
cv = stats.t.ppf(1 - alpha, df)
# ppf means percent point function (inverse of cdf — percentiles).
# calculate the two tail critical value
cv = stats.t.ppf(1 - (alpha/2), df)
# calculate the p-value
p = (1 - stats.t.cdf(abs(t_stat), df)) * 2
print((cv,p))
Two tail T-test for equal mean
from scipy import stats
print(stats.ttest_ind(a=df1["PPY"],b=df2["PPY"],equal_var=False))
Output:
Ttest_indResult(statistic=0.2844374536719791, pvalue=0.7804443088685238)
Z test for known population standard deviation (sample n>30 or normally distributed)
# given area under normal to get the z-score
print(stats.norm.ppf(1900/2000,loc=1,scale=5/np.sqrt(2000)))
# given z-score to return area under the curve
print(stats.norm.cdf(1,loc=1,scale=5/np.sqrt(2000)))
Confidence Interval
# CI = mu +- t * (standard error for each sample statistic is different)
def sample_mean_confidence_interval(data, confidence=0.95):
a = 1.0 * np.array(data)
n = len(a)
m, se = np.mean(a),stats.sem(a)
#h = stats.t.ppf((1+confidence)/2.,n-1)*se # two tail
h = stats.t.ppf(confidence, n-1)*se # one tail
return "{0:10.5f},{1:10.5f},{2:10.5f},{3:10.5f}".format(m, m-h, m+h, h*2)
print('for 95% confidence interval one tail t test - mean, lower, upper, width')
print(sample_mean_confidence_interval(df.VBTLX))
Correlation
#https://machinelearningmastery.com/how-to-use-correlation-to-understand-the-relationship-between-variables/
#cov(X, Y) = (sum (x - mean(X)) * (y - mean(Y)) ) * 1/(n-1)
#covariance(X, Y) / (stdv(X) * stdv(Y))
from scipy import stats
np.corrcoef(option['spot'],option['future'])
stats.pearsonr(option['spot'],option['future'])
stats.spearmanr(option['spot'],option['future'])
Chi Squared test
# print(stats.chi2.ppf(1-alpha,DOF))
# this is to calculate critical value, if test statistic>CV then reject Ho
print(stats.chi2.ppf(.95, 10))
print(stats.chi2.ppf(.95, 30))
print(stats.chi2.ppf(.95, 100))
print(stats.chi2.ppf(.95, 900))
18.30
43.77
124.34
970.90
# contingency table
table = np.array([1,2,3,4,5,6])
display(table)
chi_val,P,dof,exp_val=stats.chi2_contingency(table)
print("Chi square test\nChi squared = %f\nP= %f\nDOF= %d"%(chi_val,P,dof))
# chi square test
stats.chisquare(f_obs=[1,2,3,4,5,6],f_exp=[1],ddof=1)
# excel
# =CHISQ.INV(0.95,2) # 2 dof 95% confidence critival value
# =CHISQ.DIST.RT(3.0599,2) # the p vale
Lognormal
def lognorm_cdf(x, mu, sigma):
shape = sigma
loc = 0
scale = math.exp(mu)
return stats.lognorm.cdf(x, shape, loc, scale)
x = 5
mu = 0.1
sigma = 0.5
p = lognorm_cdf(x, mu, sigma)
print(p)
stats.lognorm.ppf(0.05,9.8)
1st-4th Moments
from scipy import stats
skew = df.resample('10Y').apply(lambda x:stats.skew(x))[:-1]
kurt = df.resample('10Y').apply(lambda x:stats.kurtosis(x))[:-1]
mean, var, skew, kurt = norm.stats(moments='mvsk')
Jarque-Bera JB test
JB = df.resample('10Y').apply(lambda x:stats.jarque_bera(x).pvalue)[:-1]
Sample statistics
print(df['Simple Return'].mean())
print(df['Simple Return'].sem(axis=0))
print(df['Simple Return'].std(axis=0)/np.sqrt(len(df)))
print(df['Simple Return'].std(axis=0))
print(df['Simple Return'].autocorr(lag=1))
print(df['Simple Return'].autocorr(lag=2))
print(np.corrcoef(series1,series2)) # will return n by n matrix
# if need to get single number do np.corrcoef()[0,1]
df.corr()
df1.corrwith(df2, axis = 1/0)
series1.corr(series2)
df.cumsum()
df.pct_change()
# https://jakevdp.github.io/PythonDataScienceHandbook/03.08-aggregation-and-grouping.html
df.columns.str.contains(r'ha',na=True)
df.columns.isin(['haha'])
df_train.loc[:,df_train.columns.str.contains('feature')]
df[df['return'].isin(df2['return'].values.tolist())]
df.unstack()
df.index)
df.info
df.describe(include='all').loc['mean']
df.columns
df.shape
df.column.shift(-1)
df.empty
df_close.pivot_table(index=['date'], columns='stock', values='close') # long to wide
df[df < 0].values
df = df.between_time(093500,145500)
df = df.iloc[ pd.DatetimeIndex(df['ticktime'].indexer_between_time(stime,etime)]
np.var
np.std
np.mean() # dont use this
np.median() # dont use this
# use pd.Series(list()).median()
np.log
np.percentile(arr,99)
np.sqrt
np.power(array1,3)
np.hstack
np.vstack
np.ones
np.zeros
np.shape(# can get list dimensions)
np.nonzero()
np.exp
np.eye
np.diag
np.sin
np.max # single max
map(max, a, b) # item by item max
np.argmax() # index of maximum
np.any
np.all
np.cumsum
np.round
np.where(df.test>0,1,0)
np.where((df['cond1']>0)&(df['cond2']<2),1,0) # must use bitwise opeartor
np.where(test=='if',1,np.where(test='ifelse",1,0))
np.array([]).tolist()
arr1 = np.append(arr1,arr2)
np.apply_along_axis
winsorization
from scipy.stats.mstats import winsorize
np.mean(winsorize(answer_initial,limits=[0.2,0.2]))
# note if dataset is not enough, no outlier will be trimmed
cointegration tests
from statsmodels.tsa.stattools import coint
tscore,pvalue,significance = coint(df.CumsumG,df.CumsumS)
Random Number, Distribution and Simulation
fixed step
np.arange(start=1,stop=25,step=1) # define step size, infer number of steps, output np.array
np.linspace(start=0,stop=1,num=11) # define number of steps, infer step size
np.range() # gives range object from generator, slower than np.arange
pd.date_range(start='1/1/2020',periods=T,freq='D')
normal distribution
>>> import scipy.stats
>>> scipy.stats.norm(0, 1)
<scipy.stats.distributions.rv_frozen object at 0x928352c>
>>> scipy.stats.norm(0, 1).pdf(0)
0.3989422804014327
>>> scipy.stats.norm(0, 1).cdf(0)
0.5
>>> scipy.stats.norm(100, 12)
<scipy.stats.distributions.rv_frozen object at 0x928352c>
>>> scipy.stats.norm(100, 12).pdf(98)
0.032786643008494994
>>> scipy.stats.norm(100, 12).cdf(98)
0.43381616738909634
>>> scipy.stats.norm(100, 12).cdf(100)
0.5
random normal
from random import seed
from random import random
# seed to make rerun having same results
seed(123)
np.random.normal(mu,sigma,size=(10,1))
random integer
np.random.randint(loc=0,scale=100,size=(13,1))
random from exponential distribution
np.random.exponential(scale=1,size=(128))
random from uniform distribution
np.random.uniform(low=0,high=1,size=(36))
random from dirichlet distribution
weights = np.random.dirichlet(np.ones(num_assets),size=1)
# regarding interval transformation see my answer at
# https://stackoverflow.com/questions/63910689/transformed-dirichlet-array-with-range-1-1-in-numpy
Linear Regression
OLS on multiple dataframe
import statsmodels.api as sm
X = df2[['Excess_Market','HML','SMB','UMD']]
y = df1['Excess Return']
X = sm.add_constant(X)
m = sm.OLS(y,X)
model = m.fit()
# ols based on pandas
rolling_beta = pd.ols(x=,y=,window_type='rolling',window=30)
spread = y - rolling_beta.beta['x'] * x
(Better) OLS on single dataframe with specified formula
m = sm.formula.ols(formula='Lottery~Literacy+Wealth+Region',data=df)
# default with adding a constant for intercept; use 'Value~Time-1' to remove intercept
model = m.fit() # result object
OLS Outputs
m.endog_names # print regressor or endog or dependent variables
m.exog_names # print regresee or exog or independent variables
m.df_model
m.df_resid
model.params # regression coefficients
model.params.Intercept
model.bse # standard error
model.tvalues
model.pvalues
model.rsquared
model.resid
model.summary()
model.fvalue
model.f_pvalue
Rolling OLS regression
# https://www.statsmodels.org/stable/examples/notebooks/generated/rolling_ls.html
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
m = RollingOLS.from_formula('CSAD~return_SPY+abs_return_SPY+square_return_SPY',data=df,window=60)
model = m.fit()
model.tvalues
Predictions
# in-sample prediction
--------------------------
# plot beta
print(model.predict(df.Time).tail(2))
plt.plot(df.Time,model.predict(exog=df.Time))
plt.plot(x,y,'o')
s,b=np.polyfit(x,y,1) # note opposite x y order than OLS
plt.plot(x,s*x+b)
sns.lmplot(x='CumsumS',y='CumsumG',data=df)
# out-sample prediction
-------------------------
X = pd.DataFrame([61,62],columns=["Time"],index=pd.date_range(start="20201001",periods=2,freq='m'))
X = sm.add_constant(X)
model.predict(exog=X)
# non-linear prediction
----------------------
x = df.Time
X = np.column_stack((x,x**2,x**3,x**4,x**5))
X = sm.add_constant(X)
m = sm.OLS(y,X)
model = m.fit()
Xnew = pd.DataFrame([[61,61**2,61**3,61**4,61**5],[62,62**2,62**3,62**4,62**5]],columns=["Time1","Time2","Time3","Time4","Time5"],index=pd.date_range(start="20201001",periods=2,freq='m'))
Xnew = sm.add_constant(Xnew)
model.predict(exog=Xnew))
t,v,w,x,y,z = np.polyfit(df.Time,df.Value,5)
plt.plot(df.Time,t*np.power(df.Time,5)+v*np.power(df.Time,4)+w*np.power(df.Time,3)+x*np.power(df.Time,2)+y*np.power(df.Time,1)+z)
# linear seems to have mean reverting nature, but nonlinear has a somewhat trend following nature
polyfit in numpy
# linear polynomial (mean reverting nature)
s,b=np.polyfit(df.Time,df.Value,1)
print(np.poly1d(np.polyfit(X,y,1)) )
plt.plot(df.Time,s*df.Time+b)
# 5th power polynomial (trend following nature)
t,v,w,x,y,z = np.polyfit(df.Time,df.Value,5)
print(np.poly1d(np.polyfit(X,y,5)))
plt.plot(df.Time,t*np.power(df.Time,5)+v*np.power(df.Time,4)+w*np.power(df.Time,3)+x*np.power(df.Time,2)+y*np.power(df.Time,1)+z)
polyfit in seaborn
sns.lmplot(x='Time',y='Value',data=df)
ANOVA
import statsmodels.api as sm
# lm = linear model
# typ2 = type 2 sum of squared - most common
m = sm.formula.ols('conformity~C(fcategory, Sum)*C(partner_status, Sum)',data=data)
model = m.fit()
print(sm.stats.anova_lm(model, typ=2)) # dataframe is returned
Plotting
matplotlib plot
## in ipython must set plt.ion() - interactive on rather than plt.ioff(), otherwise plt.show() will hang!
plt.close()
plt.ioff()
plt.figure()
plt.ion()
import matplotlib.pyplot as plt
import matplotlib as mpl
plt.figure(figsize = (10,6))
plt.title("test")
plt.xlabel('Portfolio Volatility')
plt.ylabel('Portfolio Return')
plt.colorbar(label='haha')
plt.ylim(0,0.6)
plt.legend(['1','2'])
plt.grid(b=True, which='major', color='b', linestyle='--')
plt.xticks(rotation=25)
#fig.autofmt_xdate(rotation=45)
#ax.tick_params(axis='x', labelrotation=45)
plt.axhline(y=-2)
plt.savefig('5.b.2.png')
plt.show()
plt.plot(x,y,linestyle='o',color='purple',alpha=0.8,label='haha',linewidth=0.1,markevery=200,markeredgecolor='orange',markersize=10,marker='x','r*')
%matplotlib inline
get_ipython().magic("matplotlib auto") # show interactive plot not inline
plt.rc('xtick', labelsize = 8) # if x-axis datetime label too big
# specify ticks
import matplotlib
matplotlib.pyplot.yticks(y_value)
all plotting is done wrt axes
# step 1 prepare data
x = []
y = []
# step 2 create plot
fig = plt.figure()
ax = fig.add_subplot(111)
# step 3 plot
ax.plot()
# step 4 customize
# step 5 save
plt.savefig('foo.png')
# step 6 show
plt.show()
customize xticks
import matplotlib.dates as mdates
#https://matplotlib.org/stable/gallery/text_labels_and_annotations/date.html
#plt.figure(figsize=(16,8))
fig, ax = plt.subplots(figsize=(10,6))
plt.plot(model.tvalues['square_return_SPY'])
plt.legend(['rolling 60 day t-stat for CSAD regression'])
plt.axhline(y=-2,color='red')
plt.grid()
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=12))
# Minor ticks every month.
fmt_month = mdates.MonthLocator()
ax.xaxis.set_minor_locator(fmt_month)
seaborn plot
import seaborn as sns
plt.figure(figsize = (10,6))
sns.distplot(x, hist=False)
plt.savefig('5.b.2.png')
plt.show()
dataframe plot
fig = df.plot(kind='line',figsize=(10,6),title="Test",color='green',grid=True,label='test')
fig.get_figure().savefig('8.a.1.png')
plt.show()
df.plot(x=,y=)
statsmodel api plot
import statsmodels.api as sm
with mpl.rc_context():
mpl.rc("figure", figsize=(10,6))
sm.qqplot(x,line ='45')
plt.savefig('5.b.1.png')
plt.show()
# regression plot
fig = plt.figure(figsize =(15,8))
fig = sm.graphics.plot_regress_exog(model,'ReturnS',fig=fig)
plt.savefig('15.b.2.png')
plt.show()
histogram
plt.hist(y, bins=np.arange(start=y.min(),stop=y.max(),step=0.1),density=False)
np.histogram(a=y,bins=np.arange(start=0,stop=10,step=0.01),density=True)
scatter plot
plt.plot(df.Time,df.Value)
plt.scatter(a,b,colorbar=a/b,marker='o')
# scatter plot
plt.figure(figsize=(16,8))
plt.scatter(option.spot[::10],option.future[::10],alpha=0.3,s=0.3)
plt.plot(plt.xlim(), plt.xlim(), linestyle='-', color='k', lw=3, scalex=False, scaley=False)
plt.ylim(3.7,3.8)
plt.xlim(3.7,3.8)
plt.grid()
plt.show()
whisker plot
# box and whisker plots
dataset.plot(kind='box', subplots=True, layout=(2,2), sharex=False, sharey=False)
pyplot.show()
normal pdf plot
mu = 0.00025
variance = 0.0000000585
sigma = math.sqrt(variance)
x = np.linspace(mu-3*sigma,mu+3*sigma,100)
plt.plot(x,stats.norm.pdf(x,mu,sigma))
normal over histogram
ax = plt.subplot(2,2,1)
ax.hist(df['30_rolling_residual'],bins=100,density=True,label='30 day rolling') #density will sacle area of hist to 1
mu = np.mean(df['30_rolling_residual'][29:])
variance = np.var(df['30_rolling_residual'][29:])
sigma = math.sqrt(variance)
#binwidth = 6*sigma/20
scale_factor = 1 # len(df['400_rolling_residual'])*binwidth
x = np.linspace(mu-4*sigma,mu+4*sigma,1000)
ax.plot(x,stats.norm.pdf(x,mu,sigma)*scale_factor)
ax.legend()
ax.set_xlabel('residual')
ax.title.set_text('residual distribution')
3D plotting
fig = plt.figure()
ax = fig.gca(projection='3d')
X,Y,Z=[],[],[]
for i in range(len(TTM)-1):
Y.extend(list(M['strike']))
Z.extend(list(M[TTM[i]]))
X.extend(len(list(M['strike']))*[TTM[i+1]])
plt.title('BTC surface')
plt.xlabel('TTM')
plt.ylabel('Strike')
ax.scatter(Y, X, Z,s=2)
plt.show()
import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import implied_vol_surface_fit
from matplotlib import cm
Xaxis=np.linspace(0.5, 2.0, 20)
Yaxis=np.linspace(3000, 6000, 20)
US=implied_vol_surface_fit.LocalVol()
fig = plt.figure()
ax = fig.gca(projection='3d')
X,Y,Z=[],[],[]
for i in Xaxis:
for j in Yaxis:
X.append(i)
Y.append(j)
Z.append(US.volfit(j, i))
norm = plt.Normalize(np.min(Z), np.max(Z))
colors = cm.jet(norm(Z))
plt.title('Fitted Implied vol surface for SPX')
plt.xlabel('Strike')
plt.ylabel('Maturity')
ax.scatter(Y, X, Z, facecolors=colors, s=0.3).set_facecolor((0,0,0,0))
plt.show()
separate subplot
fig = plt.figure(figsize=(20,8))
plt.subplot(2,2,1)
df.hist(bins=30,ax=plt.gca())
# get current axes, if using plt to plot then no need
plt.subplot(2,2,2)
df.hist(cumulative=True,bins=300,ax=plt.gca())
plt.subplot(2,2,3)
df.boxplot(ax=plt.gca())
plt.subplot(2,2,4)
sm.qqplot((df['CER']-np.mean(df['CER']))/np.std(df['CER']),line ='45',ax=plt.gca())
fig.savefig('8.a.2.png')
separate subplot method 2
fig , axarr = plt.subplots(2, sharex=True,figsize=(16,8))
axarr[0].plot(df['return'])
axarr[0].plot(df['predict_return'])
axarr[0].legend(['Predicted','Real'])
axarr[0].title.set_text('return')
axarr[0].set_ylabel('return')
axarr[0].grid()
axarr[1].plot(df['predict_price'])
axarr[1].plot(df['Adj Close'])
axarr[1].legend(['Predicted','Real'])
axarr[1].title.set_text('stock price')
axarr[1].set_ylabel('price')
axarr[1].grid()
plt.tight_layout();
overlapping subplot
plt.figure(figsize=(16,5))
ax1 = df.G.plot(color='green', grid=True, label='Gold')
ax2 = df.S.plot(color='purple', grid=True, secondary_y=True, label='Silver')
ax.grid(True)
ax.axhline(y=0, color='black', linestyle='-') # horizontal line
# h=handle, l=label
h1, l1 = ax1.get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
plt.legend(l1+l2, loc=2)
----------------------------------------------------------------------------------------
fig = plt.figure(figsize=(18,10))
ax1 = fig.add_subplot(111)
ax2 = ax1.twinx()
ax1.plot(df.CumsumG - df.CumsumS, color='purple', label='Gold Cumsum Return minus Silver')
ax2.plot(df['30_rolling_residual'],label='30 day rolling residual',linewidth=0.4,color='blue')
ax2.plot(df['400_rolling_residual'],label='400 day rolling residual',linewidth=0.4,color='red')
ax1.legend(loc=1)
ax1.grid(True)
ax1.set_xlabel('Date')
ax2.legend(loc=2)
ax1.title.set_text('ds')
------------------------------------------
axarr = plt.subplots(2, sharex=True,figsize=(20,8))
axarr[0].plot(x.index, state_means[:,0], label='slope')
axarr[0].legend()
Correlation matrix plot
from pandas.plotting import scatter_matrix
scatter_matrix(df,figsize=(10,10),alpha=0.5)
sns.heatmap(df.corr(method='pearson'), annot=True, fmt='.1g',cmap=plt.cm.Blues)
plt.show()
Pandas data wrangling
select
# print series column without index
df.to_string(index=False)
df[df.Letters=='C'].Letters.item()
np.array(df['column'])
df.iloc[:,df.shape[1]-1].values # output an array
# no need to try iloc after loc
df.loc[df.Letters=='C','Letters'].values[0] # this avoids python warning
df.loc[df.Letters=='C','Letters'].iloc[0]
df['net_return'].iloc[i,:] is identical to df['net_return'][i]
df['min'] = df.iloc[:,0:3].min(axis=1) # not including index 3
# select value from series
print (df['col1'].iat[-1])
print (df['col1'].values[-1])
print (df['col1'].iloc[-1])
list
# flatten list
def flatten(t):
return [item for sublist in t for item in sublist]
# find difference
list(set(list1).symmetric_difference(set(list2)))
list(set(list1)-set(list2))
# find intersection
list(set(ticker_list_old).intersection(set(ticker_list)))
# remove or delete from list
list.remove(value)
print(l.pop(3))
del l[6]
del l[-3:]
[s for s in l if s.endswith('e')]
# incremental list
df['days'] = 30
df['days'] = df['days']-np.arange(len(df))
Create dataframe
df=pd.DataFrame({'Month':[],'PRR':[]})
df=pd.DataFrame(columns=['a','b'],index=,data=np.transpose([[1],[2]]))
df = pd.DataFrame(data=list(d.items()))
df = pd.concat(dict,axis=0)
df = pd.concat(list_of_list,axis=0)
corrdf = pd.DataFrame.from_dict(dict,orient='index') # key as column
corrdf = pd.DataFrame.from_dict(dict) # key as row
pd.DataFrame.from_dict(result_vwap_dict, orient='index').reset_index()
result_vwap_df = pd.DataFrame(result_vwap.items(),columns=['delta','IC mean'])
# convert series to df
series.to_frame()
# add column index
result = pd.DataFrame([],columns=['df','asdf'])
# add row indesx
result = pd.DataFrame([],index=['df','asdf'])
# dataframe from lists
df = pd.DataFrame(list(zip(lst, lst2)),columns =['Name', 'val'])
# https://www.geeksforgeeks.org/create-a-pandas-dataframe-from-lists/
# https://www.geeksforgeeks.org/different-ways-to-create-pandas-dataframe/
# dataframe from dataframe
df[['test']]
Dictionary
# remove na in dict
res = {k: corrdict[k] for k in corrdict if not isnan(corrdict[k])}
# sort dict
sorted(res.items(), key=lambda item: item[1])
{k: v for k, v in sorted(coef_dict.items(), key=lambda item: abs(item[1]),reverse=False)}
# slice dictionary items
for k, v in list(industry_map.items())[:2]:
print(k)
# replace dataframe column with dictionary mapping
df.replace({'column':stock_mapping_dict})
# get key with max value
max(stats, key=stats.get)
# different variable name in for loop
# method 1 # cannot assign rvalue to lvalue
for k in range(5):
exec(f'cat_{k} = k*2')
# method 2 # this works
for x in range(0, 9):
globals()['string%s' % x] = 'Hello'
# method 3 # good
d = {}
for x in range(1, 10):
d["string{0}".format(x)] = "Hello"
# method 4 # the best
names = locals() # or names = globals()
for symbol in symbols:
names[str(symbol)]=df[df['id'] == symbol]
del names[str(symbol)]
# delete key
# This will return my_dict[key] if key exists in the dictionary, and None otherwise. If the second parameter is not specified (i.e. my_dict.pop('key')) and key does not exist, a KeyError is raised.
my_dict.pop('key', None)
# find key from value
dictionary = {'george': 16, 'amber': 19}
search_age = input("Provide age")
for name, age in dictionary.items():
if age == search_age:
print(name)
# get key and return 'default' if not exist
dict.get(key, default=None)
# update
dict1.update(dict2)
(1)有相同的键时:会使用最新的字典 b 中 该 key 对应的 value 值。
(2)有新的键时:会直接把字典 b 中的 key、value 加入到 a 中。
lambda function
# take in two and return 1
def IVs(price_series, isCall, S, K_series):
ttm = price_series.name
# rt = r(ttm)
# dt = d(ttm)
print(price_series)
result = pd.DataFrame({'price': price_series, 'strike':K_series}).apply\
(lambda xy: IV(xy['price'], isCall, S, xy['strike'], ttm), axis=1)
print(result)
return result
Indexing and rename
df.reset_index(drop=True,inplace=True)
df1.index.names = ['index']
df.index.name = 'indexname'
df.set_index('Date',inplace=True).sort_index()
df.index = pd.to_datetime(df.index,dayfirst=True)
df1.columns = df1.columns.astype(str)
df2.rename(columns={'Mkt-RF': 'Excess_Market'}, inplace=True)
df.rename(index={0: "x", 1: "y", 2: "z"})
regex
import re
# Target String one
str1 = "Emma's luck numbers are 251 761 231 451"
# pattern to find three consecutive digits
string_pattern = r"\d{3}"
# compile string pattern to re.Pattern object
regex_pattern = re.compile(string_pattern)
# find all the matches in string one
result = regex_pattern.findall(str1)
print(result)
# Output ['251', '761', '231', '451']
# https://www.w3schools.com/python/python_regex.asp
df.filter(regex=("d.*"))
df.select(lambda col: col.startswith('d'), axis=1)
import re
newlist = list(filter(re.compile(".*cat").match, mylist)) # Read Note below
## examples
'\.' -> match string contain period
'Length
Data Clean
# remove string from column of float
delta_num = pd.to_numeric(delta.iloc[:,0], errors='coerce')
Add Drop column
df.drop('Adj Close',axis=1,inplace=True)
del df2['Net SharesChanged']
df['new col']=0
df.insert(loc, column, value, allow_duplicates = False)
data=pd.concat([a,b],axis=1) # both a and b are df
df.drop(['Q', 'R'], axis=1)
df.drop(df.columns[1,2,4,5,12],axis=1,inplace=True)
# do same but attach it to the dataframe
df['c'] = df.apply(lambda row: row.a + row.b, axis=1)
df.dropna().loc[:,'ret_f0f1'].apply(lambda row: 1 if row > 0 else 0)
# add column in diff order
first_column = df.pop('Name') # pop is like cutting the column and storing to another var
df.insert(0, 'Name', first_column)
# add column with unmatching rows (need reset index)
df['sig'] = pd.Series(predicted_return)
# join columns
df.set_index('key').join(other.set_index('key'))
df.join(other.set_index('key'), on='key')
df.join(other, lsuffix='_caller', rsuffix='_other')
## assign
df.assign(temp_f=lambda x: x.existing_col * 9 / 5 + 32,
temp_k=lambda x: (x['temp_f'] + 459.67) * 5 / 9,
temp_g=df['temp_c'] * 9 / 5 + 32)
Add Drop row
t = pd.concat([t,addrow],axis=0)
df1.append(df2, ignore_index = True) # another way of concat
df.loc[]
df = pd.DataFrame(columns=['lib'])
df.loc['row name',:]=column_value # this cannot add dups
# for duplicated index need to do below
df = pd.concat([df,pd.DataFrame(index=[],data=[],columns=[])],axis=0)
df.drop([0, 2])
df.drop(['Q', 'R'], axis=0)
# correct way to append row
df = pd.DataFrame([],columns=['a','b'])
b = [1,2]
df = df.append(pd.Series(b, index = df.columns), ignore_index=True)
df = df.append(pd.DataFrame(b, columns=df.columns), ignore_index=True) # both are correct
df = df.append(pd.DataFrame({'column': value, 'column2': value2},index=[0]),ignore_index=True)
Add total row or column
df.loc['Column_Total']= df.sum(numeric_only=True, axis=0)
df.loc[:,'Row_Total'] = df.sum(numeric_only=True, axis=1)
multi-index
low_factor = factor_year.iloc[factor_year.index.get_level_values('feps_quantile') == 0]
binning and rank
# https://pbpython.com/pandas-qcut-cut.html
df['size_quantile'] = pd.qcut(df['mv_eq'], q=[0, .2, .4, .6, .8, 1], labels=[1, 2, 3, 4, 5])
df['feps_quantile'] = pd.qcut(df['feps'], q=5, labels=[1, 2, 3, 4, 5])
# groupby then bin
df['size_quantile'] = df.groupby(by=['yearmonth'])['mv_eq'].transform( \
lambda x: pd.qcut(x,q=[0,.2,.4,.6,.8,1],labels=[0,1,2,3,4]))
df['feps_quantile'] = df.groupby(by=['yearmonth','size_quantile'])['feps'].transform( \
lambda x: pd.qcut(x, q=5, labels=False, duplicates='drop'))
# aggregate
plt.hist(df.groupby(by=['siccd'])['feps'].aggregate([np.mean]),bins=100)
# rank
df['industry_feps_quantile'] = df.groupby(by=['yearmonth','siccd'])['feps'].transform( \
lambda x: pd.qcut(x.rank(method='first'), q=5, labels=False, duplicates='drop'))
df groupby transform vs apply
# apply
grouping.apply(min)
grouping.min()
id
1 1
2 1
3 1
Name: price, dtype: int64
# transform
# it does broadcasting to the original shape
grouping.transform(min)
0 1
1 1
2 1
3 1
4 1
5 1
6 1
7 1
8 1
Name: price, dtype: int64
unique
len(pd.unique(df['column']))
len(df['column'].unique())
df['column'].value_counts()
df.index.value_counts()
unique_list = (list(set(list)))
# otherwise cannot concat
df = df.loc[~df.index.duplicated(),:]
df3 = df3[~df3.index.duplicated(keep='first')]
Sort
df2.sort_values(['v1','v2'],ascending=[False,True], inplace=True).drop_duplicates()
sorted(std_dict.items(),key=lambda x: x[1],reverse=True)[:5]
# sort row index
df2.sort_index(inplace=True,axis=0)
#sort column index
df2.sort_index(inplace=True,axis=1)
#df2 = df2.loc[:,sorted(df2.columns)] # this somehow doesnt work for column name sort
# sort by column index
df2 = df.sort_values(by=df.columns[1])
# sort by rows
# https://www.nbshare.io/notebook/731958269/Pandas-How-To-Sort-Columns-And-Rows/
dfc.sort_values('Afghanistan',axis=1)
NA NAN Null missing data
df = df.fillna(method='ffill')
df = df.fillna(method='bfill')
df = df.dropna(subset=['Sector','industry'],axis=1,how='all') #how='any'
df = df.isnull()
#check for na
result.isnull().sum(axis=0)
df.isna().sum()
np.any(np.isnan(df_fit['return'])) #should be false
np.all(np.isfinite(df_fit['return'])) #should be true
# https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html
When summing data, NA (missing) values will be treated as zero.
If the data are all NA, the result will be 0.
Cumulative methods like cumsum() and cumprod() ignore NA values by default, but preserve them in the resulting arrays. To override this behaviour and include NA values, use skipna=False.
# other fill methods
df.fillna(method="ffill")
values = {"A": 0, "B": 1, "C": 2, "D": 3}
df.fillna(value=values)
df['corr_signal'] = df['corr_signal'].replace(to_replace=0, method='ffill', limit=rolling_period)
# drop inf and na (only works in df!)
df.replace(to_replace='None', value=np.nan, inplace=True, regex=False)
df.replace(to_replace=[-np.inf,np.inf], value=np.nan, inplace=True, regex=False)
df_fit.loc[(~np.isfinite(df_fit)) & df_fit.notnull()] = np.nan
df_fit = df_fit[np.isfinite(df_fit).all(1)]
df.dropna(inplace=True)
# drop inf and na in nd array
series[(series==-np.inf)|(series==np.inf)|(series==np.nan)] = 0
zscore = zscore[~np.isnan(zscore)]
# drop inf and na in series
df['col'].fillna(0, inplace=True)
zscore = zscore.replace(np.nan,0)
# if something_nan == np.nan not work
缺失值有个特点(坑),它不等于任何值,连自己都不相等。np.nan == np.nan >> False
# can use np.isnan(something_nan)
append vs extend
a = [1,2]
b = [3,4]
a.append(b): [1,2,[3,4]]
a.extend(b): [1,2,3,4,]
Filter
# filter by column
df.loc[:,df.loc['row_tag',:]=='filter_value']
# filter by percentile or quantile
final2 = final2.loc[final2['mcap']<np.percentile(final2['mcap'],50)]
df[df.a < df.a.quantile(.95)]
# count number of TRUE or FALSE according to some condition
np.count_nonzero(df["open long"]!=0)
# filter column name
res.filter(regex='alpha')
df = df.loc[df['tickaskvol'].map(lambda x:len(str(x))<5)]
new_df = df[~df["col"].str.contains(word)]
df_option_inst = df_option_inst[df_option_inst['name'].str.contains('50ETF.*3.2')]
myfilter = df_clean_atmvol0.filter(regex=('2alpha')).columns.tolist()
df_clean_atmvol0.loc[:,~df_clean_atmvol0.columns.isin(myfilter)]
# array filter
[e for e in y_demean if e < np.percentile(y_demean,80)]
# list mask
mask = np.isfinite(Y[:, 0])
Y_ = Y[mask, :]
X_ = X[mask, :]
## filtering join
df_close = df_close[df_close.index.isin(df_qfq.index)]
df_qfq = df_qfq[df_qfq.index.isin(df_close.index)]
df_a[~df_a.index.isin(df_b.index)]
itertools iteration of dataframe
from itertools import product
for SMA1, SMA2 in product(sma1, sma2)"
# it means
arr1 = [1, 2, 3]
arr2 = [5, 6, 7]
[(1, 5), (1, 6), (1, 7), (2, 5), (2, 6), (2, 7), (3, 5), (3, 6), (3, 7)]
from itertools import combinations
letters ="GeEKS"
a = combinations(letters, 3) # object
y = [' '.join(i) for i in a]
['G e E', 'G e K', 'G e S', 'G E K', 'G E S', 'G K S', 'e E K', 'e E S', 'e K S', 'E K S']
sharpe_list = []
for comb in combinations([50,40,30,20,10,5,1],2):
start_ttm = comb[0]
end_ttm = comb[1]
for index, row in df.iterrows():
!!!这样循环不可以改df本身,所以还是要用range()然后select
train test split
from sklearn.model_selection import train_test_split
train, test = train_test_split(df, test_size=0.2)
groupby and aggregate
from itertools import groupby
for key, group in groupby([1,1,2,2,2,2,3,4]):
print(key, list(group))
df.groupby(by=['outerlevel','2ndlevel'])['column'].first()
df.groupby(by=['outerlevel','2ndlevel'])['column'].describe().unstack()
df.groupby('class').size()
df.groupby('group').agg({'a':['sum', 'max'],
'b':'mean',
'c':'sum',
'd': lambda x: x.max() - x.min()})
Merge
df = pd.merge(pd.merge(df_a,df_b,on='caldt',left_index=False),df_c_df,on='date',how='inner') #left_on,right_on
df.columns = ['df_a','df_b','df_c']
# multiple merge
countries = [df1,df2,df3]
df = reduce(lambda left,right: pd.merge(left,right,on=['Date'],how='outer'), countries)
df.columns = ['Date','CA','FR','GE','JP','UK','US','AU','HK']
ha = pd.merge(df_clean_all,df_clean_all.groupby(by=['ticktime'])['iv'].mean(),\
left_on='ticktime',right_on='ticktime',how='left',suffixes=(None,'group'))
Data manipulation and other functions
Memory
print(f'Training Set Memory Usage: {df_train.memory_usage().sum() / 1024 ** 2:.2f} MB')
logging
import logging
logging.basicConfig(filename="dispersion.log",format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filemode='w')
logger=logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.debug("This is just a harmless debug message")
logger.info("This is just an information for you")
logger.warning("OOPS!!!Its a Warning")
logger.error("Have you try to divide a number by zero")
logger.critical("The Internet is not working....")
# https://www.machinelearningplus.com/python/python-logging-guide/
while logger.hasHandlers():
logger.removeHandler(logger.handlers[0])
os.remove(r'C:\Users\kl\Desktop\dispersion.log')
yahoo api
(depreciated)
https://pypi.org/project/yfinance/
import yfinance as yf
stock = 'AMZN SPY'
data = yf.download(stock, '2012-01-01', '2015-01-01',group_by='ticker')
data['AMZN']['Adj Close']
# only good for single stock
from pandas_datareader import data as pdr
yf.pdr_override()
# yahoo can get minute data for last 30 days, max 7 day period.
# good for multiple stocksf
indexes = '^GSPTSE ^FCHI ^GDAXI ^N225 ^FTSE ^GSPC ^AORD ^HSI'
indexes = indexes.split() # this must be a list
data = pdr.get_data_yahoo(indexes, start="2000-09-01", end="2020-12-01", interval='1mo')["Adj Close"]
gold = pd.DataFrame(data=data)
from yahoofinancials import YahooFinancials as YF
# https://pypi.org/project/yahoofinancials/
stock = YF(['AAPL'])
stock.get_interest_expense()
IS = stock.get_financial_stmts('quarterly', 'income')
BS = stock.get_financial_stmts('quarterly', 'balance')
cash = stock.get_financial_stmts('quarterly', 'cash')
#daily_tech_stock_prices = stock.get_historical_price_data('2008-09-15', '2018-09-15', 'daily')
for i in range(0,len(IS['incomeStatementHistoryQuarterly']['AAPL'])):
print(IS['incomeStatementHistoryQuarterly']['AAPL'][i].keys())
print(IS['incomeStatementHistoryQuarterly']['AAPL'][0]['2020-09-26'].keys())
print(IS['incomeStatementHistoryQuarterly']['AAPL'][0]['2020-09-26'].values())
import yahoo_fin.stock_info as si
#http://theautomatic.net/yahoo_fin-documentation/
#http://theautomatic.net/2020/05/05/how-to-download-fundamentals-data-with-python/
#si.get_analysts_info('nflx')
#si.get_day_gainers()
#si.get_day_most_active()
#si.get_dividends("msft", start_date="01-01-2010",end_date='01-01-2020')
#si.get_earnings('nflx')
#si.get_financials('nflx', yearly = False, quarterly = True).items()
#si.get_holders('nflx')
#si.get_live_price('nflx')
###############################
#si.get_balance_sheet('nflx', yearly = False)
#si.get_cash_flow('nflx', yearly = False)
#si.get_income_statement('nflx', yearly = False)
quote = si.get_quote_table('aapl')
#si.get_stats('nflx') # RATIOS
val = si.get_stats_valuation('nflx') # ratio aross time
#si.tickers_sp500()
import yahoo_fin.options as op
#op.get_calls('nflx')
#op.get_options_chain('nflx')
# get tickers
import yahoo_fin.stock_info as si
dow_list = si.tickers_dow() #secs
#dow_list = si.tickers_nasdaq() #23mins
#dow_list = si.tickers_sp500()
#dow_list = si.tickers_other() #get other tickers not in NASDAQ (based off nasdaq.com)
prices = pd.DataFrame()
for ticker in dow_list[:]:
try:
prices[ticker]=si.get_data(ticker, start_date = "01/01/2013", end_date = "11/30/2020").adjclose
except:
pass
WRDS api (Wharton business school)
import wrds
db = wrds.Connection(wrds_user='')
db.create_pgpass_file()
db.list_libraries()
db.list_tables(library='crsp')
# get fund numbers first
fund_df = db.get_table(library='crsp', table='fund_names',obs=2)
fund_df = fund_df.loc[fund_df['ticker'].isin(['VBTLX','VEMAX','VSMAX']),:]
fund_df.drop_duplicates(subset=['crsp_fundno'],keep='last',inplace=True)
# find the monthly returns
month_df = db.get_table(library='crsp', table='monthly_returns')
VBTLX_df = month_df.loc[month_df['crsp_fundno']==31244]
FB = db.raw_sql("select date,ret from crsp.dsf where (cusip='30303M10') and date>='2018-01-01' and date<='2019-12-31'")
FMP api
from fmp_python.fmp import FMP
import FundamentalAnalysis as fa
ticker = "AAPL,TSLA"
api_key = ""
fa.profile(ticker, api_key)
Morningstar api
# morningstar
import morningstar.good_morning as gm
#print(help(gm))
a = gm.FinancialsDownloader()
b = a.download('AAPL')
display(b)
Fundamental Analysis api
https://pypi.org/project/FundamentalAnalysis/
Beautiful soup web scraping
#https://www.pluralsight.com/guides/extracting-data-html-beautifulsoup
import requests
from bs4 import BeautifulSoup
URL = 'https://t#'
page = requests.get(URL)
print(page.status_code) # This should print 200
soup = BeautifulSoup(page.content, 'html.parser')
results = soup.find(id='page-wrapper')
print(results.prettify())
table = soup.find('table', class_='table table-striped table-hover table-sm')
df = pd.read_html(str(table))[0]
URL and zip
import urllib.request
import zipfile
ff_url = "http://"
urllib.request.urlretrieve(ff_url,'fama_french.zip')
zip_file = zipfile.ZipFile('fama_french.zip', 'r')
zip_file.extractall()
zip_file.close()
VAR (Value at Risk)
def calculate_var(fund,confidence):
print(fund,confidence)
invest = 100000
# use z score instead of t to calculate VAR since the sample size is big
z = abs(stats.norm.ppf((1-confidence)/2,0,1))
se = df[fund].sem()
std = df[fund].std()
mean = df[fund].mean()
relative_var = invest*z*std
abs_var = invest*(z*std+mean)
return "relative var = {:.0f} absolute var = {:.0f}".format(relative_var,abs_var)
def simulated_var(fund,confidence):
invest = 100000
VaR_95 = df[fund].quantile(1-confidence)
return tabulate([['95%',abs(VaR_95*invest)]],headers=['Confidence level','Value at Risk'])
data input
xls = pd.ExcelFile(r'C:\Users\AQR.xlsx')
df1 = pd.read_excel(xls, 'Return',index_col=0, header=2)
df = pd.read_csv('F-F_Research_Data_Factors.csv', skiprows = 3, index_col = 0)
# delimiter=',', skiprows=2, error_bad_lines=False
pd.read_csv(filepath,usecols=np.arange(0,21),skiprows=0,names=headernames,index_col=0)
# load from pickle
with open('ticker_str.pickle','rb') as handle:
load = pickle.load(handle)
ticker_str_1 = load[0]
ticker_str_2 = load[1]
ticker_str_3 = load[2]
read_csv
import csv
rows = []
with open(r"C:\Users\xcy99\Desktop\OneDrive - The Chinese University of Hong Kong\聚润\模型\result_IC_15min.csv", 'r') as file:
csvreader = csv.reader(file)
header = next(csvreader)
for row in csvreader:
rows.append(row)
print(header)
print(rows)
data output
from openpyxl import load_workbook
out_path = "C:\\Users\\python.xlsx"
writer = pd.ExcelWriter(out_path, engine='openpyxl')
# try to open an existing workbook
writer.book = load_workbook(out_path)
# copy existing sheets
writer.sheets = dict((ws.title, ws) for ws in writer.book.worksheets)
# read existing file
reader = pd.read_excel(out_path)
# write out the new sheet
df2.to_excel(writer,index=False,header=False,startrow=len(reader)+1)
result.to_excel(r'C:\Users\xcy99\Desktop\haha.xlsx')
writer.close()
# out to csv
df.to_csv(index=False)
dt.to_csv('file_name.csv’) # relative position
dt.to_csv('C:/Users/abc/Desktop/file_name.csv')
# output to pickle
with open('ticker_str.pickle','wb') as handle:
pickle.dump([ticker_str_1,ticker_str_2,ticker_str_3],handle,protocol=pickle.HIGHEST_PROTOCOL)
Asset Return
# procedure
1. get log return and match the rows
2. can only sum return when it is log
3. np.exp(log_return) # do it if need to convert to compounding return
4. no need to do np.exp() for chart as it is not compounding
-----------------------------------------
# simple return
df['return'] = df['close'].pct_change().shift(-1)
(1+df['return']).cumprod()
---------------------------------------
# the whole point of using lognormal return is for the prob to go up/down be exactly the same (additive)
df['return'] = np.log(prices['Adj Close']).diff(1).shift(-1) # log(today/yesterday)=log(today)-log(yesterday)
df = df[:-1]
# above matched today's close price with tomorrow's return in the same df row for backtesting
# if some feature dependent on return, then do shift(-1) at very end before backtesting
-------------------------------------------
# also can use geometric mean on simple return
# it takes care of the compounding effect and order of profits
stats.gmean(1+simple_return)*100-100
-------------------------------------------
book_example = book_example[~book_example['log_return'].isnull()]
---------------------------------------
单利,复利,净值
# 带净值计算的年化
annual_return = (stats.gmean((df_scalp['pnl']+asset)/asset)-1)*252
annual_std = (stats.gstd((df_scalp['pnl']+asset)/asset)-1)*252
# 带净值计算后的夏普
sharpe = (stats.gmean((df_scalp['pnl']+asset)/asset)-1)/(stats.gstd((df_scalp['pnl']+asset)/asset)-1) * np.sqrt(252)
plt.plot(df_scalp['pnl'].cumsum()/asset+1,label='zscore with cash ratio') # 单利
plt.plot(((df_scalp['pnl']+asset)/asset).cumprod(),label='zscore with cash ratio') # 复利
MDD, max drawdown
print(f"最大回撤%: {(pnl_2020/pnl_2020.rolling(cointwindow,min_periods=1).max()-1).rolling(cointwindow,min_periods=1).min()['price'].min()}")
#(trough-peak)/peak
#https://quant.stackexchange.com/questions/18094/how-can-i-calculate-the-maximum-drawdown-mdd-in-python/18101
Roll_Max = SPY_Dat['Adj Close'].cummax()
Daily_Drawdown = SPY_Dat['Adj Close']/Roll_Max - 1.0
Max_Daily_Drawdown = Daily_Drawdown.cummin()
return_total = (1 + df_pnl['daily_pnl']).cumprod()[-1] - 1 # for simple return
mdd = (df_pnl['total'] / df_pnl['total'].cummax()-1).cummin().min()
sharpe = df_pnl['daily_pnl'].mean()/df_pnl['daily_pnl'].std() * np.sqrt(252) # 夏普是标准差分之一
calmar = df_pnl['daily_pnl'].mean()/abs(mdd) * np.sqrt(252)
volatility
ret.rolling(60).std()*np.sqrt(252)*0.01
Net asset value and cash utilization ratio
# cumulative_pnl/asset+1
asset=2
df_scalp['pnl'] = np.where(df_scalp['zsig'].abs()<=asset,df_scalp['pnl'], asset/df_scalp['zsig'].abs()*df_scalp['pnl'])
df_scalp_pos['pnl'] = np.where(df_scalp_pos['zsig'].abs()<=asset,df_scalp_pos['pnl'], asset/df_scalp_pos['zsig'].abs()*df_scalp_pos['pnl'])
plt.plot(df_scalp['pnl'].cumsum()/asset+1,label='zscore with cash ratio')
zscore
def zscore(series):
series_copy = pd.Series(series).copy()
series_copy.replace(to_replace=[-np.inf, np.inf], value=np.nan, inplace=True, regex=False)
up = pd.Series(list(filter(lambda x: x != 0, series_copy.abs()))).median() * 12
down = pd.Series(list(filter(lambda x: x != 0, series_copy.abs()))).median() * -12
series_copy[series_copy > up] = up
series_copy[series_copy < down] = down
zscore = (series_copy - series_copy.mean())/series_copy.std()
zscore.replace(to_replace=[-np.inf, np.inf, np.nan], value=0, inplace=True, regex=False) # must not be inf or nan to put in lasso
return np.array(zscore)
import smtplib
gmail_user = '@gmail.com'
gmail_password = ''
sent_from = gmail_user
to = ['', 'test@hotmail.com']
subject = 'haha strategy'
body = 'buy ticker '+str(df2.iloc[0:5,1])
email_text = """\
From: %s
To: %s
Subject: %s
%s
""" % (sent_from, ", ".join(to), subject, body)
try:
server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
server.ehlo()
server.login(gmail_user, gmail_password)
server.sendmail(sent_from, to, email_text)
server.close()
print('Email sent!')
except Exception as e:
print(str(e))
Portfolio performance ratios
rfr = 0.055
target = 0
df1['Excess Decile Return'] = df1['Decile Return']-rfr
sharpe_ratio = df1['Excess Decile Return'].mean()/df1['Excess Decile Return'].std()
sharpe ratio = (daily_return-daily_rf).mean()/daily_return.std()*np.sqrt(242)
treynor_ratio = df1['Excess Decile Return'].mean()/model.params.Excess_Market
# Note standard deviation is sqrt of (x-mu)^2/N, if mu is 0 then = return^2.mean().sqrt()
df1.loc[df1['Excess Decile Return']<target,'Excess Downside Return'] = df1['Excess Decile Return']**2
sortino_ratio = (df1['Excess Decile Return'].mean()-rfr)/np.sqrt(df1['Excess Downside Return'].mean())
print('sortino ratio is '+str(sortino_ratio))
print('sharpe ratio is '+str(sharpe_ratio))
print('treynor_ratio is '+str(treynor_ratio))
Rolling
# time series rolling sum
df_month = df.resample('10Y').sum()
# resampling frequency https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html
# rolling average
a=np.cumsum(df1['Decile Return'],axis=0)[1::10]
average_yearly_return_for_each_decade = (np.array(a[1:]) - np.array(a[:-1]))/10
# rolling in pandas
df['rolling_residual'] = df['residual'].rolling(30).mean()
df['future_pos'] = df[['future-1']].rolling(rolling_short).apply(lambda col: np.sum(abs(np.array([x for x in col if x > 0]))))
Expanding
df.expanding(3).sum()
Max drawdown
round(df["net_return"].cumsum().min()*100,2)}%
round to hundred
position = int(math.ceil(position / 100.0)) * 100
Annualized return and risk free rate
# annual_rf = 0.1
daily_rf = np.power(1+annual_rf,252)-1
# window_return = 0.1 (using cumsum())
annual_return = np.power(window_return,252/len(window))
Optimization
import scipy.optimize as optimize
res1 = optimize.minimize(function with varying input, initial guess, method='Nelder-Mead')
t_opt = res1.fun
b_opt = float(res1.x)
Portfolio construction
import scipy.optimize as optimize
def minimize_sharpe(weights):
return -portfolio_stats(weights)['sharpe']
# assets add up to 1 and return being target return
constraints = ({'type':'eq','fun': lambda x: portfolio_stats(x)['return']-target_return},{'type':'eq','fun': lambda x: np.sum(x)-1})
# one asset can take up 0 - 100% position
bounds = tuple((0,1) for x in range(len(countries)))
# start from equally weighted
initializer = len(countries) * [1./len(countries),]
optimal_sharpe=optimize.minimize(minimize_sharpe,
initializer,
method = 'SLSQP',
bounds = bounds,
constraints = constraints)
# get the output
optimal_sharpe_weights=optimal_sharpe['x'].round(4) # x is our output
optimal_sharpe['fun']
optimize system of equations
def objective(x):
x1=x[0]
x2=x[1]
x3=x[2]
x4=x[3]
return -1.50440748*x1-1.35598889*x2-0.50999618*x3+0.77768992*x4
def constraint1(x):
return -x[0]-x[1]-x[2]-x[3]
initializer = np.ones([4,1])
bounds = tuple((-1,1) for i in range(0,4))
con1 = {'type':'eq','fun':constraint1}
cons = [con1]
sol = minimize(objective,initializer,method='SLSQP',bounds=bounds,constraints=[])
# https://stackoverflow.com/questions/31098228/solving-system-using-linalg-with-constraints
Time Series
datetime
import datetime as dt
#https://www.programiz.com/python-programming/datetime/strftime
t = pd.date_range(start='1/1/2020',periods=T,freq='D')
df = pd.DataFrame(data=r,index=t,columns=['CER'])
df.index = pd.to_datetime(df.index,format= '%Y%m',dayfirst=True)
dt.datetime.now()-dt.timedelta(days=90)
signal['date'] = pd.date_range(dt.datetime(2019,11,28),dt.datetime(2019,11,28) + dt.timedelta(minutes=3), freq="1min")
# to datetime object
dt.datetime.strptime("1/1/2018", "%m/%d/%Y")
datetime_object = dt.datetime.strptime('09/19/18 13:55:26.000', '%m/%d/%y %H:%M:%S.%f')
time_object = dt.datetime.strptime('13::55::26', '%H::%M::%S').time()
# from datetime object
print("Formatted date : " , datetime.datetime.now().strftime("%y-%m-%d-%H-%M"))
print("Present year: ", datetime.date.today().strftime("%Y"))
print("Month of year: ", datetime.date.today().strftime("%B"))
print("Week number of the year: ", datetime.date.today().strftime("%W"))
print("Weekday of the week: ", datetime.date.today().strftime("%w"))
print("Day of year: ", datetime.date.today().strftime("%j"))
print("Day of the month : ", datetime.date.today().strftime("%d"))
print("Day of week: ", datetime.date.today().strftime("%A"))
# filter datetime index
import datetime as dt
final[final.loc[:,'date'].dt.year==2012]
df_clean_all_copy[df_clean_all_copy['ticktime'].dt.date==pd.to_datetime('2021-04-01')]
# pd.to_datetime('2018-10-26 12:00:00')
# time diff delta compare
df['timediff'] = (df['ticktime'] - df['ticktime'].shift(1)).map(lambda x: x.microseconds / 1000)\
+ (df['ticktime'] - df['ticktime'].shift(1)).map(lambda x: x.seconds * 1000)
df = df[df['timediff'] > 500]
# convert datetime64 to datetime
pd.to_datetime(df['ticktime'].values[0]).date()
# get date
df['ticktime'].astype(str).map(lambda x:x[:21])
df['ticktime'].apply(lambda x:pd.to_datetime(x).date())
acf, pacf
print(df['Simple Return'].autocorr(lag=1))
print(df['Simple Return'].autocorr(lag=2))
print(df['Simple Return'].autocorr(lag=3))
from statsmodels.graphics.tsaplots import plot_acf
with mpl.rc_context():
mpl.rc("figure", figsize=(10,6))
plot_acf(df['Simple Return'],lags=5,title='First 5 lags')
plt.savefig('9.a.3.png')
from statsmodels.graphics.tsaplots import plot_pacf
plot_pacf(df['Simple Return'],lags=5,title='First 5 lags')
######################
def plot_acf(data, x_name):
times = [0,1,3,5,7,10,15,20,25,30,40,50,60]
fig = plt.figure(figsize=(16,10))
result = []
for t in times:
tmp = copy.deepcopy(data[[x_name]])
tmp['x_shift'] = tmp[x_name].shift(t*2)
tmp = tmp.dropna()
corr = np.corrcoef(tmp[x_name], tmp['x_shift'])[0,1]
result.append(corr)
#print(result)
ax = fig.add_subplot(1,1,1)
ax.set_title('X{} ACF'.format(x_name), size = 20)
ax.set_xlim(1,60)
ax.set_xticks(np.append(np.arange(1,10,1),[20,40,60]))
ax.plot(times, result)
ax.grid()
plot_acf(df_linear, y_predict_name)
Formatting
display
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100)
pd.set_option('display.min_rows', 100)
pd.set_option('display.expand_frame_repr', True)
pd.set_option('expand_frame_repr',False)
#pd.set_option('display.max_rows', 10)
#pd.set_option('display.max_columns', None)
#pd.set_option('max_colwidth',100)
#pd.set_option('display.float_format', lambda x: '%10.3f' % x)
#pd.reset_option('display.float_format')
#np.set_printoptions(precision=4, suppress=True)
%precision %10.4f # non scientific
https://kiwidamien.github.io/stylish-pandas.html
# print without newline
print('haha',end=',')
# print variable with literal string
print(f'n_lags: {result[1]}')
# from IPython.display import display, Markdown, Latex, Image, SVG
display(aaa)
Markdown(stats.norm.__doc__)
String formatting
print('haah %s and %s is %.4f' % ('Gold','Silver',pvalue))
# create different variable names in a for loop
for x in range(0, 9):
globals()['string%s' % x] = 'Hello'
print("Geeks :{0:2d}, Portal :{1:8.2f}".format(12, 00.546))
print("Geeks: {a:5d}, Portal: {p:8.2f}".format(a = 453, p = 59.058))
# https://www.w3schools.com/python/ref_string_format.asp
Number formatting
https://mkaz.blog/code/python-string-format-cookbook/
pi = 3.14159
print(" pi = %1.2f " % pi) # old
print(" pi = {:.2f}".format( pi )) # new
Timing
import time
start_time = time.time()
print("time elapsed: {:.2f}m".format((time.time() - start_time)/60))
Import stuff
import importlib
importlib.reload(module)
module.function()
#import warnings
#warnings.filterwarnings('ignore') # not recommand to turn off
%config InlineBackend.figure_format = 'retina'
print(os.getcwd())
print(pd.__version__)
print(sys.version)
import numpy as np
import pandas as pd
from scipy import stats
import array_to_latex as a2l
from random import seed
from random import random
from numpy.random import randn #random normal
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.dates as mdates
import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_acf
import datetime
import wrds
import yfinance as yf
from pandas_datareader import data as pdr
from tabulate import tabulate
import statsmodels.formula.api as smf
from statsmodels.tsa.stattools import coint
import math
import sys
display(sys.executable)
脚本调用
------------------------a.py----------------------------
import os
for i in range(0, 100):
r = os.system("python {} {}".format('b.py', i))
------------------------b.py----------------------------
import sys
if __name__ == '__main__':
i = sys.argv[1]
print('output')
if __name__ == '__main__':
print('run directly')
else:
print('run when import')
-> match string end with Length ‘^Sepal’ -> match string begin with Sepal ‘^x[1-5]
Data Clean
Add Drop column
Add Drop row
Add total row or column
multi-index
binning and rank
df groupby transform vs apply
unique
Sort
NA NAN Null missing data
append vs extend
Filter
itertools iteration of dataframe
train test split
groupby and aggregate
Merge
Data manipulation and other functions
Memory
logging
yahoo api
WRDS api (Wharton business school)
FMP api
Morningstar api
Fundamental Analysis api
Beautiful soup web scraping
URL and zip
VAR (Value at Risk)
data input
read_csv
data output
Asset Return
MDD, max drawdown
volatility
Net asset value and cash utilization ratio
zscore
Portfolio performance ratios
Rolling
Expanding
Max drawdown
round to hundred
Annualized return and risk free rate
Optimization
Portfolio construction
optimize system of equations
Time Series
datetime
acf, pacf
Formatting
display
String formatting
Number formatting
Timing
Import stuff
脚本调用
-> match string begin with x and end with 1 to 5
Data Clean
Add Drop column
Add Drop row
Add total row or column
multi-index
binning and rank
df groupby transform vs apply
unique
Sort
NA NAN Null missing data
append vs extend
Filter
itertools iteration of dataframe
train test split
groupby and aggregate
Merge
Data manipulation and other functions
Memory
logging
yahoo api
WRDS api (Wharton business school)
FMP api
Morningstar api
Fundamental Analysis api
Beautiful soup web scraping
URL and zip
VAR (Value at Risk)
data input
read_csv
data output
Asset Return
MDD, max drawdown
volatility
Net asset value and cash utilization ratio
zscore
Portfolio performance ratios
Rolling
Expanding
Max drawdown
round to hundred
Annualized return and risk free rate
Optimization
Portfolio construction
optimize system of equations
Time Series
datetime
acf, pacf
Formatting
display
String formatting
Number formatting
Timing
Import stuff
脚本调用
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/78191
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!