如何建立回测系统(三)
优化版本
public class MarketPlace {
private Map<String, WriteLockMap<Long, OrderQueen>> sellOrderChannel;
private Map<String, WriteLockMap<Long, OrderQueen>> buyOrderChannel;
private Map<String, Long> lastComplete;
public MarketPlace() {
this.sellOrderChannel = new ConcurrentHashMap<>();
this.buyOrderChannel = new ConcurrentHashMap<>();
this.lastComplete = new ConcurrentHashMap<>();
}
public void order(Order order) {
OrderQueen orderQueen;
WriteLockMap<Long, OrderQueen> longOrderQueenMap;
String fixCacheString = order.getTradeCode().intern();
if (order.sell()) {
longOrderQueenMap = buyOrderChannel.get(order.getTradeCode());
if (longOrderQueenMap == null) {
synchronized (fixCacheString) {
longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>());
buyOrderChannel.put(order.getTradeCode(), longOrderQueenMap);
}
}
} else {
longOrderQueenMap = sellOrderChannel.get(order.getTradeCode());
if (longOrderQueenMap == null) {
synchronized (fixCacheString) {
longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>());
sellOrderChannel.put(order.getTradeCode(), longOrderQueenMap);
}
}
}
orderQueen = longOrderQueenMap.get(order.getPrice());
if (orderQueen == null) {
longOrderQueenMap.lock();
orderQueen = new OrderQueen(order.getPrice());
longOrderQueenMap.put(order.getPrice(), orderQueen);
longOrderQueenMap.unlock();
}
Long price = orderQueen.try2ClinchOrder(order);
if (price != null) {
this.lastComplete.put(order.getTradeCode(), price);
}
if (!order.isComplete()) {
if (order.sell()) {
WriteLockMap<Long, OrderQueen> tm = this.sellOrderChannel
.get(order.getTradeCode());
if (tm == null) {
synchronized (fixCacheString) {
WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>(
new ConcurrentHashMap<>());
OrderQueen newOne = new OrderQueen(order.getPrice());
newOne.addOrder(order);
writeLockMap.put(order.getPrice(), newOne);
this.sellOrderChannel.put(order.getTradeCode(), writeLockMap);
}
} else {
OrderQueen orderQueen1 = tm.get(order.getPrice());
if (orderQueen1 == null) {
tm.lock();
orderQueen1 = new OrderQueen(order.getPrice());
orderQueen1.addOrder(order);
tm.put(order.getPrice(), orderQueen1);
tm.unlock();
}
}
} else {
WriteLockMap<Long, OrderQueen> tm = this.buyOrderChannel
.get(order.getTradeCode());
if (tm == null) {
synchronized (fixCacheString) {
WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>(
new ConcurrentHashMap<>());
OrderQueen newOne = new OrderQueen(order.getPrice());
newOne.addOrder(order);
writeLockMap.put(order.getPrice(), newOne);
this.buyOrderChannel.put(order.getTradeCode(), writeLockMap);
}
} else {
OrderQueen orderQueen1 = tm.get(order.getPrice());
if (orderQueen1 == null) {
tm.lock();
orderQueen1 = new OrderQueen(order.getPrice());
orderQueen1.addOrder(order);
tm.put(order.getPrice(), orderQueen1);
tm.unlock();
} else {
orderQueen1.addOrder(order);
}
}
}
}
}
public Long getLastPrice(String security) {
return this.lastComplete.get(security);
}
}
public class OrderQueen {
//最近完成的订单位置
private int lastCompleteIndex;
private int lastInsertIndex;
private int lastInsertCacheIndex;
//归属价格级别
private long priceLevel;
//订单队列
private Order[] orders;
private Order[][] finishCache;
public OrderQueen(long priceLevel) {
this.priceLevel = priceLevel;
orders = new Order[5000];
finishCache = new Order[500][];
lastCompleteIndex = -1;
lastInsertIndex = 0;
}
private boolean needExpendOrderList() {
return orders.length <= lastInsertIndex;
}
private void expand() {
Order[] orders = new Order[5000];
this.finishCache[lastInsertCacheIndex] = this.orders;
lastInsertCacheIndex++;
this.orders = orders;
lastInsertIndex = 0;
lastCompleteIndex = -1;
}
public long getPriceLevel() {
return priceLevel;
}
public synchronized void addOrder(Order order) {
if (needExpendOrderList()) {
expand();
}
this.orders[lastInsertIndex] = order;
lastInsertIndex++;
}
public synchronized Long try2ClinchOrder(Order order2Clinch) {
Long clinchPrice = null;
for (int index = lastCompleteIndex + 1; index < lastInsertIndex; index++) {
Order order = this.orders[index];
long count;
try {
count = order2Clinch.getRemainCount() - order.getRemainCount();
} catch (NullPointerException e) {
throw new NullPointerException();
}
if (count < 0) {
long completeCount = order.getRemainCount() - Math.abs(count);
order2Clinch.addCompleteCount(completeCount);
order2Clinch.complete();
order.addCompleteCount(completeCount);
clinchPrice = order.getPrice();
break;
} else if (count == 0) {
order2Clinch.addCompleteCount(order.getRemainCount());
order2Clinch.complete();
order.addCompleteCount(order.getRemainCount());
order.complete();
clinchPrice = order.getPrice();
lastCompleteIndex++;
break;
} else {
order2Clinch.addCompleteCount(order.getRemainCount());
order.addCompleteCount(order.getRemainCount());
order.complete();
clinchPrice = order.getPrice();
lastCompleteIndex++;
}
}
return clinchPrice;
}
}
性能分析
- 减少每次对全部订单进行筛选排序,分出买卖单
- 针对国内涨跌幅限制,对价格进行分级,每个价格中订单按照时间排序
测试发现,50W的数据从原来的80000ms降低到100ms,效果显著,多线程测试亦无问题