关于数据库:网易云商七鱼智能客服自适应-ProtoStuff-数据库缓存实践

1次阅读

共计 14474 个字符,预计需要花费 37 分钟才能阅读完成。

需要背景

目前,网易云商·七鱼智能客服数据库缓存应用了 spring-data-redis 框架,并由自研的缓存组件进行治理。该组件应用 Jackson 框架对缓存数据进行序列化和反序列化,并将其以明文 JSON 的模式存储在 Redis 中。

这种形式存在两个问题:

  • 速度慢,CPU 占用高

在应用服务中,读写缓存数据时须要进行字符串的反序列化和序列化操作,行将对象转换为 JSON 格局再转换为字节数组,然而应用 Jackson 序列化形式的性能并不是最优的。此外,在线上服务剖析中发现,对于缓存命中率较高的利用,在并发略微高一点的状况下,Jackson 序列化会占用较多的 CPU 资源。

  • 存储空间大,资源节约

对于 Redis 集群来说,JSON 数据占用的存储空间较大,会节约 Redis 存储资源。

在对同类序列化框架进行调研后,咱们决定应用 ProtoStuff 代替 Jackson 框架。本文将简要介绍 ProtoStuff 的存储原理,并探讨在替换过程中遇到的一些问题。

对于 ProtoStuff

什么是 ProtoStuff?

ProtoStuff 是一种基于 Google Protocol Buffers(protobuf)协定的序列化和反序列化库,它能够将 Java 对象序列化为二进制数据并进行网络传输或存储,也能够将二进制数据反序列化为 Java 对象。与其余序列化库相比,ProtoStuff 具备更高的性能和更小的序列化大小,因为它应用了基于标记的二进制编码格局,同时防止了 Java 序列化的一些毛病,例如序列化后的数据过大和序列化性能较慢等问题。因而,ProtoStuff 被广泛应用于高性能的分布式系统和大规模数据存储系统中。

Protostuff 的序列化编码算法与 Protobuf 基本相同,都采纳基于 Varint 编码的变长序列化形式,以实现对编码后的字节数组的压缩。此外,Protostuff 还引入了 LinkedBuffer 这种数据结构,通过链表的形式将不间断内存组合起来,从而实现数据的动静扩张,进步存储效率。

Varint 编码是一种可变长度的整数编码方式,用于压缩数字数据,使其更加紧凑。它应用 1 个或多个字节来示意一个整数,其中每个字节的高位都用于批示下一个字节是否属于同一个数。较小的数字应用较少的字节编码,而较大的数字则须要更多的字节编码。这种编码方式被广泛应用于网络传输和存储畛域。

LinkedBuffer

简略看一下 LinkedBuffer 的源码:

public final class LinkedBuffer{
    /**
     * The minimum buffer size for a {@link LinkedBuffer}.
     */
    public static final int MIN_BUFFER_SIZE = 256;

    /**
     * The default buffer size for a {@link LinkedBuffer}.
     */
    public static final int DEFAULT_BUFFER_SIZE = 512;

    final byte[] buffer;

    final int start;

    int offset;

    LinkedBuffer next;       
}

byte[] buffer 是用来存储序列化过程中的字节数组的,默认的大小是 512,最低能够设置成 256。LinkedBuffer next 指向的是下一个节点。start 是开始地位,offset 是偏移量。

链表大略长这样,这样就能够把几块间断的内存块给链接到一起了。

Schema 接口

除了 LinkedBuffer 这个类,还有一个要害的接口:Schema,这是一个相似于数据库 DDL 构造的货色,它定义了序列化对象的类的构造信息,有哪些字段,字段的程序是怎么样的,怎么序列化,怎么反序列化。

在应用的时候个别用的是 RuntimeSchema 这个实现类。

public final class RuntimeSchema<T> implements Schema<T>, FieldMap<T>
{
    private final FieldMap<T> fieldMap;

