设计行情系统


行情系统用来生成公开市场的历史数据,主要是K线图。

K线图的数据来源是交易引擎成交产生的一个个Tick。一个K线包括OHLC这4个价格数据。在一个时间段内,第一个Tick的价格是Open,最后一个Tick的价格是Close,最高的价格是High,最低的价格是Low:

  1. High ──▶│
  2. ┌─┴─┐◀── Close
  3. Open ──▶└─┬─┘
  4. Low ──▶│

给定一组Tick集合,就可以汇总成一个K线,对应一个Bar结构:

  1. public class AbstractBarEntity {
  2. public long startTime; // 开始时间
  3. public BigDecimal openPrice; // 开始价格
  4. public BigDecimal highPrice; // 最高价格
  5. public BigDecimal lowPrice; // 最低价格
  6. public BigDecimal closePrice; // 结束价格
  7. public BigDecimal quantity; // 成交数量
  8. }

通常我们需要按1秒、1分钟、1小时和1天来生成不同类型的K线,因此,行情系统的功能就是不断从消息系统中读取Tick,合并,然后输出不同类型的K线。

此外,API系统还需要提供查询公开市场信息的功能。对于最近的成交信息和K线图,可以缓存在Redis中,对于较早时期的K线图,可以通过数据库查询。因此,行情系统需要将生成的K线保存到数据库中,同时负责不断更新Redis的缓存。

对于最新成交信息,我们在Redis中用一个List表示,它的每一个元素是一个序列号后的JSON:

  1. ["{...}", "{...}", "{...}"...]

如果有新的Tick产生,就需要把它们追加到列表尾部,同时将最早的Tick删除,以便维护一个最近成交的列表。

直接读取Redis列表,操作后再写回Redis是可以的,但比较麻烦。这里我们直接用Lua脚本更新最新Tick列表。Redis支持将一个Lua脚本加载后,直接在Redis内部执行脚本:

  1. local KEY_LAST_SEQ = '_TickSeq_' -- 上次更新的SequenceID
  2. local LIST_RECENT_TICKS = KEYS[1] -- 最新TicksKey
  3. local seqId = ARGV[1] -- 输入的SequenceID
  4. local jsonData = ARGV[2] -- 输入的JSON字符串表示的tick数组:"["{...}","{...}",...]"
  5. local strData = ARGV[3] -- 输入的JSON字符串表示的tick数组:"[{...},{...},...]"
  6. -- 获取上次更新的sequenceId:
  7. local lastSeqId = redis.call('GET', KEY_LAST_SEQ)
  8. local ticks, len;
  9. if not lastSeqId or tonumber(seqId) > tonumber(lastSeqId) then
  10. -- 广播:
  11. redis.call('PUBLISH', 'notification', '{"type":"tick","sequenceId":' .. seqId .. ',"data":' .. jsonData .. '}')
  12. -- 保存当前sequence id:
  13. redis.call('SET', KEY_LAST_SEQ, seqId)
  14. -- 更新最新tick列表:
  15. ticks = cjson.decode(strData)
  16. len = redis.call('RPUSH', LIST_RECENT_TICKS, unpack(ticks))
  17. if len > 100 then
  18. -- 裁剪LIST以保存最新的100Tick:
  19. redis.call('LTRIM', LIST_RECENT_TICKS, len-100, len-1)
  20. end
  21. return true
  22. end
  23. -- 无更新返回false
  24. return false

在API中,要获取最新成交信息,我们直接从Redis缓存取出列表,然后拼接成一个JSON字符串:

  1. @ResponseBody
  2. @GetMapping(value = "/ticks", produces = "application/json")
  3. public String getRecentTicks() {
  4. List<String> data = redisService.lrange(RedisCache.Key.RECENT_TICKS, 0, -1);
  5. if (data == null || data.isEmpty()) {
  6. return "[]";
  7. }
  8. StringJoiner sj = new StringJoiner(",", "[", "]");
  9. for (String t : data) {
  10. sj.add(t);
  11. }
  12. return sj.toString();
  13. }

