如何建立一个回测系统(五)

如何建立一个回测系统(五)

账户系统设计

设计需求

  1. 模拟交易
  2. 模拟费率
  3. 能够记录每次交易时候的状态
  4. 自动计算胜率
  5. 无缝对接到正常交易系统

字段设计

  1. taskUid(记录对应搜索任务)
  2. Datasource(价格记录源)
  3. SimulateMarket(交易场,传入正式则用于正式交易系统,传入模拟则用于回测系统)
  4. actualMoney(已经到账的钱)
  5. bookMoney(账本计算的钱)
  6. beginDate(开户日期)
  7. holdComponents(交易标的物持有状态及价格)
  8. book(每日交易的Order)

行为设计

  1. 平账
  2. 申购
  3. 赎回
  4. 查询持有均价

以上为账户的完备详细设计

如何建立一个回测系统(四)

如何建立一个回测系统(四)

搜索任务处理

一个好的量化交易,是对于一个函数的极值的搜索及规则的搜索

但因为函数的复杂性,无法得到函数的性质,有时候可以采用暴力搜索极值点的方式

类似于sum(f(g(x,y),z),t) = alpha其中z是外部响应,z与g(x,y)之间接近线性无关(我们的交易量远达不到影响结果的情景)t是时间序列下标

定义任务

g(x,y)中不同的系数对于f的的影响不同,又因为g(x,y)不是一个显函数,只能通过代入数据计算

我们定义每组(x,y,ts,te)为搜索任务,将数据及计算函数交由每个线程完成,此时不会涉及多线程问题,任务之间线性无关

支撑部件

第一版本我本想通过序列化为字符串形式来作为存储

但因为字符串是变长形式,难以在文件层级进行读写

故而想到如对象存储一般,因为实现难度高,工程量巨大(修改操作占主要),故转而投向SQLite作为持久化操作

最终

我以SQL作为存储端,将任务分发至多台机器形成集群,可以极大的加快搜索进度,且任意机器死亡可以在另一台机器上恢复,无需从头开始

如何建立回测系统(三)

如何建立回测系统(三)

优化版本


public class MarketPlace { private Map<String, WriteLockMap<Long, OrderQueen>> sellOrderChannel; private Map<String, WriteLockMap<Long, OrderQueen>> buyOrderChannel; private Map<String, Long> lastComplete; public MarketPlace() { this.sellOrderChannel = new ConcurrentHashMap<>(); this.buyOrderChannel = new ConcurrentHashMap<>(); this.lastComplete = new ConcurrentHashMap<>(); } public void order(Order order) { OrderQueen orderQueen; WriteLockMap<Long, OrderQueen> longOrderQueenMap; String fixCacheString = order.getTradeCode().intern(); if (order.sell()) { longOrderQueenMap = buyOrderChannel.get(order.getTradeCode()); if (longOrderQueenMap == null) { synchronized (fixCacheString) { longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>()); buyOrderChannel.put(order.getTradeCode(), longOrderQueenMap); } } } else { longOrderQueenMap = sellOrderChannel.get(order.getTradeCode()); if (longOrderQueenMap == null) { synchronized (fixCacheString) { longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>()); sellOrderChannel.put(order.getTradeCode(), longOrderQueenMap); } } } orderQueen = longOrderQueenMap.get(order.getPrice()); if (orderQueen == null) { longOrderQueenMap.lock(); orderQueen = new OrderQueen(order.getPrice()); longOrderQueenMap.put(order.getPrice(), orderQueen); longOrderQueenMap.unlock(); } Long price = orderQueen.try2ClinchOrder(order); if (price != null) { this.lastComplete.put(order.getTradeCode(), price); } if (!order.isComplete()) { if (order.sell()) { WriteLockMap<Long, OrderQueen> tm = this.sellOrderChannel .get(order.getTradeCode()); if (tm == null) { synchronized (fixCacheString) { WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>( new ConcurrentHashMap<>()); OrderQueen newOne = new OrderQueen(order.getPrice()); newOne.addOrder(order); writeLockMap.put(order.getPrice(), newOne); this.sellOrderChannel.put(order.getTradeCode(), writeLockMap); } } else { OrderQueen orderQueen1 = tm.get(order.getPrice()); if (orderQueen1 == null) { tm.lock(); orderQueen1 = new OrderQueen(order.getPrice()); orderQueen1.addOrder(order); tm.put(order.getPrice(), orderQueen1); tm.unlock(); } } } else { WriteLockMap<Long, OrderQueen> tm = this.buyOrderChannel .get(order.getTradeCode()); if (tm == null) { synchronized (fixCacheString) { WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>( new ConcurrentHashMap<>()); OrderQueen newOne = new OrderQueen(order.getPrice()); newOne.addOrder(order); writeLockMap.put(order.getPrice(), newOne); this.buyOrderChannel.put(order.getTradeCode(), writeLockMap); } } else { OrderQueen orderQueen1 = tm.get(order.getPrice()); if (orderQueen1 == null) { tm.lock(); orderQueen1 = new OrderQueen(order.getPrice()); orderQueen1.addOrder(order); tm.put(order.getPrice(), orderQueen1); tm.unlock(); } else { orderQueen1.addOrder(order); } } } } } public Long getLastPrice(String security) { return this.lastComplete.get(security); } }

