关于python:WonderTrader高频交易初探及v06发布

35次阅读

共计 20791 个字符,预计需要花费 52 分钟才能阅读完成。

前言

自从 WonderTrader 实现了 HFT 策略引擎以来,始终都没有工夫彻底的将高频策略研发、回测、仿真、实盘整个流程彻底走通一遍。所以趁着最近公司要上高频的机会,笔者基于 WonderTrader 把高频策略的利用彻底梳理了一遍。

本文的次要目标就是帮忙用户初步理解 WonderTraderHFT引擎上如何开发策略的

平台筹备

之前实盘框架下的 HFT 引擎曾经根本实现,然而回测框架下的 HFT 策略的反对因为事件太多始终没有欠缺。这次彻底梳理 HFT 引擎,正好把回测局部也欠缺了一下。HFT回测引擎欠缺之后,WonderTrader也正好公布一个新版本v0.6.0

v0.6.0 更新要点

  • CTA引擎设置指标仓位时,同时订阅 tick 数据,次要针对标的不确定的策略,例如截面因子 CTA 策略
  • CTA回测引擎中,输入的平仓明细中新增“最大潜在收益”和“最大潜在亏损”两个字段
  • HFT引擎的回测进行了一次彻底的整顿实现,根本满足了 HFT 策略回测的需要(已测试)
  • 初步实现了 HFT 引擎对股票 Level2 数据(orderqueue,orderdetail,transaction)的拜访接口
  • WtPorterWtBtPorter 两个 C 接口粘合模块,初步实现了 C 接口对股票 Level2 数据的反对

高频模型介绍

本文采纳的高频模型,源自Darryl ShenLinacre College University of Oxford)于 2015 年 5 月 27 日发表的《Order Imbalance Based Strategy in High Frequency Trading》一文(网络上能够找到)。

该模型基于 Ltick数据中的委托量的不均衡因子、委比因子以及中间价回归因子三个因子,预测 t0 时刻之后的 ktick数据的中间价的均价变动量,并以此构建 线性模型。通过线性回归,失去各个因子的系数。线性方程如下:

方程中各个符号的具体含意,请感兴趣的读者自行检索。该文中应用 2014 年 IF 主力合约全年的 tick 进行回测,每次进出场以 1 手股指为单位,能够实现 92.6% 的胜率,最优参数下,年化夏普率能够达到7.243,日均P&L58600元。

模型实现

有了模型当前,咱们开始来编写代码实现。因为本文旨在介绍 HFT 策略开发的流程,为了升高读者了解难度,策略都采纳 Python 编写。

策略框架介绍

首先咱们来看一下一个高频策略的根本构造:

class BaseHftStrategy:
    '''
    HFT 策略根底类,所有的策略都从该类派生 \n
    蕴含了策略的根本开发框架
    '''
    def __init__(self, name):
        self.__name__ = name
        
    
    def name(self):
        return self.__name__


    def on_init(self, context:HftContext):
        '''
        策略初始化,启动的时候调用 \n
        用于加载自定义数据 \n
        @context    策略运行上下文
        '''
        return

    def on_tick(self, context:HftContext, stdCode:str, newTick:dict):
        '''
        Tick 数据进来时调用 \n
        @context    策略运行上下文 \n
        @stdCode    合约代码 \n
        @newTick    最新 Tick
        '''
        return

    def on_order_detail(self, context:HftContext, stdCode:str, newOrdQue:dict):
        '''
        逐笔委托数据进来时调用 \n
        @context    策略运行上下文 \n
        @stdCode    合约代码 \n
        @newOrdQue  最新逐笔委托
        '''
        return

    def on_order_queue(self, context:HftContext, stdCode:str, newOrdQue:dict):
        '''
        委托队列数据进来时调用 \n
        @context    策略运行上下文 \n
        @stdCode    合约代码 \n
        @newOrdQue  最新委托队列
        '''
        return

    def on_transaction(self, context:HftContext, stdCode:str, newTrans:dict):
        '''
        逐笔成交数据进来时调用 \n
        @context    策略运行上下文 \n
        @stdCode    合约代码 \n
        @newTrans   最新逐笔成交
        '''
        return

    def on_bar(self, context:HftContext, stdCode:str, period:str, newBar:dict):
        '''
        K 线闭合时回调
        @context    策略上下文 \n
        @stdCode    合约代码
        @period     K 线周期
        @newBar     最新闭合的 K 线
        '''
        return

    def on_channel_ready(self, context:HftContext):
        '''
        交易通道就绪告诉 \n
        @context    策略上下文 \n
        '''
        return

    def on_channel_lost(self, context:HftContext):
        '''
        交易通道失落告诉 \n
        @context    策略上下文 \n
        '''
        return

    def on_entrust(self, context:HftContext, localid:int, stdCode:str, bSucc:bool, msg:str, userTag:str):
        '''
        下单后果回报
        @context    策略上下文 \n
        @localid    本地订单 id\n
        @stdCode    合约代码 \n
        @bSucc      下单后果 \n
        @mes        下单后果形容
        '''
        return

    def on_order(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, totalQty:float, leftQty:float, price:float, isCanceled:bool, userTag:str):
        '''
        订单回报
        @context    策略上下文 \n
        @localid    本地订单 id\n
        @stdCode    合约代码 \n
        @isBuy      是否买入 \n
        @totalQty   下单数量 \n
        @leftQty    残余数量 \n
        @price      下单价格 \n
        @isCanceled 是否已撤单
        '''
        return

    def on_trade(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, qty:float, price:float, userTag:str):
        '''
        成交回报
        @context    策略上下文 \n
        @stdCode    合约代码 \n
        @isBuy      是否买入 \n
        @qty        成交数量 \n
        @price      成交价格
        '''
        return

