如何构建回测系统(九)

如何构建回测系统(九)

关于HotReload Classloader的一个实践

在(八)中,我们主要考虑到了一个大的框架是怎么实现,大概有哪些部件,现在来看一下我的参考实现

@Slf4j
public class DBStrategyClassLoader extends ClassLoader {

    private final Map<String, Class<?>> loadClassMap;
    private static DynamicStrategyRepository repository;
    private static DBStrategyClassLoader instance;
    private static ClassLoader springbootClassloader;

    private DBStrategyClassLoader() {
        this.loadClassMap = new ConcurrentHashMap<>();
    }

    public static void init(DynamicStrategyRepository repository,
                            ClassLoader springbootClassloader) {
        DBStrategyClassLoader.repository = repository;
        DBStrategyClassLoader.springbootClassloader = springbootClassloader;
        log.info("DBStrategyClassLoader initialized");
    }

    public void reload() {
        log.info("class reload start");
        synchronized (DBStrategyClassLoader.class) {
            var newInstance = new DBStrategyClassLoader();
            var loadClassName = loadClassMap.keySet();
            for (var clazzName : loadClassName) {
                try {
                    newInstance.loadClassMap.put(clazzName, newInstance.loadClass(clazzName));
                } catch (ClassNotFoundException e) {
                    log.error("reload clazz {}: failed,", clazzName, e);
                }
            }
            instance = newInstance;
        }
        log.info("class reloaded ");
    }

    public static DBStrategyClassLoader getInstance() {
        if (Objects.isNull(instance)) {
            synchronized (DBStrategyClassLoader.class) {
                if (Objects.isNull(instance)) {
                    instance = new DBStrategyClassLoader();
                }
            }
        }
        return instance;
    }

    @Override
    public Class<?> loadClass(String name) throws ClassNotFoundException {
        try {
            return springbootClassloader.loadClass(name);
        } catch (ClassNotFoundException e) {
            return super.loadClass(name);
        }
    }

    @Override
    protected Class<?> findClass(String name) throws ClassNotFoundException {
        var clazz = this.loadClassMap.get(name);
        if (Objects.isNull(clazz)) {

            var dbData = repository.findByDbClazz_name(name);
            if (!dbData.isEmpty()) {
                var data = dbData.stream().max(Comparator.comparing(DynamicStrategy::getVersion))
                        .get().getDbClazz().compile();
                clazz = defineClass(name, data, 0, data.length);
                loadClassMap.put(name, clazz);
            } else {
                throw new ClassNotFoundException();
            }
        }
        return clazz;
    }
}

由谁管理这个ClassLoader的生命周期

这里有个前置条件,我大部分的组件,为了方便再springmvc中使用,都采用了Spring来做生命周期管理,但是对于classloader,为了完整将classloader在root object上置为null,应交由自己管理

故而在这里使用了双锁单例模式

public static DBStrategyClassLoader getInstance() {
    if (Objects.isNull(instance)) {
        synchronized (DBStrategyClassLoader.class) {
            if (Objects.isNull(instance)) {
                instance = new DBStrategyClassLoader();
            }
        }
    }
    return instance;
}

reload详解

为了使得reload之后,代码能重新运行,还要重新loadclass,当所有准备就绪后,替换instance

同时使用了该classloader的部件应该重新加载Class使得新的代码生效

public void reload() {
    log.info("class reload start");
    synchronized (DBStrategyClassLoader.class) {
        var newInstance = new DBStrategyClassLoader();
        var loadClassName = loadClassMap.keySet();
        for (var clazzName : loadClassName) {
            try {
                newInstance.loadClassMap.put(clazzName, newInstance.loadClass(clazzName));
            } catch (ClassNotFoundException e) {
                log.error("reload clazz {}: failed,", clazzName, e);
            }
        }
        instance = newInstance;
    }
    log.info("class reloaded ");
}

其他

