使用dagster重构可转债与股票数据更新入库

“专注做好一件事情,砍掉多余的野心”。——这篇文章收藏了好久,每天看到都有触动。

有时间我们想做的事情太多,或者说担心自己做不好,所以多做几件事情。但这里颇有些“自证预言”的逻辑在。有时候,不是因为这件事本身一定行,是你相信,付出巨大的努力,它才做成的。

——又回来老生长谈,选择还是努力。

很多成功者,大家都说他们运气好,因为他处在对的势上,无需选择,只要努力就好了。

对于普通人,就是持续去试,做当下认为最优的事情。

01 dagster的sensor

“感应器”与“定时器”的区别在于,定时器的逻辑是“主动”轮询,而感应器是事件触发式的。

dagster官方文档及检索结果并未呈现可以支持kakfa。

sensor本质是一个定时器,默认30s触发一次。可以通过minimum_interval_seconds设置最短区间,我试了一下,配置的的值+2s就是触发区间,而这个区间最小可以设置到5秒。


@sensor(job=job_build_index_factor, minimum_interval_seconds=3)
def mysensor():
get_dagster_logger().info('kafka感应器!')

在对于时效要求没有特别高的场景是可以接受的。

但是对于需要“抢”时效的场景,这个5秒就有点耽误事了。

02 股票数据更新

图片

@op(description='更新股票close因子')
def update_stock_close(code, inc):
    utils.ts_build_daily_factor(tb_name='stock_factors', code=code, factor='close', inc=inc,
                                func=stk_daily_basic_func)


@op(description='更新股票pe_ttm因子')
def update_stock_pe(code, inc):
    utils.ts_build_daily_factor(tb_name='stock_factors', code=code, factor='pe_ttm', inc=inc,
                                func=stk_daily_basic_func)


@op(description='更新股票pb因子')
def update_stock_pb(code, inc):
    utils.ts_build_daily_factor(tb_name='stock_factors', code=code, factor='pb', inc=inc,
                                func=stk_daily_basic_func)


@op(description='更新股票财务指标_净利润年增长率')
def update_stock_netprofit_yoy(code, inc):
    utils.ts_build_daily_factor(tb_name='stock_factors', code=code, factor='dt_netprofit_yoy', inc=inc,
                                func=stk_fin_func)


@graph(description='更新单支股票因子')
def stock_graph(code, inc):
    update_stock_netprofit_yoy(code, inc)
    update_stock_close(code, inc)
    update_stock_pb(code, inc)
    update_stock_pe(code, inc)


stock_job = stock_graph.to_job(description='更新单支股票任务')

if __name__ == '__main__':
    config = {
        'inputs': {
            'code': {
                'value': '688002.SH'  # 118030.SZ
            },
            'inc': {
                'value': False
            }
        }
    }
    stock_job.execute_in_process(config)

图片

03 更新可转债数据

@op(description='更新转股价')
def update_cb_chg(code, inc):
    utils.ts_build_daily_factor(tb_name='cb_factors', code=code, factor='chg_price', inc=False,
                                func=cb_chg_func)


@op(name='update_cb_close', description='更新转债收盘价')
def update_cb_close(code, inc):
    build_cb_factor(code, 'close', inc)
    return None


@graph
def cb_graph(code, inc):
    update_cb_close(code, inc)
    results = update_cb_chg(code, inc)
    # results = update_factor_chg(item)
    return results

使用dagster的定时任务,可以把数据更新,数据计算等准备好。排序之后,可以算出综合分数高的可转债,然后再写到mongo特定的表里,供django使用。

使用bootrap的table表格,ajax排序等,显示可转债列表,后端使用django-ningo提供的restapi。

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/104166
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!

(0)
股市刺客的头像股市刺客
上一篇 2024 年 7 月 29 日
下一篇 2024 年 7 月 29 日

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注