    public static <T> RuntimeSchema<T> createFrom(Class<T> typeClass, Set<String> exclusions, IdStrategy strategy) {
        // 省略局部代码
        final Map<String, java.lang.reflect.Field> fieldMap = findInstanceFields(typeClass);
        final ArrayList<Field<T>> fields = new ArrayList<Field<T>>(fieldMap.size());
        int i = 0;
        boolean annotated = false;
        for (java.lang.reflect.Field f : fieldMap.values()) {if (!exclusions.contains(f.getName())) {if (f.getAnnotation(Deprecated.class) != null) {
                    i++;
                    continue;
                }
                final Tag tag = f.getAnnotation(Tag.class);
                final int fieldMapping;
                final String name;
                if (tag == null) {
                    // 省略局部代码
                    fieldMapping = ++i;
                    name = f.getName();}
                else {
                    // 省略局部代码
                    annotated = true;
                    fieldMapping = tag.value();
                    // 省略局部代码
                    name = tag.alias().isEmpty() ? f.getName() : tag.alias();
                }

                final Field<T> field = RuntimeFieldFactory.getFieldFactory(f.getType(), strategy).create(fieldMapping, name, f,                        strategy);
                fields.add(field);
            }
        }
        return new RuntimeSchema<T>(typeClass, fields, RuntimeEnv.newInstantiator(typeClass));
    }

    static void fill(Map<String, java.lang.reflect.Field> fieldMap, Class<?> typeClass) {if (Object.class != typeClass.getSuperclass())
            fill(fieldMap, typeClass.getSuperclass());

        for (java.lang.reflect.Field f : typeClass.getDeclaredFields()) {int mod = f.getModifiers();
            if (!Modifier.isStatic(mod) && !Modifier.isTransient(mod) && f.getAnnotation(Exclude.class) == null)
                fieldMap.put(f.getName(), f);
        }
    }

    @Override
    public List<Field<T>> getFields()    {return fieldMap.getFields();
    }

    @Override
    public final void writeTo(Output output, T message) throws IOException {for (Field<T> f : getFields())
            f.writeTo(output, message);
    }

}

依据 fill 办法的实现,咱们能够得悉 fieldMap 是通过调用以后类及其父类的 getDeclaredFields 办法所获取的所有字段。接着,在 createFrom 办法中,咱们遍历所有字段,获取每个字段的序列化序号 fieldMapping。在序列化过程中,咱们调用 writeTo 办法,将每个字段依照 fieldMapping 的程序写入字节数组中。

家喻户晓,Java 的 getDeclaredFields 办法返回的字段数组不是依照特定的顺序排列的。字段的程序取决于具体的 JVM 实现以及编译器等因素。因而,在不应用 Tag 注解的时候,序列化的字段程序是不固定的。如果在原有的字段两头随便插入一个字段,或者在合并代码的时候调换了字段的程序,反序列化的数据不仅会错乱,很大概率还会报错。

在 ProtoStuff 的官网文档里,举荐应用 @Tag 注解来显式的申明字段序列化的程序。Tag 注解对于小我的项目或者固定不会变的类对象的确是挺好用的,然而对于老我的项目序列化框架迁徙来说,多个代码仓库超过 400 个对象须要加 Tag 注解,代码改变量和影响范畴将会十分宏大。而且一旦有字段加了 Tag 注解,那么后续增加的所有字段都须要增加注解,并且须要保障新增字段的程序是递增的,会有肯定的保护老本和危险。

自适应 ProtoStuff 的革新计划

为了缩小序列化框架迁徙过程的代码改变范畴和危险,升高前期编码保护老本,咱们须要一个能够在序列化与反序列化时主动适配字段的革新计划。

次要思路

序列化

  • 将 getDeclaredFields 办法获取到的以后类及其父类所有的字段,依据字段名称进行排序。
  • 遍历排序后的字段列表,将字段转换成 ProtoStuff 须要的 Field 列表,再调用 RuntimeSchema 的构造方法新建一个对象。通过 RuntimeSchema 对象实现序列化操作,生成字节数组。
  • 因为 ProtoStuff 的编码是 T-L-V 格局的,只存了对象字段的下标和具体的值,没有存残缺的类门路,而且 spring-data-redis 反序列化的时候不晓得指标对象的类型,因而还须要一个包装类来存储额定的信息。
  • 对对立包装对象进行序列化,返回生成的字节数组。
  • 将缓存对象的类构造信息缓存到 Redis 中,以便反序列化时应用。