public class OrderQueen { //最近完成的订单位置 private int lastCompleteIndex; private int lastInsertIndex; private int lastInsertCacheIndex; //归属价格级别 private long priceLevel; //订单队列 private Order[] orders; private Order[][] finishCache; public OrderQueen(long priceLevel) { this.priceLevel = priceLevel; orders = new Order[5000]; finishCache = new Order[500][]; lastCompleteIndex = -1; lastInsertIndex = 0; } private boolean needExpendOrderList() { return orders.length <= lastInsertIndex; } private void expand() { Order[] orders = new Order[5000]; this.finishCache[lastInsertCacheIndex] = this.orders; lastInsertCacheIndex++; this.orders = orders; lastInsertIndex = 0; lastCompleteIndex = -1; } public long getPriceLevel() { return priceLevel; } public synchronized void addOrder(Order order) { if (needExpendOrderList()) { expand(); } this.orders[lastInsertIndex] = order; lastInsertIndex++; } public synchronized Long try2ClinchOrder(Order order2Clinch) { Long clinchPrice = null; for (int index = lastCompleteIndex + 1; index < lastInsertIndex; index++) { Order order = this.orders[index]; long count; try { count = order2Clinch.getRemainCount() - order.getRemainCount(); } catch (NullPointerException e) { throw new NullPointerException(); } if (count < 0) { long completeCount = order.getRemainCount() - Math.abs(count); order2Clinch.addCompleteCount(completeCount); order2Clinch.complete(); order.addCompleteCount(completeCount); clinchPrice = order.getPrice(); break; } else if (count == 0) { order2Clinch.addCompleteCount(order.getRemainCount()); order2Clinch.complete(); order.addCompleteCount(order.getRemainCount()); order.complete(); clinchPrice = order.getPrice(); lastCompleteIndex++; break; } else { order2Clinch.addCompleteCount(order.getRemainCount()); order.addCompleteCount(order.getRemainCount()); order.complete(); clinchPrice = order.getPrice(); lastCompleteIndex++; } } return clinchPrice; } }

性能分析

  1. 减少每次对全部订单进行筛选排序,分出买卖单
  2. 针对国内涨跌幅限制,对价格进行分级,每个价格中订单按照时间排序

测试发现,50W的数据从原来的80000ms降低到100ms,效果显著,多线程测试亦无问题

如何建立回测系统(二)

如何建立回测系统(二)

一个简单的回放市场实现

public class MarketPlace {

    private Map<String, List<Order>> orderChannel;
    private Map<String, Long> lastComplete;