整个策略的构造大抵能够分为四块:

  • 策略自身的回调
  • 行情数据的回调
  • 交易通道的回调
  • 交易回报的回调

其中行情数据的回调,次要包含 on_tickon_barlevel2数据回调,本文中只须要关注 on_tick 即可;交易通道的回调,次要是告诉策略交易通道的连贯和断开事件;交易回报的回调,次要是订单回报、成交回报以及下单回报。

参数设计

依据模型的逻辑,咱们设置回溯 tick 数为5,中间价变动的阈值为0.3,那么咱们便能够将策略参数设计如下:

'''交易参数'''
self.__code__ = code            #交易合约
self.__expsecs__ = expsecs      #订单超时秒数,用于管制超时撤单
self.__freq__ = freq            #交易频率管制,指定工夫内限度信号数,单位秒

self.__lots__ = lots            #单次交易手数

self.count = count              #回溯 tick 条数
self.beta_0 = beta_0            #常量系数 + 残差
self.beta_r = beta_r            #中间价回归因子系数
self.threshold = threshold      #中间价变动阈值
self.beta_oi = beta_oi          #成交量不均衡因子系数序列
self.beta_rou = beta_rou        #委比因子系数序列
self.active_secs = active_secs  #交易工夫区间
self.stoppl = stoppl            #止盈止损配置

外围逻辑

在大抵理解了 HFT 策略的构造当前,咱们就能够开始来编码了。整个策略的外围逻辑,集中在 on_tick 回调中,次要就是上述模型的计算,代码如下:

hisTicks = context.stra_get_ticks(self.__code__, self.count + 1)
if hisTicks.size != self.count+1:
    return

if (len(newTick["askprice"]) == 0) or (len(newTick["bidprice"]) == 0):
    return

spread = newTick["askprice"][0] - newTick["bidprice"][0]

total_OIR = 0.0
total_rou = 0.0
# 计算不均衡因子和委比因子的累加之和
for i in range(1, self.count + 1):
    prevTick = hisTicks.get_tick(i-1)
    curTick = hisTicks.get_tick(i)

    lastBidPx = self.get_price(prevTick, -1)
    lastAskPx = self.get_price(prevTick, 1)

    lastBidQty = prevTick["bidqty"][0] if len(prevTick["bidqty"]) > 0 else 0
    lastAskQty = prevTick["askqty"][0] if len(prevTick["askqty"]) > 0 else 0

    curBidPx = self.get_price(curTick, -1)
    curAskPx = self.get_price(curTick, 1)

    curBidQty = curTick["bidqty"][0] if len(curTick["bidqty"]) > 0 else 0
    curAskQty = curTick["askqty"][0] if len(curTick["askqty"]) > 0 else 0

    delta_vb = 0.0
    delta_va = 0.0
    if curBidPx < lastBidPx:
        delta_vb = 0.0
    elif curBidPx == lastBidPx:
        delta_vb = curBidQty - lastBidQty
    else:
        delta_vb = curBidQty

    if curAskPx < lastAskPx:
        delta_va = curAskQty
    elif curAskPx == lastAskPx:
        delta_va = curAskQty - lastAskQty
    else:
        delta_va = 0.0
    voi = delta_vb - delta_va
    total_OIR += self.beta_oi[i-1]*voi/spread

    #计算委比
    rou = (curBidQty - curAskQty)/(curBidQty + curAskQty)
    total_rou += self.beta_rou[i-1]*rou/spread

