回测系统3一键获运行获取股票alpha158个因子

一.效果如下

回测系统3一键获运行获取股票alpha158个因子

回测系统3一键获运行获取股票alpha158个因子

二.具体实现

  1. 核心思想还是昨天eval 来实现,之后加载这个获取因子表达式类
# Optimized code
from datafeed.factor.alpha import AlphaBase




class Alpha158(AlphaBase):
    def get_fields_names(self):
        fields = []
        names = []


        # kbar
        fields += [
            "(close-open)/open",
            "(high-low)/open",
            "(close-open)/(high-low+1e-12)",
            "(high-greater(open, close))/open",
            "(high-greater(open, close))/(high-low+1e-12)",
            "(less(open, close)-low)/open",
            "(less(open, close)-low)/(high-low+1e-12)",
            "(2*close-high-low)/open",
            "(2*close-high-low)/(high-low+1e-12)",
        ]
        names += [
            "KMID",
            "KLEN",
            "KMID2",
            "KUP",
            "KUP2",
            "KLOW",
            "KLOW2",
            "KSFT",
            "KSFT2",
        ]


        # =========== price ==========
        feature = ["open", "high", "low", "close"]
        windows = range(5)
        for field in feature:
            fields += [f"shift({field}, {d})/close" if d != 0 else f"{field}/close" for d in windows]
            names += [f"{field.upper()}{d}" for d in windows]


        # ================ volume ===========
        fields += [f"shift(volume, {d})/(volume+1e-12)" if d != 0 else "volume/(volume+1e-12)" for d in windows]
        names += [f"VOLUME{d}" for d in windows]


        # ================= rolling ====================
        windows = [5, 10, 20, 30, 60]
        for d in windows:
            fields += [f"shift(close, {d})/close"]
            names += [f"ROC{d}"]


            fields += [f"mean(close, {d})/close"]
            names += [f"MA{d}"]


            fields += [f"std(close, {d})/close"]
            names += [f"STD{d}"]


            fields += [f"max(high, {d})/close"]
            names += [f"MAX{d}"]


            fields += [f"min(low, {d})/close"]
            names += [f"MIN{d}"]


            fields += [f"quantile(close, {d}, 0.8)/close"]
            names += [f"QTLU{d}"]


            fields += [f"quantile(close, {d}, 0.2)/close"]
            names += [f"QTLD{d}"]


            fields += [f"(close-min(low, {d}))/(max(high, {d})-min(low, {d})+1e-12)"]
            names += [f"RSV{d}"]


            fields += [f"idxmax(high, {d})/{d}"]
            names += [f"IMAX{d}"]


            fields += [f"idxmin(low, {d})/{d}"]
            names += [f"IMIN{d}"]


            fields += [f"(idxmax(high, {d})-idxmin(low, {d}))/ {d}"]
            names += [f"IMXD{d}"]


            fields += [f"corr(close, log(volume+1), {d})"]
            names += [f"CORR{d}"]


            fields += [f"corr(close/shift(close,1), log(volume/shift(volume, 1)+1), {d})"]
            names += [f"CORD{d}"]


            fields += [f"mean(close>shift(close, 1), {d})"]
            names += [f"CNTP{d}"]


            fields += [f"mean(close<shift(close, 1), {d})"]
            names += [f"CNTN{d}"]


            fields += [f"mean(close>shift(close, 1), {d})-mean(close<shift(close, 1), {d})"]
            names += [f"CNTD{d}"]


            fields += [f"sum(greater(close-shift(close, 1), 0), {d})/(sum(Abs(close-shift(close, 1)), {d})+1e-12)"]
            names += [f"SUMP{d}"]


            fields += [f"(sum(greater(close-shift(close, 1), 0), {d})-sum(greater(shift(close, 1)-close, 0), {d}))"
                       f"/(sum(Abs(close-shift(close, 1)), {d})+1e-12)"]
            names += [f"SUMD{d}"]


            fields += [f"mean(volume, {d})/(volume+1e-12)"]
            names += [f"VMA{d}"]


            fields += [f"std(volume, {d})/(volume+1e-12)"]
            names += [f"VSTD{d}"]


            fields += [
                f"std(Abs(close/shift(close, 1)-1)*volume, {d})/(mean(Abs(close/shift(close, 1)-1)*volume, {d})+1e-12)"]
            names += [f"WVMA{d}"]


            fields += [f"sum(greater(volume-shift(volume, 1), 0), {d})/(sum(Abs(volume-shift(volume, 1)), {d})+1e-12)"]
            names += [f"VSUMP{d}"]


            fields += [f"sum(greater(shift(volume, 1)-volume, 0), {d})/(sum(Abs(volume-shift(volume, 1)), {d})+1e-12)"]
            names += [f"VSUMN{d}"]


            fields += [f"(sum(greater(volume-shift(volume, 1), 0), {d})-sum(greater(shift(volume, 1)-volume, 0), {d}))"
                       f"/(sum(Abs(volume-shift(volume, 1)), {d})+1e-12)"]
            names += [f"VSUMD{d}"]


        return fields, names

