共计 20949 个字符,预计需要花费 53 分钟才能阅读完成。
序
本文主要研究一下 storm tuple 的序列化
ExecutorTransfer.tryTransfer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/executor/ExecutorTransfer.java
// Every executor has an instance of this class
public class ExecutorTransfer {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);
private final WorkerState workerData;
private final KryoTupleSerializer serializer;
private final boolean isDebug;
private int indexingBase = 0;
private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
private AtomicReferenceArray<JCQueue> queuesToFlush;
// [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance
public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
this.workerData = workerData;
this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
}
//……
// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
LOG.info(“TRANSFERRING tuple {}”, addressedTuple);
}
JCQueue localQueue = getLocalQueue(addressedTuple);
if (localQueue != null) {
return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
}
return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);
}
//……
}
ExecutorTransfer 在构造器里头创建了 KryoTupleSerializer
这里先判断目标地址是否是在 localQueue 中,如果是则进行 local transfer,否则进行 remote transfer
remote transfer 的时候调用了 workerData.tryTransferRemote,并传递了 serializer
WorkerState.tryTransferRemote
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java
/* Not a Blocking call. If cannot emit, will add ‘tuple’ to pendingEmits and return ‘false’. ‘pendingEmits’ can be null */
public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
}
WorkerState.tryTransferRemote 实际上使用的是 workerTransfer.tryTransferRemote
workerTransfer.tryTransferRemote
storm-2.0.0/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerTransfer.java
/* Not a Blocking call. If cannot emit, will add ‘tuple’ to ‘pendingEmits’ and return ‘false’. ‘pendingEmits’ can be null */
public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
if (pendingEmits != null && !pendingEmits.isEmpty()) {
pendingEmits.add(addressedTuple);
return false;
}
if (!remoteBackPressureStatus[addressedTuple.dest].get()) {
TaskMessage tm = new TaskMessage(addressedTuple.getDest(), serializer.serialize(addressedTuple.getTuple()));
if (transferQueue.tryPublish(tm)) {
return true;
}
} else {
LOG.debug(“Noticed Back Pressure in remote task {}”, addressedTuple.dest);
}
if (pendingEmits != null) {
pendingEmits.add(addressedTuple);
}
return false;
}
这里可以看到创建 TaskMessage 的时候,使用 serializer.serialize(addressedTuple.getTuple()) 对 tuple 进行了序列化;该 serializer 为 ITupleSerializer 类型,它的实现类为 KryoTupleSerializer
KryoTupleSerializer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoTupleSerializer.java
public class KryoTupleSerializer implements ITupleSerializer {
KryoValuesSerializer _kryo;
SerializationFactory.IdDictionary _ids;
Output _kryoOut;
public KryoTupleSerializer(final Map<String, Object> conf, final GeneralTopologyContext context) {
_kryo = new KryoValuesSerializer(conf);
_kryoOut = new Output(2000, 2000000000);
_ids = new SerializationFactory.IdDictionary(context.getRawTopology());
}
public byte[] serialize(Tuple tuple) {
try {
_kryoOut.clear();
_kryoOut.writeInt(tuple.getSourceTask(), true);
_kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
tuple.getMessageId().serialize(_kryoOut);
_kryo.serializeInto(tuple.getValues(), _kryoOut);
return _kryoOut.toBytes();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// public long crc32(Tuple tuple) {
// try {
// CRC32OutputStream hasher = new CRC32OutputStream();
// _kryo.serializeInto(tuple.getValues(), hasher);
// return hasher.getValue();
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
}
KryoTupleSerializer 创建了 KryoValuesSerializer,在 serialize tuple 的时候调用了_kryo.serializeInto(tuple.getValues(), _kryoOut)
KryoValuesSerializer
storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/KryoValuesSerializer.java
public class KryoValuesSerializer {
Kryo _kryo;
ListDelegate _delegate;
Output _kryoOut;
public KryoValuesSerializer(Map<String, Object> conf) {
_kryo = SerializationFactory.getKryo(conf);
_delegate = new ListDelegate();
_kryoOut = new Output(2000, 2000000000);
}
public void serializeInto(List<Object> values, Output out) {
// this ensures that list of values is always written the same way, regardless
// of whether it’s a java collection or one of clojure’s persistent collections
// (which have different serializers)
// Doing this lets us deserialize as ArrayList and avoid writing the class here
_delegate.setDelegate(values);
_kryo.writeObject(out, _delegate);
}
public byte[] serialize(List<Object> values) {
_kryoOut.clear();
serializeInto(values, _kryoOut);
return _kryoOut.toBytes();
}
public byte[] serializeObject(Object obj) {
_kryoOut.clear();
_kryo.writeClassAndObject(_kryoOut, obj);
return _kryoOut.toBytes();
}
}
KryoValuesSerializer 在构造器里头调用 SerializationFactory.getKryo(conf) 方法创建_kryo
这里的_delegate 使用的是 ListDelegate(即用它来包装一下 List<Object> values),_kryoOut 为 new Output(2000, 2000000000)
serialize 方法调用的是 serializeInto 方法,该方法最后调用的是原生的_kryo.writeObject 方法进行序列化
SerializationFactory.getKryo
storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/SerializationFactory.java
public static Kryo getKryo(Map<String, Object> conf) {
IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
Kryo k = kryoFactory.getKryo(conf);
k.register(byte[].class);
/* tuple payload serializer is specified via configuration */
String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
try {
Class serializerClass = Class.forName(payloadSerializerName);
Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
k.register(ListDelegate.class, serializer);
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);
}
k.register(ArrayList.class, new ArrayListSerializer());
k.register(HashMap.class, new HashMapSerializer());
k.register(HashSet.class, new HashSetSerializer());
k.register(BigInteger.class, new BigIntegerSerializer());
k.register(TransactionAttempt.class);
k.register(Values.class);
k.register(org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class);
k.register(org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class);
k.register(ConsList.class);
k.register(BackPressureStatus.class);
synchronized (loader) {
for (SerializationRegister sr : loader) {
try {
sr.register(k);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
kryoFactory.preRegister(k, conf);
boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);
register(k, conf.get(Config.TOPOLOGY_KRYO_REGISTER), conf, skipMissing);
kryoFactory.postRegister(k, conf);
if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
try {
Class klass = Class.forName(klassName);
IKryoDecorator decorator = (IKryoDecorator) klass.newInstance();
decorator.decorate(k);
} catch (ClassNotFoundException e) {
if (skipMissing) {
LOG.info(“Could not find kryo decorator named ” + klassName + “. Skipping registration…”);
} else {
throw new RuntimeException(e);
}
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
kryoFactory.postDecorate(k, conf);
return k;
}
public static void register(Kryo k, Object kryoRegistrations, Map<String, Object> conf, boolean skipMissing) {
Map<String, String> registrations = normalizeKryoRegister(kryoRegistrations);
for (Map.Entry<String, String> entry : registrations.entrySet()) {
String serializerClassName = entry.getValue();
try {
Class klass = Class.forName(entry.getKey());
Class serializerClass = null;
if (serializerClassName != null) {
serializerClass = Class.forName(serializerClassName);
}
if (serializerClass == null) {
k.register(klass);
} else {
k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
}
} catch (ClassNotFoundException e) {
if (skipMissing) {
LOG.info(“Could not find serialization or class for ” + serializerClassName + “. Skipping registration…”);
} else {
throw new RuntimeException(e);
}
}
}
}
SerializationFactory.getKryo 静态方法首先根据 Config.TOPOLOGY_KRYO_FACTORY 创建 IKryoFactory,默认是 org.apache.storm.serialization.DefaultKryoFactory
之后通过 IKryoFactory.getKryo 创建 Kryo,之后就是对 Kryo 进行一系列配置,这里注册了 byte[].class、ListDelegate.class、ArrayList.class、HashMap.class、HashSet.class、BigInteger.class、TransactionAttempt.class、Values.class、org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class
ListDelegate.class 为 payload 的容器,采用 Config.TOPOLOGY_TUPLE_SERIALIZER(topology.tuple.serializer,默认是 org.apache.storm.serialization.types.ListDelegateSerializer) 配置的类进行序列化
Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(topology.skip.missing.kryo.registrations,默认为 false),当 kryo 找不到配置的要序列化的 class 对应 serializers 的时候,是抛出异常还是直接跳过注册;
最后通过 Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators) 加载自定义的 serialization
DefaultKryoFactory
storm-2.0.0/storm-client/src/jvm/org/apache/storm/serialization/DefaultKryoFactory.java
public class DefaultKryoFactory implements IKryoFactory {
@Override
public Kryo getKryo(Map<String, Object> conf) {
KryoSerializableDefault k = new KryoSerializableDefault();
k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)));
k.setReferences(false);
return k;
}
@Override
public void preRegister(Kryo k, Map<String, Object> conf) {
}
public void postRegister(Kryo k, Map<String, Object> conf) {
((KryoSerializableDefault) k).overrideDefault(true);
}
@Override
public void postDecorate(Kryo k, Map<String, Object> conf) {
}
public static class KryoSerializableDefault extends Kryo {
boolean _override = false;
public void overrideDefault(boolean value) {
_override = value;
}
@Override
public Serializer getDefaultSerializer(Class type) {
if (_override) {
return new SerializableSerializer();
} else {
return super.getDefaultSerializer(type);
}
}
}
}
这里从配置读取 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization),默认该值为 true,则 registrationRequired 这里设置为 false,即序列化的时候不要求该 class 必须在已注册的列表中
Kryo
kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/Kryo.java
/** If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered
* using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
* @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
* @see ClassResolver#getRegistration(Class) */
public Registration getRegistration (Class type) {
if (type == null) throw new IllegalArgumentException(“type cannot be null.”);
Registration registration = classResolver.getRegistration(type);
if (registration == null) {
if (Proxy.isProxyClass(type)) {
// If a Proxy class, treat it like an InvocationHandler because the concrete class for a proxy is generated.
registration = getRegistration(InvocationHandler.class);
} else if (!type.isEnum() && Enum.class.isAssignableFrom(type) && !Enum.class.equals(type)) {
// This handles an enum value that is an inner class. Eg: enum A {b{}};
registration = getRegistration(type.getEnclosingClass());
} else if (EnumSet.class.isAssignableFrom(type)) {
registration = classResolver.getRegistration(EnumSet.class);
} else if (isClosure(type)) {
registration = classResolver.getRegistration(ClosureSerializer.Closure.class);
}
if (registration == null) {
if (registrationRequired) {
throw new IllegalArgumentException(unregisteredClassMessage(type));
}
if (warnUnregisteredClasses) {
warn(unregisteredClassMessage(type));
}
registration = classResolver.registerImplicit(type);
}
}
return registration;
}
/** Registers the class using the lowest, next available integer ID and the {@link Kryo#getDefaultSerializer(Class) default
* serializer}. If the class is already registered, no change will be made and the existing registration will be returned.
* Registering a primitive also affects the corresponding primitive wrapper.
* <p>
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
* using this method. The order must be the same at deserialization as it was for serialization. */
public Registration register (Class type) {
Registration registration = classResolver.getRegistration(type);
if (registration != null) return registration;
return register(type, getDefaultSerializer(type));
}
/** Returns the best matching serializer for a class. This method can be overridden to implement custom logic to choose a
* serializer. */
public Serializer getDefaultSerializer (Class type) {
if (type == null) throw new IllegalArgumentException(“type cannot be null.”);
final Serializer serializerForAnnotation = getDefaultSerializerForAnnotatedType(type);
if (serializerForAnnotation != null) return serializerForAnnotation;
for (int i = 0, n = defaultSerializers.size(); i < n; i++) {
DefaultSerializerEntry entry = defaultSerializers.get(i);
if (entry.type.isAssignableFrom(type)) {
Serializer defaultSerializer = entry.serializerFactory.makeSerializer(this, type);
return defaultSerializer;
}
}
return newDefaultSerializer(type);
}
/** Called by {@link #getDefaultSerializer(Class)} when no default serializers matched the type. Subclasses can override this
* method to customize behavior. The default implementation calls {@link SerializerFactory#makeSerializer(Kryo, Class)} using
* the {@link #setDefaultSerializer(Class) default serializer}. */
protected Serializer newDefaultSerializer (Class type) {
return defaultSerializer.makeSerializer(this, type);
}
/** Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already
* registered, the existing entry is updated with the new serializer. Registering a primitive also affects the corresponding
* primitive wrapper.
* <p>
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
* using this method. The order must be the same at deserialization as it was for serialization. */
public Registration register (Class type, Serializer serializer) {
Registration registration = classResolver.getRegistration(type);
if (registration != null) {
registration.setSerializer(serializer);
return registration;
}
return classResolver.register(new Registration(type, serializer, getNextRegistrationId()));
}
/** Returns the lowest, next available integer ID. */
public int getNextRegistrationId () {
while (nextRegisterID != -2) {
if (classResolver.getRegistration(nextRegisterID) == null) return nextRegisterID;
nextRegisterID++;
}
throw new KryoException(“No registration IDs are available.”);
}
Kryo 的 getRegistration 方法,当遇到 class 没有注册时会判断 registrationRequired,如果为 true,则抛出 IllegalArgumentException;如果为 false,则调用 classResolver.registerImplicit 进行隐式注册,同时如果 warnUnregisteredClasses 为 true 则会打印 warning 信息
Kryo 的 register 方法如果没有指定 Serializer 时,会通过 getDefaultSerializer 获取最匹配的 Serializer,如果从已经注册的 defaultSerializers 没匹配到,则调用 newDefaultSerializer 创建一个,这里可能存在无法创建的异常,会抛出 IllegalArgumentException
register(Class type, Serializer serializer) 方法最后是调用 ClassResolver.register(Registration registration) 方法,对于没有 Registration 的,这里 new 了一个,同时通过 getNextRegistrationId,给 Registration 分配一个 id
DefaultClassResolver.register
kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/util/DefaultClassResolver.java
static public final byte NAME = -1;
protected final IntMap<Registration> idToRegistration = new IntMap();
protected final ObjectMap<Class, Registration> classToRegistration = new ObjectMap();
protected IdentityObjectIntMap<Class> classToNameId;
public Registration registerImplicit (Class type) {
return register(new Registration(type, kryo.getDefaultSerializer(type), NAME));
}
public Registration register (Registration registration) {
if (registration == null) throw new IllegalArgumentException(“registration cannot be null.”);
if (registration.getId() != NAME) {
if (TRACE) {
trace(“kryo”, “Register class ID ” + registration.getId() + “: ” + className(registration.getType()) + ” (”
+ registration.getSerializer().getClass().getName() + “)”);
}
idToRegistration.put(registration.getId(), registration);
} else if (TRACE) {
trace(“kryo”, “Register class name: ” + className(registration.getType()) + ” (”
+ registration.getSerializer().getClass().getName() + “)”);
}
classToRegistration.put(registration.getType(), registration);
if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration);
return registration;
}
public Registration writeClass (Output output, Class type) {
if (type == null) {
if (TRACE || (DEBUG && kryo.getDepth() == 1)) log(“Write”, null);
output.writeVarInt(Kryo.NULL, true);
return null;
}
Registration registration = kryo.getRegistration(type);
if (registration.getId() == NAME)
writeName(output, type, registration);
else {
if (TRACE) trace(“kryo”, “Write class ” + registration.getId() + “: ” + className(type));
output.writeVarInt(registration.getId() + 2, true);
}
return registration;
}
protected void writeName (Output output, Class type, Registration registration) {
output.writeVarInt(NAME + 2, true);
if (classToNameId != null) {
int nameId = classToNameId.get(type, -1);
if (nameId != -1) {
if (TRACE) trace(“kryo”, “Write class name reference ” + nameId + “: ” + className(type));
output.writeVarInt(nameId, true);
return;
}
}
// Only write the class name the first time encountered in object graph.
if (TRACE) trace(“kryo”, “Write class name: ” + className(type));
int nameId = nextNameId++;
if (classToNameId == null) classToNameId = new IdentityObjectIntMap();
classToNameId.put(type, nameId);
output.writeVarInt(nameId, true);
output.writeString(type.getName());
}
public void reset () {
if (!kryo.isRegistrationRequired()) {
if (classToNameId != null) classToNameId.clear(2048);
if (nameIdToClass != null) nameIdToClass.clear();
nextNameId = 0;
}
}
DefaultClassResolver.register(Registration registration) 方法里头针对 registration 的 id 进行了判断,如果是 NAME(这里用 - 1 表示) 则注册到 ObjectMap<Class, Registration> classToRegistration,如果有 id 不是 NAME 的,则注册到 IntMap<Registration> idToRegistration
前面提到如果 registrationRequired 是 false,则调用 classResolver.registerImplicit 进行隐式注册,这里可以看到 registerImplicit 注册的 registration 的 id 是 NAME
registration 的 id 是 NAME 与否具体在 writeClass 中有体现 (如果要序列化的类的字段中不仅仅有基本类型,还有未注册的类,会调用这里的 writeClass 方法),从代码可以看到如果是 NAME,则使用的是 writeName;不是 NAME 的则直接使用 output.writeVarInt(registration.getId() + 2, true),写入 int;writeName 方法第一次遇到 NAME 的 class 时会给它生成一个 nameId,然后放入到 IdentityObjectIntMap<Class> classToNameId 中,然后写入 int,再写入 class.getName,第二次遇到该 class 的时候,由于 classToNameId 中已经存在 nameId,因而直接写入 int;但是 DefaultClassResolver 的 reset 方法在 registrationRequired 是 false 这种情况下会调用 classToNameId.clear(2048),进行清空或者 resize,这个时候一旦这个方法被调用,那么下次可能无法利用 classToNameId 用 id 替代 className 来序列化。
Kryo.writeObject
kryo-4.0.2-sources.jar!/com/esotericsoftware/kryo/Kryo.java
/** Writes an object using the registered serializer. */
public void writeObject (Output output, Object object) {
if (output == null) throw new IllegalArgumentException(“output cannot be null.”);
if (object == null) throw new IllegalArgumentException(“object cannot be null.”);
beginObject();
try {
if (references && writeReferenceOrNull(output, object, false)) {
getRegistration(object.getClass()).getSerializer().setGenerics(this, null);
return;
}
if (TRACE || (DEBUG && depth == 1)) log(“Write”, object);
getRegistration(object.getClass()).getSerializer().write(this, output, object);
} finally {
if (–depth == 0 && autoReset) reset();
}
}
/** Resets unregistered class names, references to previously serialized or deserialized objects, and the
* {@link #getGraphContext() graph context}. If {@link #setAutoReset(boolean) auto reset} is true, this method is called
* automatically when an object graph has been completely serialized or deserialized. If overridden, the super method must be
* called. */
public void reset () {
depth = 0;
if (graphContext != null) graphContext.clear();
classResolver.reset();
if (references) {
referenceResolver.reset();
readObject = null;
}
copyDepth = 0;
if (originalToCopy != null) originalToCopy.clear(2048);
if (TRACE) trace(“kryo”, “Object graph complete.”);
}
这里要注意一下,writeObject 方法在 finally 的时候判断如果 depth 为 0 且 autoReset 为 true,会调用 reset 方法;而 reset 方法会调用 classResolver.reset(),清空 nameIdToClass 以及 classToNameId(classToNameId.clear(2048))
小结
storm 默认是用 kryo 来进行 tuple 的序列化,storm 额外注册了 byte[].class、ListDelegate.class、ArrayList.class、HashMap.class、HashSet.class、BigInteger.class、TransactionAttempt.class、Values.class、org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class 等类型
Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization) 如果为 true,则 kryo.setRegistrationRequired(false),也就是如果一个 class 没有在 kryo 进行注册,不会抛异常;这个命名可能存在歧义 (不是使用 java 自身的序列化机制来进行 fallback),它实际上要表达的是对于遇到没有注册的 class 要不要 fallback,如果不 fallback 则直接抛异常,如果 fallback,则会进行隐式注册,在 classToNameId 不会被 reset 的前提下,第一次使用 className 来序列化,同时分配一个 id 写入 classToNameId,第二次则直接使用 classToNameId 中获取到的 id,也就相当于手工注册的效果
Config.TOPOLOGY_TUPLE_SERIALIZER(topology.tuple.serializer,默认是 org.apache.storm.serialization.types.ListDelegateSerializer) 用于配置 tuple 的 payload 的序列化类
Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators) 用于加载自定义的 serialization,可以直接通过 Config.registerDecorator 注册一个 IKryoDecorator,在 decorate 方法中对 Kyro 注册要序列化的 class
Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(topology.skip.missing.kryo.registrations,默认为 false) 这个属性容易跟 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization) 混淆起来,前者是 storm 自身的属性而后者 storm 包装的 kryo 的属性 (registrationRequired);Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS 配置的是在有自定义 Config.TOPOLOGY_KRYO_DECORATORS 的场景下,如果 storm 加载不到用户自定义的 IKryoDecorator 类时是 skip 还是抛异常
Kryo 的 registrationRequired 为 false 的话,则会自动对未注册的 class 进行隐式注册 (注册到 classToNameId),只在第一次序列化的时候使用 className,之后都用 id 替代,来节省空间;不过要注意的是如果 Kryo 的 autoReset 为 true 的话,那么 classToNameId 会被 reset,因而隐式注册在非第一次遇到未注册的 class 的时候并不能一直走使用 id 代替 className 来序列化
doc
Serialization
Spark 调优之 Data Serialization
Spark 2.0.2, double[], 使用 Kyro 序列化加速,和手动注册类名