    public MarketPlace() {
        orderChannel = new ConcurrentHashMap<>();
        lastComplete = new ConcurrentHashMap<>();
    }

    public void order(Order order) {
        List<Order> list = orderChannel.get(order.getTradeCode());
        if (Objects.isNull(list)) {
            list = new ArrayList<>();
            orderChannel.put(order.getTradeCode(), list);
        }
        synchronized (list) {
            List<Order> targetPriceOrder;
            List<Order> sellOrder = list.stream()
                .filter(order1 -> !order1.isComplete())
                .filter(Order::sell)
                .sorted(Comparator.comparing(Order::getPrice)
                    .thenComparing(Order::getTimestamp))
                .collect(Collectors.toList());
            List<Order> buyOrder = list.stream()
                .filter(order1 -> !order1.isComplete())
                .filter(order1 -> !order1.sell())
                .sorted(Comparator.comparing(Order::getPrice, Comparator.reverseOrder())
                    .thenComparing(Order::getTimestamp))
                .collect(Collectors.toList());
            //进来的是买单
            if (!order.sell()) {
                if (!buyOrder.isEmpty() && buyOrder.get(0).getPrice() > order.getPrice()) {
                    list.add(order);
                    return;
                }
                //处理好后,是按照价格小,时间先排列
                targetPriceOrder = sellOrder.stream()
                    //需要找到挂单中比当前订单要价低或者相等的
                    .filter(order1 -> order.getPrice() <= order1.getPrice())
                    .collect(Collectors.toList());
            } else {
                if (!sellOrder.isEmpty() && sellOrder.get(0).getPrice() < order.getPrice()) {
                    list.add(order);
                    return;
                }
                //进来的是卖单,找买单 , 卖单进来,先开始撮合最高价的买单
                targetPriceOrder = buyOrder.stream()
                    //需要找到买单中比当前订单要价高的
                    .filter(order1 -> order.getPrice() <= order1.getPrice())
                    .collect(Collectors.toList());
                //新入订单需要撮合的数量
            }
            //新入订单需要撮合的数量
            long count = order.getRemainCount();
            //连续竞价,买单进来,开始先撮合最低价的卖单
            for (Order order1 : targetPriceOrder) {
                count = count - order1.getRemainCount();
                if (count < 0) {
                    //说明当前订单已经满足,order不需要进入队列排队,order1更新成交数,中断循环
                    order1.addCompleteCount(order1.getCount() - Math.abs(count));
                    order.addCompleteCount(order.getCount());
                    order.complete();
                    this.lastComplete.put(order.getTradeCode(), order.getPrice());
                    break;
                } else if (count == 0) {
                    //说明订单恰好满足,中断循环,将order1添加移除队列
                    order.addCompleteCount(order.getCount());
                    order1.addCompleteCount(order1.getCount());
                    order1.complete();
                    order.complete();
                    this.lastComplete.put(order.getTradeCode(), order.getPrice());
                } else {
                    //说明订单未满足,将order1添加移除队列,并继续循环
                    order1.complete();
                    order.addCompleteCount(order1.getRemainCount());
                }
                count = order.getRemainCount();
            }
            if (!order.isComplete()) {
                list.add(order);
            }
        }
    }

    public Long getLastPrice(String security) {
        return this.lastComplete.get(security);
    }

性能分析

最高对单个标的物单线程,且订单越多,处理速度越慢

T = f(m*n)

可能的优化点

