聊聊storm tuple的序列化


本文主要研究一下 storm tuple 的序列化
// Every executor has an instance of this class
public class ExecutorTransfer {
private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);

private final WorkerState workerData;
private final KryoTupleSerializer serializer;
private final boolean isDebug;
private int indexingBase = 0;
private ArrayList<JCQueue> localReceiveQueues; // [taskId-indexingBase] => queue : List of all recvQs local to this worker
private AtomicReferenceArray<JCQueue> queuesToFlush;
// [taskId-indexingBase] => queue, some entries can be null. : outbound Qs for this executor instance

public ExecutorTransfer(WorkerState workerData, Map<String, Object> topoConf) {
this.workerData = workerData;
this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);


// adds addressedTuple to destination Q if it is not full. else adds to pendingEmits (if its not null)
public boolean tryTransfer(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits) {
if (isDebug) {
LOG.info(“TRANSFERRING tuple {}”, addressedTuple);

JCQueue localQueue = getLocalQueue(addressedTuple);
if (localQueue != null) {
return tryTransferLocal(addressedTuple, localQueue, pendingEmits);
return workerData.tryTransferRemote(addressedTuple, pendingEmits, serializer);


ExecutorTransfer 在构造器里头创建了 KryoTupleSerializer
这里先判断目标地址是否是在 localQueue 中,如果是则进行 local transfer,否则进行 remote transfer
remote transfer 的时候调用了 workerData.tryTransferRemote,并传递了 serializer

/* Not a Blocking call. If cannot emit, will add ‘tuple’ to pendingEmits and return ‘false’. ‘pendingEmits’ can be null */
public boolean tryTransferRemote(AddressedTuple tuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
return workerTransfer.tryTransferRemote(tuple, pendingEmits, serializer);
WorkerState.tryTransferRemote 实际上使用的是 workerTransfer.tryTransferRemote
/* Not a Blocking call. If cannot emit, will add ‘tuple’ to ‘pendingEmits’ and return ‘false’. ‘pendingEmits’ can be null */
public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> pendingEmits, ITupleSerializer serializer) {
if (pendingEmits != null && !pendingEmits.isEmpty()) {
return false;

if (!remoteBackPressureStatus[addressedTuple.dest].get()) {
TaskMessage tm = new TaskMessage(addressedTuple.getDest(), serializer.serialize(addressedTuple.getTuple()));
if (transferQueue.tryPublish(tm)) {
return true;
} else {
LOG.debug(“Noticed Back Pressure in remote task {}”, addressedTuple.dest);
if (pendingEmits != null) {
return false;
这里可以看到创建 TaskMessage 的时候,使用 serializer.serialize(addressedTuple.getTuple()) 对 tuple 进行了序列化;该 serializer 为 ITupleSerializer 类型,它的实现类为 KryoTupleSerializer
public class KryoTupleSerializer implements ITupleSerializer {
KryoValuesSerializer _kryo;
SerializationFactory.IdDictionary _ids;
Output _kryoOut;

public KryoTupleSerializer(final Map<String, Object> conf, final GeneralTopologyContext context) {
_kryo = new KryoValuesSerializer(conf);
_kryoOut = new Output(2000, 2000000000);
_ids = new SerializationFactory.IdDictionary(context.getRawTopology());

public byte[] serialize(Tuple tuple) {
try {

_kryoOut.writeInt(tuple.getSourceTask(), true);
_kryoOut.writeInt(_ids.getStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId()), true);
_kryo.serializeInto(tuple.getValues(), _kryoOut);
return _kryoOut.toBytes();
} catch (IOException e) {
throw new RuntimeException(e);

// public long crc32(Tuple tuple) {
// try {
// CRC32OutputStream hasher = new CRC32OutputStream();
// _kryo.serializeInto(tuple.getValues(), hasher);
// return hasher.getValue();
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// }
KryoTupleSerializer 创建了 KryoValuesSerializer,在 serialize tuple 的时候调用了_kryo.serializeInto(tuple.getValues(), _kryoOut)
public class KryoValuesSerializer {
Kryo _kryo;
ListDelegate _delegate;
Output _kryoOut;

public KryoValuesSerializer(Map<String, Object> conf) {
_kryo = SerializationFactory.getKryo(conf);
_delegate = new ListDelegate();
_kryoOut = new Output(2000, 2000000000);

public void serializeInto(List<Object> values, Output out) {
// this ensures that list of values is always written the same way, regardless
// of whether it’s a java collection or one of clojure’s persistent collections
// (which have different serializers)
// Doing this lets us deserialize as ArrayList and avoid writing the class here
_kryo.writeObject(out, _delegate);

public byte[] serialize(List<Object> values) {
serializeInto(values, _kryoOut);
return _kryoOut.toBytes();

public byte[] serializeObject(Object obj) {
_kryo.writeClassAndObject(_kryoOut, obj);
return _kryoOut.toBytes();

KryoValuesSerializer 在构造器里头调用 SerializationFactory.getKryo(conf) 方法创建_kryo
这里的_delegate 使用的是 ListDelegate(即用它来包装一下 List<Object> values),_kryoOut 为 new Output(2000, 2000000000)
serialize 方法调用的是 serializeInto 方法,该方法最后调用的是原生的_kryo.writeObject 方法进行序列化

public static Kryo getKryo(Map<String, Object> conf) {
IKryoFactory kryoFactory = (IKryoFactory) ReflectionUtils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
Kryo k = kryoFactory.getKryo(conf);

/* tuple payload serializer is specified via configuration */
String payloadSerializerName = (String) conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
try {
Class serializerClass = Class.forName(payloadSerializerName);
Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
k.register(ListDelegate.class, serializer);
} catch (ClassNotFoundException ex) {
throw new RuntimeException(ex);

k.register(ArrayList.class, new ArrayListSerializer());
k.register(HashMap.class, new HashMapSerializer());
k.register(HashSet.class, new HashSetSerializer());
k.register(BigInteger.class, new BigIntegerSerializer());

synchronized (loader) {
for (SerializationRegister sr : loader) {
try {
} catch (Exception e) {
throw new RuntimeException(e);

kryoFactory.preRegister(k, conf);

boolean skipMissing = (Boolean) conf.get(Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS);

register(k, conf.get(Config.TOPOLOGY_KRYO_REGISTER), conf, skipMissing);

kryoFactory.postRegister(k, conf);

if (conf.get(Config.TOPOLOGY_KRYO_DECORATORS) != null) {
for (String klassName : (List<String>) conf.get(Config.TOPOLOGY_KRYO_DECORATORS)) {
try {
Class klass = Class.forName(klassName);
IKryoDecorator decorator = (IKryoDecorator) klass.newInstance();
} catch (ClassNotFoundException e) {
if (skipMissing) {
LOG.info(“Could not find kryo decorator named ” + klassName + “. Skipping registration…”);
} else {
throw new RuntimeException(e);
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);

kryoFactory.postDecorate(k, conf);

return k;

public static void register(Kryo k, Object kryoRegistrations, Map<String, Object> conf, boolean skipMissing) {
Map<String, String> registrations = normalizeKryoRegister(kryoRegistrations);
for (Map.Entry<String, String> entry : registrations.entrySet()) {
String serializerClassName = entry.getValue();
try {
Class klass = Class.forName(entry.getKey());
Class serializerClass = null;
if (serializerClassName != null) {
serializerClass = Class.forName(serializerClassName);
if (serializerClass == null) {
} else {
k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
} catch (ClassNotFoundException e) {
if (skipMissing) {
LOG.info(“Could not find serialization or class for ” + serializerClassName + “. Skipping registration…”);
} else {
throw new RuntimeException(e);

SerializationFactory.getKryo 静态方法首先根据 Config.TOPOLOGY_KRYO_FACTORY 创建 IKryoFactory,默认是 org.apache.storm.serialization.DefaultKryoFactory
之后通过 IKryoFactory.getKryo 创建 Kryo,之后就是对 Kryo 进行一系列配置,这里注册了 byte[].class、ListDelegate.class、ArrayList.class、HashMap.class、HashSet.class、BigInteger.class、TransactionAttempt.class、Values.class、org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class
ListDelegate.class 为 payload 的容器,采用 Config.TOPOLOGY_TUPLE_SERIALIZER(topology.tuple.serializer,默认是 org.apache.storm.serialization.types.ListDelegateSerializer) 配置的类进行序列化
Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(topology.skip.missing.kryo.registrations,默认为 false),当 kryo 找不到配置的要序列化的 class 对应 serializers 的时候,是抛出异常还是直接跳过注册;
最后通过 Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators) 加载自定义的 serialization

public class DefaultKryoFactory implements IKryoFactory {

public Kryo getKryo(Map<String, Object> conf) {
KryoSerializableDefault k = new KryoSerializableDefault();
k.setRegistrationRequired(!((Boolean) conf.get(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION)));
return k;

public void preRegister(Kryo k, Map<String, Object> conf) {

public void postRegister(Kryo k, Map<String, Object> conf) {
((KryoSerializableDefault) k).overrideDefault(true);

public void postDecorate(Kryo k, Map<String, Object> conf) {

public static class KryoSerializableDefault extends Kryo {
boolean _override = false;

public void overrideDefault(boolean value) {
_override = value;

public Serializer getDefaultSerializer(Class type) {
if (_override) {
return new SerializableSerializer();
} else {
return super.getDefaultSerializer(type);
这里从配置读取 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization),默认该值为 true,则 registrationRequired 这里设置为 false,即序列化的时候不要求该 class 必须在已注册的列表中
/** If the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is false, it is automatically registered
* using the {@link Kryo#addDefaultSerializer(Class, Class) default serializer}.
* @throws IllegalArgumentException if the class is not registered and {@link Kryo#setRegistrationRequired(boolean)} is true.
* @see ClassResolver#getRegistration(Class) */
public Registration getRegistration (Class type) {
if (type == null) throw new IllegalArgumentException(“type cannot be null.”);

Registration registration = classResolver.getRegistration(type);
if (registration == null) {
if (Proxy.isProxyClass(type)) {
// If a Proxy class, treat it like an InvocationHandler because the concrete class for a proxy is generated.
registration = getRegistration(InvocationHandler.class);
} else if (!type.isEnum() && Enum.class.isAssignableFrom(type) && !Enum.class.equals(type)) {
// This handles an enum value that is an inner class. Eg: enum A {b{}};
registration = getRegistration(type.getEnclosingClass());
} else if (EnumSet.class.isAssignableFrom(type)) {
registration = classResolver.getRegistration(EnumSet.class);
} else if (isClosure(type)) {
registration = classResolver.getRegistration(ClosureSerializer.Closure.class);
if (registration == null) {
if (registrationRequired) {
throw new IllegalArgumentException(unregisteredClassMessage(type));
if (warnUnregisteredClasses) {
registration = classResolver.registerImplicit(type);
return registration;

/** Registers the class using the lowest, next available integer ID and the {@link Kryo#getDefaultSerializer(Class) default
* serializer}. If the class is already registered, no change will be made and the existing registration will be returned.
* Registering a primitive also affects the corresponding primitive wrapper.
* <p>
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
* using this method. The order must be the same at deserialization as it was for serialization. */
public Registration register (Class type) {
Registration registration = classResolver.getRegistration(type);
if (registration != null) return registration;
return register(type, getDefaultSerializer(type));

/** Returns the best matching serializer for a class. This method can be overridden to implement custom logic to choose a
* serializer. */
public Serializer getDefaultSerializer (Class type) {
if (type == null) throw new IllegalArgumentException(“type cannot be null.”);

final Serializer serializerForAnnotation = getDefaultSerializerForAnnotatedType(type);
if (serializerForAnnotation != null) return serializerForAnnotation;

for (int i = 0, n = defaultSerializers.size(); i < n; i++) {
DefaultSerializerEntry entry = defaultSerializers.get(i);
if (entry.type.isAssignableFrom(type)) {
Serializer defaultSerializer = entry.serializerFactory.makeSerializer(this, type);
return defaultSerializer;

return newDefaultSerializer(type);

/** Called by {@link #getDefaultSerializer(Class)} when no default serializers matched the type. Subclasses can override this
* method to customize behavior. The default implementation calls {@link SerializerFactory#makeSerializer(Kryo, Class)} using
* the {@link #setDefaultSerializer(Class) default serializer}. */
protected Serializer newDefaultSerializer (Class type) {
return defaultSerializer.makeSerializer(this, type);

/** Registers the class using the lowest, next available integer ID and the specified serializer. If the class is already
* registered, the existing entry is updated with the new serializer. Registering a primitive also affects the corresponding
* primitive wrapper.
* <p>
* Because the ID assigned is affected by the IDs registered before it, the order classes are registered is important when
* using this method. The order must be the same at deserialization as it was for serialization. */
public Registration register (Class type, Serializer serializer) {
Registration registration = classResolver.getRegistration(type);
if (registration != null) {
return registration;
return classResolver.register(new Registration(type, serializer, getNextRegistrationId()));

/** Returns the lowest, next available integer ID. */
public int getNextRegistrationId () {
while (nextRegisterID != -2) {
if (classResolver.getRegistration(nextRegisterID) == null) return nextRegisterID;
throw new KryoException(“No registration IDs are available.”);

Kryo 的 getRegistration 方法,当遇到 class 没有注册时会判断 registrationRequired,如果为 true,则抛出 IllegalArgumentException;如果为 false,则调用 classResolver.registerImplicit 进行隐式注册,同时如果 warnUnregisteredClasses 为 true 则会打印 warning 信息
Kryo 的 register 方法如果没有指定 Serializer 时,会通过 getDefaultSerializer 获取最匹配的 Serializer,如果从已经注册的 defaultSerializers 没匹配到,则调用 newDefaultSerializer 创建一个,这里可能存在无法创建的异常,会抛出 IllegalArgumentException
register(Class type, Serializer serializer) 方法最后是调用 ClassResolver.register(Registration registration) 方法,对于没有 Registration 的,这里 new 了一个,同时通过 getNextRegistrationId,给 Registration 分配一个 id

static public final byte NAME = -1;

protected final IntMap<Registration> idToRegistration = new IntMap();
protected final ObjectMap<Class, Registration> classToRegistration = new ObjectMap();
protected IdentityObjectIntMap<Class> classToNameId;

public Registration registerImplicit (Class type) {
return register(new Registration(type, kryo.getDefaultSerializer(type), NAME));

public Registration register (Registration registration) {
if (registration == null) throw new IllegalArgumentException(“registration cannot be null.”);
if (registration.getId() != NAME) {
if (TRACE) {
trace(“kryo”, “Register class ID ” + registration.getId() + “: ” + className(registration.getType()) + ” (”
+ registration.getSerializer().getClass().getName() + “)”);
idToRegistration.put(registration.getId(), registration);
} else if (TRACE) {
trace(“kryo”, “Register class name: ” + className(registration.getType()) + ” (”
+ registration.getSerializer().getClass().getName() + “)”);
classToRegistration.put(registration.getType(), registration);
if (registration.getType().isPrimitive()) classToRegistration.put(getWrapperClass(registration.getType()), registration);
return registration;

public Registration writeClass (Output output, Class type) {
if (type == null) {
if (TRACE || (DEBUG && kryo.getDepth() == 1)) log(“Write”, null);
output.writeVarInt(Kryo.NULL, true);
return null;
Registration registration = kryo.getRegistration(type);
if (registration.getId() == NAME)
writeName(output, type, registration);
else {
if (TRACE) trace(“kryo”, “Write class ” + registration.getId() + “: ” + className(type));
output.writeVarInt(registration.getId() + 2, true);
return registration;

protected void writeName (Output output, Class type, Registration registration) {
output.writeVarInt(NAME + 2, true);
if (classToNameId != null) {
int nameId = classToNameId.get(type, -1);
if (nameId != -1) {
if (TRACE) trace(“kryo”, “Write class name reference ” + nameId + “: ” + className(type));
output.writeVarInt(nameId, true);
// Only write the class name the first time encountered in object graph.
if (TRACE) trace(“kryo”, “Write class name: ” + className(type));
int nameId = nextNameId++;
if (classToNameId == null) classToNameId = new IdentityObjectIntMap();
classToNameId.put(type, nameId);
output.writeVarInt(nameId, true);

public void reset () {
if (!kryo.isRegistrationRequired()) {
if (classToNameId != null) classToNameId.clear(2048);
if (nameIdToClass != null) nameIdToClass.clear();
nextNameId = 0;

DefaultClassResolver.register(Registration registration) 方法里头针对 registration 的 id 进行了判断,如果是 NAME(这里用 - 1 表示) 则注册到 ObjectMap<Class, Registration> classToRegistration,如果有 id 不是 NAME 的,则注册到 IntMap<Registration> idToRegistration
前面提到如果 registrationRequired 是 false,则调用 classResolver.registerImplicit 进行隐式注册,这里可以看到 registerImplicit 注册的 registration 的 id 是 NAME
registration 的 id 是 NAME 与否具体在 writeClass 中有体现 (如果要序列化的类的字段中不仅仅有基本类型,还有未注册的类,会调用这里的 writeClass 方法),从代码可以看到如果是 NAME,则使用的是 writeName;不是 NAME 的则直接使用 output.writeVarInt(registration.getId() + 2, true),写入 int;writeName 方法第一次遇到 NAME 的 class 时会给它生成一个 nameId,然后放入到 IdentityObjectIntMap<Class> classToNameId 中,然后写入 int,再写入 class.getName,第二次遇到该 class 的时候,由于 classToNameId 中已经存在 nameId,因而直接写入 int;但是 DefaultClassResolver 的 reset 方法在 registrationRequired 是 false 这种情况下会调用 classToNameId.clear(2048),进行清空或者 resize,这个时候一旦这个方法被调用,那么下次可能无法利用 classToNameId 用 id 替代 className 来序列化。

/** Writes an object using the registered serializer. */
public void writeObject (Output output, Object object) {
if (output == null) throw new IllegalArgumentException(“output cannot be null.”);
if (object == null) throw new IllegalArgumentException(“object cannot be null.”);
try {
if (references && writeReferenceOrNull(output, object, false)) {
getRegistration(object.getClass()).getSerializer().setGenerics(this, null);
if (TRACE || (DEBUG && depth == 1)) log(“Write”, object);
getRegistration(object.getClass()).getSerializer().write(this, output, object);
} finally {
if (–depth == 0 && autoReset) reset();

/** Resets unregistered class names, references to previously serialized or deserialized objects, and the
* {@link #getGraphContext() graph context}. If {@link #setAutoReset(boolean) auto reset} is true, this method is called
* automatically when an object graph has been completely serialized or deserialized. If overridden, the super method must be
* called. */
public void reset () {
depth = 0;
if (graphContext != null) graphContext.clear();
if (references) {
readObject = null;

copyDepth = 0;
if (originalToCopy != null) originalToCopy.clear(2048);

if (TRACE) trace(“kryo”, “Object graph complete.”);
这里要注意一下,writeObject 方法在 finally 的时候判断如果 depth 为 0 且 autoReset 为 true,会调用 reset 方法;而 reset 方法会调用 classResolver.reset(),清空 nameIdToClass 以及 classToNameId(classToNameId.clear(2048))

storm 默认是用 kryo 来进行 tuple 的序列化,storm 额外注册了 byte[].class、ListDelegate.class、ArrayList.class、HashMap.class、HashSet.class、BigInteger.class、TransactionAttempt.class、Values.class、org.apache.storm.metric.api.IMetricsConsumer.DataPoint.class、org.apache.storm.metric.api.IMetricsConsumer.TaskInfo.class、ConsList.class、BackPressureStatus.class 等类型
Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization) 如果为 true,则 kryo.setRegistrationRequired(false),也就是如果一个 class 没有在 kryo 进行注册,不会抛异常;这个命名可能存在歧义 (不是使用 java 自身的序列化机制来进行 fallback),它实际上要表达的是对于遇到没有注册的 class 要不要 fallback,如果不 fallback 则直接抛异常,如果 fallback,则会进行隐式注册,在 classToNameId 不会被 reset 的前提下,第一次使用 className 来序列化,同时分配一个 id 写入 classToNameId,第二次则直接使用 classToNameId 中获取到的 id,也就相当于手工注册的效果
Config.TOPOLOGY_TUPLE_SERIALIZER(topology.tuple.serializer,默认是 org.apache.storm.serialization.types.ListDelegateSerializer) 用于配置 tuple 的 payload 的序列化类
Config.TOPOLOGY_KRYO_DECORATORS(topology.kryo.decorators) 用于加载自定义的 serialization,可以直接通过 Config.registerDecorator 注册一个 IKryoDecorator,在 decorate 方法中对 Kyro 注册要序列化的 class
Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS(topology.skip.missing.kryo.registrations,默认为 false) 这个属性容易跟 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION(topology.fall.back.on.java.serialization) 混淆起来,前者是 storm 自身的属性而后者 storm 包装的 kryo 的属性 (registrationRequired);Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS 配置的是在有自定义 Config.TOPOLOGY_KRYO_DECORATORS 的场景下,如果 storm 加载不到用户自定义的 IKryoDecorator 类时是 skip 还是抛异常
Kryo 的 registrationRequired 为 false 的话,则会自动对未注册的 class 进行隐式注册 (注册到 classToNameId),只在第一次序列化的时候使用 className,之后都用 id 替代,来节省空间;不过要注意的是如果 Kryo 的 autoReset 为 true 的话,那么 classToNameId 会被 reset,因而隐式注册在非第一次遇到未注册的 class 的时候并不能一直走使用 id 代替 className 来序列化


