个人笔记金融计量学python方法

这篇分享的是我个人收集整理的金融计量学python方法。我会不停更新因为平时自己会用。借知乎这个平台分享,纠错,然后向大家学习更好的编程方法。

These are my personal notes about python methods in Econometrics. I will constantly update them to confirm accuracy, clarity, and usefulness. Happy to receive any corrections or suggestions!

  • Numpy and Matrix 矩阵
  • Latex Output 论文编辑器
  • Statistics 统计
  • Random Number and Simulation 随机数和模拟
  • Linear Regression 回归
  • Plotting 画图
  • Pandas 数据结构包
  • Data 数据
  • Optimization 优化
  • Time Series 时间序列
  • Formatting 格式

array operations

x=np.append(x,y)
np.repeat(x,5)
np.vstack(()) # same as np.row_stack(())
np.hstack(())
x_var = np.vstack([df_option.iloc[0]['vega'],df_option.iloc[0]['theta']])
x_var = np.hstack((x_var,np.array([[row['vega']],[row['theta']]])))
np.concatenate((t1,t2),axis=)

creating array

a = np.empty((13,))
a = np.zeros((13,))
a = np.ones((13,))
a = np.full((13,),7)
a = np.random.random((13))

selection

ndArray[row_index][column_index] = ndArray[row_index,column_index]
ndArray[row_index]
ndArray[start_index: end_index ,  :]
ndArray[ : , column_index]
ndArray[ : , start_index: end_index]
#https://thispointer.com/python-numpy-select-rows-columns-by-index-from-a-2d-ndarray-multi-dimension/

# higher dimensions
price_paths = price_paths[:,-1,:]

vector multiplication

import numpy as np 
# variance matrix
var = np.matmul(annual_v.T,annual_v)
# cov = var * correlation matrix
cov = np.matmul(var,corr)

A @ B

port_var = np.dot(weights.T, np.dot(df_invest.cov(), weights)) 

Scalar multiplication

list = map(lambda x: x * 2, list)
list = map(lambda x,y:x*y,lista,listb)
products = [a * b for a, b in zip(list1, list2)]
# or just use numpy array

# matrix addition:
list(map(lambda x:x+2,[2,3,4]))
np.array([2,3,4])+2

Diagonal matrix

x = np.diag(np.arange(4)) 

Triangular matrix

a = np.triu(np.ones((3, 3)), 1)    
a.T  # transpose

Solving for system of linear equations

matrix = np.vstack([beta,smb,hml])
z = np.array([[-1]*3,[0]*3,[0]*3])
x, residuals, rank, s = np.linalg.lstsq(matrix,z)
# Least squares is a standard approach to problems with more equations than unknowns, also known as overdetermined systems
# x, residuals, rank, singular version of matrix(also absolute of eigenvalue)
# https://sodocumentation.net/numpy/topic/3753/linear-algebra-with-np-linalg


x0 + 2 * x1 + x2 = 4
         x1 + x2 = 3
x0 +          x2 = 5
A = np.array([[1, 2, 1],
              [0, 1, 1],
              [1, 0, 1]])
b = np.array([4, 3, 5])
x = np.linalg.solve(A, b)
# Out: x = array([ 1.5, -0.5,  3.5])
# https://stackoverflow.com/questions/31098228/solving-system-using-linalg-with-constraints

Latex Output

import array_to_latex as a2l 
a2l.to_ltx ()

print(df.describe().to_latex())
print(df.corr().to_latex()) 

print(model.summary().as_latex()) 
print(sm.stats.anova_lm(model, typ=2).to_latex()) 

from tabulate import tabulate 
tabulate([['95%',abs(VaR_95*invest)]],headers=['Confidence level','Value at Risk']) 

Statistics

Student T-test for unknown population standard deviation (sample n>30 or normally distributed). This is to check if the sample and the ‘hypothesis=0 sample’ come from the same distribution.

from scipy import stats 

alpha = 0.05
df = 5
t_stat = 1.96

# calculate the one tail critical value, equivalent to Excel TINV(alpha,df)
cv = stats.t.ppf(1 - alpha, df)

# ppf means percent point function (inverse of cdf — percentiles).

# calculate the two tail critical value
cv = stats.t.ppf(1 - (alpha/2), df)

# calculate the p-value
p = (1 - stats.t.cdf(abs(t_stat), df)) * 2

print((cv,p)) 

Two tail T-test for equal mean

from scipy import stats

print(stats.ttest_ind(a=df1["PPY"],b=df2["PPY"],equal_var=False)) 

Output:

Ttest_indResult(statistic=0.2844374536719791, pvalue=0.7804443088685238) 

Z test for known population standard deviation (sample n>30 or normally distributed)

# given area under normal to get the z-score
print(stats.norm.ppf(1900/2000,loc=1,scale=5/np.sqrt(2000)))

# given z-score to return area under the curve
print(stats.norm.cdf(1,loc=1,scale=5/np.sqrt(2000))) 

Confidence Interval

# CI = mu +- t * (standard error for each sample statistic is different) 

def sample_mean_confidence_interval(data, confidence=0.95):
    a = 1.0 * np.array(data)
    n = len(a)
    m, se = np.mean(a),stats.sem(a)
    #h = stats.t.ppf((1+confidence)/2.,n-1)*se # two tail
    h = stats.t.ppf(confidence, n-1)*se # one tail
    return "{0:10.5f},{1:10.5f},{2:10.5f},{3:10.5f}".format(m, m-h, m+h, h*2)
print('for 95% confidence interval one tail t test - mean, lower, upper, width')
print(sample_mean_confidence_interval(df.VBTLX)) 

Correlation

#https://machinelearningmastery.com/how-to-use-correlation-to-understand-the-relationship-between-variables/
#cov(X, Y) = (sum (x - mean(X)) * (y - mean(Y)) ) * 1/(n-1)
#covariance(X, Y) / (stdv(X) * stdv(Y))

from scipy import stats
np.corrcoef(option['spot'],option['future'])
stats.pearsonr(option['spot'],option['future'])
stats.spearmanr(option['spot'],option['future'])

Chi Squared test

# print(stats.chi2.ppf(1-alpha,DOF))
# this is to calculate critical value, if test statistic>CV then reject Ho
print(stats.chi2.ppf(.95, 10))
print(stats.chi2.ppf(.95, 30))
print(stats.chi2.ppf(.95, 100))
print(stats.chi2.ppf(.95, 900))
18.30
43.77
124.34
970.90

# contingency table
table = np.array([1,2,3,4,5,6])
display(table)
chi_val,P,dof,exp_val=stats.chi2_contingency(table)
print("Chi square test\nChi squared = %f\nP= %f\nDOF= %d"%(chi_val,P,dof))

# chi square test
stats.chisquare(f_obs=[1,2,3,4,5,6],f_exp=[1],ddof=1)

# excel 
# =CHISQ.INV(0.95,2) # 2 dof 95% confidence critival value
# =CHISQ.DIST.RT(3.0599,2) # the p vale

Lognormal

def lognorm_cdf(x, mu, sigma):
    shape  = sigma
    loc    = 0
    scale  = math.exp(mu)
    return stats.lognorm.cdf(x, shape, loc, scale)

x      = 5
mu     = 0.1
sigma  = 0.5
p      = lognorm_cdf(x, mu, sigma)
print(p)

stats.lognorm.ppf(0.05,9.8)

1st-4th Moments

from scipy import stats 

skew = df.resample('10Y').apply(lambda x:stats.skew(x))[:-1] 
kurt = df.resample('10Y').apply(lambda x:stats.kurtosis(x))[:-1] 

mean, var, skew, kurt = norm.stats(moments='mvsk')

Jarque-Bera JB test

JB = df.resample('10Y').apply(lambda x:stats.jarque_bera(x).pvalue)[:-1] 

Sample statistics