为了提供序列化的效率,还能够将 RuntimeSchema 对象缓存到本地。

反序列化

将字节数组反序列化成通用的包装类。

从包装类中获取到源数据的类门路,版本号,字段哈希值。先判断源数据类是否是汇合或者根本数据类型,如果是根本数据类型,间接返回 source 字段的内容。如果是汇合类,判断本地版本号是否与包装类获取到的版本号统一,统一的时候返回 source 字段的内容。

源数据类型既不是汇合也不是根本数据类型,获取本地对象的版本号,如果本地对象版本号大于缓存版本号,则将缓存数据淘汰掉。

如果本地对象的版本号和缓存中的版本号统一,就间接应用本地类进行转换,获取到 RuntimeSchema 进行反序列化。

如果本地对象的版本号小于缓存中的版本号,则须要依据类门路 + 缓存版本号 从 Redis 中获取到对应的类构造信息,将本地的字段进行从新排序,获取到和缓存数据对应的字段程序值,再生成相应的 RuntimeSchema 进行反序列化。

代码实现

ProtoStuff 的入门应用是很简略的,只须要引入 ProtoStuff 的依赖,而后在须要应用序列化的类字段上加上 Tag 注解即可应用。也能够不应用注解,ProtoStuff 会依据字段程序来确定缓存中的程序。

减少 Maven 依赖

<!--        protostuff        -->
        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.7.4</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.7.4</version>
        </dependency>

对立包装类

public class ProtoStuffWrapper implements Serializable {
    private static final long serialVersionUID = 6310017353904821602L;
    // 版本号
    @Tag(1)
    private int version;
    // 包装类型的残缺路径名
    @Tag(2)
    private String className;
    // 包装对象序列化后的字节数组
    @Tag(3)
    private byte[] data;
    // 是否是没有包装的类型
    @Tag(4)
    private boolean noWrapperObject;
    // 用于存储汇合对象
    @Tag(5)
    private Object source;
    // 类字段 hash
    @Tag(6)
    private int classHash;
    // 省略 get set 和 构造方法
}

对于根本数据类型和一些 Java 的根底对象,以及汇合,Map 类对象,会间接将数据放在 source 中。

重写序列化办法

实现 org.springframework.data.redis.serializer.RedisSerializer 接口,重写序列化办法。

流程图

代码

public class ProtostuffRedisSerializer implements RedisSerializer<Object> {private static final Map<String, ProtoSchema> SCHEMA_CACHE = new ConcurrentHashMap<>(200);
    private static final Map<String, Schema> REMOTE_CLASS_SCHEMA_CACHE = new ConcurrentHashMap<>(200);
    private static final Delegate<Timestamp> TIMESTAMP_DELEGATE = new TimestampDelegate();
    private static final DefaultIdStrategy ID_STRATEGY = (DefaultIdStrategy) RuntimeEnv.ID_STRATEGY;
    private static final ThreadLocal<LinkedBuffer> BUFFER = ThreadLocal.withInitial(() -> LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE));
    private static final Schema WRAPPER_SCHEMA = RuntimeSchema.getSchema(ProtoStuffWrapper.class);
    private static final int SECONDS_OF_THIRTY_DAYS = 30 * 60 * 60 * 24;
    private static final long MILLISECOND_OF_THIRTY_DAYS = SECONDS_OF_THIRTY_DAYS * 1000L;
    private final StringRedisTemplate redisTemplate;

    static {ID_STRATEGY.registerDelegate(TIMESTAMP_DELEGATE);
    }

    public ProtostuffRedisSerializer(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}

    @Override
    public byte[] serialize(Object o) throws SerializationException {if (source == null) {return EMPTY_ARRAY;}
        LinkedBuffer buffer = BUFFER.get();
        byte[] data = new byte[0];
        try {String className = getClassName(source);
            Class<?> typeClass = source.getClass();
            Object serializeObj;

            if (isBasicType(source, className) || isArrayType(typeClass)) {
                int classVersion = 0;
                if (isArrayType(typeClass)) {classVersion = readVersion(source);
                }
                serializeObj = new ProtoStuffWrapper(className, classVersion, source);
            } else {ProtoSchema protoSchema = getCachedProtoSchema(className, source);
                try {data = ProtostuffIOUtil.toByteArray(source, protoSchema.getSchema(), buffer);
                } finally {buffer.clear();
                }
                serializeObj = new ProtoStuffWrapper(className, data, protoSchema);
            }
            data = ProtostuffIOUtil.toByteArray(serializeObj, WRAPPER_SCHEMA, buffer);
        } catch (Exception e) {logger.error("protostuff serialize fail", e);
        } finally {buffer.clear();
        }
        return data;
    }

    @Override
    public Object deserialize(byte[] bytes) throws SerializationException {return deserialize(source, Object.class);
    }
}

