如何构建回测系统(九)

如何构建回测系统(九)

关于HotReload Classloader的一个实践

在(八)中,我们主要考虑到了一个大的框架是怎么实现,大概有哪些部件,现在来看一下我的参考实现

@Slf4j
public class DBStrategyClassLoader extends ClassLoader {

    private final Map<String, Class<?>> loadClassMap;
    private static DynamicStrategyRepository repository;
    private static DBStrategyClassLoader instance;
    private static ClassLoader springbootClassloader;

    private DBStrategyClassLoader() {
        this.loadClassMap = new ConcurrentHashMap<>();
    }

    public static void init(DynamicStrategyRepository repository,
                            ClassLoader springbootClassloader) {
        DBStrategyClassLoader.repository = repository;
        DBStrategyClassLoader.springbootClassloader = springbootClassloader;
        log.info("DBStrategyClassLoader initialized");
    }

    public void reload() {
        log.info("class reload start");
        synchronized (DBStrategyClassLoader.class) {
            var newInstance = new DBStrategyClassLoader();
            var loadClassName = loadClassMap.keySet();
            for (var clazzName : loadClassName) {
                try {
                    newInstance.loadClassMap.put(clazzName, newInstance.loadClass(clazzName));
                } catch (ClassNotFoundException e) {
                    log.error("reload clazz {}: failed,", clazzName, e);
                }
            }
            instance = newInstance;
        }
        log.info("class reloaded ");
    }

    public static DBStrategyClassLoader getInstance() {
        if (Objects.isNull(instance)) {
            synchronized (DBStrategyClassLoader.class) {
                if (Objects.isNull(instance)) {
                    instance = new DBStrategyClassLoader();
                }
            }
        }
        return instance;
    }

    @Override
    public Class<?> loadClass(String name) throws ClassNotFoundException {
        try {
            return springbootClassloader.loadClass(name);
        } catch (ClassNotFoundException e) {
            return super.loadClass(name);
        }
    }

    @Override
    protected Class<?> findClass(String name) throws ClassNotFoundException {
        var clazz = this.loadClassMap.get(name);
        if (Objects.isNull(clazz)) {

            var dbData = repository.findByDbClazz_name(name);
            if (!dbData.isEmpty()) {
                var data = dbData.stream().max(Comparator.comparing(DynamicStrategy::getVersion))
                        .get().getDbClazz().compile();
                clazz = defineClass(name, data, 0, data.length);
                loadClassMap.put(name, clazz);
            } else {
                throw new ClassNotFoundException();
            }
        }
        return clazz;
    }
}

由谁管理这个ClassLoader的生命周期

这里有个前置条件,我大部分的组件,为了方便再springmvc中使用,都采用了Spring来做生命周期管理,但是对于classloader,为了完整将classloader在root object上置为null,应交由自己管理

故而在这里使用了双锁单例模式

public static DBStrategyClassLoader getInstance() {
    if (Objects.isNull(instance)) {
        synchronized (DBStrategyClassLoader.class) {
            if (Objects.isNull(instance)) {
                instance = new DBStrategyClassLoader();
            }
        }
    }
    return instance;
}

reload详解

为了使得reload之后,代码能重新运行,还要重新loadclass,当所有准备就绪后,替换instance

同时使用了该classloader的部件应该重新加载Class使得新的代码生效

public void reload() {
    log.info("class reload start");
    synchronized (DBStrategyClassLoader.class) {
        var newInstance = new DBStrategyClassLoader();
        var loadClassName = loadClassMap.keySet();
        for (var clazzName : loadClassName) {
            try {
                newInstance.loadClassMap.put(clazzName, newInstance.loadClass(clazzName));
            } catch (ClassNotFoundException e) {
                log.error("reload clazz {}: failed,", clazzName, e);
            }
        }
        instance = newInstance;
    }
    log.info("class reloaded ");
}

其他

由于项目是SpringBoot的项目,众所周知是Springboot自己也实现了Classloader,所有我们非数据库代码都是通过该Classloader加载的,故而,我们还需要在对象中设置Springboot的Classloader,从任意Spring管理对象然后getClass().getClassloader()即可获得

如何构建回测系统(八)

如何构建回测系统(八)

代码升级问题

回测系统必然涉及策略的反复测试及重新部署

重写ClassLoader

为了将代码存放在不同的位置,我们需要重写ClassLoader用以加载二进制代码,我们选用数据库作为代码的存放处,方便查询

重写Classloader中findClass函数