  1. 减少每次对全部订单进行筛选排序,分出买卖单
  2. 针对国内涨跌幅限制,对价格进行分级,每个价格中订单按照实践排序
  3. 尽可能用基础类型

如何建立回测系统

如何建立回测系统

什么是回测

就是对市场的变化进行回放,加入我们的策略得出,如果我们参与其中会有怎么样的结果

如何回放

首先要了解平时我们说的股票当日价格是如何形成的

股票一旦出售后本身是没有价格的,在每个交易日开始前,交易所会进行集中竞价,最后取买卖集合中的笔数最多的价格作为当日的开盘价

紧接着开始连续竞价

连续竞价中,股票价格为股票的成交价,成交价如下定义:

1. 最高买进申报与最低卖出申报相同,则该价格即为成交价格;

2. 买入申报价格高于即时揭示的最低卖出申报价格时,以即时揭示的最低卖出申报价格为成交价

3. 卖出申报价格低于即时揭示的最高买入申报价格时,以即时揭示的最高买入申报价格为成交价

总结

若需要对股市状况进行回测,首先应该保存当日所有的订单情况,然后对价格进行撮合,若撮合成功,则此时可以算出当前价格

平时我们接受的数据成为股市交易的快照

其中记录着一段时间内的成交量,成交均价,成交总价格等的信息

记一次性能优化问题

记一次性能优化问题

参与架构

代码设计

我们总假设,温度是一次次上报的,且点与点之间没有先后的顺序

很容易写出以下代码

For(EachTimeUploadData)

Pipe.collect(in,out,now)

遇到的问题

我们的不仅需要记录温度,还要记录压力和流量,而这些属性是归属于管道

这个时候会遇到一个巨大的问题,数据库的DeadLock

我们默认聚合根是Pipe,那么为了防止聚合根在两个线程中出现,故而数据库中是要对这一行开启事务的,意味着根所在一行是要锁行的,而测试中如果遇上同一根管道的首尾同时上报数据,那么就会发生DeadLock,更不要提某些情况下是要沿着管道将沿途所有的管道锁定的问题。

解决死锁

  1. 我们不能允许数据丢失
  2. 不能够对架构做大改动

要解决死锁的问题,最简单的办法时,我们只用一个线程来操作,在不考虑集群的情况下,将原有方案变更为,数据上报时候顺序写入文件,然后开启一个线程从文件头开始反序列化数据然后逐一写入数据库。

集群环境

在集群环境下,依旧有锁冲突的问题,那么冲突时候策略就可以考虑以下了,我选择了,冲突之后将数据放入队列尾部,这个时候代码逻辑上是没什么问题了

性能

可是上线之后发现,其写入性能仅在15s,注意是s,是秒,才完成50条入队消息的消费

在各种断点之后,依旧发现瓶颈在数据库与应用中

  1. 每次查询流量系统地址,30~50ms
  2. 每次查询流量,30~50ms
  3. 每次查询管道链条300~500ms

在一些支管上,单次插入可以达到最大5000ms

解决