由于项目是SpringBoot的项目,众所周知是Springboot自己也实现了Classloader,所有我们非数据库代码都是通过该Classloader加载的,故而,我们还需要在对象中设置Springboot的Classloader,从任意Spring管理对象然后getClass().getClassloader()即可获得

如何构建回测系统(八)

如何构建回测系统(八)

代码升级问题

回测系统必然涉及策略的反复测试及重新部署

重写ClassLoader

为了将代码存放在不同的位置,我们需要重写ClassLoader用以加载二进制代码,我们选用数据库作为代码的存放处,方便查询

重写Classloader中findClass函数

需要注意关键点是,defineClass对同一个class只能解析一次(JVM限制),所以我们需要解析后将Class用Map缓存起来

如果需要更新同名class,需要将Classloader丢弃掉,并新建一个ClassLoader实例(JVM在认为一个相同的Class是指相同的Classloader+全限名则是相同)

实现JavaCompilerInMemory

为了让前端页面能动态的修改class类,需要将.java的内容通过Java代码进行编译
参考代码:

    public byte[] compile() {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
        var diagnosticCollector = new DiagnosticCollector<JavaFileObject>();
        var fileManager = new MyJavaFileObjectManager(compiler.getStandardFileManager(diagnosticCollector, null, StandardCharsets.UTF_8));
        var javaFileObjects = Collections.singletonList(new MyJavaFileObject(getName(), codes, byteArrayOutputStream));
        fileManager.outputStream = byteArrayOutputStream;

        var compilerTask = compiler.getTask(null, fileManager, diagnosticCollector, null, null, javaFileObjects);
        if (compilerTask.call()) {
            return byteArrayOutputStream.toByteArray();
        } else {
            StringBuilder stringBuilder = new StringBuilder();
            diagnosticCollector.getDiagnostics().forEach(diagnostic -> stringBuilder.append(diagnostic.getMessage(Locale.CHINESE)));
            throw new RunException(stringBuilder.toString());
        }
    }

    static class MyJavaFileObjectManager extends ForwardingJavaFileManager<JavaFileManager> {

        Map<String, JavaFileObject> fileObjects = new HashMap<>();
        OutputStream outputStream;

        public MyJavaFileObjectManager(JavaFileManager fileManager) {
            super(fileManager);
        }

        @Override
        public JavaFileObject getJavaFileForInput(Location location, String className, JavaFileObject.Kind kind) throws IOException {
            JavaFileObject javaFileObject = fileObjects.get(className);
            if (javaFileObject == null) {
                return super.getJavaFileForInput(location, className, kind);
            }
            return javaFileObject;
        }

        @Override
        public JavaFileObject getJavaFileForOutput(Location location, String qualifiedClassName, JavaFileObject.Kind kind, FileObject sibling) throws IOException {
            JavaFileObject javaFileObject = new MyJavaFileObject(qualifiedClassName, kind, outputStream);
            fileObjects.put(qualifiedClassName, javaFileObject);
            return javaFileObject;
        }
    }

    static class MyJavaFileObject extends SimpleJavaFileObject {
        String source;
        OutputStream outputStream;
        String name;

        public MyJavaFileObject(String name, Kind kind, OutputStream stream) {
            super(URI.create("String:///" + name + kind.extension), kind);
            this.outputStream = stream;
        }

        public MyJavaFileObject(String name, String source) {
            super(URI.create("String:///" + name + Kind.SOURCE.extension), Kind.SOURCE);
            this.source = source;
            this.name = name;
        }

        public MyJavaFileObject(String name, String source, OutputStream outputStream) {
            this(name, source);
            this.outputStream = outputStream;
        }

        public MyJavaFileObject(URI uri, Kind kind) {
            super(uri, kind);
        }

        @Override
        public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
            return source;
        }

        @Override
        public OutputStream openOutputStream() throws IOException {
            return outputStream;
        }
    }

最终

通过CURD将上述的ClassLoader和JavaCompilerInMemory通过自己的业务穿起来,就可以实现在网页编写java代码,并动态修改和运行