需要注意关键点是,defineClass对同一个class只能解析一次(JVM限制),所以我们需要解析后将Class用Map缓存起来

如果需要更新同名class,需要将Classloader丢弃掉,并新建一个ClassLoader实例(JVM在认为一个相同的Class是指相同的Classloader+全限名则是相同)

实现JavaCompilerInMemory

为了让前端页面能动态的修改class类,需要将.java的内容通过Java代码进行编译
参考代码:

    public byte[] compile() {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
        var diagnosticCollector = new DiagnosticCollector<JavaFileObject>();
        var fileManager = new MyJavaFileObjectManager(compiler.getStandardFileManager(diagnosticCollector, null, StandardCharsets.UTF_8));
        var javaFileObjects = Collections.singletonList(new MyJavaFileObject(getName(), codes, byteArrayOutputStream));
        fileManager.outputStream = byteArrayOutputStream;

        var compilerTask = compiler.getTask(null, fileManager, diagnosticCollector, null, null, javaFileObjects);
        if (compilerTask.call()) {
            return byteArrayOutputStream.toByteArray();
        } else {
            StringBuilder stringBuilder = new StringBuilder();
            diagnosticCollector.getDiagnostics().forEach(diagnostic -> stringBuilder.append(diagnostic.getMessage(Locale.CHINESE)));
            throw new RunException(stringBuilder.toString());
        }
    }

    static class MyJavaFileObjectManager extends ForwardingJavaFileManager<JavaFileManager> {

        Map<String, JavaFileObject> fileObjects = new HashMap<>();
        OutputStream outputStream;

        public MyJavaFileObjectManager(JavaFileManager fileManager) {
            super(fileManager);
        }

        @Override
        public JavaFileObject getJavaFileForInput(Location location, String className, JavaFileObject.Kind kind) throws IOException {
            JavaFileObject javaFileObject = fileObjects.get(className);
            if (javaFileObject == null) {
                return super.getJavaFileForInput(location, className, kind);
            }
            return javaFileObject;
        }

        @Override
        public JavaFileObject getJavaFileForOutput(Location location, String qualifiedClassName, JavaFileObject.Kind kind, FileObject sibling) throws IOException {
            JavaFileObject javaFileObject = new MyJavaFileObject(qualifiedClassName, kind, outputStream);
            fileObjects.put(qualifiedClassName, javaFileObject);
            return javaFileObject;
        }
    }

    static class MyJavaFileObject extends SimpleJavaFileObject {
        String source;
        OutputStream outputStream;
        String name;

        public MyJavaFileObject(String name, Kind kind, OutputStream stream) {
            super(URI.create("String:///" + name + kind.extension), kind);
            this.outputStream = stream;
        }

        public MyJavaFileObject(String name, String source) {
            super(URI.create("String:///" + name + Kind.SOURCE.extension), Kind.SOURCE);
            this.source = source;
            this.name = name;
        }

        public MyJavaFileObject(String name, String source, OutputStream outputStream) {
            this(name, source);
            this.outputStream = outputStream;
        }

        public MyJavaFileObject(URI uri, Kind kind) {
            super(uri, kind);
        }

        @Override
        public CharSequence getCharContent(boolean ignoreEncodingErrors) throws IOException {
            return source;
        }

        @Override
        public OutputStream openOutputStream() throws IOException {
            return outputStream;
        }
    }

最终

通过CURD将上述的ClassLoader和JavaCompilerInMemory通过自己的业务穿起来,就可以实现在网页编写java代码,并动态修改和运行

socket编程日记

socket编程日记

自定义分割包协议

注意到,socket发送出去的包最大只能为tcp的mtu长度减去包头长度,超过长度的包会分批到达

TCP保证了包到达的顺序及正确性

UDP需要自己实现包顺序及异常校验

对端如果没有分包协议会导致一次性读过头或者没读完就进行数据操作

分包协议的两种方式

包头放置数据

  • 特征值
  • 数据长度
  • 数组字段
接收端伪代码形式
header{
    len
}
while(1){
    buf[64]
    socket.recv(buf)
    read(&header,buf)
    data = new data[header.len]
    readLen = readRemainBuf(buf,data)
    socket.recv(data,(header.len-readLen))
}

包尾放置分隔符

  • 数据字段
  • 分隔符
while(1){
    buf
    socket.recvAndExpend(buf)
    while not found delimiter:
        socket.recvAndExpend(buf)
        if found delimiter:
            set delimiterIndex
    return buf[0:delimiterIndex]
}