用Lua脚本更新Redis缓存还有一个好处,就是Lua脚本执行的时候,不但可以更新List,还可以通过Publish命令广播事件,后续我们编写基于WebSocket的推送服务器时,直接监听Redis广播,就可以主动向浏览器推送Tick更新的事件。

类似的,针对每一种K线,我们都在Redis中用ZScoredSet存储,用K线的开始时间戳作为Score。更新K线时,从每种ZScoredSet中找出Score最大的Bar结构,就是最后一个Bar,然后尝试更新。如果可以持久化这个Bar就返回,如果可以合并这个Bar就刷新ZScoreSet,用Lua脚本实现如下:

  1. local function merge(existBar, newBar)
  2. existBar[3] = math.max(existBar[3], newBar[3]) -- 更新High Price
  3. existBar[4] = math.min(existBar[4], newBar[4]) -- 更新Low Price
  4. existBar[5] = newBar[5] -- close
  5. existBar[6] = existBar[6] + newBar[6] -- 更新quantity
  6. end
  7. local function tryMergeLast(barType, seqId, zsetBars, timestamp, newBar)
  8. local topic = 'notification'
  9. local popedScore, popedBar
  10. -- 查找最后一个Bar:
  11. local poped = redis.call('ZPOPMAX', zsetBars)
  12. if #poped == 0 then
  13. -- ZScoredSet无任何bar, 直接添加:
  14. redis.call('ZADD', zsetBars, timestamp, cjson.encode(newBar))
  15. redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}')
  16. else
  17. popedBar = cjson.decode(poped[1])
  18. popedScore = tonumber(poped[2])
  19. if popedScore == timestamp then
  20. -- 合并Bar并发送通知:
  21. merge(popedBar, newBar)
  22. redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar))
  23. redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(popedBar) .. '}')
  24. else
  25. -- 可持久化最后一个Bar,生成新的Bar:
  26. if popedScore < timestamp then
  27. redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar), timestamp, cjson.encode(newBar))
  28. redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}')
  29. return popedBar
  30. end
  31. end
  32. end
  33. return nil
  34. end
  35. local seqId = ARGV[1]
  36. local KEY_BAR_SEQ = '_BarSeq_'
  37. local zsetBars, topics, barTypeStartTimes
  38. local openPrice, highPrice, lowPrice, closePrice, quantity
  39. local persistBars = {}
  40. -- 检查sequence:
  41. local seq = redis.call('GET', KEY_BAR_SEQ)
  42. if not seq or tonumber(seqId) > tonumber(seq) then
  43. zsetBars = { KEYS[1], KEYS[2], KEYS[3], KEYS[4] }
  44. barTypeStartTimes = { tonumber(ARGV[2]), tonumber(ARGV[3]), tonumber(ARGV[4]), tonumber(ARGV[5]) }
  45. openPrice = tonumber(ARGV[6])
  46. highPrice = tonumber(ARGV[7])
  47. lowPrice = tonumber(ARGV[8])
  48. closePrice = tonumber(ARGV[9])
  49. quantity = tonumber(ARGV[10])
  50. local i, bar
  51. local names = { 'SEC', 'MIN', 'HOUR', 'DAY' }
  52. -- 检查是否可以merge:
  53. for i = 1, 4 do
  54. bar = tryMergeLast(names[i], seqId, zsetBars[i], barTypeStartTimes[i], { barTypeStartTimes[i], openPrice, highPrice, lowPrice, closePrice, quantity })
  55. if bar then
  56. persistBars[names[i]] = bar
  57. end
  58. end
  59. redis.call('SET', KEY_BAR_SEQ, seqId)
  60. return cjson.encode(persistBars)
  61. end
  62. redis.log(redis.LOG_WARNING, 'sequence ignored: exist seq => ' .. seq .. ' >= ' .. seqId .. ' <= new seq')
  63. return '{}'