socket编程日记

socket编程日记

自定义分割包协议

注意到,socket发送出去的包最大只能为tcp的mtu长度减去包头长度,超过长度的包会分批到达

TCP保证了包到达的顺序及正确性

UDP需要自己实现包顺序及异常校验

对端如果没有分包协议会导致一次性读过头或者没读完就进行数据操作

分包协议的两种方式

包头放置数据

  • 特征值
  • 数据长度
  • 数组字段
接收端伪代码形式
header{
    len
}
while(1){
    buf[64]
    socket.recv(buf)
    read(&header,buf)
    data = new data[header.len]
    readLen = readRemainBuf(buf,data)
    socket.recv(data,(header.len-readLen))
}

包尾放置分隔符

  • 数据字段
  • 分隔符
while(1){
    buf
    socket.recvAndExpend(buf)
    while not found delimiter:
        socket.recvAndExpend(buf)
        if found delimiter:
            set delimiterIndex
    return buf[0:delimiterIndex]
}

为图片添加水印


import numpy from PIL import Image from PIL import ImageDraw from PIL import ImageFont from numpy import fft i1 = Image.open("d:/abc.jpg") weigh = i1.size[0] heigh = i1.size[1] i2 = Image.new('RGB', (weigh, heigh), (255, 255, 255)) font = ImageFont.truetype("arial.ttf", 36) font.size = 36 draw = ImageDraw.Draw(i2) draw.text((0, 0), "Mark By Michaelssss", font=font, fill=(0, 0, 0)) i2 = i2.transpose(Image.ROTATE_180) draw = ImageDraw.Draw(i2) draw.text((0, 0), "Mark By Michaelssss", font=font, fill=(0, 0, 0)) arr1 = numpy.array(i1) arr2 = numpy.array(i2) final = numpy.uint8(fft.ifft2(arr2 * 40 + fft.fft2(arr1))) Image.fromarray(final).show() # Image.fromarray(numpy.uint8(arr2)).show() Image.fromarray(numpy.uint8(fft.fft2(numpy.array(final)) - fft.fft2(arr1))).show()

这样就能添加一个盲水印,感谢开源让我不用学习如何手撕FFT哈哈哈哈哈哈

如何构建回测系统(七)

如何构建回测系统(七)

清洗数据

内存不足问题

  1. 单个对象粗略估算(刨除所有的对象辅助数据):438byte
  2. 约有5*60*60*3000条数据
  3. 为了使得在8GB左右的系统中也能顺利运行需要做改进

数据不连续问题

  1. ETF中成分股出现没有交易无法从逐笔还原

改进

  1. 切割导入时间,实践是每三十分钟一个段落导入
  2. 所有需要前置数据可以预留在内存
  3. 做到每个段落指执行读1次数据库,写一次数据库
  4. 将导入任务抽离,并且每进行一次导入保存当前进度以方便过程恢复
  5. 往前搜索直到股票第一次开市价格
  6. 需要保存股票的天数据
  7. 使用现金替代来代替无交易数据

结果

  1. 在8GB系统中,成功完成4GB数据导入
  2. iopv均有数据,但无法验证,下一步想办法验证

如何构建回测系统(六)

如何构建回测系统(六)

数据量问题

  1. 粗略估算,从2000年到目前,若完全采用逐笔构建高精度交易数据,至少需要31TB的存储空间
  2. 内存不可能满足
  3. 采用文件存储搜索不方便,时间复杂度过高

解决方向

  1. 抛弃原有的完全将数据放置于内存中的想法
    1. 引入数据库
    2. 分表存储和读取
    3. 应用层做聚合
  2. 将原来由回测系统每次构建高精度数据,变更为ETL系统做数据的计算和存储,回测系统仅读取,分离其功能