prevTick = hisTicks.get_tick(-2)
# t- 1 时刻的中间价
prevMP = (self.get_price(prevTick, -1) + self.get_price(prevTick, 1))/2
# 最新的中间价
curMP = (newTick["askprice"][0] + newTick["bidprice"][0])/2
# 两个快照之间的成交均价
if newTick["volumn"] != 0:
    avgTrdPx = newTick["turn_over"]/newTick["volumn"]/self.__comm_info__.volscale
elif self._last_atp__!= 0:
    avgTrdPx = self._last_atp__
else:
    avgTrdPx = curMP

self._last_atp__ = avgTrdPx

# 计算中间价回归因子
curR = avgTrdPx - (prevMP + curMP) / 2

# 计算预期中间价变动量
efpc = self.beta_0 + total_OIR + total_rou + self.beta_r * curR / spread

if efpc >= self.threshold:
    targetPos = self.__lots__
    diffPos = targetPos - curPos
    if diffPos != 0.0:
        targetPx = newTick["askprice"][0]
        ids = context.stra_buy(self.__code__, targetPx, abs(diffPos), "enterlong")

        #将订单号退出到治理中
        for localid in ids:
            self.__orders__[localid] = localid
        self.__last_entry_time__ = now
        self._max_dyn_prof = 0
        self._max_dyn_loss = 0
elif efpc <= -self.threshold:
    targetPos = -self.__lots__
    diffPos = targetPos - curPos
    if diffPos != 0:
        targetPx = newTick["bidprice"][0]
        ids = context.stra_sell(self.__code__, targetPx, abs(diffPos), "entershort")

        #将订单号退出到治理中
        for localid in ids:
            self.__orders__[localid] = localid
        self.__last_entry_time__ = now
        self._max_dyn_prof = 0.0
        self._max_dyn_loss = 0.0

止盈止损逻辑

然而对于高频策略,除了外围的进出场逻辑之外,止盈止损逻辑 也是十分重要的一部分。本文中的策略采纳 固定点位止损 + 跟踪止盈 来作为止盈止损逻辑,代码如下:

# 止盈止损逻辑
if curPos != 0 and self.stoppl["active"]:
    isLong = (curPos > 0)

    # 首先获取最新的价格,calc_price 为 0 的话,应用对手价计算浮盈,calc_price 为 1 的话,应用最新价计算浮盈
    price = 0
    if self.stoppl["calc_price"] == 0:
        price = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)
    else:
        price = newTick["price"]
    #而后计算浮动盈亏的跳数
    diffTicks = (price - self._last_entry_price)*(1 if isLong else -1) / self.__comm_info__.pricetick
    if diffTicks > 0:
        self._max_dyn_prof = max(self._max_dyn_prof, diffTicks)
    else:
        self._max_dyn_loss = min(self._max_dyn_loss, diffTicks)

    bNeedExit = False
    usertag = ''stop_ticks = self.stoppl["stop_ticks"]
    track_threshold = self.stoppl["track_threshold"]
    fallback_boundary = self.stoppl["fallback_boundary"]
    if diffTicks <= stop_ticks:
        context.stra_log_text("浮亏 %.0f 超过 %d 跳,止损离场" % (diffTicks, stop_ticks))
        bNeedExit = True
        usertag = "stoploss"
    elif self._max_dyn_prof >= track_threshold and diffTicks <= fallback_boundary:
        context.stra_log_text("浮赢回撤 %.0f->%.0f[阈值 %.0f->%.0f],止盈离场" % (self._max_dyn_prof, diffTicks, track_threshold, fallback_boundary))
        bNeedExit = True
        usertag = "stopprof"
    
    if bNeedExit:
        targetprice = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)
        ids = context.stra_sell(self.__code__, targetprice, abs(curPos), usertag) if isLong else context.stra_buy(self.__code__, price, abs(curPos), usertag)
        for localid in ids:
            self.__orders__[localid] = localid

        # 出场逻辑执行当前完结逻辑
        return

开盘前出场的逻辑

有了止盈止损逻辑,咱们还须要增加一段 开盘前出场的逻辑,代码如下:

curMin = context.stra_get_time()
curPos = context.stra_get_position(stdCode)

# 不在交易工夫,则查看是否有持仓
# 如果有持仓,则须要清理
if not self.is_active(curMin):
    self._last_atp__ = 0.0
    if curPos == 0:
        return
    self.__to_clear__ = True
else:
    self.__to_clear__ = False