  1. 增加管道标记父节点字段,而非靠查找所有管道再通过计算得出父节点
  2. 增加本地缓存和Redis缓存切换,以其在没有redis环境下可以降低查询流量系统地址的时延
  3. 优化流量和压力的Opc客户端,减少数据传输量
  4. 将于数据库交互中的插入和更新变为Batch形式

测试结果

消费50条数据仅需要300ms左右,

网络路径是如何被发现的

网络路径是如何被发现的

最近突然好奇到,我们熟悉的IP网络是如何运作的,查了半天资料都是说TCP/IP那几层协议,根本没有说网络是如何运行的。那我们试试头脑风暴,再发明一次网络吧

一个理想的模型

网络是由无数个可以相互连接的点组成

一些限制

每个点是没有先验知识的,也就是说,点与点之间如果不是直接连接在一开始是不知道能否由通路的,但点能知道直接连通的道路

发现通信路径

在没有点与点之间没有先验知识的前提下,我们就只能通过BFS/DFS算法来发现路径
所以当第一次通信的时候,点与点之间的时间最优情况T(n^n) (假设每个点直接连接n个点,在第二跳就能找到通路)

问题

这个情况下,我们会发现性能是和我们现在的网络不同,应该是会很慢的

优化

如果需要提升性能,就需要增加先验知识

增加假设

点的直接通路中由特定的点具有网络通路的解释权,我们称之为高优先级点,高优先级点的位置应该预先内置在访问点处,高优先级点数量m << n

优化结果

性能会提升至T(m^m)

再增加假设

假设我们可以广播,亦即是发出一个信号,如果点能到达,则返回路径表,此时的实际访问次数是T(h) h是网络中的最先达成通路的深度

问题

在未达成通路之前,整个网络相当于开启了m^m^h个半连接,会产生信息风暴

优化

划分网络,减少同时建立的半连接数量

对应到我们真实的IP网络

这个时候是不是就有点像我们的现有的IP网络了

硬件层就是能直接连接的路径

需要中间转发的,就在IP协议层完成

比如ICMP就是我们的广播信号

在一些核心交换机上还存在手工配置的路由表,就是高优先级点的先验信息

网关就是我们的高优先级点

划分就是我们的掩码

当然这都是我瞎几把想的东西,和实际出入应该大了去了,IP网发展了这么多年,基本模型都有可能变了

效率优先

效率优先

开场白真难,好,开场结束

什么是低效实践

我是以java语言为主力开发语言的代码编写人员,同时还负责我们所有的服务器的部署及维护, 实践中遇到最痛苦的事情莫过于, 精细化管理一切代码行为

为什么MyBatis在我的项目中很糟糕

我们团队一共就五个人,两个做后端,一个前端,一个原型设计,外加一个对外沟通的部门负责人.在一开始的选型中, 选中的框架是使用mybatis+springboot+vue来完成功能,我们主要是采集工厂生产过程中关键点温度的变化,以及保温性能的数据。

对于工业化设备监控来说,开发过程中最大的问题是,我们的模型的字段会在开发过程中不断的更新,于是我们实践中就会将mybatis当成一个Orm框架来使用,基本来说,就是只是用CRUD,每个对象是一张表,操作顺序都是查出对象-》修改对象-》回写数据库,这种半自动的工具在这样的实践中就会带来需要管理的代码实在太多,而且基本为样板代码,更严重的是,对象直接的关系维护必须手工写代码(这种代码基本就是样板代码,其实Hibernate这种框架都已经解决好了),对于一个小团队来说这是一个不可取的行为。没那么多精力,也不应该花那么多精力来维护这些样板代码。

举个栗子

我们对于采集点管理进行建模,首先有区域,然后有管道,接着有测温点信息,最后测温点有测温点数据

然后我们需要维护区域与管道之间的关系,管道与测温点之间的关系,区域与区域之间的关系,测温点数据与测温点之间的关系

一顿操作,需要有四个Dao,对应四个Entity,然后关系(就是Repository的职责)有四个Repository。

以上全是样板代码

提升效率

问题的关键在于如何将我们的开发人员从无趣且繁琐的样板代码实施中解放出来呢,答案在EJB中已经被实践,就是全自动的ORM框架

全自动的ORM带来的是开发人员不用再去做大量重复性工作以维护数据之间的关系和正确性

对于一个我们进行中的项目,采用MyBatis plus方案需要实现四个Dao,四个Repository,还有实现四个Entity,而改用Jpa/Hibernate之后,代码数量减少到四个Repository,无需实现Entity,Dao,而且SpringDataJpa已经可以通过动态代理方式生成代码,Repository是无需实现,只需要声明接口

原先使用了接近5(人/日)的工作量才完成的方案,改用Jpa后只需要0.5(人/日)

需要回避的点

全自动ORM是否有缺陷?

有,一是对象与数据库之间的关系需要靠一套不可变的规则维持,但是代码总是有人维护的,而且Hibernate的实现人并不是我们,若规则变动,我们的数据全部报废

二是全自动的ORM会有解析对象的性能开销,具体到我们项目是,启动时间增加两秒到五秒

三是需要实现一层控制事务的Service

解决方法也很简单,版本别动,启动时间其实无所谓,然后多采用懒加载,事务无非就是多一个Transactional的注解

结语