print(df['Simple Return'].mean())
print(df['Simple Return'].sem(axis=0))
print(df['Simple Return'].std(axis=0)/np.sqrt(len(df)))
print(df['Simple Return'].std(axis=0))
print(df['Simple Return'].autocorr(lag=1))
print(df['Simple Return'].autocorr(lag=2))
print(np.corrcoef(series1,series2)) # will return n by n matrix 
# if need to get single number do np.corrcoef()[0,1]
df.corr()
df1.corrwith(df2, axis = 1/0)
series1.corr(series2)
df.cumsum()
df.pct_change()
# https://jakevdp.github.io/PythonDataScienceHandbook/03.08-aggregation-and-grouping.html
df.columns.str.contains(r'ha',na=True)
df.columns.isin(['haha'])
df_train.loc[:,df_train.columns.str.contains('feature')]
df[df['return'].isin(df2['return'].values.tolist())]
df.unstack()
df.index)
df.info
df.describe(include='all').loc['mean']
df.columns
df.shape
df.column.shift(-1)
df.empty
df_close.pivot_table(index=['date'], columns='stock', values='close') # long to wide
df[df < 0].values
df = df.between_time(093500,145500)
df = df.iloc[ pd.DatetimeIndex(df['ticktime'].indexer_between_time(stime,etime)]

np.var
np.std
np.mean() # dont use this
np.median() # dont use this
# use pd.Series(list()).median()
np.log
np.percentile(arr,99)
np.sqrt
np.power(array1,3) 
np.hstack
np.vstack
np.ones
np.zeros
np.shape(# can get list dimensions)
np.nonzero()
np.exp
np.eye
np.diag
np.sin
np.max # single max
map(max, a, b) # item by item max
np.argmax() # index of maximum
np.any
np.all
np.cumsum
np.round
np.where(df.test>0,1,0)
np.where((df['cond1']>0)&(df['cond2']<2),1,0) # must use bitwise opeartor
np.where(test=='if',1,np.where(test='ifelse",1,0))
np.array([]).tolist()
arr1 = np.append(arr1,arr2)
np.apply_along_axis

winsorization

from scipy.stats.mstats import winsorize
np.mean(winsorize(answer_initial,limits=[0.2,0.2]))
# note if dataset is not enough, no outlier will be trimmed

cointegration tests

from statsmodels.tsa.stattools import coint 

tscore,pvalue,significance = coint(df.CumsumG,df.CumsumS) 

Random Number, Distribution and Simulation

fixed step

np.arange(start=1,stop=25,step=1) # define step size, infer number of steps, output np.array
np.linspace(start=0,stop=1,num=11) # define number of steps, infer step size
np.range() # gives range object from generator, slower than np.arange

pd.date_range(start='1/1/2020',periods=T,freq='D') 

normal distribution

>>> import scipy.stats
>>> scipy.stats.norm(0, 1)
<scipy.stats.distributions.rv_frozen object at 0x928352c>
>>> scipy.stats.norm(0, 1).pdf(0)
0.3989422804014327
>>> scipy.stats.norm(0, 1).cdf(0)
0.5
>>> scipy.stats.norm(100, 12)
<scipy.stats.distributions.rv_frozen object at 0x928352c>
>>> scipy.stats.norm(100, 12).pdf(98)
0.032786643008494994
>>> scipy.stats.norm(100, 12).cdf(98)
0.43381616738909634
>>> scipy.stats.norm(100, 12).cdf(100)
0.5

random normal

from random import seed
from random import random 

# seed to make rerun having same results
seed(123) 
np.random.normal(mu,sigma,size=(10,1)) 

random integer

np.random.randint(loc=0,scale=100,size=(13,1)) 

random from exponential distribution

np.random.exponential(scale=1,size=(128)) 

random from uniform distribution

np.random.uniform(low=0,high=1,size=(36)) 

random from dirichlet distribution

weights = np.random.dirichlet(np.ones(num_assets),size=1) 

# regarding interval transformation see my answer at
# https://stackoverflow.com/questions/63910689/transformed-dirichlet-array-with-range-1-1-in-numpy 

Linear Regression

OLS on multiple dataframe

import statsmodels.api as sm

X = df2[['Excess_Market','HML','SMB','UMD']] 
y = df1['Excess Return']
X =  sm.add_constant(X)
m = sm.OLS(y,X) 
model = m.fit()

# ols based on pandas
rolling_beta = pd.ols(x=,y=,window_type='rolling',window=30)
spread = y - rolling_beta.beta['x'] * x

(Better) OLS on single dataframe with specified formula

m = sm.formula.ols(formula='Lottery~Literacy+Wealth+Region',data=df)
# default with adding a constant for intercept; use 'Value~Time-1' to remove intercept
model = m.fit() # result object 

OLS Outputs

m.endog_names # print regressor or endog or dependent variables
m.exog_names # print regresee or exog or independent variables
m.df_model
m.df_resid
model.params # regression coefficients
model.params.Intercept
model.bse # standard error
model.tvalues
model.pvalues
model.rsquared
model.resid
model.summary()
model.fvalue
model.f_pvalue

Rolling OLS regression

# https://www.statsmodels.org/stable/examples/notebooks/generated/rolling_ls.html
import statsmodels.api as sm
from statsmodels.regression.rolling import RollingOLS
m = RollingOLS.from_formula('CSAD~return_SPY+abs_return_SPY+square_return_SPY',data=df,window=60)
model = m.fit()
model.tvalues

Predictions

# in-sample prediction
--------------------------
# plot beta
print(model.predict(df.Time).tail(2))
plt.plot(df.Time,model.predict(exog=df.Time))

plt.plot(x,y,'o')
s,b=np.polyfit(x,y,1) # note opposite x y order than OLS
plt.plot(x,s*x+b)

sns.lmplot(x='CumsumS',y='CumsumG',data=df)

# out-sample prediction
-------------------------
X = pd.DataFrame([61,62],columns=["Time"],index=pd.date_range(start="20201001",periods=2,freq='m'))
X = sm.add_constant(X) 
model.predict(exog=X)

# non-linear prediction
----------------------
x = df.Time
X = np.column_stack((x,x**2,x**3,x**4,x**5))
X = sm.add_constant(X)
m = sm.OLS(y,X)
model = m.fit()
Xnew = pd.DataFrame([[61,61**2,61**3,61**4,61**5],[62,62**2,62**3,62**4,62**5]],columns=["Time1","Time2","Time3","Time4","Time5"],index=pd.date_range(start="20201001",periods=2,freq='m'))
Xnew = sm.add_constant(Xnew)
model.predict(exog=Xnew))

t,v,w,x,y,z = np.polyfit(df.Time,df.Value,5)
plt.plot(df.Time,t*np.power(df.Time,5)+v*np.power(df.Time,4)+w*np.power(df.Time,3)+x*np.power(df.Time,2)+y*np.power(df.Time,1)+z)

# linear seems to have mean reverting nature, but nonlinear has a somewhat trend following nature

polyfit in numpy

# linear polynomial (mean reverting nature)
s,b=np.polyfit(df.Time,df.Value,1) 
print(np.poly1d(np.polyfit(X,y,1)) )
plt.plot(df.Time,s*df.Time+b) 

# 5th power polynomial (trend following nature)
t,v,w,x,y,z = np.polyfit(df.Time,df.Value,5) 
print(np.poly1d(np.polyfit(X,y,5)))
plt.plot(df.Time,t*np.power(df.Time,5)+v*np.power(df.Time,4)+w*np.power(df.Time,3)+x*np.power(df.Time,2)+y*np.power(df.Time,1)+z) 

polyfit in seaborn

sns.lmplot(x='Time',y='Value',data=df) 

ANOVA

import statsmodels.api as sm 

# lm = linear model
# typ2 = type 2 sum of squared - most common
m = sm.formula.ols('conformity~C(fcategory, Sum)*C(partner_status, Sum)',data=data) 
model = m.fit()
print(sm.stats.anova_lm(model, typ=2)) # dataframe is returned

Plotting

matplotlib plot

## in ipython must set plt.ion() - interactive on rather than plt.ioff(), otherwise plt.show() will hang!
plt.close()
plt.ioff()
plt.figure()
plt.ion()

import matplotlib.pyplot as plt
import matplotlib as mpl

plt.figure(figsize = (10,6))
plt.title("test") 
plt.xlabel('Portfolio Volatility')
plt.ylabel('Portfolio Return')
plt.colorbar(label='haha')
plt.ylim(0,0.6)
plt.legend(['1','2'])
plt.grid(b=True, which='major', color='b', linestyle='--')
plt.xticks(rotation=25)
#fig.autofmt_xdate(rotation=45)
#ax.tick_params(axis='x', labelrotation=45)
plt.axhline(y=-2)
plt.savefig('5.b.2.png') 
plt.show()
plt.plot(x,y,linestyle='o',color='purple',alpha=0.8,label='haha',linewidth=0.1,markevery=200,markeredgecolor='orange',markersize=10,marker='x','r*')

%matplotlib inline
get_ipython().magic("matplotlib auto") # show interactive plot not inline

plt.rc('xtick', labelsize = 8) # if x-axis datetime label too big

# specify ticks
import matplotlib
matplotlib.pyplot.yticks(y_value)

all plotting is done wrt axes

# step 1 prepare data
x = []
y = []
# step 2 create plot
fig = plt.figure()
ax = fig.add_subplot(111)
# step 3 plot
ax.plot()
# step 4 customize
# step 5 save
plt.savefig('foo.png')
# step 6 show
plt.show()

customize xticks

import matplotlib.dates as mdates
#https://matplotlib.org/stable/gallery/text_labels_and_annotations/date.html

#plt.figure(figsize=(16,8))
fig, ax = plt.subplots(figsize=(10,6))
plt.plot(model.tvalues['square_return_SPY'])
plt.legend(['rolling 60 day t-stat for CSAD regression'])
plt.axhline(y=-2,color='red')
plt.grid()
ax.xaxis.set_major_locator(mdates.MonthLocator(interval=12))
# Minor ticks every month.
fmt_month = mdates.MonthLocator()
ax.xaxis.set_minor_locator(fmt_month)

seaborn plot

import seaborn as sns

plt.figure(figsize = (10,6))
sns.distplot(x, hist=False)
plt.savefig('5.b.2.png')
plt.show()

dataframe plot

fig = df.plot(kind='line',figsize=(10,6),title="Test",color='green',grid=True,label='test')
fig.get_figure().savefig('8.a.1.png') 
plt.show()

df.plot(x=,y=)

statsmodel api plot

import statsmodels.api as sm 

with mpl.rc_context():
    mpl.rc("figure", figsize=(10,6))
    sm.qqplot(x,line ='45')
    plt.savefig('5.b.1.png') 
plt.show()

# regression plot
fig = plt.figure(figsize =(15,8))
fig = sm.graphics.plot_regress_exog(model,'ReturnS',fig=fig) 
plt.savefig('15.b.2.png')
plt.show()

histogram

plt.hist(y, bins=np.arange(start=y.min(),stop=y.max(),step=0.1),density=False) 
np.histogram(a=y,bins=np.arange(start=0,stop=10,step=0.01),density=True) 

scatter plot

plt.plot(df.Time,df.Value) 
plt.scatter(a,b,colorbar=a/b,marker='o')

# scatter plot
plt.figure(figsize=(16,8))
plt.scatter(option.spot[::10],option.future[::10],alpha=0.3,s=0.3)
plt.plot(plt.xlim(), plt.xlim(), linestyle='-', color='k', lw=3, scalex=False, scaley=False)
plt.ylim(3.7,3.8)
plt.xlim(3.7,3.8)
plt.grid()
plt.show()

whisker plot

# box and whisker plots
dataset.plot(kind='box', subplots=True, layout=(2,2), sharex=False, sharey=False)
pyplot.show()

normal pdf plot

mu = 0.00025
variance = 0.0000000585
sigma = math.sqrt(variance)
x = np.linspace(mu-3*sigma,mu+3*sigma,100)
plt.plot(x,stats.norm.pdf(x,mu,sigma))

normal over histogram

ax = plt.subplot(2,2,1)
ax.hist(df['30_rolling_residual'],bins=100,density=True,label='30 day rolling') #density will sacle area of hist to 1
mu = np.mean(df['30_rolling_residual'][29:])
variance = np.var(df['30_rolling_residual'][29:])
sigma = math.sqrt(variance)
#binwidth = 6*sigma/20
scale_factor = 1 # len(df['400_rolling_residual'])*binwidth
x = np.linspace(mu-4*sigma,mu+4*sigma,1000)
ax.plot(x,stats.norm.pdf(x,mu,sigma)*scale_factor)
ax.legend()
ax.set_xlabel('residual')
ax.title.set_text('residual distribution')

3D plotting

fig = plt.figure()
ax = fig.gca(projection='3d')
X,Y,Z=[],[],[]
for i in range(len(TTM)-1):
    Y.extend(list(M['strike']))
    Z.extend(list(M[TTM[i]]))
    X.extend(len(list(M['strike']))*[TTM[i+1]])
plt.title('BTC surface')
plt.xlabel('TTM')
plt.ylabel('Strike')
ax.scatter(Y, X, Z,s=2)
plt.show()

import matplotlib.pyplot as plt
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import implied_vol_surface_fit
from matplotlib import cm
Xaxis=np.linspace(0.5, 2.0, 20)
Yaxis=np.linspace(3000, 6000, 20)
US=implied_vol_surface_fit.LocalVol()
fig = plt.figure()
ax = fig.gca(projection='3d')
X,Y,Z=[],[],[]
for i in Xaxis:
    for j in Yaxis:
        X.append(i)
        Y.append(j)
        Z.append(US.volfit(j, i))
norm = plt.Normalize(np.min(Z), np.max(Z))
colors = cm.jet(norm(Z))
plt.title('Fitted Implied vol surface for SPX')
plt.xlabel('Strike')
plt.ylabel('Maturity')
ax.scatter(Y, X, Z, facecolors=colors, s=0.3).set_facecolor((0,0,0,0))
plt.show()

separate subplot

fig = plt.figure(figsize=(20,8)) 

plt.subplot(2,2,1)
df.hist(bins=30,ax=plt.gca()) 
# get current axes, if using plt to plot then no need

plt.subplot(2,2,2)
df.hist(cumulative=True,bins=300,ax=plt.gca())

plt.subplot(2,2,3)
df.boxplot(ax=plt.gca())

plt.subplot(2,2,4)
sm.qqplot((df['CER']-np.mean(df['CER']))/np.std(df['CER']),line ='45',ax=plt.gca())

fig.savefig('8.a.2.png') 

separate subplot method 2

fig , axarr = plt.subplots(2, sharex=True,figsize=(16,8))
axarr[0].plot(df['return'])
axarr[0].plot(df['predict_return'])
axarr[0].legend(['Predicted','Real'])
axarr[0].title.set_text('return')
axarr[0].set_ylabel('return')
axarr[0].grid()
axarr[1].plot(df['predict_price'])
axarr[1].plot(df['Adj Close'])
axarr[1].legend(['Predicted','Real'])
axarr[1].title.set_text('stock price')
axarr[1].set_ylabel('price')
axarr[1].grid()
plt.tight_layout();

overlapping subplot

plt.figure(figsize=(16,5))
ax1 = df.G.plot(color='green', grid=True, label='Gold')
ax2 = df.S.plot(color='purple', grid=True, secondary_y=True, label='Silver')
ax.grid(True)
ax.axhline(y=0, color='black', linestyle='-')  # horizontal line
# h=handle, l=label
h1, l1 = ax1.get_legend_handles_labels()
h2, l2 = ax2.get_legend_handles_labels()
plt.legend(l1+l2, loc=2)

----------------------------------------------------------------------------------------

fig = plt.figure(figsize=(18,10))
ax1 = fig.add_subplot(111)
ax2 = ax1.twinx()
ax1.plot(df.CumsumG - df.CumsumS, color='purple', label='Gold Cumsum Return minus Silver')
ax2.plot(df['30_rolling_residual'],label='30 day rolling residual',linewidth=0.4,color='blue')
ax2.plot(df['400_rolling_residual'],label='400 day rolling residual',linewidth=0.4,color='red')
ax1.legend(loc=1)
ax1.grid(True)
ax1.set_xlabel('Date')
ax2.legend(loc=2)
ax1.title.set_text('ds')
------------------------------------------
axarr = plt.subplots(2, sharex=True,figsize=(20,8))
axarr[0].plot(x.index, state_means[:,0], label='slope')
axarr[0].legend()

Correlation matrix plot

from pandas.plotting import scatter_matrix
scatter_matrix(df,figsize=(10,10),alpha=0.5) 

sns.heatmap(df.corr(method='pearson'), annot=True, fmt='.1g',cmap=plt.cm.Blues)
plt.show() 

Pandas data wrangling

select

# print series column without index
df.to_string(index=False)
df[df.Letters=='C'].Letters.item()
np.array(df['column'])
df.iloc[:,df.shape[1]-1].values # output an array

# no need to try iloc after loc
df.loc[df.Letters=='C','Letters'].values[0] # this avoids python warning
df.loc[df.Letters=='C','Letters'].iloc[0]
df['net_return'].iloc[i,:] is identical to df['net_return'][i]

df['min'] = df.iloc[:,0:3].min(axis=1) # not including index 3

# select value from series
print (df['col1'].iat[-1])
print (df['col1'].values[-1])
print (df['col1'].iloc[-1])

list

# flatten list
def flatten(t):
    return [item for sublist in t for item in sublist]

# find difference
list(set(list1).symmetric_difference(set(list2)))
list(set(list1)-set(list2))

# find intersection
list(set(ticker_list_old).intersection(set(ticker_list)))

# remove or delete from list
list.remove(value)
print(l.pop(3))
del l[6]
del l[-3:]
[s for s in l if s.endswith('e')]

# incremental list
df['days'] = 30
df['days'] = df['days']-np.arange(len(df))

Create dataframe

df=pd.DataFrame({'Month':[],'PRR':[]})
df=pd.DataFrame(columns=['a','b'],index=,data=np.transpose([[1],[2]]))
df = pd.DataFrame(data=list(d.items()))

df = pd.concat(dict,axis=0)
df = pd.concat(list_of_list,axis=0)

corrdf = pd.DataFrame.from_dict(dict,orient='index') # key as column
corrdf = pd.DataFrame.from_dict(dict) # key as row
pd.DataFrame.from_dict(result_vwap_dict, orient='index').reset_index()
result_vwap_df = pd.DataFrame(result_vwap.items(),columns=['delta','IC mean'])

# convert series to df
series.to_frame()

# add column index
result = pd.DataFrame([],columns=['df','asdf'])

# add row indesx
result = pd.DataFrame([],index=['df','asdf'])

# dataframe from lists
df = pd.DataFrame(list(zip(lst, lst2)),columns =['Name', 'val'])

# https://www.geeksforgeeks.org/create-a-pandas-dataframe-from-lists/
# https://www.geeksforgeeks.org/different-ways-to-create-pandas-dataframe/

# dataframe from dataframe
df[['test']]

Dictionary

# remove na in dict
res = {k: corrdict[k] for k in corrdict if not isnan(corrdict[k])}

# sort dict
sorted(res.items(), key=lambda item: item[1])
{k: v for k, v in sorted(coef_dict.items(), key=lambda item: abs(item[1]),reverse=False)}

# slice dictionary items
for k, v in list(industry_map.items())[:2]:
    print(k)

# replace dataframe column with dictionary mapping
df.replace({'column':stock_mapping_dict})

# get key with max value
max(stats, key=stats.get)

# different variable name in for loop 
# method 1 # cannot assign rvalue to lvalue
for k in range(5):
    exec(f'cat_{k} = k*2')
# method 2 # this works
for x in range(0, 9):
    globals()['string%s' % x] = 'Hello'
# method 3 # good
d = {}
for x in range(1, 10):
    d["string{0}".format(x)] = "Hello"
# method 4 # the best
    names = locals() # or names = globals()
    for symbol in symbols:
        names[str(symbol)]=df[df['id'] == symbol]
        del names[str(symbol)]

# delete key
# This will return my_dict[key] if key exists in the dictionary, and None otherwise. If the second parameter is not specified (i.e. my_dict.pop('key')) and key does not exist, a KeyError is raised.
my_dict.pop('key', None)

# find key from value
dictionary = {'george': 16, 'amber': 19}
search_age = input("Provide age")
for name, age in dictionary.items():
    if age == search_age:
        print(name)

# get key and return 'default' if not exist
dict.get(key, default=None)

# update
dict1.update(dict2)
(1)有相同的键时:会使用最新的字典 b 中 该 key 对应的 value 值。
(2)有新的键时:会直接把字典 b 中的 key、value 加入到 a 中。

lambda function

# take in two and return 1

def IVs(price_series, isCall, S, K_series):
    ttm = price_series.name
    # rt = r(ttm)
    # dt = d(ttm)
    print(price_series)
    result = pd.DataFrame({'price': price_series, 'strike':K_series}).apply\
        (lambda xy: IV(xy['price'], isCall, S, xy['strike'], ttm), axis=1)
    print(result)
    return result

Indexing and rename

df.reset_index(drop=True,inplace=True)
df1.index.names = ['index']
df.index.name = 'indexname'
df.set_index('Date',inplace=True).sort_index()
df.index = pd.to_datetime(df.index,dayfirst=True) 
df1.columns = df1.columns.astype(str) 
df2.rename(columns={'Mkt-RF': 'Excess_Market'}, inplace=True)
df.rename(index={0: "x", 1: "y", 2: "z"})

regex

import re

# Target String one
str1 = "Emma's luck numbers are 251 761 231 451"
# pattern to find three consecutive digits
string_pattern = r"\d{3}"
# compile string pattern to re.Pattern object
regex_pattern = re.compile(string_pattern)
# find all the matches in string one
result = regex_pattern.findall(str1)
print(result)
# Output ['251', '761', '231', '451']
# https://www.w3schools.com/python/python_regex.asp

df.filter(regex=("d.*"))
df.select(lambda col: col.startswith('d'), axis=1)

import re
newlist = list(filter(re.compile(".*cat").match, mylist)) # Read Note below

## examples
'\.' -> match string contain period
'Length

Data Clean

# remove string from column of float
delta_num = pd.to_numeric(delta.iloc[:,0], errors='coerce')

Add Drop column

df.drop('Adj Close',axis=1,inplace=True) 
del df2['Net SharesChanged'] 
df['new col']=0
df.insert(loc, column, value, allow_duplicates = False)
data=pd.concat([a,b],axis=1) # both a and b are df
df.drop(['Q', 'R'], axis=1)
df.drop(df.columns[1,2,4,5,12],axis=1,inplace=True)

# do same but attach it to the dataframe
df['c'] = df.apply(lambda row: row.a + row.b, axis=1)
df.dropna().loc[:,'ret_f0f1'].apply(lambda row: 1 if row > 0 else 0)

# add column in diff order
first_column = df.pop('Name') # pop is like cutting the column and storing to another var
df.insert(0, 'Name', first_column)

# add column with unmatching rows (need reset index)
df['sig'] = pd.Series(predicted_return)

# join columns
df.set_index('key').join(other.set_index('key'))
df.join(other.set_index('key'), on='key')
df.join(other, lsuffix='_caller', rsuffix='_other')

## assign
df.assign(temp_f=lambda x: x.existing_col * 9 / 5 + 32,
          temp_k=lambda x: (x['temp_f'] +  459.67) * 5 / 9,
          temp_g=df['temp_c'] * 9 / 5 + 32)

Add Drop row

t = pd.concat([t,addrow],axis=0)
df1.append(df2, ignore_index = True) # another way of concat
df.loc[]

df = pd.DataFrame(columns=['lib'])
df.loc['row name',:]=column_value # this cannot add dups

# for duplicated index need to do below
df = pd.concat([df,pd.DataFrame(index=[],data=[],columns=[])],axis=0)

df.drop([0, 2])
df.drop(['Q', 'R'], axis=0)

# correct way to append row
df = pd.DataFrame([],columns=['a','b'])
b = [1,2]
df = df.append(pd.Series(b, index = df.columns), ignore_index=True)
df = df.append(pd.DataFrame(b, columns=df.columns), ignore_index=True) # both are correct
df = df.append(pd.DataFrame({'column': value, 'column2': value2},index=[0]),ignore_index=True)

Add total row or column

df.loc['Column_Total']= df.sum(numeric_only=True, axis=0)
df.loc[:,'Row_Total'] = df.sum(numeric_only=True, axis=1)

multi-index

low_factor = factor_year.iloc[factor_year.index.get_level_values('feps_quantile') == 0]

binning and rank

# https://pbpython.com/pandas-qcut-cut.html

df['size_quantile'] = pd.qcut(df['mv_eq'], q=[0, .2, .4, .6, .8, 1], labels=[1, 2, 3, 4, 5])
df['feps_quantile'] = pd.qcut(df['feps'], q=5, labels=[1, 2, 3, 4, 5])

# groupby then bin
df['size_quantile'] = df.groupby(by=['yearmonth'])['mv_eq'].transform( \
    lambda x: pd.qcut(x,q=[0,.2,.4,.6,.8,1],labels=[0,1,2,3,4]))
df['feps_quantile'] = df.groupby(by=['yearmonth','size_quantile'])['feps'].transform( \
    lambda x: pd.qcut(x, q=5, labels=False, duplicates='drop'))

# aggregate
plt.hist(df.groupby(by=['siccd'])['feps'].aggregate([np.mean]),bins=100)

# rank
df['industry_feps_quantile'] = df.groupby(by=['yearmonth','siccd'])['feps'].transform( \
    lambda x: pd.qcut(x.rank(method='first'), q=5, labels=False, duplicates='drop'))

df groupby transform vs apply

# apply
grouping.apply(min)
grouping.min()
id
1    1
2    1
3    1
Name: price, dtype: int64

# transform
# it does broadcasting to the original shape
grouping.transform(min)
0    1
1    1
2    1
3    1
4    1
5    1
6    1
7    1
8    1
Name: price, dtype: int64

unique

len(pd.unique(df['column']))
len(df['column'].unique())
df['column'].value_counts()
df.index.value_counts()

unique_list = (list(set(list))) 

# otherwise cannot concat
df = df.loc[~df.index.duplicated(),:]
df3 = df3[~df3.index.duplicated(keep='first')]

Sort

df2.sort_values(['v1','v2'],ascending=[False,True], inplace=True).drop_duplicates()

sorted(std_dict.items(),key=lambda x: x[1],reverse=True)[:5] 

# sort row index
df2.sort_index(inplace=True,axis=0) 

#sort column index
df2.sort_index(inplace=True,axis=1) 
#df2 = df2.loc[:,sorted(df2.columns)] # this somehow doesnt work for column name sort

# sort by column index
df2 = df.sort_values(by=df.columns[1])

# sort by rows
# https://www.nbshare.io/notebook/731958269/Pandas-How-To-Sort-Columns-And-Rows/
dfc.sort_values('Afghanistan',axis=1)

NA NAN Null missing data

df = df.fillna(method='ffill')
df = df.fillna(method='bfill')
df = df.dropna(subset=['Sector','industry'],axis=1,how='all') #how='any'
df = df.isnull()


#check for na
result.isnull().sum(axis=0)
df.isna().sum()
np.any(np.isnan(df_fit['return'])) #should be false
np.all(np.isfinite(df_fit['return'])) #should be true

# https://pandas.pydata.org/pandas-docs/stable/user_guide/missing_data.html
When summing data, NA (missing) values will be treated as zero.
If the data are all NA, the result will be 0.
Cumulative methods like cumsum() and cumprod() ignore NA values by default, but preserve them in the resulting arrays. To override this behaviour and include NA values, use skipna=False.

# other fill methods
df.fillna(method="ffill")
values = {"A": 0, "B": 1, "C": 2, "D": 3}
df.fillna(value=values)
df['corr_signal'] = df['corr_signal'].replace(to_replace=0, method='ffill', limit=rolling_period)

# drop inf and na (only works in df!)

df.replace(to_replace='None', value=np.nan, inplace=True, regex=False)
df.replace(to_replace=[-np.inf,np.inf], value=np.nan, inplace=True, regex=False)

df_fit.loc[(~np.isfinite(df_fit)) & df_fit.notnull()] = np.nan
df_fit = df_fit[np.isfinite(df_fit).all(1)]
df.dropna(inplace=True)

# drop inf and na in nd array
series[(series==-np.inf)|(series==np.inf)|(series==np.nan)] = 0
zscore = zscore[~np.isnan(zscore)]

# drop inf and na in series
df['col'].fillna(0, inplace=True)
zscore = zscore.replace(np.nan,0)

# if something_nan == np.nan not work
缺失值有个特点(坑),它不等于任何值,连自己都不相等。np.nan == np.nan >> False
# can use np.isnan(something_nan)

append vs extend

a = [1,2]
b = [3,4]
a.append(b): [1,2,[3,4]]
a.extend(b): [1,2,3,4,]

Filter

# filter by column
df.loc[:,df.loc['row_tag',:]=='filter_value']

# filter by percentile or quantile
final2 = final2.loc[final2['mcap']<np.percentile(final2['mcap'],50)]
df[df.a < df.a.quantile(.95)]

# count number of TRUE or FALSE according to some condition
np.count_nonzero(df["open long"]!=0)

# filter column name
res.filter(regex='alpha')

df = df.loc[df['tickaskvol'].map(lambda x:len(str(x))<5)]

new_df = df[~df["col"].str.contains(word)]
df_option_inst = df_option_inst[df_option_inst['name'].str.contains('50ETF.*3.2')]

myfilter = df_clean_atmvol0.filter(regex=('2alpha')).columns.tolist()
df_clean_atmvol0.loc[:,~df_clean_atmvol0.columns.isin(myfilter)]

# array filter
[e for e in y_demean if e < np.percentile(y_demean,80)]

# list mask
  mask = np.isfinite(Y[:, 0])
  Y_ = Y[mask, :]
  X_ = X[mask, :]

## filtering join
df_close = df_close[df_close.index.isin(df_qfq.index)]
df_qfq = df_qfq[df_qfq.index.isin(df_close.index)]
df_a[~df_a.index.isin(df_b.index)]

itertools iteration of dataframe

from itertools import product
for SMA1, SMA2 in product(sma1, sma2)"
# it means 
arr1 = [1, 2, 3]
arr2 = [5, 6, 7]
[(1, 5), (1, 6), (1, 7), (2, 5), (2, 6), (2, 7), (3, 5), (3, 6), (3, 7)]

from itertools import combinations
letters ="GeEKS"
a = combinations(letters, 3) # object
y = [' '.join(i) for i in a]
['G e E', 'G e K', 'G e S', 'G E K', 'G E S', 'G K S', 'e E K', 'e E S', 'e K S', 'E K S']

 sharpe_list = []
 for comb in combinations([50,40,30,20,10,5,1],2):
     start_ttm = comb[0]
     end_ttm = comb[1]

for index, row in df.iterrows():
!!!这样循环不可以改df本身,所以还是要用range()然后select

train test split

from sklearn.model_selection import train_test_split

train, test = train_test_split(df, test_size=0.2)

groupby and aggregate

from itertools import groupby
for key, group in groupby([1,1,2,2,2,2,3,4]):
    print(key, list(group)) 

df.groupby(by=['outerlevel','2ndlevel'])['column'].first()
df.groupby(by=['outerlevel','2ndlevel'])['column'].describe().unstack()
df.groupby('class').size()

df.groupby('group').agg({'a':['sum', 'max'], 
                         'b':'mean', 
                         'c':'sum', 
                         'd': lambda x: x.max() - x.min()})

Merge

df = pd.merge(pd.merge(df_a,df_b,on='caldt',left_index=False),df_c_df,on='date',how='inner') #left_on,right_on
df.columns = ['df_a','df_b','df_c'] 

# multiple merge
countries = [df1,df2,df3]
df = reduce(lambda left,right: pd.merge(left,right,on=['Date'],how='outer'), countries)
df.columns = ['Date','CA','FR','GE','JP','UK','US','AU','HK']

ha = pd.merge(df_clean_all,df_clean_all.groupby(by=['ticktime'])['iv'].mean(),\
left_on='ticktime',right_on='ticktime',how='left',suffixes=(None,'group'))

Data manipulation and other functions

Memory

print(f'Training Set Memory Usage: {df_train.memory_usage().sum() / 1024 ** 2:.2f} MB')

logging

import logging  
logging.basicConfig(filename="dispersion.log",format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filemode='w')
logger=logging.getLogger() 
logger.setLevel(logging.DEBUG) 
logger.debug("This is just a harmless debug message") 
logger.info("This is just an information for you") 
logger.warning("OOPS!!!Its a Warning") 
logger.error("Have you try to divide a number by zero") 
logger.critical("The Internet is not working....") 
 # https://www.machinelearningplus.com/python/python-logging-guide/

while logger.hasHandlers():
    logger.removeHandler(logger.handlers[0])
os.remove(r'C:\Users\kl\Desktop\dispersion.log')

yahoo api

(depreciated)
https://pypi.org/project/yfinance/
import yfinance as yf
stock = 'AMZN SPY'
data = yf.download(stock, '2012-01-01', '2015-01-01',group_by='ticker')
data['AMZN']['Adj Close']
# only good for single stock

from pandas_datareader import data as pdr
yf.pdr_override() 
# yahoo can get minute data for last 30 days, max 7 day period.
# good for multiple stocksf
indexes = '^GSPTSE ^FCHI ^GDAXI ^N225 ^FTSE ^GSPC ^AORD ^HSI'
indexes = indexes.split() # this must be a list
data = pdr.get_data_yahoo(indexes, start="2000-09-01", end="2020-12-01", interval='1mo')["Adj Close"]
gold = pd.DataFrame(data=data)


from yahoofinancials import YahooFinancials as YF
# https://pypi.org/project/yahoofinancials/
stock = YF(['AAPL'])
stock.get_interest_expense()
IS = stock.get_financial_stmts('quarterly', 'income')
BS = stock.get_financial_stmts('quarterly', 'balance')
cash = stock.get_financial_stmts('quarterly', 'cash')
#daily_tech_stock_prices = stock.get_historical_price_data('2008-09-15', '2018-09-15', 'daily')
for i in range(0,len(IS['incomeStatementHistoryQuarterly']['AAPL'])):
    print(IS['incomeStatementHistoryQuarterly']['AAPL'][i].keys())
print(IS['incomeStatementHistoryQuarterly']['AAPL'][0]['2020-09-26'].keys())
print(IS['incomeStatementHistoryQuarterly']['AAPL'][0]['2020-09-26'].values())


import yahoo_fin.stock_info as si
#http://theautomatic.net/yahoo_fin-documentation/
#http://theautomatic.net/2020/05/05/how-to-download-fundamentals-data-with-python/
#si.get_analysts_info('nflx')
#si.get_day_gainers()
#si.get_day_most_active()
#si.get_dividends("msft", start_date="01-01-2010",end_date='01-01-2020')
#si.get_earnings('nflx')
#si.get_financials('nflx', yearly = False, quarterly = True).items()
#si.get_holders('nflx')
#si.get_live_price('nflx')
###############################
#si.get_balance_sheet('nflx', yearly = False)
#si.get_cash_flow('nflx', yearly = False)
#si.get_income_statement('nflx', yearly = False)
quote = si.get_quote_table('aapl')
#si.get_stats('nflx') # RATIOS
val = si.get_stats_valuation('nflx') # ratio aross time
#si.tickers_sp500()
import yahoo_fin.options as op
#op.get_calls('nflx')
#op.get_options_chain('nflx')

# get tickers
import yahoo_fin.stock_info as si
dow_list = si.tickers_dow() #secs
#dow_list = si.tickers_nasdaq() #23mins
#dow_list = si.tickers_sp500()
#dow_list = si.tickers_other() #get other tickers not in NASDAQ (based off nasdaq.com)
prices = pd.DataFrame()
for ticker in dow_list[:]:
    try:
        prices[ticker]=si.get_data(ticker, start_date = "01/01/2013", end_date = "11/30/2020").adjclose
    except:
        pass

WRDS api (Wharton business school)

import wrds
db = wrds.Connection(wrds_user='')
db.create_pgpass_file()
db.list_libraries()
db.list_tables(library='crsp')

# get fund numbers first
fund_df = db.get_table(library='crsp', table='fund_names',obs=2)
fund_df = fund_df.loc[fund_df['ticker'].isin(['VBTLX','VEMAX','VSMAX']),:]
fund_df.drop_duplicates(subset=['crsp_fundno'],keep='last',inplace=True)

# find the monthly returns
month_df = db.get_table(library='crsp', table='monthly_returns')
VBTLX_df = month_df.loc[month_df['crsp_fundno']==31244]

FB = db.raw_sql("select date,ret from crsp.dsf where (cusip='30303M10') and date>='2018-01-01' and date<='2019-12-31'")

FMP api

from fmp_python.fmp import FMP
import FundamentalAnalysis as fa

ticker = "AAPL,TSLA"
api_key = ""
fa.profile(ticker, api_key)

Morningstar api

# morningstar
import morningstar.good_morning as gm
#print(help(gm))
a = gm.FinancialsDownloader()
b = a.download('AAPL')
display(b)

Fundamental Analysis api

https://pypi.org/project/FundamentalAnalysis/

Beautiful soup web scraping

#https://www.pluralsight.com/guides/extracting-data-html-beautifulsoup
import requests
from bs4 import BeautifulSoup

URL = 'https://t#'
page = requests.get(URL)
print(page.status_code)   # This should print 200
soup = BeautifulSoup(page.content, 'html.parser')
results = soup.find(id='page-wrapper')
print(results.prettify())
table = soup.find('table', class_='table table-striped table-hover table-sm')
df = pd.read_html(str(table))[0]

URL and zip

import urllib.request
import zipfile

ff_url = "http://"
urllib.request.urlretrieve(ff_url,'fama_french.zip')

zip_file = zipfile.ZipFile('fama_french.zip', 'r')
zip_file.extractall()
zip_file.close()

VAR (Value at Risk)

def calculate_var(fund,confidence):
    print(fund,confidence)
    invest = 100000
    # use z score instead of t to calculate VAR since the sample size is big
    z = abs(stats.norm.ppf((1-confidence)/2,0,1))
    se = df[fund].sem()
    std = df[fund].std()
    mean = df[fund].mean()
    relative_var = invest*z*std
    abs_var = invest*(z*std+mean)
    return "relative var = {:.0f}  absolute var = {:.0f}".format(relative_var,abs_var)

def simulated_var(fund,confidence):
    invest = 100000
    VaR_95 = df[fund].quantile(1-confidence)
    return tabulate([['95%',abs(VaR_95*invest)]],headers=['Confidence level','Value at Risk'])

data input

xls = pd.ExcelFile(r'C:\Users\AQR.xlsx')
df1 = pd.read_excel(xls, 'Return',index_col=0, header=2)

df = pd.read_csv('F-F_Research_Data_Factors.csv', skiprows = 3, index_col = 0)
# delimiter=',', skiprows=2, error_bad_lines=False

pd.read_csv(filepath,usecols=np.arange(0,21),skiprows=0,names=headernames,index_col=0)

# load from pickle
with open('ticker_str.pickle','rb') as handle:
    load = pickle.load(handle)
    ticker_str_1 = load[0]
    ticker_str_2 = load[1]
    ticker_str_3 = load[2]

read_csv

import csv
rows = []
with open(r"C:\Users\xcy99\Desktop\OneDrive - The Chinese University of Hong Kong\聚润\模型\result_IC_15min.csv", 'r') as file:
    csvreader = csv.reader(file)
    header = next(csvreader)
    for row in csvreader:
        rows.append(row)
print(header)
print(rows)

data output

from openpyxl import load_workbook

out_path = "C:\\Users\\python.xlsx"
writer = pd.ExcelWriter(out_path, engine='openpyxl')

# try to open an existing workbook
writer.book = load_workbook(out_path)

# copy existing sheets
writer.sheets = dict((ws.title, ws) for ws in writer.book.worksheets)

# read existing file
reader = pd.read_excel(out_path)

# write out the new sheet
df2.to_excel(writer,index=False,header=False,startrow=len(reader)+1)
result.to_excel(r'C:\Users\xcy99\Desktop\haha.xlsx')
writer.close()

# out to csv
df.to_csv(index=False)
dt.to_csv('file_name.csv’) # relative position
dt.to_csv('C:/Users/abc/Desktop/file_name.csv')

# output to pickle
with open('ticker_str.pickle','wb') as handle:
    pickle.dump([ticker_str_1,ticker_str_2,ticker_str_3],handle,protocol=pickle.HIGHEST_PROTOCOL)

Asset Return

# procedure
1. get log return and match the rows
2. can only sum return when it is log
3. np.exp(log_return) # do it if need to convert to compounding return
4. no need to do np.exp() for chart as it is not compounding
-----------------------------------------
# simple return 
df['return'] = df['close'].pct_change().shift(-1)
(1+df['return']).cumprod()
---------------------------------------
# the whole point of using lognormal return is for the prob to go up/down be exactly the same (additive)
df['return'] = np.log(prices['Adj Close']).diff(1).shift(-1) # log(today/yesterday)=log(today)-log(yesterday)
df = df[:-1]
# above matched today's close price with tomorrow's return in the same df row for backtesting
# if some feature dependent on return, then do shift(-1) at very end before backtesting
-------------------------------------------
# also can use geometric mean on simple return
# it takes care of the compounding effect and order of profits
stats.gmean(1+simple_return)*100-100
-------------------------------------------
book_example = book_example[~book_example['log_return'].isnull()]
---------------------------------------
单利,复利,净值
# 带净值计算的年化
annual_return = (stats.gmean((df_scalp['pnl']+asset)/asset)-1)*252
annual_std = (stats.gstd((df_scalp['pnl']+asset)/asset)-1)*252
# 带净值计算后的夏普
sharpe = (stats.gmean((df_scalp['pnl']+asset)/asset)-1)/(stats.gstd((df_scalp['pnl']+asset)/asset)-1) * np.sqrt(252)
plt.plot(df_scalp['pnl'].cumsum()/asset+1,label='zscore with cash ratio') # 单利
plt.plot(((df_scalp['pnl']+asset)/asset).cumprod(),label='zscore with cash ratio') # 复利

MDD, max drawdown

print(f"最大回撤%: {(pnl_2020/pnl_2020.rolling(cointwindow,min_periods=1).max()-1).rolling(cointwindow,min_periods=1).min()['price'].min()}")

#(trough-peak)/peak
#https://quant.stackexchange.com/questions/18094/how-can-i-calculate-the-maximum-drawdown-mdd-in-python/18101
Roll_Max = SPY_Dat['Adj Close'].cummax()
Daily_Drawdown = SPY_Dat['Adj Close']/Roll_Max - 1.0
Max_Daily_Drawdown = Daily_Drawdown.cummin()

return_total = (1 + df_pnl['daily_pnl']).cumprod()[-1] - 1 # for simple return
mdd = (df_pnl['total'] / df_pnl['total'].cummax()-1).cummin().min()
sharpe = df_pnl['daily_pnl'].mean()/df_pnl['daily_pnl'].std() * np.sqrt(252) # 夏普是标准差分之一
calmar = df_pnl['daily_pnl'].mean()/abs(mdd) * np.sqrt(252)

volatility

ret.rolling(60).std()*np.sqrt(252)*0.01

Net asset value and cash utilization ratio

# cumulative_pnl/asset+1
asset=2
df_scalp['pnl'] = np.where(df_scalp['zsig'].abs()<=asset,df_scalp['pnl'], asset/df_scalp['zsig'].abs()*df_scalp['pnl'])
df_scalp_pos['pnl'] = np.where(df_scalp_pos['zsig'].abs()<=asset,df_scalp_pos['pnl'], asset/df_scalp_pos['zsig'].abs()*df_scalp_pos['pnl'])
plt.plot(df_scalp['pnl'].cumsum()/asset+1,label='zscore with cash ratio')

zscore

def zscore(series):
    series_copy = pd.Series(series).copy()
    series_copy.replace(to_replace=[-np.inf, np.inf], value=np.nan, inplace=True, regex=False)
    up = pd.Series(list(filter(lambda x: x != 0, series_copy.abs()))).median() * 12
    down = pd.Series(list(filter(lambda x: x != 0, series_copy.abs()))).median() * -12
    series_copy[series_copy > up] = up
    series_copy[series_copy < down] = down
    zscore = (series_copy - series_copy.mean())/series_copy.std()
    zscore.replace(to_replace=[-np.inf, np.inf, np.nan], value=0, inplace=True, regex=False) # must not be inf or nan to put in lasso
    return np.array(zscore)

Email

import smtplib

gmail_user = '@gmail.com'
gmail_password = ''
sent_from = gmail_user
to = ['', 'test@hotmail.com']
subject = 'haha strategy'
body = 'buy ticker '+str(df2.iloc[0:5,1])
email_text = """\
From: %s
To: %s
Subject: %s
%s
""" % (sent_from, ", ".join(to), subject, body)
try:
    server = smtplib.SMTP_SSL('smtp.gmail.com', 465)
    server.ehlo()
    server.login(gmail_user, gmail_password)
    server.sendmail(sent_from, to, email_text)
    server.close()
    print('Email sent!')
except Exception as e:
    print(str(e))

Portfolio performance ratios

rfr = 0.055
target = 0

df1['Excess Decile Return'] = df1['Decile Return']-rfr

sharpe_ratio = df1['Excess Decile Return'].mean()/df1['Excess Decile Return'].std()
sharpe ratio = (daily_return-daily_rf).mean()/daily_return.std()*np.sqrt(242)
treynor_ratio = df1['Excess Decile Return'].mean()/model.params.Excess_Market

# Note standard deviation is sqrt of (x-mu)^2/N, if mu is 0 then = return^2.mean().sqrt()
df1.loc[df1['Excess Decile Return']<target,'Excess Downside Return'] = df1['Excess Decile Return']**2
sortino_ratio = (df1['Excess Decile Return'].mean()-rfr)/np.sqrt(df1['Excess Downside Return'].mean())

print('sortino ratio is '+str(sortino_ratio))
print('sharpe ratio is '+str(sharpe_ratio))
print('treynor_ratio is '+str(treynor_ratio))

Rolling

# time series rolling sum
df_month = df.resample('10Y').sum()
# resampling frequency https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html

# rolling average
a=np.cumsum(df1['Decile Return'],axis=0)[1::10]
average_yearly_return_for_each_decade = (np.array(a[1:]) - np.array(a[:-1]))/10

# rolling in pandas
df['rolling_residual'] = df['residual'].rolling(30).mean()

df['future_pos'] = df[['future-1']].rolling(rolling_short).apply(lambda col: np.sum(abs(np.array([x for x in col if x > 0]))))

Expanding

df.expanding(3).sum()

Max drawdown

round(df["net_return"].cumsum().min()*100,2)}%

round to hundred

position = int(math.ceil(position / 100.0)) * 100

Annualized return and risk free rate

# annual_rf = 0.1
daily_rf = np.power(1+annual_rf,252)-1

# window_return = 0.1 (using cumsum())
annual_return = np.power(window_return,252/len(window))

Optimization

import scipy.optimize as optimize

res1 = optimize.minimize(function with varying input, initial guess, method='Nelder-Mead')
t_opt = res1.fun
b_opt = float(res1.x)

Portfolio construction

import scipy.optimize as optimize

def minimize_sharpe(weights):  
    return -portfolio_stats(weights)['sharpe'] 

# assets add up to 1 and return being target return
constraints = ({'type':'eq','fun': lambda x: portfolio_stats(x)['return']-target_return},{'type':'eq','fun': lambda x: np.sum(x)-1})
# one asset can take up 0 - 100% position
bounds = tuple((0,1) for x in range(len(countries)))
# start from equally weighted
initializer = len(countries) * [1./len(countries),]

optimal_sharpe=optimize.minimize(minimize_sharpe,
                                 initializer,
                                 method = 'SLSQP',
                                 bounds = bounds,
                                 constraints = constraints)

# get the output
optimal_sharpe_weights=optimal_sharpe['x'].round(4) # x is our output
optimal_sharpe['fun']

optimize system of equations

def objective(x):
    x1=x[0]
    x2=x[1]
    x3=x[2]
    x4=x[3]
    return -1.50440748*x1-1.35598889*x2-0.50999618*x3+0.77768992*x4

def constraint1(x):
    return -x[0]-x[1]-x[2]-x[3]

initializer = np.ones([4,1])
bounds = tuple((-1,1) for i in range(0,4))
con1 = {'type':'eq','fun':constraint1}
cons = [con1]

sol = minimize(objective,initializer,method='SLSQP',bounds=bounds,constraints=[])

# https://stackoverflow.com/questions/31098228/solving-system-using-linalg-with-constraints

Time Series

datetime

import datetime as dt
#https://www.programiz.com/python-programming/datetime/strftime
t = pd.date_range(start='1/1/2020',periods=T,freq='D')
df = pd.DataFrame(data=r,index=t,columns=['CER'])
df.index = pd.to_datetime(df.index,format= '%Y%m',dayfirst=True) 
dt.datetime.now()-dt.timedelta(days=90)
signal['date'] = pd.date_range(dt.datetime(2019,11,28),dt.datetime(2019,11,28) + dt.timedelta(minutes=3), freq="1min")

# to datetime object
dt.datetime.strptime("1/1/2018", "%m/%d/%Y")
datetime_object = dt.datetime.strptime('09/19/18 13:55:26.000', '%m/%d/%y %H:%M:%S.%f')
time_object = dt.datetime.strptime('13::55::26', '%H::%M::%S').time()

# from datetime object
print("Formatted date : " , datetime.datetime.now().strftime("%y-%m-%d-%H-%M"))
print("Present year: ", datetime.date.today().strftime("%Y"))
print("Month of year: ", datetime.date.today().strftime("%B"))
print("Week number of the year: ", datetime.date.today().strftime("%W"))
print("Weekday of the week: ", datetime.date.today().strftime("%w"))
print("Day of year: ", datetime.date.today().strftime("%j"))
print("Day of the month : ", datetime.date.today().strftime("%d"))
print("Day of week: ", datetime.date.today().strftime("%A"))

# filter datetime index
import datetime as dt
final[final.loc[:,'date'].dt.year==2012]
df_clean_all_copy[df_clean_all_copy['ticktime'].dt.date==pd.to_datetime('2021-04-01')]
# pd.to_datetime('2018-10-26 12:00:00')

# time diff delta compare
    df['timediff'] = (df['ticktime'] - df['ticktime'].shift(1)).map(lambda x: x.microseconds / 1000)\
                      + (df['ticktime'] - df['ticktime'].shift(1)).map(lambda x: x.seconds * 1000)
    df = df[df['timediff'] > 500]

# convert datetime64 to datetime
pd.to_datetime(df['ticktime'].values[0]).date()

# get date
df['ticktime'].astype(str).map(lambda x:x[:21])
df['ticktime'].apply(lambda x:pd.to_datetime(x).date())

acf, pacf

print(df['Simple Return'].autocorr(lag=1))
print(df['Simple Return'].autocorr(lag=2))
print(df['Simple Return'].autocorr(lag=3))

from statsmodels.graphics.tsaplots import plot_acf
with mpl.rc_context():
    mpl.rc("figure", figsize=(10,6))
    plot_acf(df['Simple Return'],lags=5,title='First 5 lags')
    plt.savefig('9.a.3.png')

from statsmodels.graphics.tsaplots import plot_pacf
plot_pacf(df['Simple Return'],lags=5,title='First 5 lags')

######################
def plot_acf(data, x_name):
    times = [0,1,3,5,7,10,15,20,25,30,40,50,60]
    fig = plt.figure(figsize=(16,10))
    result = []
    for t in times:
        tmp = copy.deepcopy(data[[x_name]])
        tmp['x_shift'] = tmp[x_name].shift(t*2)
        tmp = tmp.dropna()
        corr = np.corrcoef(tmp[x_name], tmp['x_shift'])[0,1]
        result.append(corr)
    #print(result)
    ax = fig.add_subplot(1,1,1)
    ax.set_title('X{} ACF'.format(x_name), size = 20)
    ax.set_xlim(1,60)
    ax.set_xticks(np.append(np.arange(1,10,1),[20,40,60]))
    ax.plot(times, result)
    ax.grid()
plot_acf(df_linear, y_predict_name)

Formatting

display

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_rows', 100)
pd.set_option('display.min_rows', 100)
pd.set_option('display.expand_frame_repr', True)
pd.set_option('expand_frame_repr',False)

#pd.set_option('display.max_rows', 10)
#pd.set_option('display.max_columns', None)
#pd.set_option('max_colwidth',100)
#pd.set_option('display.float_format', lambda x: '%10.3f' % x)
#pd.reset_option('display.float_format')
#np.set_printoptions(precision=4, suppress=True)
%precision %10.4f # non scientific
https://kiwidamien.github.io/stylish-pandas.html

# print without newline
print('haha',end=',')

# print variable with literal string
print(f'n_lags: {result[1]}')

# from IPython.display import display, Markdown, Latex, Image, SVG
display(aaa)
Markdown(stats.norm.__doc__)

String formatting

print('haah %s and %s is %.4f' % ('Gold','Silver',pvalue))

# create different variable names in a for loop
for x in range(0, 9):
    globals()['string%s' % x] = 'Hello'

print("Geeks :{0:2d}, Portal :{1:8.2f}".format(12, 00.546))
print("Geeks: {a:5d},  Portal: {p:8.2f}".format(a = 453, p = 59.058))

# https://www.w3schools.com/python/ref_string_format.asp

Number formatting

https://mkaz.blog/code/python-string-format-cookbook/

pi = 3.14159
print(" pi = %1.2f " % pi)         # old
print(" pi = {:.2f}".format( pi )) # new

Timing

import time
start_time = time.time()

print("time elapsed: {:.2f}m".format((time.time() - start_time)/60))

Import stuff

import importlib
importlib.reload(module)
module.function()

#import warnings
#warnings.filterwarnings('ignore') # not recommand to turn off
%config InlineBackend.figure_format = 'retina' 
print(os.getcwd())
print(pd.__version__)
print(sys.version)
import numpy as np
import pandas as pd
from scipy import stats
import array_to_latex as a2l
from random import seed
from random import random
from numpy.random import randn #random normal
import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.dates as mdates
import statsmodels.api as sm
from statsmodels.graphics.tsaplots import plot_acf
import datetime
import wrds
import yfinance as yf
from pandas_datareader import data as pdr
from tabulate import tabulate
import statsmodels.formula.api as smf
from statsmodels.tsa.stattools import coint
import math
import sys
display(sys.executable)
脚本调用
------------------------a.py----------------------------
import os
for i in range(0, 100):
    r = os.system("python {} {}".format('b.py', i))
------------------------b.py----------------------------
import sys
if __name__ == '__main__':
    i = sys.argv[1]
print('output')
    
if __name__ == '__main__':
    print('run directly')
else:
    print('run when import')

-> match string end with Length ‘^Sepal’ -> match string begin with Sepal ‘^x[1-5]

Data Clean


Add Drop column


Add Drop row


Add total row or column


multi-index


binning and rank


df groupby transform vs apply


unique


Sort


NA NAN Null missing data


append vs extend


Filter


itertools iteration of dataframe


train test split


groupby and aggregate


Merge

Data manipulation and other functions

Memory


logging


yahoo api


WRDS api (Wharton business school)


FMP api


Morningstar api


Fundamental Analysis api


Beautiful soup web scraping


URL and zip


VAR (Value at Risk)


data input


read_csv


data output


Asset Return


MDD, max drawdown


volatility


Net asset value and cash utilization ratio


zscore


Email


Portfolio performance ratios


Rolling


Expanding


Max drawdown


round to hundred


Annualized return and risk free rate

Optimization


Portfolio construction


optimize system of equations

Time Series

datetime


acf, pacf

Formatting

display


String formatting


Number formatting


Timing


Import stuff


脚本调用


-> match string begin with x and end with 1 to 5

Data Clean


Add Drop column


Add Drop row


Add total row or column


multi-index


binning and rank


df groupby transform vs apply


unique


Sort


NA NAN Null missing data


append vs extend


Filter


itertools iteration of dataframe


train test split


groupby and aggregate


Merge

Data manipulation and other functions

Memory


logging


yahoo api


WRDS api (Wharton business school)


FMP api


Morningstar api


Fundamental Analysis api


Beautiful soup web scraping


URL and zip


VAR (Value at Risk)


data input


read_csv


data output


Asset Return


MDD, max drawdown


volatility


Net asset value and cash utilization ratio


zscore


Email


Portfolio performance ratios


Rolling


Expanding


Max drawdown


round to hundred


Annualized return and risk free rate

Optimization


Portfolio construction


optimize system of equations

Time Series

datetime


acf, pacf

Formatting

display


String formatting


Number formatting


Timing


Import stuff


脚本调用


发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/78191
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 2024 年 7 月 15 日
下一篇 2024 年 7 月 15 日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注