# 如果须要清理持仓,且不在撤单过程中
if self.__to_clear__ :
    if self.__cancel_cnt__ == 0:
        if curPos > 0:
            # 以对手价挂单
            targetPx = self.get_price(newTick, -1)
            ids = context.stra_sell(self.__code__, targetPx, abs(curPos), "deadline")

            #将订单号退出到治理中
            for localid in ids:
                self.__orders__[localid] = localid
        elif curPos < 0:
            # 以对手价挂单
            targetPx = self.get_price(newTick, 1)
            ids = context.stra_buy(self.__code__, targetPx, abs(curPos), "deadline")

            #将订单号退出到治理中
            for localid in ids:
                self.__orders__[localid] = localid
    
    return

订单治理逻辑

而后,咱们还须要增加一段 订单治理 的逻辑,代码如下:

def check_orders(self, ctx:HftContext):
    #如果未实现订单不为空
    ord_cnt = len(self.__orders__.keys())
    if ord_cnt > 0 and self.__last_entry_time__ is not None:
        #以后工夫,肯定要从 api 获取,不然回测会有问题
        now = makeTime(ctx.stra_get_date(), ctx.stra_get_time(), ctx.stra_get_secs())
        span = now - self.__last_entry_time__
        total_secs = span.total_seconds()
        if total_secs >= self.__expsecs__: #如果订单超时,则须要撤单
            ctx.stra_log_text("%d 条订单超时撤单" % (ord_cnt))
            for localid in self.__orders__:
                ctx.stra_cancel(localid)
                self.__cancel_cnt__ += 1
                ctx.stra_log_text("在途撤复数 -> %d" % (self.__cancel_cnt__))

其余逻辑

除了上述的逻辑之外,咱们还须要解决一些 细节问题,如:

  • 解决订单回报,用于更新本地订单的状态;
  • 解决成交回报,用于更新入场价格,计算浮动盈亏
  • 解决交易通道就绪的回报,用于查看是否有不在治理内的未实现单

残缺源码

整个策略的 残缺代码 如下:

from wtpy import BaseHftStrategy
from wtpy import HftContext

from datetime import datetime

def makeTime(date:int, time:int, secs:int):
    '''
    将零碎工夫转成 datetime\n
    @date   日期,格局如 20200723\n
    @time   工夫,准确到分,格局如 0935\n
    @secs   秒数,准确到毫秒,格局如 37500
    '''
    return datetime(year=int(date/10000), month=int(date%10000/100), day=date%100, 
        hour=int(time/100), minute=time%100, second=int(secs/1000), microsecond=secs%1000*1000)