接下来我们编写QuotationService,初始化的时候加载Redis脚本,接收到Tick消息时调用脚本更新Tick和Bar,然后持久化Tick和Bar,代码如下:

  1. @Component
  2. public class QuotationService {
  3. @Autowired
  4. RedisService redisService;
  5. @Autowired
  6. MessagingFactory messagingFactory;
  7. MessageConsumer tickConsumer;
  8. private String shaUpdateRecentTicksLua = null;
  9. private String shaUpdateBarLua = null;
  10. @PostConstruct
  11. public void init() throws Exception {
  12. // 加载Redis脚本:
  13. this.shaUpdateRecentTicksLua = this.redisService.loadScriptFromClassPath("/redis/update-recent-ticks.lua");
  14. this.shaUpdateBarLua = this.redisService.loadScriptFromClassPath("/redis/update-bar.lua");
  15. // 接收Tick消息:
  16. String groupId = Messaging.Topic.TICK.name() + "_" + IpUtil.getHostId();
  17. this.tickConsumer = messagingFactory.createBatchMessageListener(Messaging.Topic.TICK, groupId,
  18. this::processMessages);
  19. }
  20. // 处理接收的消息:
  21. public void processMessages(List<AbstractMessage> messages) {
  22. for (AbstractMessage message : messages) {
  23. processMessage((TickMessage) message);
  24. }
  25. }
  26. // 处理一个Tick消息:
  27. void processMessage(TickMessage message) {
  28. // 对一个Tick消息中的多个Tick先进行合并:
  29. final long createdAt = message.createdAt;
  30. StringJoiner ticksStrJoiner = new StringJoiner(",", "[", "]");
  31. StringJoiner ticksJoiner = new StringJoiner(",", "[", "]");
  32. BigDecimal openPrice = BigDecimal.ZERO;
  33. BigDecimal closePrice = BigDecimal.ZERO;
  34. BigDecimal highPrice = BigDecimal.ZERO;
  35. BigDecimal lowPrice = BigDecimal.ZERO;
  36. BigDecimal quantity = BigDecimal.ZERO;
  37. for (TickEntity tick : message.ticks) {
  38. String json = tick.toJson();
  39. ticksStrJoiner.add("\"" + json + "\"");
  40. ticksJoiner.add(json);
  41. if (openPrice.signum() == 0) {
  42. openPrice = tick.price;
  43. closePrice = tick.price;
  44. highPrice = tick.price;
  45. lowPrice = tick.price;
  46. } else {
  47. // open price is set:
  48. closePrice = tick.price;
  49. highPrice = highPrice.max(tick.price);
  50. lowPrice = lowPrice.min(tick.price);
  51. }
  52. quantity = quantity.add(tick.quantity);
  53. }
  54. // 计算应该合并的每种类型的Bar的开始时间:
  55. long sec = createdAt / 1000;
  56. long min = sec / 60;
  57. long hour = min / 60;
  58. long secStartTime = sec * 1000;
  59. long minStartTime = min * 60 * 1000;
  60. long hourStartTime = hour * 3600 * 1000;
  61. long dayStartTime = Instant.ofEpochMilli(hourStartTime).atZone(zoneId).withHour(0).toEpochSecond() * 1000;
  62. // 更新Tick缓存:
  63. String ticksData = ticksJoiner.toString();
  64. Boolean tickOk = redisService.executeScriptReturnBoolean(this.shaUpdateRecentTicksLua,
  65. new String[] { RedisCache.Key.RECENT_TICKS },
  66. new String[] { String.valueOf(this.sequenceId), ticksData, ticksStrJoiner.toString() });
  67. if (!tickOk.booleanValue()) {
  68. logger.warn("ticks are ignored by Redis.");
  69. return;
  70. }
  71. // 保存Tick至数据库:
  72. saveTicks(message.ticks);
  73. // 更新Redis缓存的各种类型的Bar:
  74. String strCreatedBars = redisService.executeScriptReturnString(this.shaUpdateBarLua,
  75. new String[] { RedisCache.Key.SEC_BARS, RedisCache.Key.MIN_BARS, RedisCache.Key.HOUR_BARS,
  76. RedisCache.Key.DAY_BARS },
  77. new String[] { // ARGV
  78. String.valueOf(this.sequenceId), // sequence id
  79. String.valueOf(secStartTime), // sec-start-time
  80. String.valueOf(minStartTime), // min-start-time
  81. String.valueOf(hourStartTime), // hour-start-time
  82. String.valueOf(dayStartTime), // day-start-time
  83. String.valueOf(openPrice), // open
  84. String.valueOf(highPrice), // high
  85. String.valueOf(lowPrice), // low
  86. String.valueOf(closePrice), // close
  87. String.valueOf(quantity) // quantity
  88. });
  89. Map<BarType, BigDecimal[]> barMap = JsonUtil.readJson(strCreatedBars, TYPE_BARS);
  90. if (!barMap.isEmpty()) {
  91. // 保存Bar:
  92. SecBarEntity secBar = createBar(SecBarEntity::new, barMap.get(BarType.SEC));
  93. MinBarEntity minBar = createBar(MinBarEntity::new, barMap.get(BarType.MIN));
  94. HourBarEntity hourBar = createBar(HourBarEntity::new, barMap.get(BarType.HOUR));
  95. DayBarEntity dayBar = createBar(DayBarEntity::new, barMap.get(BarType.DAY));
  96. saveBars(secBar, minBar, hourBar, dayBar);
  97. }
  98. }
  99. }

