如何建立回测系统(三)

如何建立回测系统(三)

优化版本


public class MarketPlace { private Map<String, WriteLockMap<Long, OrderQueen>> sellOrderChannel; private Map<String, WriteLockMap<Long, OrderQueen>> buyOrderChannel; private Map<String, Long> lastComplete; public MarketPlace() { this.sellOrderChannel = new ConcurrentHashMap<>(); this.buyOrderChannel = new ConcurrentHashMap<>(); this.lastComplete = new ConcurrentHashMap<>(); } public void order(Order order) { OrderQueen orderQueen; WriteLockMap<Long, OrderQueen> longOrderQueenMap; String fixCacheString = order.getTradeCode().intern(); if (order.sell()) { longOrderQueenMap = buyOrderChannel.get(order.getTradeCode()); if (longOrderQueenMap == null) { synchronized (fixCacheString) { longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>()); buyOrderChannel.put(order.getTradeCode(), longOrderQueenMap); } } } else { longOrderQueenMap = sellOrderChannel.get(order.getTradeCode()); if (longOrderQueenMap == null) { synchronized (fixCacheString) { longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>()); sellOrderChannel.put(order.getTradeCode(), longOrderQueenMap); } } } orderQueen = longOrderQueenMap.get(order.getPrice()); if (orderQueen == null) { longOrderQueenMap.lock(); orderQueen = new OrderQueen(order.getPrice()); longOrderQueenMap.put(order.getPrice(), orderQueen); longOrderQueenMap.unlock(); } Long price = orderQueen.try2ClinchOrder(order); if (price != null) { this.lastComplete.put(order.getTradeCode(), price); } if (!order.isComplete()) { if (order.sell()) { WriteLockMap<Long, OrderQueen> tm = this.sellOrderChannel .get(order.getTradeCode()); if (tm == null) { synchronized (fixCacheString) { WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>( new ConcurrentHashMap<>()); OrderQueen newOne = new OrderQueen(order.getPrice()); newOne.addOrder(order); writeLockMap.put(order.getPrice(), newOne); this.sellOrderChannel.put(order.getTradeCode(), writeLockMap); } } else { OrderQueen orderQueen1 = tm.get(order.getPrice()); if (orderQueen1 == null) { tm.lock(); orderQueen1 = new OrderQueen(order.getPrice()); orderQueen1.addOrder(order); tm.put(order.getPrice(), orderQueen1); tm.unlock(); } } } else { WriteLockMap<Long, OrderQueen> tm = this.buyOrderChannel .get(order.getTradeCode()); if (tm == null) { synchronized (fixCacheString) { WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>( new ConcurrentHashMap<>()); OrderQueen newOne = new OrderQueen(order.getPrice()); newOne.addOrder(order); writeLockMap.put(order.getPrice(), newOne); this.buyOrderChannel.put(order.getTradeCode(), writeLockMap); } } else { OrderQueen orderQueen1 = tm.get(order.getPrice()); if (orderQueen1 == null) { tm.lock(); orderQueen1 = new OrderQueen(order.getPrice()); orderQueen1.addOrder(order); tm.put(order.getPrice(), orderQueen1); tm.unlock(); } else { orderQueen1.addOrder(order); } } } } } public Long getLastPrice(String security) { return this.lastComplete.get(security); } }

public class OrderQueen { //最近完成的订单位置 private int lastCompleteIndex; private int lastInsertIndex; private int lastInsertCacheIndex; //归属价格级别 private long priceLevel; //订单队列 private Order[] orders; private Order[][] finishCache; public OrderQueen(long priceLevel) { this.priceLevel = priceLevel; orders = new Order[5000]; finishCache = new Order[500][]; lastCompleteIndex = -1; lastInsertIndex = 0; } private boolean needExpendOrderList() { return orders.length <= lastInsertIndex; } private void expand() { Order[] orders = new Order[5000]; this.finishCache[lastInsertCacheIndex] = this.orders; lastInsertCacheIndex++; this.orders = orders; lastInsertIndex = 0; lastCompleteIndex = -1; } public long getPriceLevel() { return priceLevel; } public synchronized void addOrder(Order order) { if (needExpendOrderList()) { expand(); } this.orders[lastInsertIndex] = order; lastInsertIndex++; } public synchronized Long try2ClinchOrder(Order order2Clinch) { Long clinchPrice = null; for (int index = lastCompleteIndex + 1; index < lastInsertIndex; index++) { Order order = this.orders[index]; long count; try { count = order2Clinch.getRemainCount() - order.getRemainCount(); } catch (NullPointerException e) { throw new NullPointerException(); } if (count < 0) { long completeCount = order.getRemainCount() - Math.abs(count); order2Clinch.addCompleteCount(completeCount); order2Clinch.complete(); order.addCompleteCount(completeCount); clinchPrice = order.getPrice(); break; } else if (count == 0) { order2Clinch.addCompleteCount(order.getRemainCount()); order2Clinch.complete(); order.addCompleteCount(order.getRemainCount()); order.complete(); clinchPrice = order.getPrice(); lastCompleteIndex++; break; } else { order2Clinch.addCompleteCount(order.getRemainCount()); order.addCompleteCount(order.getRemainCount()); order.complete(); clinchPrice = order.getPrice(); lastCompleteIndex++; } } return clinchPrice; } }

性能分析

  1. 减少每次对全部订单进行筛选排序,分出买卖单
  2. 针对国内涨跌幅限制,对价格进行分级,每个价格中订单按照时间排序

测试发现,50W的数据从原来的80000ms降低到100ms,效果显著,多线程测试亦无问题