class HftStraOrderImbalance(BaseHftStrategy):

    def __init__(self, name:str, code:str, count:int, lots:int, beta_0:float, beta_r:float, threshold:float, 
            beta_oi:list, beta_rou:list, expsecs:int, offset:int, freq:int, active_secs:list, stoppl:dict, reserve:int=0):
        BaseHftStrategy.__init__(self, name)

        '''交易参数'''
        self.__code__ = code            #交易合约
        self.__expsecs__ = expsecs      #订单超时秒数,用于管制超时撤单
        self.__freq__ = freq            #交易频率管制,指定工夫内限度信号数,单位秒

        self.__lots__ = lots            #单次交易手数

        self.count = count              #回溯 tick 条数
        self.beta_0 = beta_0            #常量系数 + 残差
        self.beta_r = beta_r            #中间价回归因子系数
        self.threshold = threshold      #中间价变动阈值
        self.beta_oi = beta_oi          #成交量不均衡因子系数序列
        self.beta_rou = beta_rou        #委比因子系数序列
        self.active_secs = active_secs  #交易工夫区间
        self.stoppl = stoppl            #止盈止损配置

        '''外部数据'''
        self.__last_tick__ = None       #上一笔行情
        self.__orders__ = dict()        #策略相干的订单
        self.__last_entry_time__ = None #上次入场工夫
        self.__cancel_cnt__ = 0         #正在撤销的订单数
        self.__channel_ready__ = False  #通道是否就绪

        self.__comm_info__ = None
        self.__to_clear__ = False

        self._last_entry_price = 0.0
        self._max_dyn_prof = 0.0
        self._max_dyn_loss = 0.0

        self._last_atp__ = 0.0

    def is_active(self, curMin:int) -> bool:
        for sec in self.active_secs:
            if sec["start"] <= curMin and curMin <= sec["end"]:
                return True
        return False
        

    def on_init(self, context:HftContext):
        '''
        策略初始化,启动的时候调用 \n
        用于加载自定义数据 \n
        @context    策略运行上下文
        '''
        self.__comm_info__ = context.stra_get_comminfo(self.__code__)
        #先订阅实时数据
        context.stra_sub_ticks(self.__code__)

        self.__ctx__ = context

    def check_orders(self, ctx:HftContext):
        #如果未实现订单不为空
        ord_cnt = len(self.__orders__.keys())
        if ord_cnt > 0 and self.__last_entry_time__ is not None:
            #以后工夫,肯定要从 api 获取,不然回测会有问题
            now = makeTime(ctx.stra_get_date(), ctx.stra_get_time(), ctx.stra_get_secs())
            span = now - self.__last_entry_time__
            total_secs = span.total_seconds()
            if total_secs >= self.__expsecs__: #如果订单超时,则须要撤单
                ctx.stra_log_text("%d 条订单超时撤单" % (ord_cnt))
                for localid in self.__orders__:
                    ctx.stra_cancel(localid)
                    self.__cancel_cnt__ += 1
                    ctx.stra_log_text("在途撤复数 -> %d" % (self.__cancel_cnt__))

    def get_price(self, newTick, pricemode=0):
        if pricemode == 0:
            return newTick["price"]
        elif pricemode == 1:
            return newTick["askprice"][0] if len(newTick["askprice"])>0 else newTick["price"]
        elif pricemode == -1:
            return newTick["bidprice"][0] if len(newTick["bidprice"])>0 else newTick["price"]


    def on_tick(self, context:HftContext, stdCode:str, newTick:dict):
        if self.__code__ != stdCode:
            return

        #如果有未实现订单,则进入订单治理逻辑
        if len(self.__orders__.keys()) != 0:
            self.check_orders(context)
            return

        if not self.__channel_ready__:
            return

        curMin = context.stra_get_time()
        curPos = context.stra_get_position(stdCode)

        # 不在交易工夫,则查看是否有持仓
        # 如果有持仓,则须要清理
        if not self.is_active(curMin):
            self._last_atp__ = 0.0
            if curPos == 0:
                return
            self.__to_clear__ = True
        else:
            self.__to_clear__ = False

        # 如果须要清理持仓,且不在撤单过程中
        if self.__to_clear__ :
            if self.__cancel_cnt__ == 0:
                if curPos > 0:
                    targetPx = self.get_price(newTick, -1)
                    ids = context.stra_sell(self.__code__, targetPx, abs(curPos), "deadline")

                    #将订单号退出到治理中
                    for localid in ids:
                        self.__orders__[localid] = localid
                elif curPos < 0:
                    targetPx = self.get_price(newTick, 1)
                    ids = context.stra_buy(self.__code__, targetPx, abs(curPos), "deadline")

                    #将订单号退出到治理中
                    for localid in ids:
                        self.__orders__[localid] = localid
            
            return

        # 止盈止损逻辑
        if curPos != 0 and self.stoppl["active"]:
            isLong = (curPos > 0)
            price = 0
            if self.stoppl["calc_price"] == 0:
                price = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)
            else:
                price = newTick["price"]
            diffTicks = (price - self._last_entry_price)*(1 if isLong else -1) / self.__comm_info__.pricetick
            if diffTicks > 0:
                self._max_dyn_prof = max(self._max_dyn_prof, diffTicks)
            else:
                self._max_dyn_loss = min(self._max_dyn_loss, diffTicks)

            bNeedExit = False
            usertag = ''stop_ticks = self.stoppl["stop_ticks"]
            track_threshold = self.stoppl["track_threshold"]
            fallback_boundary = self.stoppl["fallback_boundary"]
            if diffTicks <= stop_ticks:
                context.stra_log_text("浮亏 %.0f 超过 %d 跳,止损离场" % (diffTicks, stop_ticks))
                bNeedExit = True
                usertag = "stoploss"
            elif self._max_dyn_prof >= track_threshold and diffTicks <= fallback_boundary:
                context.stra_log_text("浮赢回撤 %.0f->%.0f[阈值 %.0f->%.0f],止盈离场" % (self._max_dyn_prof, diffTicks, track_threshold, fallback_boundary))
                bNeedExit = True
                usertag = "stopprof"
            
            if bNeedExit:
                targetprice = self.get_price(newTick, -1) if isLong else self.get_price(newTick, 1)
                ids = context.stra_sell(self.__code__, targetprice, abs(curPos), usertag) if isLong else context.stra_buy(self.__code__, price, abs(curPos), usertag)
                for localid in ids:
                    self.__orders__[localid] = localid

                # 出场逻辑执行当前完结逻辑
                return

        now = makeTime(self.__ctx__.stra_get_date(), self.__ctx__.stra_get_time(), self.__ctx__.stra_get_secs())
        
        # 成交量为 0 且上一个成交均价为 0,则须要退出
        if newTick["volumn"] == 0 and self._last_atp__ == 0.0:
            return

        #如果曾经入场,且有频率限度,则做频率查看
        if self.__last_entry_time__ is not None and self.__freq__ != 0:
            #以后工夫,肯定要从 api 获取,不然回测会有问题
            span = now - self.__last_entry_time__
            if span.total_seconds() <= self.__freq__:
                return

        hisTicks = context.stra_get_ticks(self.__code__, self.count + 1)
        if hisTicks.size != self.count+1:
            return

        if (len(newTick["askprice"]) == 0) or (len(newTick["bidprice"]) == 0):
            return

        spread = newTick["askprice"][0] - newTick["bidprice"][0]

        total_OIR = 0.0
        total_rou = 0.0
        for i in range(1, self.count + 1):
            prevTick = hisTicks.get_tick(i-1)
            curTick = hisTicks.get_tick(i)

            lastBidPx = self.get_price(prevTick, -1)
            lastAskPx = self.get_price(prevTick, 1)

            lastBidQty = prevTick["bidqty"][0] if len(prevTick["bidqty"]) > 0 else 0
            lastAskQty = prevTick["askqty"][0] if len(prevTick["askqty"]) > 0 else 0

            curBidPx = self.get_price(curTick, -1)
            curAskPx = self.get_price(curTick, 1)

            curBidQty = curTick["bidqty"][0] if len(curTick["bidqty"]) > 0 else 0
            curAskQty = curTick["askqty"][0] if len(curTick["askqty"]) > 0 else 0

            delta_vb = 0.0
            delta_va = 0.0
            if curBidPx < lastBidPx:
                delta_vb = 0.0
            elif curBidPx == lastBidPx:
                delta_vb = curBidQty - lastBidQty
            else:
                delta_vb = curBidQty

            if curAskPx < lastAskPx:
                delta_va = curAskQty
            elif curAskPx == lastAskPx:
                delta_va = curAskQty - lastAskQty
            else:
                delta_va = 0.0
            voi = delta_vb - delta_va
            total_OIR += self.beta_oi[i-1]*voi/spread

            #计算委比
            rou = (curBidQty - curAskQty)/(curBidQty + curAskQty)
            total_rou += self.beta_rou[i-1]*rou/spread


        prevTick = hisTicks.get_tick(-2)
        # t- 1 时刻的中间价
        prevMP = (self.get_price(prevTick, -1) + self.get_price(prevTick, 1))/2
        # 最新的中间价
        curMP = (newTick["askprice"][0] + newTick["bidprice"][0])/2
        # 两个快照之间的成交均价
        if newTick["volumn"] != 0:
            avgTrdPx = newTick["turn_over"]/newTick["volumn"]/self.__comm_info__.volscale
        elif self._last_atp__!= 0:
            avgTrdPx = self._last_atp__
        else:
            avgTrdPx = curMP

        self._last_atp__ = avgTrdPx

        # 计算中间价回归因子
        curR = avgTrdPx - (prevMP + curMP) / 2

        # 计算预期中间价变动量
        efpc = self.beta_0 + total_OIR + total_rou + self.beta_r * curR / spread

        if efpc >= self.threshold:
            targetPos = self.__lots__
            diffPos = targetPos - curPos
            if diffPos != 0.0:
                targetPx = newTick["askprice"][0]
                ids = context.stra_buy(self.__code__, targetPx, abs(diffPos), "enterlong")

                #将订单号退出到治理中
                for localid in ids:
                    self.__orders__[localid] = localid
                self.__last_entry_time__ = now
                self._max_dyn_prof = 0
                self._max_dyn_loss = 0
        elif efpc <= -self.threshold:
            targetPos = -self.__lots__
            diffPos = targetPos - curPos
            if diffPos != 0:
                targetPx = newTick["bidprice"][0]
                ids = context.stra_sell(self.__code__, targetPx, abs(diffPos), "entershort")

                #将订单号退出到治理中
                for localid in ids:
                    self.__orders__[localid] = localid
                self.__last_entry_time__ = now
                self._max_dyn_prof = 0.0
                self._max_dyn_loss = 0.0


    def on_bar(self, context:HftContext, stdCode:str, period:str, newBar:dict):
        return

    def on_channel_ready(self, context:HftContext):
        undone = context.stra_get_undone(self.__code__)
        if undone != 0 and len(self.__orders__.keys()) == 0:
            context.stra_log_text("%s 存在不在治理中的未实现单 %f 手,全副撤销" % (self.__code__, undone))
            isBuy = (undone > 0)
            ids = context.stra_cancel_all(self.__code__, isBuy)
            for localid in ids:
                self.__orders__[localid] = localid
            self.__cancel_cnt__ += len(ids)
            context.stra_log_text("在途撤复数 -> %d" % (self.__cancel_cnt__))
        self.__channel_ready__ = True

    def on_channel_lost(self, context:HftContext):
        context.stra_log_text("交易通道连贯失落")
        self.__channel_ready__ = False

    def on_entrust(self, context:HftContext, localid:int, stdCode:str, bSucc:bool, msg:str, userTag:str):
        return

    def on_order(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, totalQty:float, leftQty:float, price:float, isCanceled:bool, userTag:str):
        if localid not in self.__orders__:
            return

        if isCanceled or leftQty == 0:
            self.__orders__.pop(localid)
            if self.__cancel_cnt__ > 0:
                self.__cancel_cnt__ -= 1
                self.__ctx__.stra_log_text("在途撤复数 -> %d" % (self.__cancel_cnt__))
        return

    def on_trade(self, context:HftContext, localid:int, stdCode:str, isBuy:bool, qty:float, price:float, userTag:str):
        self._last_entry_price = price

