白癜风患者救助计划 http://pf.39.net/bdfyy/bdfhl/170423/5327361.html第17章TqSdk部分函数解读17.1DIFF协议
DIFF(DifferentialInformationFlowforFinance)是一个基于websocket和json的应用层协议。websocket是全双工通信,当客户端和服务器端建立连接后,就可以相互发数据,建立连接又称为“握手”,“握手”成功就可以建立通信了,不用在每次需要传输信息时重新建立连接,即不会“掉线”。json是数据存储格式,json数据可以方便的反序列化为Python数据。
DIFF协议可以简单的理解为服务端和客户端的通信方式,协议规定了数据格式,以便于服务端和客户端可以解读对方发来的数据。
DIFF协议分为两部分:数据访问和数据传输。
17.1.1数据传输DIFF协议要求服务端将业务数据以JSONMergePatch的格式推送给客户端,JSONMergePatch的格式形如Python字典,可以在客户端反序列化为Python字典(其实是映射类型Entity)。例如:
{"aid":"rtn_data",#业务信息截面更新"data":[#数据更新数组{"balance":.1,#账户资金},{"float_profit":.,#浮动盈亏},{"quotes":{"SHFE.cu":{"datetime":"-12-:1:02.","last_price":.0,#最新价"volume":,#成交量"pre_close":.0,#昨收}}}]}
laid字段值即为数据包类型,"aid":"rtn_data"表示该包的类型为业务信息截面更新包。
l整个data数组相当于一个事务,其中的每一个元素都是一个JSONMergePatch,处理完整个数组后业务截面即完成了从上一个时间截面推进到下一个时间截面。
DIFF协议要求客户端发送peek_message数据包以获得业务信息截面更新,例如:
{"aid":"peek_message"}
l服务端在收到peek_message数据包后应检查是否有数据更新,如果有则应将更新内容立即发送给客户端,如果没有则应等到有更新发生时再回应客户端。
l服务端发送rtn_data数据包后可以等收到下一个peek_message后再发送下一个rtn_data数据包。
l一个简单的客户端实现可以在连接成功后及每收到一个rtn_data数据包后发送一个peek_message数据包,这样当客户端带宽不足时会自动降低业务信息截面的更新频率以适应低带宽。
当数据包中的aid字段不是rtn_data或peek_message则表示该包为一个指令包,具体指令由各业务模块定义,例如subscribe_quote表示订阅行情,insert_order表示下单。
l由于客户端和服务端存在网络通讯延迟,客户端的指令需要过一段时间才会影响到业务信息截面中的业务数据,为了使客户端能分辨出服务端是否处理了该指令,通常服务端会将客户端的请求以某种方式体现在截面中(具体方式由各业务模块定义)。例如subscribe_quote订阅行情时服务端会将业务截面中的ins_list字段更新为客户端订阅的合约列表,这样当客户端检查服务端发来的业务截面时如果ins_list包含了客户端订阅的某个合约说明服务端处理了订阅指令,但若quotes没有该合约则说明该合约不存在订阅失败。
服务端发送包含"aid":"rtn_data"字段的业务数据截面更新,客户端发送包含"aid":"peek_message"字段的数据包请求业务数据截面,或发送包含"aid":"subscribe_quote"、"aid":"insert_order"等字段的指令包,如此,服务端和客户端相互发信息,服务端和客户端根据字段识别数据及处理数据。
17.1.2数据访问DIFF协议要求服务端维护一个业务信息截面,例如:
{"account_id":"",#账号"static_balance":.,#静态权益"balance":.55,#账户资金"available":.152,#可用资金"float_profit":.21,#浮动盈亏"risk_ratio":0.,#风险度"using":.2,#占用资金"position_volume":12,#持仓总手数"ins_list":"SHFE.cu,...."#行情订阅的合约列表"quotes":{#所有订阅的实时行情"SHFE.cu":{"instrument_id":"SHFE.cu","datetime":"-12-01:21:2.","ask_priceN":.0,#卖N价"ask_volumeN":,#卖N量"bid_priceN":.0,#买N价"bid_volumeN":,#买N量"last_price":.0,#最新价"highest":.0,#最高价"lowest":.0,#最低价"amount":.5,#成交额"volume":,#成交量"open_interest":,#持仓量"pre_open_interest":,#昨持"pre_close":.0,#昨收"open":.0,#今开"close":"-",#收盘"lower_limit":.0,#跌停"upper_limit":.0,#涨停"average":.1#均价"pre_settlement":.0,#昨结"settlement":"-",#结算价},...}}
对应的客户端也维护了一个该截面的镜像,因此业务层可以简单同步的访问到全部业务数据。
TqSdk即是客户端,TqSdk把收到的业务数据截面以上面的格式合并到_data属性里,_data为多层嵌套的映射类型Entity,业务数据例如“quotes”,也是Entity,其“键”是合约代码,例如“SHFE.cu”,其“值”是最终的业务数据——Quote对象,业务函数get_quote()便是把_data里的Quote对象的一个引用返回给调用方,调用方获得的是Quote对象的动态引用。
_data是可变映射类型,会接收服务端发来的更新,因此业务函数返回的对象引用也会指向随时更新的业务数据。
17.2业务函数以get_quote()为例,上节已经介绍了get_quote()与_data的关系,现在我们结合函数的代码再看下其执行过程,我们只取代码的主要部分:
defget_quote(self,symbol:str)-Quote:#从_data属性中提取Quotequote=_get_obj(self._data,["quotes",symbol],self._prototype["quotes"]["#"])#若合约symbol是新添加的合约,则向服务端发送订阅该合约的指令包ifsymbolnotinself._requests["quotes"]:self._requests["quotes"].add(symbol)self._send_pack({"aid":"subscribe_quote","ins_list":",".join(self._requests["quotes"]),})#返回quote,其指向的是_data中的Quotereturnquote
其他的业务函数工作逻辑类似。业务对象Quote、Trade、Order、Position、Account等都是Entity的子类,可以像类一样获取其属性,也可以像字典一样使用。业务对象在模块objs中定义。
既然业务对象可以像字典一样使用,那应该可以向业务对象中添加新的{键:值}对,的确是可以的,例如下单之后给Order添加一个策略名称,这样就可以判断委托单属于哪个策略,例如:
fromtqsdkimportTqApi,TqAccount,TqAuthapi=TqApi(TqAccount("H期货公司","账户","密码"),auth=TqAuth("信易账号","密码"))quote=api.get_quote("SHFE.rb")#多1手rb,买入价order1=api.insert_order("SHFE.rb",direction="BUY",offset="OPEN",volume=1,limit_price=)order1[cta]=aaa#添加字段#以对手价多2手rb,下单模式为"FAK"order2=api.insert_order("SHFE.rb",direction="BUY",offset="OPEN",volume=2,limit_price=quote.ask_price1,advanced="FAK")order2[cta]=bbb#以对手价平今1手rb的多单order=api.insert_order("SHFE.rb",direction="SELL",offset="CLOSETODAY",volume=1,limit_price=quote.bid_price1)order[cta]=cccorder=api.get_order()#获取全部委托单whileTrue:foroinorder.values():#遍历委托单print(o.instrument_id,o.direction,o.status)#打印品种代码、下单方向、委托单状态print(o.cta)#打印新字段print(----------------)api.wait_update()输出结果为:rbBUYALIVEaaa----------------rbBUYALIVEbbb----------------rbSELLALIVEccc-----------------02-:2:24-INFO-模拟交易下单:时间:-02-:2:24.,合约:SHFE.rb,开平:OPEN,方向:BUY,手数:1,价格:.0-02-:2:24-INFO-模拟交易下单:时间:-02-:2:24.,合约:SHFE.rb,开平:OPEN,方向:BUY,手数:2,价格:.0-02-:2:24-INFO-模拟交易委托单:全部成交-02-:2:24-INFO-模拟交易下单:时间:-02-:2:24.,合约:SHFE.rb,开平:CLOSETODAY,方向:SELL,手数:1,价格:.0-02-:2:24-INFO-模拟交易委托单:全部成交rbBUYALIVEaaarbBUYFINISHEDbbb----------------rbSELLFINISHEDccc----------------rbBUYALIVEaaa----------------rbBUYFINISHEDbbb----------------rbSELLFINISHEDccc
我们下了三单,并分别添加了三个字段,用get_order()获取全部委托单,然后遍历委托单,就可以根据新添加的字段判断该委托单属于哪个策略了。
17.insert_order()insert_order用来下单,我们只截取主要代码看一下执行过程:
definsert_order(...)-Order:"""发送下单指令.**注意:指令将在下次调用**:py:meth:`wait_update`**时发出**"""ifself._loop.is_running():#事件循环正在运行#把下单请求函数打包成task排入事件循环self.create_task(self._insert_order_async(...))#下单后获取委托单orderorder=self.get_order(order_id,account=account)#更新委托单字段order.update({"order_id":order_id,"exchange_id":exchange_id,...})returnorder#返回委托单else:#事件循环还未运行#打包一个指令包pack=self._get_insert_order_pack(...)#发送指令包self._send_pack(pack)##下单后获取委托单orderorder=self.get_order(order_id,account=account)#更新委托单字段order.update({"order_id":order_id,"exchange_id":exchange_id,...})returnorder#返回委托单#发送指令包函数def_send_pack(self,pack):#立即向队列发送指令包ifnotself._is_slave:self._send_chan.send_nowait(pack)else:self._master._slave_send_pack(pack)#下单请求函数asyncdef_insert_order_async(...):#打包一个指令包pack=self._get_insert_order_pack(...)#发送指令包self._send_pack(pack)
下单的主要流程为:用协程任务打包一个指令包再发出去。create_task是无阻塞的,创建完task立即返回,get_order获取委托单也是无阻塞的,因此insert_order执行后会立即返回一个Order对象引用——order,不会等待委托单成交与否。
create_task把下单函数打包成task排入事件循环,需要在调用wait_update启动事件循环时才能执行该task并从队列取出指令包并发送向服务端。
17.4create_task()create_task用来把协程打包成Task对象,以便于在事件循环中并发执行,我们看下函数的代码:
部分内容省略...
TqSdk中大量用到了create_task创建Task,而Task执行结束后会调用回调函数_on_task_done()停止事件循环,而且主线程在执行时(取得了控制权)事件循环可能已经是停止状态,因此需要循环调用wait_update()再次开启事件循环以执行未完成的Task。
17.5TqChanTqChan定义在模块channel中,TqChan是异步队列asyncio.Queue的子类,TqSdk中大量用到了TqChan,TqSdk各组件间通过TqChan传递数据,一个组件向TqChan放入数据,另一个组件从TqChan里取出数据。
TqChan里定义了发送数据和接收数据的函数,因此用TqChan可以连接收、发组件,使组件间建立通信。
数据在组件间单向传递,由TqChan连接的组件构成了生产者、消费者模型,我们在第1.7节已经引入过TqChan。
我们看下TqChan的主要代码,代码各部分的含义注释的很清楚了:
代码省略...
TqSdk中大量用到了TqChan在组件间收发数据,当事件循环被stop停止时,收数据一端执行item=awaitasyncio.Queue.get(self)时会挂起自身并交出控制权给事件循环的调用方,调用方再次启动事件循环时,事件循环继续轮询执行task。
17.6register_update_notify()register_update_notify()函数用于把业务数据注册到TqChan,实际上是把TqChan添加到业务对象的_listener属性里,当业务对象更新时会向TqChan添加一个True值,当TqChan为空时则等待业务对象更新。
我们先看一个以TqChan实例在协程中接收数据更新的例子:
fromtqsdkimportTqApi,TqAuth
api=TqApi(auth=TqAuth("信易账号","密码"))
quote=api.get_quote("CFFEX.T")#订阅盘口行情
#定义一个协程
asyncdeffunc():
fromtqsdk.channelimportTqChan#导入TqChan
chan=TqChan(api,last_only=True)#实例化TqChan,接收数据更新
quote["_listener"].add(chan)#把chan添加进quote的_listener属性
asyncforpinchan:#若quote有更新会执行循环体,如无更新则阻塞等待
print(p)
print(quote.datetime,quote.last_price)#打印盘口时间和最新价
break
awaitchan.close()#chan使用完关闭
returnquote.instrument_name,quote.instrument_name#返回值task=api.create_task(func())#把协程打包成Task
whileTrue:
api.wait_update()
iftask.done():#Task结束后获取协程返回值
print(task.result())
输出结果为:
True
-02-:11:02..
(债十,.0)
(债十,.0)
(债十,.0)
register_update_notify()函数是对上述代码的简化,再用with语句管理上下文,例如:
fromtqsdkimportTqApi,TqAuth
api=TqApi(auth=TqAuth("信易账号","密码"))
quote=api.get_quote("CFFEX.T")#订阅盘口行情
#定义一个协程
asyncdeffunc():
asyncwithapi.register_update_notify(quote)aschan:#把quote注册到chan
asyncforpinchan:#若quote有更新会执行循环体,如无更新则阻塞等待
print(p)
print(quote.datetime,quote.last_price)#打印盘口时间和最新价
break
returnquote.instrument_name,quote.instrument_name#返回值task=api.create_task(func())#把协程打包成Task
whileTrue:
api.wait_update()
iftask.done():#Task结束后获取协程返回值
print(task.result())
输出结果为:
True
-02-:48:5.8.26
(债十,债十)
(债十,债十)
(债十,债十)
若asyncforpinchan循环不用break跳出,则会随quote更新循环执行,若quote无更新,比如停盘,异步迭代函数__anext__()里将阻塞,循环也跟着阻塞,等待再次收到quote更新。
17.7wait_update()wait_update用于等待业务更新,我们结合其代码分析下其执行机制:
代码省略...
从wait_update的代码可知,wait_update的工作可分成四大块:
1.先执行事件循环中存在的task
2.向服务端请求新数据
.事件循环轮询执行未完成的task,若无task也未设置超时也未收到新数据,将阻塞
4.收到了新数据,停止事件循环,用新数据更新_data,等待下次调用wait_update
wait_update其实是事件循环的调用方(执行self._loop.run_forever()),因此,wait_update的核心工作是开启事件循环。
部分内容省略...
由于TqChan继承于先进先出队列asyncio.Queue,当线程中被time.sleep()阻塞时,服务端的数据可能已经更新了很多,当阻塞释放后TqSdk客户端先接收(取出)的数据是最先发送到队列里的,即先接收的数据是最早的数据,因此比实时行情滞后,滞后的数据又不应丢弃,如果只用最新数据,指标计算、K线等将不再连续可能产生随机错误的交易信号,因此应避免使用time.sleep()阻塞线程。
同理,如果线程中有一个耗时运算,同样会导致接收的行情滞后,因此应把耗时运算放在协程中,并在必要处用asyncio.sleep(0)让出协程的控制权,以使得wait_update()有机会更新最新数据,若耗时运算确实需要等待完成,则宜用多进程执行耗时运算。
需要循环处理业务数据(例如循环获取持仓)的任务若不与wait_update在同一个循环中,也建议都用协程实现,并用create_task创建为Task,因为Task执行结束后会调用_on_task_done()停止事件循环,且协程中每次循环也用asyncio.sleep(0)让出控制权使得业务数据获得更新的机会,否则循环使用的业务数据可能仍是之前的值而出现逻辑错误。监听业务更新更好的方法是用register_update_notify把业务数据注册到TqChan,在asyncfor中对最新数据处理。
总之,应避免wait_update()被阻塞,保持wait_update()可被随时调用,否则将导致行情滞后。
17.8目标持仓工具TargetPosTaskTargetPosTask用来创建目标持仓task,我们截取主要代码看一下:
classTargetPosTaskSingleton(type):#检查下单方向及品种目标持仓task是否重复
_instances={}
def__call__(cls,api,symbol,price="ACTIVE",offset_priority="今昨,开",trade_chan=None,*args,**kwargs):
ifsymbolnotinTargetPosTaskSingleton._instances:
TargetPosTaskSingleton._instances[symbol]=\super(TargetPosTaskSingleton,cls).__call__(api,symbol,price,**kwargs)
else:
instance=TargetPosTaskSingleton._instances[symbol]
ifinstance._offset_priority!=offset_priority:
raiseException("您试图用不同的offset_priority参数创建两个%s调仓任务,offset_priority参数原为%s,现为%s"%(
symbol,instance._offset_priority,offset_priority))
ifinstance._price!=price:
raiseException("您试图用不同的price参数创建两个%s调仓任务,price参数原为%s,现为%s"%(symbol,instance._price,price))
returnTargetPosTaskSingleton._instances[symbol]
classTargetPosTask(object,metaclass=TargetPosTaskSingleton):
"""目标持仓task,该task可以将指定合约调整到目标头寸"""
def__init__()-None:
super(TargetPosTask,self).__init__()
self._pos_chan=TqChan(self._api,last_only=True)#目标持仓队列
self._task=self._api.create_task(self._target_pos_task())#创建目标持仓task
defset_target_volume(self,volume:int)-None:
"""设置目标持仓手数"""
self._pos_chan.send_nowait(int(volume))#该函数就这一个作用,把目标持仓放入目标持仓队列
def_get_order(self,offset,vol,pending_frozen):
"""
根据指定的offset和预期下单手数vol,返回符合要求的委托单最大报单手数
"""
returnorder_offset,order_dir,order_volume#返回开平方向、买卖方向、下单手数
asyncdef_target_pos_task(self):
"""负责调整目标持仓的task"""
try:
asyncfortarget_posinself._pos_chan:
target_pos=self._pos_chan.recv_latest(target_pos)#获取最后一个target_pos目标仓位
#确定调仓增减方向
delta_volume=target_pos-self._pos.pos#计算目标持仓和净持仓的差值
foreach_priorityinself._offset_priority+",":#按不同模式的优先级顺序报出不同的offset单,股指(“昨开”)平昨优先从不平今就先报平昨,原油平今优先("今昨开")就报平今
#返回开平方向、买卖方向、下单手数
order_offset,order_dir,order_volume=self._get_order()
order_task=InsertOrderUntilAllTradedTask()
classInsertOrderUntilAllTradedTask(object):
"""追价下单task,该task会在行情变化后自动撤单重下,直到全部成交
(注:此类主要在tqsdk内部使用,并非简单用法,不建议用户使用)"""
def__init__():
"""创建追价下单task实例"""
self._task=self._api.create_task(self._run())#创建追价下单task
asyncdef_run(self):
"""负责追价下单的task"""
asyncwithself._api.register_update_notify()asupdate_chan:
whileself._volume!=0:
insert_order_task=InsertOrderTask()
order=awaitinsert_order_task._order_chan.recv()
check_chan=TqChan(self._api,last_only=True)
check_task=self._api.create_task(self._check_price())#检查价格是否变化
try:
awaitasyncio.shield(insert_order_task._task)
order=insert_order_task._order_chan.recv_latest(order)
self._volume=order.volume_left
ifself._api.get_order().status=="ALIVE":
#当task被cancel时,主动撤掉未成交的挂单
self._api.cancel_order(order.order_id,account=self._account)
#在每次退出时,都等到insert_order_task执行完,此时order状态一定是FINISHED;self._trade_chan也一定会收到全部的成交手数
awaitinsert_order_task._task
def_get_price(self,direction,price_mode):
"""根据最新行情和下单方式计算出最优的下单价格"""
#主动买的价格序列(优先判断卖价,如果没有则用买价)
returnlimit_price
asyncdef_check_price(self,update_chan,order_price,order):
"""判断价格是否变化的task"""
classInsertOrderTask(object):
"""下单task(注:此类主要在tqsdk内部使用,并非简单用法,不建议用户使用)"""
def__init__():
"""创建下单task实例"""
self._task=self._api.create_task(self._run())
asyncdef_run(self):
"""负责下单的task"""
order_id=_generate_uuid("PYSDK_target")
order=self._api.insert_order(self._symbol,self._direction,self._offset,self._volume,self._limit_price,order_id=order_id,account=self._account)
上述代码存在多层调用使整体略显复杂。
从上述代码来看,TargetPosTask(api,symbol)实例化时便在初始化函数中创建了多层调用的task,主要作用就是检查队列里有没有需要下单的手数、价格有没有变化、要不要撤单重下、把委托单和成交单放入队列或取出等,调用set_target_volume(volume:int)只是把目标持仓手数放入队列,真正执行下单的是初始化函数中创建的task,而最终执行下单的则是task中调用的insert_order()函数。
所以TargetPosTask(其实是lib模块中的四个多层调用的类)的作用就是把调用insert_order下单来实现调仓的过程封装在了一起,使得两句代码即可完成调仓,但感觉其实现过程还是太复杂了,目前其臃肿的代码使得原本的insert_order失去了灵活性,例如不能使用高级委托指令、调仓一旦创建不能根据行情自由调整、不能在多个策略中对同一品种设置不同调仓目标,未来应该会优化。
目前个人更倾向于推荐使用第18.5节里的开平仓函数OpenClose实现调仓。
所向何方感谢支持