K线是一组Bar按ZSet缓存在Redis中,Score就是Bar的开始时间。更新Bar时,同时广播通知,以便后续推送。要查询某种K线图,在API中,需要传入开始和结束的时间戳,通过ZRANGE命令返回排序后的List:

  1. String getBars(String key, long start, long end) {
  2. List<String> data = redisService.zrangebyscore(key, start, end);
  3. if (data == null || data.isEmpty()) {
  4. return "[]";
  5. }
  6. StringJoiner sj = new StringJoiner(",", "[", "]");
  7. for (String t : data) {
  8. sj.add(t);
  9. }
  10. return sj.toString();
  11. }

参考源码

可以从GitHubGitee下载源码。

GitHubmichaelliaowarpexchange/

▸ build)

▸ sql)

▤ schema.sql)

▤ docker-compose.yml)

▤ pom.xml)

▸ common)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ bean)

▤ AuthToken.java)

▤ OrderBookBean.java)

▤ OrderBookItemBean.java)

▤ OrderRequestBean.java)

▤ SimpleMatchDetailRecord.java)

▤ TransferRequestBean.java)

▤ ValidatableBean.java)

▸ client)

▤ RestClient.java)

▸ config)

▤ ExchangeConfiguration.java)

▸ ctx)

▤ UserContext.java)

▸ db)

▤ AccessibleProperty.java)

▤ Criteria.java)

▤ CriteriaQuery.java)

▤ DbTemplate.java)

▤ From.java)

▤ Limit.java)

▤ Mapper.java)

▤ OrderBy.java)

▤ Select.java)

▤ Where.java)

▸ enums)

▤ AssetEnum.java)

▤ BarType.java)

▤ ClearingType.java)

▤ Direction.java)

▤ MatchType.java)

▤ OrderStatus.java)

▤ UserType.java)

▸ message)

▸ event)

▤ AbstractEvent.java)

▤ OrderCancelEvent.java)

▤ OrderRequestEvent.java)

▤ TransferEvent.java)

▤ AbstractMessage.java)

▤ ApiResultMessage.java)

▤ NotificationMessage.java)

▤ TickMessage.java)

▸ messaging)

▤ BatchMessageHandler.java)

▤ MessageConsumer.java)

▤ MessageProducer.java)

▤ MessageTypes.java)

▤ Messaging.java)

▤ MessagingConfiguration.java)

▤ MessagingFactory.java)

▸ model)

▸ quotation)

▤ DayBarEntity.java)

▤ HourBarEntity.java)

▤ MinBarEntity.java)

▤ SecBarEntity.java)

▤ TickEntity.java)