模型回测

模型编码实现当前,咱们就能够思考模型回测了。

数据筹备

笔者共享了股指期货主力合约 2020 年 12 月到 2021 年 1 月份的 tick 数据到百度网盘中,地址如下:
https://pan.baidu.com/s/1Bdxh… 提取码:d6bh

文件名为CFFEX.IF.HOT_ticks_20201201_20210118.7z,读者能够自行获取。

数据格式为 WonderTrader 外部压缩寄存的数据格式 .dsb,如果要做回归的话,那么还须要将.dsb 文件导出为 csv 文件。wtpy中的 WtDtHelper 模块中就提供了数据转换的办法,调用代码如下:

from wtpy.wrapper import WtDataHelper
import os

dtHelper = WtDataHelper()
dtHelper.dump_ticks('dsb 文件所在的目录', '要输入的 csv 目录')

csv 数据导出当前,就能够利用 python 读取数据进行模型线性回归了。

回测入口

线性回归做好当前,失去一组系数。而后编写回测入口脚本,代码如下:

from wtpy import WtBtEngine, EngineType
from strategies.HftStraOrdImbal import HftStraOrderImbalance

def read_params_from_csv(filename) -> dict:
    params = {
        "beta_0":0.0,
        "beta_r":0.0,
        "beta_oi":[],
        "beta_rou":[]}
    f = open(filename, "r")
    lines = f.readlines()
    f.close()
    for row in range(1, len(lines)):
        curLine = lines[row]
        ay = curLine.split(",")
        if row == 1:
            params["beta_0"] = float(ay[1])
        elif row == 14:
            params["beta_r"] = float(ay[1])
        elif row > 1 and row <=7:
            params["beta_oi"].append(float(ay[1]))
        elif row > 7 and row <=13:
            params["beta_rou"].append(float(ay[1]))

    return params


