

二.具体实现
- 核心思想还是昨天eval 来实现,之后加载这个获取因子表达式类
# Optimized code
from datafeed.factor.alpha import AlphaBase
class Alpha158(AlphaBase):
def get_fields_names(self):
fields = []
names = []
# kbar
fields += [
"(close-open)/open",
"(high-low)/open",
"(close-open)/(high-low+1e-12)",
"(high-greater(open, close))/open",
"(high-greater(open, close))/(high-low+1e-12)",
"(less(open, close)-low)/open",
"(less(open, close)-low)/(high-low+1e-12)",
"(2*close-high-low)/open",
"(2*close-high-low)/(high-low+1e-12)",
]
names += [
"KMID",
"KLEN",
"KMID2",
"KUP",
"KUP2",
"KLOW",
"KLOW2",
"KSFT",
"KSFT2",
]
# =========== price ==========
feature = ["open", "high", "low", "close"]
windows = range(5)
for field in feature:
fields += [f"shift({field}, {d})/close" if d != 0 else f"{field}/close" for d in windows]
names += [f"{field.upper()}{d}" for d in windows]
# ================ volume ===========
fields += [f"shift(volume, {d})/(volume+1e-12)" if d != 0 else "volume/(volume+1e-12)" for d in windows]
names += [f"VOLUME{d}" for d in windows]
# ================= rolling ====================
windows = [5, 10, 20, 30, 60]
for d in windows:
fields += [f"shift(close, {d})/close"]
names += [f"ROC{d}"]
fields += [f"mean(close, {d})/close"]
names += [f"MA{d}"]
fields += [f"std(close, {d})/close"]
names += [f"STD{d}"]
fields += [f"max(high, {d})/close"]
names += [f"MAX{d}"]
fields += [f"min(low, {d})/close"]
names += [f"MIN{d}"]
fields += [f"quantile(close, {d}, 0.8)/close"]
names += [f"QTLU{d}"]
fields += [f"quantile(close, {d}, 0.2)/close"]
names += [f"QTLD{d}"]
fields += [f"(close-min(low, {d}))/(max(high, {d})-min(low, {d})+1e-12)"]
names += [f"RSV{d}"]
fields += [f"idxmax(high, {d})/{d}"]
names += [f"IMAX{d}"]
fields += [f"idxmin(low, {d})/{d}"]
names += [f"IMIN{d}"]
fields += [f"(idxmax(high, {d})-idxmin(low, {d}))/ {d}"]
names += [f"IMXD{d}"]
fields += [f"corr(close, log(volume+1), {d})"]
names += [f"CORR{d}"]
fields += [f"corr(close/shift(close,1), log(volume/shift(volume, 1)+1), {d})"]
names += [f"CORD{d}"]
fields += [f"mean(close>shift(close, 1), {d})"]
names += [f"CNTP{d}"]
fields += [f"mean(close<shift(close, 1), {d})"]
names += [f"CNTN{d}"]
fields += [f"mean(close>shift(close, 1), {d})-mean(close<shift(close, 1), {d})"]
names += [f"CNTD{d}"]
fields += [f"sum(greater(close-shift(close, 1), 0), {d})/(sum(Abs(close-shift(close, 1)), {d})+1e-12)"]
names += [f"SUMP{d}"]
fields += [f"(sum(greater(close-shift(close, 1), 0), {d})-sum(greater(shift(close, 1)-close, 0), {d}))"
f"/(sum(Abs(close-shift(close, 1)), {d})+1e-12)"]
names += [f"SUMD{d}"]
fields += [f"mean(volume, {d})/(volume+1e-12)"]
names += [f"VMA{d}"]
fields += [f"std(volume, {d})/(volume+1e-12)"]
names += [f"VSTD{d}"]
fields += [
f"std(Abs(close/shift(close, 1)-1)*volume, {d})/(mean(Abs(close/shift(close, 1)-1)*volume, {d})+1e-12)"]
names += [f"WVMA{d}"]
fields += [f"sum(greater(volume-shift(volume, 1), 0), {d})/(sum(Abs(volume-shift(volume, 1)), {d})+1e-12)"]
names += [f"VSUMP{d}"]
fields += [f"sum(greater(shift(volume, 1)-volume, 0), {d})/(sum(Abs(volume-shift(volume, 1)), {d})+1e-12)"]
names += [f"VSUMN{d}"]
fields += [f"(sum(greater(volume-shift(volume, 1), 0), {d})-sum(greater(shift(volume, 1)-volume, 0), {d}))"
f"/(sum(Abs(volume-shift(volume, 1)), {d})+1e-12)"]
names += [f"VSUMD{d}"]
return fields, names
2.添加了两个分组 函数调用包
# 滚动的一定要加 calc_by_symbol 还有shfit之类的,其他不加问题不大,加了也没事
# df.set_index([df.index, 'symbol'], inplace=True)
# 按股票代码分组 一般都是按这个分组 比如shift 子类的
def calc_by_symbol(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 分离非 pd.Series 类型的参数和 pd.Series 类型的参数
se_args, other_args = [], []
se_names = []
for arg in args:
if isinstance(arg, pd.Series):
se_args.append(arg)
se_names.append(arg.name)
else:
other_args.append(arg)
# 根据 pd.Series 参数的数量进行处理
if len(se_args) == 1:
# 如果只有一个 pd.Series 参数,则对其进行分组并应用函数
# groupby(level=1, group_keys=False):groupby 是 pandas 中的一个方法,用于对数据进行分组。level=1 表示根据索引的第二个级别(通常是股票代码)进行分组。group_keys=False 表示在分组后不保留分组键,这意味着分组键不会作为结果的一部分返回。
ret = se_args[0].groupby(level=1, group_keys=False).apply(lambda x: func(x, *other_args, **kwargs))
elif se_args:
# 如果有多个 pd.Series 参数,则将它们合并成 DataFrame 并应用函数
df = pd.concat(se_args, axis=1)
df.index = se_args[0].index
# 使用列名从 DataFrame 中提取对应的 Series,并应用函数
ret = df.groupby(level=1, group_keys=False).apply(
lambda sub_df: func(*[sub_df[name] for name in se_names], *other_args))
ret.index = df.index
else:
# 如果没有 pd.Series 参数,则直接应用函数
ret = func(*other_args, **kwargs)
return ret
return wrapper
# 按日期分组 一般rank一类的按这个分组,同一天轮转进行排名
def calc_by_date(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 分离非 pd.Series 类型的参数和 pd.Series 类型的参数
se_args, other_args = [], []
se_names = []
for arg in args:
if isinstance(arg, pd.Series):
se_args.append(arg)
se_names.append(arg.name)
else:
other_args.append(arg)
# 根据 pd.Series 参数的数量进行处理
if len(se_args) == 1:
# 如果只有一个 pd.Series 参数,则对其进行分组并应用函数
# groupby(level=0, group_keys=False):groupby 是 pandas 中的一个方法,用于对数据进行分组。level=0 表示根据索引的第一级(通常是日期)进行分组。group_keys=False 表示在分组后不保留分组键,这意味着分组键不会作为结果的一部分返回。
ret = se_args[0].groupby(level=0, group_keys=False).apply(lambda x: func(x, *other_args, **kwargs))
elif se_args:
# 如果有多个 pd.Series 参数,则将它们合并成 DataFrame 并应用函数
df = pd.concat(se_args, axis=1)
df.index = se_args[0].index
# 使用列名从 DataFrame 中提取对应的 Series,并应用函数
ret = df.groupby(level=0, group_keys=False).apply(
lambda sub_df: func(*[sub_df[name] for name in se_names], *other_args))
ret.index = df.index
else:
# 如果没有 pd.Series 参数,则直接应用函数
ret = func(*other_args, **kwargs)
return ret
return wrapper
如果是mean这种需要滚动,或者shift这种自己偏移的 计算函数 需要@calc_by_symbol
@calc_by_symbol
def shift(se: pd.Series, N: int) -> pd.Series:
"""
将给定的 pandas Series 对象 `se` 向下(未来)移动 `N` 个周期。
参数:
se: 要移动的 pandas Series 对象。
N: 移动的周期数。正数表示向下移动,负数表示向上移动。
返回:
移动后的 pandas Series 对象。
"""
# 使用 pandas 的 shift 方法将 Series 向下移动 N 个周期
return se.shift(N)
如果是日内排序的 加@calc_by_date
后面轮转回测系统会用到
发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/268146
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!