从下面的 deserialize 办法的定义中能够看到,入参只有一个字节数组,出参是一个 Object,没有 Class 类的参数,因而必须要有一个对立的包装类来保留指标类的定义信息。

Timestamp 序列化代理

对于 Timestamp 类型的字段须要本人写一个序列化代理去解决,不然会有解析失败的问题。

public class TimestampDelegate implements Delegate<Timestamp> {
    @Override
    public WireFormat.FieldType getFieldType() {return WireFormat.FieldType.FIXED64;}

    @Override
    public Timestamp readFrom(Input input) throws IOException {return new Timestamp(input.readFixed64());
    }

    @Override
    public void writeTo(Output output, int number, Timestamp timestamp, boolean repeated) throws IOException {output.writeFixed64(number, timestamp.getTime(), repeated);
    }

    @Override
    public void transfer(Pipe pipe, Input input, Output output, int number, boolean repeated) throws IOException {output.writeFixed64(number, input.readFixed64(), repeated);
    }

    @Override
    public Class<?> typeClass() {return Timestamp.class;}
}

ProtoSchema

本地缓存对象,用来缓存序列化对象的 RuntimeSchema 和类的相干信息。

public class ProtoSchema {
    // 版本号
    private int version;
    // 类字段 hash
    private int hash;
    // 序列化对象的 RuntimeSchema
    private Schema schema;
    // 本地缓存失效开始工夫
    private long createTime;
    // 省略 get set 和 构造方法
}

getCachedProtoSchema

获取序列化对象的 RuntimeSchema 和类的相干信息。本地缓存中存在则间接应用缓存中的数据,不存在时,解析类对象,依据排序后的字段构建 RuntimeSchema 来进行序列化。