if __name__ == "__main__":
    # 创立一个运行环境,并退出策略
    engine = WtBtEngine(EngineType.ET_HFT)
    engine.init('.\\Common\\', "configbt.json")
    engine.configBacktest(202101040900,202101181500)
    engine.configBTStorage(mode="csv", path="./storage/")
    engine.commitBTConfig()

    active_sections = [
        {
            "start": 931,
            "end": 1457
        }
    ]

    stop_params = {
        "active":True,          # 是否启用止盈止损
        "stop_ticks": -25,      # 止损跳数,如果浮亏达到该跳数,则间接止损
        "track_threshold": 15,  # 追踪止盈阈值跳数,超过该阈值则触发追踪止盈
        "fallback_boundary": 2, # 追踪止盈回撤边界跳数,即浮盈跳数回撤到该边界值以下,立刻止盈
        "calc_price":0
    }

    params = read_params_from_csv('IF_10ticks_20201201_20201231.csv')
    straInfo = HftStraOrderImbalance(name='hft_IF',
                                     code="CFFEX.IF.HOT",
                                     count=6,
                                     lots=1,
                                     threshold=0.3,
                                     expsecs=5,
                                     offset=0,
                                     freq=0,
                                     active_secs=active_sections,
                                     stoppl=stop_params,
                                     **params)

    engine.set_hft_strategy(straInfo)

    engine.run_backtest()

    kw = input('press any key to exit\n')
    engine.release_backtest()