难点

  1. 为提升开发效率使用的是jpa/hibernate,但是jpa的层级过高,不方便做分表分库聚合,需要使用其底层的接口针对需要分表的对象重新构建应用层数据库接口
  2. 剥离了项目之后的依赖关系,及接口关系梳理

如何建立一个回测系统(五)

如何建立一个回测系统(五)

账户系统设计

设计需求

  1. 模拟交易
  2. 模拟费率
  3. 能够记录每次交易时候的状态
  4. 自动计算胜率
  5. 无缝对接到正常交易系统

字段设计

  1. taskUid(记录对应搜索任务)
  2. Datasource(价格记录源)
  3. SimulateMarket(交易场,传入正式则用于正式交易系统,传入模拟则用于回测系统)
  4. actualMoney(已经到账的钱)
  5. bookMoney(账本计算的钱)
  6. beginDate(开户日期)
  7. holdComponents(交易标的物持有状态及价格)
  8. book(每日交易的Order)

行为设计

  1. 平账
  2. 申购
  3. 赎回
  4. 查询持有均价

以上为账户的完备详细设计

如何建立一个回测系统(四)

如何建立一个回测系统(四)

搜索任务处理

一个好的量化交易,是对于一个函数的极值的搜索及规则的搜索

但因为函数的复杂性,无法得到函数的性质,有时候可以采用暴力搜索极值点的方式

类似于sum(f(g(x,y),z),t) = alpha其中z是外部响应,z与g(x,y)之间接近线性无关(我们的交易量远达不到影响结果的情景)t是时间序列下标

定义任务

g(x,y)中不同的系数对于f的的影响不同,又因为g(x,y)不是一个显函数,只能通过代入数据计算

我们定义每组(x,y,ts,te)为搜索任务,将数据及计算函数交由每个线程完成,此时不会涉及多线程问题,任务之间线性无关

支撑部件

第一版本我本想通过序列化为字符串形式来作为存储

但因为字符串是变长形式,难以在文件层级进行读写

故而想到如对象存储一般,因为实现难度高,工程量巨大(修改操作占主要),故转而投向SQLite作为持久化操作

最终

我以SQL作为存储端,将任务分发至多台机器形成集群,可以极大的加快搜索进度,且任意机器死亡可以在另一台机器上恢复,无需从头开始

如何建立回测系统(三)

如何建立回测系统(三)

优化版本


