扩展事件源

了解事件,首先要从 RQAlpha 的事件驱动说起。

RQAlpha 大部分的组件是以 add_listener 的方式进行事件的注册。举例来说:

  • 当Bar数据生成,则会触发 EVENT.BAR 事件,那么用户的 handle_bar 相关的代码注册了该事件则会立即执行。
  • 当订单成交,则会触发 EVENT.TRADE 事件,那么系统的账户模块因为注册了该事件,就可以立即计算成交以后的收益和资金变化。
  • 当订单下单,则会触发 EVENT.ORDER_PENDING_NEW 事件,前端风控模块注册了该事件,则可以立即对该订单进行审核,如果不满足风控要求,则直接指定执行 order._cancel(some_reason) 来保证有问题的订单不会进入实际下单环节。程序化交易中很多需求,都可以通过注册事件的方式无缝插入到 RQAlpha 中进行扩展。

事件源分类

  • SystemEvent: 系统事件源
    • POST_SYSTEM_INIT: 系统初始化后触发
    • POST_USER_INIT: 策略的 init 函数执行后触发
    • POST_SYSTEM_RESTORED: 在实盘时,你可能需要在此事件后根据其他信息源对系统状态进行调整
  • MarketEvent: 市场及数据事件源
    • POST_UNIVERSE_CHANGED: 策略证券池发生变化后触发
    • PRE_BEFORE_TRADING: 执行 before_trading 函数前触发
    • BEFORE_TRADING: 该事件会触发策略的 before_trading 函数
    • POST_BEFORE_TRADING: 执行 before_trading 函数后触发
    • PRE_BAR: 执行 handle_bar 函数前触发
    • BAR: 该事件会触发策略的 handle_bar 函数
    • POST_BAR: 执行 handle_bar 函数后触发
    • PRE_TICK: 执行 handle_tick 前触发
    • TICK: 该事件会触发策略的 handle_tick 函数
    • POST_TICK: 执行 handle_tick 后触发
    • PRE_SCHEDULED: 在 scheduler 执行前触发
    • POST_SCHEDULED: 在 scheduler 执行后触发
    • PRE_AFTER_TRADING: 执行 after_trading 函数前触发
    • AFTER_TRADING: 该事件会触发策略的 after_trading 函数
    • POST_AFTER_TRADING: 执行 after_trading 函数后触发
    • PRE_SETTLEMENT: 结算前触发该事件
    • SETTLEMENT: 触发结算事件
    • POST_SETTLEMENT: 结算后触发该事件
  • OrderEvent: 交易事件源
    • ORDER_PENDING_NEW: 创建订单
    • ORDER_CREATION_PASS: 创建订单成功
    • ORDER_CREATION_REJECT: 创建订单失败
    • ORDER_PENDING_CANCEL: 创建撤单
    • ORDER_CANCELLATION_PASS: 撤销订单成功
    • ORDER_CANCELLATION_REJECT: 撤销订单失败
    • ORDER_UNSOLICITED_UPDATE: 订单状态更新
    • TRADE: 成交

事件源的订阅及使用

我们可以订阅需要的事件源,从而在该事件发生时实现指定需求。

下面以最简单的 Mod - ProgressMod 为例,介绍事件源的订阅和使用。

ProgressMod 需要实现的需求非常的简单:在命令行输出目前回测的进度条。https://raw.githubusercontent.com/ricequant/rq-resource/master/rqalpha/progress_bar.png首先定义一个 ProgressMod 类,继承与接口类 AbstractMod

  1. from rqalpha.interface import AbstractMod
  2.  
  3. class ProgressMod(AbstractMod):
  4.  
  5. def __init__(self):
  6. pass
  7.  
  8. def start_up(self, env, mod_config):
  9. """
  10. RQAlpha 在系统启动时会调用此接口;在此接口中,可以通过调用 ``env`` 的相应方法来覆盖系统默认组件。
  11.  
  12. :param env: 系统环境
  13. :type env: :class:`~Environment`
  14. :param mod_config: 模块配置参数
  15. """
  16. pass
  17.  
  18. def tear_down(self, success, exception=None):
  19. """
  20. RQAlpha 在系统退出前会调用此接口。
  21.  
  22. :param code: 退出代码
  23. :type code: rqalpha.const.EXIT_CODE
  24. :param exception: 如果在策略执行过程中出现错误,此对象为相应的异常对象
  25. """
  26. pass

我们将需求进行分拆:

  • 在回测开始时初始化进度条
  • 在回测每日交易结束后更新进度条
  • 在回测结束后,终止进度条为了实现以上需求,我们需要注册两个事件:

  • EVENT.POST_SYSTEM_INIT 系统初始化后

  • EVENT.POST_AFTER_TRADING 交易结束后进度条相关 我们使用 click 库来实现,具体 API 这里不详细展开。

接下来,我们在 start_up 函数中进行事件注册,并定义 _init_tick 函数来响应事件。

  1. from rqalpha.interface import AbstractMod
  2.  
  3. class ProgressMod(AbstractMod):
  4.  
  5. def __init__(self):
  6. self._env = None
  7.  
  8. def start_up(self, env, mod_config):
  9. self._env = env
  10. env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
  11. env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)
  12.  
  13. def tear_down(self, success, exception=None):
  14. pass
  15.  
  16. def _init(self, event):
  17. pass
  18.  
  19. def _tick(self, event):
  20. pass

_init 函数中,初始化 progressBar,进度条的长度为回测的总时长

  1. def _init(self):
  2. trading_length = len(self._env.config.base.trading_calendar)
  3. self.progress_bar = click.progressbar(length=trading_length, show_eta=False)