2.添加了两个分组 函数调用包

# 滚动的一定要加 calc_by_symbol  还有shfit之类的,其他不加问题不大,加了也没事


# df.set_index([df.index, 'symbol'], inplace=True)
# 按股票代码分组  一般都是按这个分组 比如shift 子类的
def calc_by_symbol(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 分离非 pd.Series 类型的参数和 pd.Series 类型的参数
        se_args, other_args = [], []
        se_names = []
        for arg in args:
            if isinstance(arg, pd.Series):
                se_args.append(arg)
                se_names.append(arg.name)
            else:
                other_args.append(arg)


        # 根据 pd.Series 参数的数量进行处理
        if len(se_args) == 1:
            # 如果只有一个 pd.Series 参数,则对其进行分组并应用函数
            # groupby(level=1, group_keys=False):groupby 是 pandas 中的一个方法,用于对数据进行分组。level=1 表示根据索引的第二个级别(通常是股票代码)进行分组。group_keys=False 表示在分组后不保留分组键,这意味着分组键不会作为结果的一部分返回。
            ret = se_args[0].groupby(level=1, group_keys=False).apply(lambda x: func(x, *other_args, **kwargs))
        elif se_args:
            # 如果有多个 pd.Series 参数,则将它们合并成 DataFrame 并应用函数
            df = pd.concat(se_args, axis=1)
            df.index = se_args[0].index
            # 使用列名从 DataFrame 中提取对应的 Series,并应用函数
            ret = df.groupby(level=1, group_keys=False).apply(
                lambda sub_df: func(*[sub_df[name] for name in se_names], *other_args))
            ret.index = df.index
        else:
            # 如果没有 pd.Series 参数,则直接应用函数
            ret = func(*other_args, **kwargs)


        return ret


    return wrapper




# 按日期分组  一般rank一类的按这个分组,同一天轮转进行排名
def calc_by_date(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        # 分离非 pd.Series 类型的参数和 pd.Series 类型的参数
        se_args, other_args = [], []
        se_names = []
        for arg in args:
            if isinstance(arg, pd.Series):
                se_args.append(arg)
                se_names.append(arg.name)
            else:
                other_args.append(arg)


        # 根据 pd.Series 参数的数量进行处理
        if len(se_args) == 1:
            # 如果只有一个 pd.Series 参数,则对其进行分组并应用函数
            # groupby(level=0, group_keys=False):groupby 是 pandas 中的一个方法,用于对数据进行分组。level=0 表示根据索引的第一级(通常是日期)进行分组。group_keys=False 表示在分组后不保留分组键,这意味着分组键不会作为结果的一部分返回。
            ret = se_args[0].groupby(level=0, group_keys=False).apply(lambda x: func(x, *other_args, **kwargs))
        elif se_args:
            # 如果有多个 pd.Series 参数,则将它们合并成 DataFrame 并应用函数
            df = pd.concat(se_args, axis=1)
            df.index = se_args[0].index
            # 使用列名从 DataFrame 中提取对应的 Series,并应用函数
            ret = df.groupby(level=0, group_keys=False).apply(
                lambda sub_df: func(*[sub_df[name] for name in se_names], *other_args))
            ret.index = df.index
        else:
            # 如果没有 pd.Series 参数,则直接应用函数
            ret = func(*other_args, **kwargs)


        return ret


    return wrapper

如果是mean这种需要滚动,或者shift这种自己偏移的 计算函数 需要@calc_by_symbol

@calc_by_symbol
def shift(se: pd.Series, N: int) -> pd.Series:
    """
    将给定的 pandas Series 对象 `se` 向下(未来)移动 `N` 个周期。
    参数:
    se: 要移动的 pandas Series 对象。
    N: 移动的周期数。正数表示向下移动,负数表示向上移动。
    返回:
    移动后的 pandas Series 对象。
    """
    # 使用 pandas 的 shift 方法将 Series 向下移动 N 个周期
    return se.shift(N)

如果是日内排序的 加@calc_by_date

后面轮转回测系统会用到

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/268146
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 3小时前
下一篇 3小时前

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注