流数据实时关联在金融业务场景中非常常见,对流数据实时融合,可以使因子指标计算更高效,也能满足更丰富的信息输出需求。DolphinDB 提供了多种流数据连接引擎,用户直接调用即可实现流数据实时关联。
这里我们罗列了一下用户经常会用到的一些场景:
– 根据行情快照和逐笔成交数据实时计算复杂高频因子;
– 在逐笔成交数据的基础上丰富委托信息并实时输出;
– 根据逐笔成交数据实时匹配最近一次报价;
– 对多个数据源降频采样,计算分钟指标并将结果关联到同一张表中;
– 根据快照数据实时匹配股票历史日频指标;
– 实时计算股票与某指数的分钟收益率相关性。
上一期中,我们为大家介绍了如何使用 window join 将行情快照和逐笔成交数据进行关联分析。本期会有什么样的场景应用呢?快来看看吧~
Left Semi Join
有成交数据了,想关联原始委托数据怎么办?
我们知道,逐笔成交数据中包含了买卖双方的原始委托订单号,理论上来说,通过股票代码和订单号,就可以关联到逐笔委托数据,从而在成交数据的基础上丰富原始委托信息。
具体来说,对于每条逐笔成交数据,我们都要找到对应的委托单,然后将结果与逐笔成交记录一一对应输出。在找到对应的委托单前,该条逐笔成交记录暂时不输出。
在 DolphinDB 中,可以用两个 Left Semi Join 引擎级联的方式,对成交表 trades 中的卖方委托单、买方委托单依次进行关联。
大家可以前往知乎搜索DolphinDB,查看引擎级联更详细的介绍。
// create table
share streamTable(1:0, `Sym`BuyNo`SellNo`TradePrice`TradeQty`TradeTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME]) as trades
share streamTable(1:0, `Sym`OrderNo`Side`OrderQty`OrderPrice`OrderTime, [SYMBOL, LONG, INT, LONG, DOUBLE, TIME]) as orders
share streamTable(1:0, `Sym`SellNo`BuyNo`TradePrice`TradeQty`TradeTime`BuyOrderQty`BuyOrderPrice`BuyOrderTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME, LONG, DOUBLE, TIME]) as outputTemp
share streamTable(1:0, `Sym`BuyNo`SellNo`TradePrice`TradeQty`TradeTime`BuyOrderQty`BuyOrderPrice`BuyOrderTime`SellOrderQty`SellOrderPrice`SellOrderTime, [SYMBOL, LONG, LONG, DOUBLE, LONG, TIME, LONG, DOUBLE, TIME, LONG, DOUBLE, TIME]) as output
// create engine: left join buy order
ljEngineBuy=createLeftSemiJoinEngine(name="leftJoinBuy", leftTable=outputTemp, rightTable=orders, outputTable=output, metrics=<[SellNo, TradePrice, TradeQty, TradeTime, BuyOrderQty, BuyOrderPrice, BuyOrderTime, OrderQty, OrderPrice, OrderTime]>, matchingColumn=[`Sym`BuyNo, `Sym`OrderNo])
// create engine: left join sell order
ljEngineSell=createLeftSemiJoinEngine(name="leftJoinSell", leftTable=trades, rightTable=orders, outputTable=getLeftStream(ljEngineBuy), metrics=<[BuyNo, TradePrice, TradeQty, TradeTime, OrderQty, OrderPrice, OrderTime]>, matchingColumn=[`Sym`SellNo, `Sym`OrderNo])
// subscribe topic
subscribeTable(tableName="trades", actionName="appendLeftStream", handler=getLeftStream(ljEngineSell), msgAsTable=true, offset=-1)
subscribeTable(tableName="orders", actionName="appendRightStreamForSell", handler=getRightStream(ljEngineSell), msgAsTable=true, offset=-1)
subscribeTable(tableName="orders", actionName="appendRightStreamForBuy", handler=getRightStream(ljEngineBuy), msgAsTable=true, offset=-1)
构造数据写入作为原始输入的 2 个流数据表:
// generate data: trade
t1 = table(`A`B`B`A as Sym, [2, 5, 5, 6] as BuyNo, [4, 1, 3, 4] as SellNo, [7.6, 3.5, 3.5, 7.6]as TradePrice, [10, 100, 20, 50]as TradeQty, 10:00:00.000+(400 500 500 600) as TradeTime)
// generate data: order
t2 = table(`B`A`B`A`B`A as Sym, 1..6 as OrderNo, [2, 1, 2, 2, 1, 1] as Side, [100, 10, 20, 100, 350, 50] as OrderQty, [7.6, 3.5, 7.6, 3.5, 7.6, 3.5] as OrderPrice, 10:00:00.000+(1..6)*100 as OrderTime)
// input data
orders.append!(t2)
trades.append!(t1)
输入数据与关联关系如下:

通过两个 Left Semi Join 引擎,上图中 trades 数据流中的每一条记录将分别和 orders 数据流中的两条记录关联,进而取得 orders 中的委托量、价、时间等字段,关联得到的结果表 output 如下:

Asof Join
有成交数据了,怎么实时匹配最近一次报价?
因为逐笔成交数据和报价数据的发生时间不可能完全一致,所以这两张数据表不能使用常用的等值连接。在计算个股交易成本时,往往需要以成交时间为基准,找到交易发生前的最近一次报价数据,因此需要以邻近匹配的方式关联两个数据流。
这个场景的特征是,每条成交记录匹配一条时刻早于自己的报价记录,输出与原始的每一条成交记录一一对应。
在 DolphinDB 中,我们可以用 Asof Join 引擎来实现此场景:
// create table
share streamTable(1:0, `Sym`TradeTime`TradePrice, [SYMBOL, TIME, DOUBLE]) as trades
share streamTable(1:0, `Sym`Time`Bid1Price`Ask1Price, [SYMBOL, TIME, DOUBLE, DOUBLE]) as snapshot
share streamTable(1:0, `TradeTime`Sym`TradePrice`TradeCost`SnapshotTime, [TIME, SYMBOL, DOUBLE, DOUBLE, TIME]) as output
// create engine
ajEngine = createAsofJoinEngine(name="asofJoin", leftTable=trades, rightTable=snapshot, outputTable=output, metrics=<[TradePrice, abs(TradePrice-(Bid1Price+Ask1Price)/2), snapshot.Time]>, matchingColumn=`Sym, timeColumn=`TradeTime`Time, useSystemTime=false, delayedTime=1000)
// subscribe topic
subscribeTable(tableName="trades", actionName="appendLeftStream", handler=getLeftStream(ajEngine), msgAsTable=true, offset=-1, hash=0)
subscribeTable(tableName="snapshot", actionName="appendRightStream", handler=getRightStream(ajEngine), msgAsTable=true, offset=-1, hash=1)
构造数据写入作为原始输入的 2 个流数据表,先写入右表,再写入左表:
// generate data: trade
t1 = table(`A`A`B`A`B`B as Sym, 10:00:02.000+(1..6)*700 as TradeTime, (3.4 3.5 7.7 3.5 7.5 7.6) as TradePrice)
// generate data: snapshot
t2 = table(`A`B`A`B as Sym, 10:00:00.000+(3 3 6 6)*1000 as Time, (3.5 7.6 3.5 7.6) as Bid1Price, (3.5 7.6 3.6 7.6) as Ask1Price)
// input data
snapshot.append!(t2)
trades.append!(t1)
输入数据与关联关系如下:

关联得到的结果表 output 如下,左表中全部 7 条数据都有对应的输出。本例中,在创建引擎时指定了 delayTime 参数,因此对于分组 B ,即使右表 snapshot 中没有比 10:00:06.200 更大的时间戳, 右表 trades 中最后一条数据(B,10:00:06.200, 7.6) 仍然能够在注入引擎 2s 后强制输出。

发布者:股市刺客,转载请注明出处:https://www.95sca.cn/archives/495935
站内所有文章皆来自网络转载或读者投稿,请勿用于商业用途。如有侵权、不妥之处,请联系站长并出示版权证明以便删除。敬请谅解!