public class MarketPlace { private Map<String, WriteLockMap<Long, OrderQueen>> sellOrderChannel; private Map<String, WriteLockMap<Long, OrderQueen>> buyOrderChannel; private Map<String, Long> lastComplete; public MarketPlace() { this.sellOrderChannel = new ConcurrentHashMap<>(); this.buyOrderChannel = new ConcurrentHashMap<>(); this.lastComplete = new ConcurrentHashMap<>(); } public void order(Order order) { OrderQueen orderQueen; WriteLockMap<Long, OrderQueen> longOrderQueenMap; String fixCacheString = order.getTradeCode().intern(); if (order.sell()) { longOrderQueenMap = buyOrderChannel.get(order.getTradeCode()); if (longOrderQueenMap == null) { synchronized (fixCacheString) { longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>()); buyOrderChannel.put(order.getTradeCode(), longOrderQueenMap); } } } else { longOrderQueenMap = sellOrderChannel.get(order.getTradeCode()); if (longOrderQueenMap == null) { synchronized (fixCacheString) { longOrderQueenMap = new WriteLockMap<>(new ConcurrentHashMap<>()); sellOrderChannel.put(order.getTradeCode(), longOrderQueenMap); } } } orderQueen = longOrderQueenMap.get(order.getPrice()); if (orderQueen == null) { longOrderQueenMap.lock(); orderQueen = new OrderQueen(order.getPrice()); longOrderQueenMap.put(order.getPrice(), orderQueen); longOrderQueenMap.unlock(); } Long price = orderQueen.try2ClinchOrder(order); if (price != null) { this.lastComplete.put(order.getTradeCode(), price); } if (!order.isComplete()) { if (order.sell()) { WriteLockMap<Long, OrderQueen> tm = this.sellOrderChannel .get(order.getTradeCode()); if (tm == null) { synchronized (fixCacheString) { WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>( new ConcurrentHashMap<>()); OrderQueen newOne = new OrderQueen(order.getPrice()); newOne.addOrder(order); writeLockMap.put(order.getPrice(), newOne); this.sellOrderChannel.put(order.getTradeCode(), writeLockMap); } } else { OrderQueen orderQueen1 = tm.get(order.getPrice()); if (orderQueen1 == null) { tm.lock(); orderQueen1 = new OrderQueen(order.getPrice()); orderQueen1.addOrder(order); tm.put(order.getPrice(), orderQueen1); tm.unlock(); } } } else { WriteLockMap<Long, OrderQueen> tm = this.buyOrderChannel .get(order.getTradeCode()); if (tm == null) { synchronized (fixCacheString) { WriteLockMap<Long, OrderQueen> writeLockMap = new WriteLockMap<>( new ConcurrentHashMap<>()); OrderQueen newOne = new OrderQueen(order.getPrice()); newOne.addOrder(order); writeLockMap.put(order.getPrice(), newOne); this.buyOrderChannel.put(order.getTradeCode(), writeLockMap); } } else { OrderQueen orderQueen1 = tm.get(order.getPrice()); if (orderQueen1 == null) { tm.lock(); orderQueen1 = new OrderQueen(order.getPrice()); orderQueen1.addOrder(order); tm.put(order.getPrice(), orderQueen1); tm.unlock(); } else { orderQueen1.addOrder(order); } } } } } public Long getLastPrice(String security) { return this.lastComplete.get(security); } }

public class OrderQueen { //最近完成的订单位置 private int lastCompleteIndex; private int lastInsertIndex; private int lastInsertCacheIndex; //归属价格级别 private long priceLevel; //订单队列 private Order[] orders; private Order[][] finishCache; public OrderQueen(long priceLevel) { this.priceLevel = priceLevel; orders = new Order[5000]; finishCache = new Order[500][]; lastCompleteIndex = -1; lastInsertIndex = 0; } private boolean needExpendOrderList() { return orders.length <= lastInsertIndex; } private void expand() { Order[] orders = new Order[5000]; this.finishCache[lastInsertCacheIndex] = this.orders; lastInsertCacheIndex++; this.orders = orders; lastInsertIndex = 0; lastCompleteIndex = -1; } public long getPriceLevel() { return priceLevel; } public synchronized void addOrder(Order order) { if (needExpendOrderList()) { expand(); } this.orders[lastInsertIndex] = order; lastInsertIndex++; } public synchronized Long try2ClinchOrder(Order order2Clinch) { Long clinchPrice = null; for (int index = lastCompleteIndex + 1; index < lastInsertIndex; index++) { Order order = this.orders[index]; long count; try { count = order2Clinch.getRemainCount() - order.getRemainCount(); } catch (NullPointerException e) { throw new NullPointerException(); } if (count < 0) { long completeCount = order.getRemainCount() - Math.abs(count); order2Clinch.addCompleteCount(completeCount); order2Clinch.complete(); order.addCompleteCount(completeCount); clinchPrice = order.getPrice(); break; } else if (count == 0) { order2Clinch.addCompleteCount(order.getRemainCount()); order2Clinch.complete(); order.addCompleteCount(order.getRemainCount()); order.complete(); clinchPrice = order.getPrice(); lastCompleteIndex++; break; } else { order2Clinch.addCompleteCount(order.getRemainCount()); order.addCompleteCount(order.getRemainCount()); order.complete(); clinchPrice = order.getPrice(); lastCompleteIndex++; } } return clinchPrice; } }

性能分析