▸ support)

▤ AbstractBarEntity.java)

▤ EntitySupport.java)

▸ trade)

▤ ClearingEntity.java)

▤ EventEntity.java)

▤ MatchDetailEntity.java)

▤ OrderEntity.java)

▤ TransferLogEntity.java)

▤ UniqueEventEntity.java)

▸ ui)

▤ ApiKeyAuthEntity.java)

▤ PasswordAuthEntity.java)

▤ UserEntity.java)

▤ UserProfileEntity.java)

▸ redis)

▤ RedisCache.java)

▤ RedisConfiguration.java)

▤ RedisService.java)

▤ SyncCommandCallback.java)

▸ support)

▤ AbstractApiController.java)

▤ AbstractDbService.java)

▤ AbstractFilter.java)

▤ LoggerSupport.java)

▸ user)

▤ UserService.java)

▸ util)

▤ ByteUtil.java)

▤ ClassPathUtil.java)

▤ HashUtil.java)

▤ IdUtil.java)

▤ IpUtil.java)

▤ JsonUtil.java)

▤ RandomUtil.java)

▤ ApiError.java)

▤ ApiErrorResponse.java)

▤ ApiException.java)

▸ resources)

▸ redis)

▤ update-bar.lua)

▤ update-orderbook.lua)

▤ update-recent-ticks.lua)

▤ logback-spring.xml)

▤ pom.xml)

▸ config)

▸ src/main)

▸ java/com/itranswarp/exchange/config)

▤ ConfigApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ config-repo)

▤ application-default.yml)

▤ application-test.yml)

▤ application.yml)

▤ push.yml)

▤ quotation.yml)

▤ trading-api.yml)

▤ trading-engine.yml)

▤ trading-sequencer.yml)

▤ ui-default.yml)

▤ ui.yml)

▸ parent)

▤ pom.xml)

▸ push)

▸ src/main)

▸ java/com/itranswarp/exchange/push)

▤ PushApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ quotation)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ quotation)

▤ QuotationDbService.java)

▤ QuotationService.java)

▤ QuotationApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ trading-api)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ service)

▤ HistoryService.java)

▤ SendEventService.java)

▤ TradingEngineApiProxyService.java)

▸ web)

▸ api)

▤ TradingApiController.java)

▤ TradingInternalApiController.java)

▤ ApiFilterRegistrationBean.java)

▤ TradingApiApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ trading-engine)

▸ src)

▸ main)

▸ java/com/itranswarp/exchange)

▸ assets)

▤ Asset.java)

▤ AssetService.java)

▤ Transfer.java)

▸ clearing)

▤ ClearingService.java)

▸ match)

▤ MatchDetailRecord.java)

▤ MatchEngine.java)

▤ MatchResult.java)

▤ OrderBook.java)

▤ OrderKey.java)

▸ order)

▤ OrderService.java)

▸ store)

▤ StoreService.java)

▸ web/api)

▤ InternalTradingEngineApiController.java)

▤ TradingEngineApplication.java)

▤ TradingEngineService.java)

▸ resources)

▤ application.yml)

▸ test/java/com/itranswarp/exchange)

▸ assets)

▤ AssetServiceTest.java)

▸ match)

▤ MatchEngineTest.java)

▤ TradingEngineServiceTest.java)

▤ pom.xml)

▸ trading-sequencer)

▸ src/main)

▸ java/com/itranswarp/exchange)

▸ sequencer)

▤ SequenceHandler.java)

▤ SequenceService.java)

▤ TradingSequencerApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▸ ui)

▸ src/main)

▸ java/com/itranswarp/exchange)

▤ UIApplication.java)

▸ resources)

▤ application.yml)

▤ pom.xml)

▤ .gitignore)

▤ LICENSE)

▤ README.md)

小结

行情系统是典型的少量写、大量读的模式,非常适合缓存。通过编写Lua脚本可使得更新Redis更加简单。

读后有收获可以支付宝请作者喝咖啡:

设计行情系统 - 图1