_tick 函数中,更新进度条

  1. def _tick(self, event):
  2. self.progress_bar.update(1)

tear_down 函数中,终止进度条

  1. def tear_down(self, success, exception=None):
  2. self.progress_bar.render_finish()

至此,我们就完成了整个 ProgressMod 的编写

  1. import click
  2.  
  3. from rqalpha.interface import AbstractMod
  4. from rqalpha.events import EVENT
  5.  
  6.  
  7. class ProgressMod(AbstractMod):
  8. def __init__(self):
  9. self._env = None
  10. self.progress_bar = None
  11.  
  12. def start_up(self, env, mod_config):
  13. self._env = env
  14. env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
  15. env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)
  16.  
  17. def _init(self, event):
  18. trading_length = len(self._env.config.base.trading_calendar)
  19. self.progress_bar = click.progressbar(length=trading_length, show_eta=False)
  20.  
  21. def _tick(self, event):
  22. self.progress_bar.update(1)
  23.  
  24. def tear_down(self, success, exception=None):
  25. self.progress_bar.render_finish()

最后,我们添加默认的载入函数 load_mod,一个完整的进度条的Mod就完成了

  1. import click
  2.  
  3. from rqalpha.interface import AbstractMod
  4. from rqalpha.events import EVENT
  5.  
  6.  
  7. class ProgressMod(AbstractMod):
  8. def __init__(self):
  9. self._env = None
  10. self.progress_bar = None
  11.  
  12. def start_up(self, env, mod_config):
  13. self._env = env
  14. env.event_bus.add_listener(EVENT.POST_AFTER_TRADING, self._tick)
  15. env.event_bus.add_listener(EVENT.POST_SYSTEM_INIT, self._init)
  16.  
  17. def _init(self, event):
  18. trading_length = len(self._env.config.base.trading_calendar)
  19. self.progress_bar = click.progressbar(length=trading_length, show_eta=False)
  20.  
  21. def _tick(self, event):
  22. self.progress_bar.update(1)
  23.  
  24. def tear_down(self, success, exception=None):
  25. self.progress_bar.render_finish()
  26.  
  27.  
  28. def load_mod():
  29. return ProgressMod()

事件源的扩展

上一节讲的是如何订阅事件源,那么如何发布事件呢?其实也很简单,只需要通过 publish_event 就可以进行事件的发布。

RQAlpha 整个回测模块是通过 rqalpha_mod_sys_simulation 实现的,其中定义了基于Bar回测的 event_sourcesimulation_broker, 其中包含了 MarketEvent 和 OrderEvent 大部分事件源的定义和发布。

我们简单来分析一下日线回测 simulation_event_source 中 MaketEvent 相关事件的触发流程。

  1. class SimulationEventSource(AbstractEventSource):
  2.  
  3. ...
  4.  
  5. def events(self, start_date, end_date, frequency):
  6. # 根据起始日期和结束日期,获取所有的交易日,然后再循环获取每一个交易日
  7. for day in self._env.data_proxy.get_trading_dates(start_date, end_date):
  8. date = day.to_pydatetime()
  9. dt_before_trading = date.replace(hour=0, minute=0)
  10. dt_bar = date.replace(hour=15, minute=0)
  11. dt_after_trading = date.replace(hour=15, minute=30)
  12. dt_settlement = date.replace(hour=17, minute=0)
  13.  
  14. yield Event(EVENT.BEFORE_TRADING, calendar_dt=dt_before_trading, trading_dt=dt_before_trading)
  15. yield Event(EVENT.BAR, calendar_dt=dt_bar, trading_dt=dt_bar)
  16.  
  17. yield Event(EVENT.AFTER_TRADING, calendar_dt=dt_after_trading, trading_dt=dt_after_trading)
  18. yield Event(EVENT.SETTLEMENT, calendar_dt=dt_settlement, trading_dt=dt_settlement)

event 函数是一个generator, 在 rqalphamod_sys_simulation 中主要返回 BEFORE_TRADING, BAR, AFTER_TRADINGSETTLEMENT 事件。RQAlpha 在接受到对应的事件后,会自动的进行相应的 _publish_event 操作,并且会自动 publish 相关的 PRE_ 和 _POST 事件。

而在 simulationbroker 中可以看到,当被调用 _cancel_order 时,会模拟撤单的执行流程,分别触发 ORDER_PENDING_CANCEL && ORDER_CANCELLATION_PASS 事件,并将 accountorder 传递给回调函数,使其可以获取其可能需要到的数据。

  1. class SimulationBroker(AbstractBroker, Persistable):
  2.  
  3. def cancel_order(self, order):
  4. account = self._get_account_for(order.order_book_id)
  5.  
  6. self._env.event_bus.publish_event(Event(EVENT.ORDER_PENDING_CANCEL, account=account, order=order))
  7.  
  8. order._mark_cancelled(_("{order_id} order has been cancelled by user.").format(order_id=order.order_id))
  9.  
  10. self._env.event_bus.publish_event(Event(EVENT.ORDER_CANCELLATION_PASS, account=account, order=order))
  11.  
  12. # account.on_order_cancellation_pass(order)
  13. try:
  14. self._open_orders.remove((account, order))
  15. except ValueError:
  16. try:
  17. self._delayed_orders.remove((account, order))
  18. except ValueError:
  19. pass

如果想查看详细的事件源相关的内容,建议直接阅读 rqalpha_mod_sys_simulation 源码,您会发现,扩展事件源比想象中要简单。

您也可以基于 rqalpha_mod_sys_simulation 扩展一个自定义的回测引擎,实现您特定的回测需求。