回测后果

咱们应用 2020 年 12 月的全副 tick 进行线性回归,失去的参数用于 2021 年 1 月回测失去的绩效如下:

date,closeprofit,positionprofit,dynbalance,fee
20210104,-11160.00,0.00,-20941.01,9781.01
20210105,-20100.00,0.00,-40712.85,20612.85
20210106,-60.00,0.00,-31828.36,31768.36
20210107,4140.00,0.00,-40344.73,44484.73
20210108,-11760.00,0.00,-66329.60,54569.60
20210111,-41280.00,0.00,-107444.80,66164.80
20210112,-66000.00,0.00,-142723.28,76723.28
20210113,-87240.00,0.00,-175926.18,88686.18
20210114,-106680.00,0.00,-202219.21,95539.21
20210115,-96840.00,0.00,-197721.78,100881.78
20210118,-110760.00,0.00,-219671.31,108911.31


从下面的绩效能够看出,该模型的体现倒是比较稳定,惋惜是稳固的亏钱 [手动狗头],切实是 难堪大用

绩效剖析

策略体现尽管难以入目,然而咱们还是要进行绩效剖析,看看有没有能够改良的点。WonderTrader针对 HFT 回测生成了 回合明细 closes.csv,能够看到每个回合的进场点和出场点,以及每个回合 潜在最大收益 潜在最大亏损。用户能够利用回合明细依据需要自行剖析每个回合进出场的点位是否正当,以及如何优化等问题。

code,direct,opentime,openprice,closetime,closeprice,qty,profit,maxprofit,maxloss,totalprofit,entertag,exittag
CFFEX.IF.HOT,SHORT,20210104093156400,5221,20210104093218400,5221,1,-0,480,-540,0,entershort,enterlong
CFFEX.IF.HOT,LONG,20210104093218400,5221,20210104093219900,5222,1,300,300,0,300,enterlong,entershort
CFFEX.IF.HOT,SHORT,20210104093219900,5222,20210104093226900,5223,1,-300,120,-480,0,entershort,enterlong
CFFEX.IF.HOT,LONG,20210104093226900,5223,20210104093301400,5216.8,1,-1860,240,-2040,-1860,enterlong,stoploss
CFFEX.IF.HOT,SHORT,20210104093317400,5210.8,20210104093319900,5211.2,1,-120,0,-480,-1980,entershort,enterlong
CFFEX.IF.HOT,LONG,20210104093320400,5210.6,20210104093347900,5211.4,1,240,540,-1080,-1740,enterlong,entershort
CFFEX.IF.HOT,SHORT,20210104093347900,5211.4,20210104093410900,5211,1,120,660,-480,-1620,entershort,enterlong
CFFEX.IF.HOT,LONG,20210104093410900,5211,20210104093424400,5203.4,1,-2280,0,-2460,-3900,enterlong,stoploss
CFFEX.IF.HOT,SHORT,20210104093432900,5201.2,20210104093446900,5207.2,1,-1800,120,-2040,-5700,entershort,stoploss

结束语

到此为止,一个残缺的 HFT 策略开发流程就走完了。尽管该模型仿佛曾经生效,然而笔者并没有深入分析以后 IF 的市场和原模型回测的工夫区间的 IF 的市场之间的差异,另外笔者也没有拓展到别的种类进行剖析。再者,笔者的次要目标是演示 HFT 策略的研发流程,所以对于模型方面不免有所疏漏。模型方面的做法,请各位读者稍作参考即可。

值得一提的是,从下面的源码中能够看到,WonderTrader针对 HFT 策略的交易接口简化成了 两个交易接口,目标就是为了简化策略开发的逻辑,让策略人研发人员将更多的精力集中在策略逻辑自身。而交易对应的开平逻辑,会在 C++ 外围通过配置文件 actionpolicy.json 进行管制,主动解决开平。另外,该策略应用 Python 开发,而 C++ 版本的雷同策略,回测工夫约为 Python 版本的十分之一左右,如果有读者想要利用 WonderTrader 上高频,在开发语言方面,还请各位读者认真斟酌。

笔者也会一直地欠缺 WonderTraderHFT策略方面的性能。也心愿各位读者能多多斧正 WonderTrader 的疏漏,帮忙 WonderTrader 欠缺起来,也能为更多的用户提供更好的基础设施服务。

最初再来一波广告

WonderTradergithub 地址:https://github.com/wondertrad…

WonderTrader官网地址:https://wondertrader.github.io

wtpygithub 地址:https://github.com/wondertrad…

正文完
 0