  1. mybatis不适用于人少的项目,开发人员少的情况下,要优先考虑开发效率
  2. 绝大部分开发人员真的不需要考虑从持久化方案中榨取更多性能,而榨取的通道多数会变成你的维护成本
  3. 性能有绝对要求下应该考虑其他方案而不采用数据库
  4. 真心建议国内的风气变一下,我在调研中发现mybatis最后演化出了mybatisplus的方案,一个四不像的方案,既没有减轻开发人员维护数据库的负担,也没有减少样板代码(mybatisplus依旧要手工维护关系)

玩一玩Rawsocket

玩一玩Rawsocket

ip包与icmp包之间的关系图

什么是rawsocket

我们一般做应用编程的时候其实关注的socket是udpsocket或者是tcpsocket,也就是协议层的东西是确定的

但是rawsocket不是,他可以让你自定义协议层的内容,比如自己顶一个YCPSocket,只要不被路由器丢弃就可以被对方机器接受

操作rawsocket的好玩的事情

因为能够操作到协议层,就能做一些有意思的事情,比如可以探测出从本机到(任意)IP间经过路由器的ip地址,或者是做ICMP攻击用

如何做

比如我可以指定ip包的ttl,使得经过特定次数路由器后包被丢弃,同时丢弃的路由器返回ICMPv4TypeTimeOut的响应,此时就可以获取到RemoteAddr,逐次记录Ip直到返回ICMPv4TypeEchoReply,此时就可以获取到指定IP的完整链路

如果是攻击行为只要将IP协议中的src更改成一个特定地址,则可以是的特定地址的服务器被DDoS攻击

Golang相关

因为要操作的层级很低,这个时候需要有些过程式的说明,

首先要创建RawSocket需要理解,我们其实是要直接向硬件写自己构造的数据,那么在描述的时候这个过程如下

获取操作硬件的指针/句柄/文件地址

将其指针赋予RawSocket,并指定每次操作socket时候写包的参数

创建好socket后就可以直接写我们创建的二进制数据了

思维训练

思维训练

题目

电动车好

反对意见

  1. 续航太短
  2. 不能跑长途
  3. 三五年后残值比汽油车少

分析

续航太短

命题:电动车是续航短
定义电动车:以电能作为最终使用的能量形态的汽车
定义续航短:续航少于300公里

不能跑长途

命题:电动车是不能跑长途的汽车
定义不能跑长途的汽车:在路途中允许补充燃料并能够将人从出发地顺利的移动到目的地的车辆

三五年后残值比汽油车少

命题:汽油车三五年后残值是高于电动车
定义残值:经过了三五年的使用后的物件市场再次出手的平均价格

现实

续航太短(现实)

电动车在续航超过了300公里以后,可以单次从一座城市到达另一座城市。从这个场景续航是满足的
这个问题的延续就是不能跑长途

不能跑长途(现实)

无论是什么能源的车最终其燃料都是有限的,需要从外部补充,那么从补充上的便利性,燃油车确实好于电动车在目前(2018)的时间节点上
但是该问题的核心在于能够便捷的补充燃料,对比电的运输与汽油的运输,在其他维护和设备价格相同的情况下,则电的运输是优于汽油的运输

三五年后残值比汽油车少(现实)

汽车的残值是与使用者保养有关,不能直接挂钩,即使汽油车,使劲用甚至其价格会低于均价(不要认为均价就是你能成功脱手的价格)。另,目前电动车残值不高的原因在于,电池的发展过快,无可获利的电池回收业务,电池占车成本的比重较高
待新的回收技术或者其他技术出现后该问题自然解决

结论

以上提及的问题在三年内都有望得到解决,这些均是在车辆正常寿命未到前能够达到

在当前时间节点上,因为有国家的补贴情况下,购买电动车,并不是一个更坏的选择,尤其在大型城市在较难获取路权的情况下,政策倾斜更应该是优选