private ProtoSchema getCachedProtoSchema(String className, Object source) {ProtoSchema protoSchema = SCHEMA_CACHE.get(className);
        if (protoSchema != null) {if (protoSchema.getVersion() == 0) {
                // 根本类型包装类间接返回
                return protoSchema;
            }
            if (System.currentTimeMillis() - protoSchema.getCreateTime() < MILLISECOND_OF_THIRTY_DAYS) {
                // 本地缓存在有效期内间接返回,不在有效期的从新加载类构造信息
                return protoSchema;
            }
        }
        Class<?> typeClass = source.getClass();
        List<Field<?>> fields = new ArrayList<>();

        LinkedHashMap<String, java.lang.reflect.Field> fieldMap = new LinkedHashMap<>();
        fill(fieldMap, typeClass);
        java.lang.reflect.Field[] declaredFields = fieldMap.values().toArray(new java.lang.reflect.Field[0]);
        // 按字段名进行排序
        Arrays.sort(declaredFields, Comparator.comparing(java.lang.reflect.Field::getName));
        int length = declaredFields.length;
        List<ProtoFieldDescription> fieldDescriptionList = new ArrayList<>(length);
        java.lang.reflect.Field f;
        Class<?> type;
        io.protostuff.runtime.Field<?> field;
        ProtoFieldDescription d;
        int index = 0;
        for (java.lang.reflect.Field declaredField : declaredFields) {
            f = declaredField;
            type = f.getType();
            d = new ProtoFieldDescription(f.getName(), ++index, type.getCanonicalName());
            fieldDescriptionList.add(d);

            field = RuntimeFieldFactory.getFieldFactory(type, ID_STRATEGY).create(d.getIndex(), d.getFieldName(), f, ID_STRATEGY);
            fields.add(field);
        }
        RuntimeSchema schema = new RuntimeSchema(typeClass, fields, RuntimeEnv.newInstantiator(typeClass));

        String[] fieldNames = fieldDescriptionList.stream().map(ProtoFieldDescription::getFieldName).toArray(String[]::new);
        protoSchema = new ProtoSchema(readVersion(source), Arrays.hashCode(fieldNames), schema);

        // 本地缓存 ProtoStuffSchema
        SCHEMA_CACHE.putIfAbsent(className, protoSchema);
        // 缓存类构造信息到 Redis
        cacheFieldDescription(getCacheKey(className, protoSchema.getVersion()), JSON.toJSONString(fieldDescriptionList));

        return protoSchema;
    }

    static void fill(Map<String, java.lang.reflect.Field> fieldMap, Class<?> typeClass) {if (Object.class != typeClass.getSuperclass()) {fill(fieldMap, typeClass.getSuperclass());
        }
        for (java.lang.reflect.Field f : typeClass.getDeclaredFields()) {int mod = f.getModifiers();
            if (!Modifier.isStatic(mod) && !Modifier.isTransient(mod) && f.getAnnotation(Exclude.class) == null) {fieldMap.put(f.getName(), f);
            }

将 ProtoStuffSchema 缓存在本地,能够防止每次都反复解析类的构造,优化性能。本地缓存减少了有效期,能够保留 Redis 中的类构造信息和本地缓存中的统一,从而避免出现 Redis 中的数据过期导致老版本利用没法读取到对应版本类构造信息的状况。

RuntimeSchema(java.lang.Class, java.util.Collection<io.protostuff.runtime.field>, io.protostuff.runtime.RuntimeEnv.Instantiator) 这个构造方法是自适应的要害,正是因为有了这个构造方法,咱们能力本人构建字段的程序。

重写反序列化办法

流程图

首先,须要对字节数组进行解析,以失去相应的对立包装类。随后,须要依据缓存版本号和本地类版本号进行比拟,以确定是否须要应用缓存中的数据。

生成版本号的逻辑是:根底版本号加上类的字段数量。如果版本号雷同,咱们还须要查看类的字段哈希值,而后依据字段哈希值获取排序后的字段名的哈希值。

代码

public <T> T deserialize(byte[] source, Class<T> type) throws SerializationException {if (isEmpty(source)) {return null;}
        try {ProtoStuffWrapper wrapper = new ProtoStuffWrapper();
            ProtostuffIOUtil.mergeFrom(source, wrapper, WRAPPER_SCHEMA);

            int cacheVersion = wrapper.getVersion();
            if (wrapper.isNoWrapperObject()) {
                // 汇合数组, 根本类型包装类 缓存对象,缓存与本地版本不统一,间接淘汰掉
                if (cacheVersion == 0 || cacheVersion == inferVersion(wrapper.getSource())) {return (T) wrapper.getSource();}
                return null;
            }
            String className = wrapper.getClassName();
            if (StringUtils.isNotEmpty(className)) {Class<?> typeClass = Class.forName(className);
                ProtoSchema protoSchema = getProtoSchema(className, typeClass);

                int localVersion = protoSchema.getVersion();
                if (cacheVersion >= localVersion) {Schema cachedSchema = getCachedSchema(wrapper, typeClass, protoSchema);

                    if (cachedSchema != null) {Object newMessage = cachedSchema.newMessage();
                        ProtostuffIOUtil.mergeFrom(wrapper.getData(), newMessage, cachedSchema);
                        return (T) newMessage;
                    }
                }
            }
        } catch (Exception e) {// 缓存,本地构造不统一, 打印一个谬误日志}
        return null;
    }

    private ProtoSchema getProtoSchema(String className, Class<?> typeClass) throws InstantiationException, IllegalAccessException {ProtoSchema protoSchema = SCHEMA_CACHE.get(className);
        if (protoSchema != null) {return protoSchema;}
        return getCachedProtoSchema(className, typeClass.newInstance());
    }

    private Schema getCachedSchema(ProtoStuffWrapper wrapper, Class<?> typeClass, ProtoSchema protoSchema) {if (wrapper.getVersion() == protoSchema.getVersion()) {if (protoSchema.getHash() == wrapper.getClassHash()) {return protoSchema.getSchema();
            } else {
                // 缓存,本地构造不统一, 打印一个谬误日志
                // logger.error("正告,本地与缓存中的版本号统一,然而字段程序不统一,利用存在异样。请重新部署, className:{}", wrapper.getClassName());
            }
        }
        // 缓存中为新版本,本地为老版本
        return getSchemaFromCache(typeClass, wrapper);
    }

getCachedSchema

本地版本为老版本,缓存版本为新版本时,反序列化的时候须要先获取到 Redis 中新版本的类形容信息。为了防止反复申请 Redis,类形容信息也会在本地缓存一份数据。

 private <T> Schema<T> getSchemaFromCache(Class<?> typeClass, ProtoStuffWrapper wrapper) {String cacheKey = getCacheKey(wrapper.getClassName(), wrapper.getVersion());
        Schema schema = REMOTE_CLASS_SCHEMA_CACHE.get(cacheKey);
        if (schema != null) {return schema;}
        Map<String, ProtoFieldDescription> fieldDescriptionMap = getProtoFieldDescriptionMap(cacheKey);
        if (MapUtils.isEmpty(fieldDescriptionMap)) {return null;}
        java.lang.reflect.Field[] declaredFields = typeClass.getDeclaredFields();
        final ArrayList<io.protostuff.runtime.Field<T>> fields = new ArrayList<>(declaredFields.length);
        ProtoFieldDescription d;
        for (java.lang.reflect.Field field : declaredFields) {d = fieldDescriptionMap.get(field.getName());
            if (d != null) {Class<?> type = field.getType();
                if (Objects.equals(d.getType(), type.getCanonicalName())) {
                    // 字段类型统一
                    io.protostuff.runtime.Field<T> pField = RuntimeFieldFactory.getFieldFactory(type, ID_STRATEGY).create(d.getIndex(), d.getFieldName(), field, ID_STRATEGY);
                    fields.add(pField);
                }
            }
        }
        schema = new RuntimeSchema(typeClass, fields, RuntimeEnv.newInstantiator(typeClass));
        REMOTE_CLASS_SCHEMA_CACHE.putIfAbsent(cacheKey, schema);
        return schema;
    }

    private Map<String, ProtoFieldDescription> getProtoFieldDescriptionMap(String key) {String cache = getStringFromRedis(key);
        if (StringUtils.isEmpty(cache)) {return new ConcurrentHashMap<>();
        }
        List<ProtoFieldDescription> fieldDescriptionList = JSON.parseArray(cache, ProtoFieldDescription.class);
        if (fieldDescriptionList == null) {return new ConcurrentHashMap<>();
        }
        return fieldDescriptionList.stream().collect(Collectors.toMap(ProtoFieldDescription::getFieldName, Function.identity(), (a, b) -> b));
    }

总结

ProtoStuff 是一个十分优良的 Java 序列化框架,具备高效性、空间占用小、易用性和可扩展性等长处。

本计划在设计之初,思考到数据库缓存序列化框架作为缓存组件的一部分,须要更多地为应用的业务方思考。因而,革新计划破费了大量精力将框架做成自适应的。此举的目标在于,让接入方在应用过程中无需放心新增字段可能会引发的反序列化程序问题,也无需额定保护 Tag 标签的程序,更不须要对历史代码进行兼容革新。只有简略的降级一下依赖的二方包,就能够实现组件的降级。

附上官网文档地址:

https://protostuff.github.io/docs/protostuff-runtime/

正文完
 0