  1. 减少每次对全部订单进行筛选排序,分出买卖单
  2. 针对国内涨跌幅限制,对价格进行分级,每个价格中订单按照时间排序

测试发现,50W的数据从原来的80000ms降低到100ms,效果显著,多线程测试亦无问题

如何建立回测系统(二)

如何建立回测系统(二)

一个简单的回放市场实现

public class MarketPlace {

    private Map<String, List<Order>> orderChannel;
    private Map<String, Long> lastComplete;

    public MarketPlace() {
        orderChannel = new ConcurrentHashMap<>();
        lastComplete = new ConcurrentHashMap<>();
    }

    public void order(Order order) {
        List<Order> list = orderChannel.get(order.getTradeCode());
        if (Objects.isNull(list)) {
            list = new ArrayList<>();
            orderChannel.put(order.getTradeCode(), list);
        }
        synchronized (list) {
            List<Order> targetPriceOrder;
            List<Order> sellOrder = list.stream()
                .filter(order1 -> !order1.isComplete())
                .filter(Order::sell)
                .sorted(Comparator.comparing(Order::getPrice)
                    .thenComparing(Order::getTimestamp))
                .collect(Collectors.toList());
            List<Order> buyOrder = list.stream()
                .filter(order1 -> !order1.isComplete())
                .filter(order1 -> !order1.sell())
                .sorted(Comparator.comparing(Order::getPrice, Comparator.reverseOrder())
                    .thenComparing(Order::getTimestamp))
                .collect(Collectors.toList());
            //进来的是买单
            if (!order.sell()) {
                if (!buyOrder.isEmpty() && buyOrder.get(0).getPrice() > order.getPrice()) {
                    list.add(order);
                    return;
                }
                //处理好后,是按照价格小,时间先排列
                targetPriceOrder = sellOrder.stream()
                    //需要找到挂单中比当前订单要价低或者相等的
                    .filter(order1 -> order.getPrice() <= order1.getPrice())
                    .collect(Collectors.toList());
            } else {
                if (!sellOrder.isEmpty() && sellOrder.get(0).getPrice() < order.getPrice()) {
                    list.add(order);
                    return;
                }
                //进来的是卖单,找买单 , 卖单进来,先开始撮合最高价的买单
                targetPriceOrder = buyOrder.stream()
                    //需要找到买单中比当前订单要价高的
                    .filter(order1 -> order.getPrice() <= order1.getPrice())
                    .collect(Collectors.toList());
                //新入订单需要撮合的数量
            }
            //新入订单需要撮合的数量
            long count = order.getRemainCount();
            //连续竞价,买单进来,开始先撮合最低价的卖单
            for (Order order1 : targetPriceOrder) {
                count = count - order1.getRemainCount();
                if (count < 0) {
                    //说明当前订单已经满足,order不需要进入队列排队,order1更新成交数,中断循环
                    order1.addCompleteCount(order1.getCount() - Math.abs(count));
                    order.addCompleteCount(order.getCount());
                    order.complete();
                    this.lastComplete.put(order.getTradeCode(), order.getPrice());
                    break;
                } else if (count == 0) {
                    //说明订单恰好满足,中断循环,将order1添加移除队列
                    order.addCompleteCount(order.getCount());
                    order1.addCompleteCount(order1.getCount());
                    order1.complete();
                    order.complete();
                    this.lastComplete.put(order.getTradeCode(), order.getPrice());
                } else {
                    //说明订单未满足,将order1添加移除队列,并继续循环
                    order1.complete();
                    order.addCompleteCount(order1.getRemainCount());
                }
                count = order.getRemainCount();
            }
            if (!order.isComplete()) {
                list.add(order);
            }
        }
    }

    public Long getLastPrice(String security) {
        return this.lastComplete.get(security);
    }

性能分析

最高对单个标的物单线程,且订单越多,处理速度越慢

T = f(m*n)

可能的优化点

  1. 减少每次对全部订单进行筛选排序,分出买卖单
  2. 针对国内涨跌幅限制,对价格进行分级,每个价格中订单按照实践排序
  3. 尽可能用基础类型