乐趣区

[case43]聊聊storm的LinearDRPCTopologyBuilder


本文主要研究一下 storm 的 LinearDRPCTopologyBuilder
实例
manual drpc
@Test
public void testManualDRPC() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
TopologyBuilder builder = new TopologyBuilder();
DRPCSpout spout = new DRPCSpout(“exclamation”); //Fields(“args”, “return-info”)
//spout 为 DRPCSpout,组件 id 为 drpc
builder.setSpout(“drpc”, spout);
builder.setBolt(“exclaim”, new ManualExclaimBolt(), 3).shuffleGrouping(“drpc”); //Fields(“result”, “return-info”)
builder.setBolt(“return”, new ReturnResults(), 3).shuffleGrouping(“exclaim”);
SubmitHelper.submitRemote(“manualDrpc”,builder.createTopology());
}

这里展示了最原始的 drpc 的 topology 的构建,开始使用 DRPCSpout,结束使用 ReturnResults
DRPCSpout 的 outputFields 为 Fields(“args”, “return-info”),ReturnResults 接收的 fields 为 Fields(“result”, “return-info”)
这里要求自定义的 ManualExclaimBolt 的 outputFields 为 Fields 为 Fields(“result”, “return-info”),其中 return-info 可以从 input 中获取,而 result 则会处理结果

使用 LinearDRPCTopologyBuilder
@Test
public void testBasicDRPCTopology() throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder(“exclamation”);
builder.addBolt(new ExclaimBolt(), 3);
SubmitHelper.submitRemote(“basicDrpc”,builder.createRemoteTopology());
}

LinearDRPCTopologyBuilder 自动帮你构建了 DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,在使用上极为简洁
由于构造的 component 上下游不同,因而对用户自定义的 bolt 的要求为输入字段为 Fields(“request”, “args”),输出字段为 new Fields(“id”, “result”),其中前者的 request 即为 requestId,即为后者的 id,是 long 型;args 为输入参数,result 为输出结果

LinearDRPCTopologyBuilder
storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/LinearDRPCTopologyBuilder.java
public class LinearDRPCTopologyBuilder {
String function;
List<Component> components = new ArrayList<>();

public LinearDRPCTopologyBuilder(String function) {
this.function = function;
}

private static String boltId(int index) {
return “bolt” + index;
}

public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt, Number parallelism) {
return addBolt(new BatchBoltExecutor(bolt), parallelism);
}

public LinearDRPCInputDeclarer addBolt(IBatchBolt bolt) {
return addBolt(bolt, 1);
}

@Deprecated
public LinearDRPCInputDeclarer addBolt(IRichBolt bolt, Number parallelism) {
if (parallelism == null) {
parallelism = 1;
}
Component component = new Component(bolt, parallelism.intValue());
components.add(component);
return new InputDeclarerImpl(component);
}

@Deprecated
public LinearDRPCInputDeclarer addBolt(IRichBolt bolt) {
return addBolt(bolt, null);
}

public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt, Number parallelism) {
return addBolt(new BasicBoltExecutor(bolt), parallelism);
}

public LinearDRPCInputDeclarer addBolt(IBasicBolt bolt) {
return addBolt(bolt, null);
}

public StormTopology createLocalTopology(ILocalDRPC drpc) {
return createTopology(new DRPCSpout(function, drpc));
}

public StormTopology createRemoteTopology() {
return createTopology(new DRPCSpout(function));
}

private StormTopology createTopology(DRPCSpout spout) {
final String SPOUT_ID = “spout”;
final String PREPARE_ID = “prepare-request”;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SPOUT_ID, spout);
builder.setBolt(PREPARE_ID, new PrepareRequest())
.noneGrouping(SPOUT_ID);
int i = 0;
for (; i < components.size(); i++) {
Component component = components.get(i);

Map<String, SourceArgs> source = new HashMap<String, SourceArgs>();
if (i == 1) {
source.put(boltId(i – 1), SourceArgs.single());
} else if (i >= 2) {
source.put(boltId(i – 1), SourceArgs.all());
}
IdStreamSpec idSpec = null;
if (i == components.size() – 1 && component.bolt instanceof FinishedCallback) {
idSpec = IdStreamSpec.makeDetectSpec(PREPARE_ID, PrepareRequest.ID_STREAM);
}
BoltDeclarer declarer = builder.setBolt(
boltId(i),
new CoordinatedBolt(component.bolt, source, idSpec),
component.parallelism);

for (SharedMemory request : component.sharedMemory) {
declarer.addSharedMemory(request);
}

if (!component.componentConf.isEmpty()) {
declarer.addConfigurations(component.componentConf);
}

if (idSpec != null) {
declarer.fieldsGrouping(idSpec.getGlobalStreamId().get_componentId(), PrepareRequest.ID_STREAM, new Fields(“request”));
}
if (i == 0 && component.declarations.isEmpty()) {
declarer.noneGrouping(PREPARE_ID, PrepareRequest.ARGS_STREAM);
} else {
String prevId;
if (i == 0) {
prevId = PREPARE_ID;
} else {
prevId = boltId(i – 1);
}
for (InputDeclaration declaration : component.declarations) {
declaration.declare(prevId, declarer);
}
}
if (i > 0) {
declarer.directGrouping(boltId(i – 1), Constants.COORDINATED_STREAM_ID);
}
}

IRichBolt lastBolt = components.get(components.size() – 1).bolt;
OutputFieldsGetter getter = new OutputFieldsGetter();
lastBolt.declareOutputFields(getter);
Map<String, StreamInfo> streams = getter.getFieldsDeclaration();
if (streams.size() != 1) {
throw new RuntimeException(“Must declare exactly one stream from last bolt in LinearDRPCTopology”);
}
String outputStream = streams.keySet().iterator().next();
List<String> fields = streams.get(outputStream).get_output_fields();
if (fields.size() != 2) {
throw new RuntimeException(
“Output stream of last component in LinearDRPCTopology must contain exactly two fields. ”
+ “The first should be the request id, and the second should be the result.”);
}

builder.setBolt(boltId(i), new JoinResult(PREPARE_ID))
.fieldsGrouping(boltId(i – 1), outputStream, new Fields(fields.get(0)))
.fieldsGrouping(PREPARE_ID, PrepareRequest.RETURN_STREAM, new Fields(“request”));
i++;
builder.setBolt(boltId(i), new ReturnResults())
.noneGrouping(boltId(i – 1));
return builder.createTopology();
}

//……
}

从 createTopology 可以看到,构建的 spout 为 DRPCSpout(spout),之后是 PrepareRequest(prepare-request)
之后根据用户设置的 bolt,包装构建 CoordinatedBolt,如果有多个 bolt 的话,会对第二个及之后的 bolt 设置 directGrouping(boltId(i – 1), Constants.COORDINATED_STREAM_ID),用 emitDirect 发射 Fields(“id”, “count”)
构建完用户设置的 bolt 之后,构建 JoinResult,最后才是 ReturnResults

DRPCSpout
storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/DRPCSpout.java
public class DRPCSpout extends BaseRichSpout {
public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
//ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
static final long serialVersionUID = 2387848310969237877L;
final String _function;
final String _local_drpc_id;
SpoutOutputCollector _collector;
List<DRPCInvocationsClient> _clients = new ArrayList<>();
transient LinkedList<Future<Void>> _futures = null;
transient ExecutorService _backround = null;

public DRPCSpout(String function) {
_function = function;
if (DRPCClient.isLocalOverride()) {
_local_drpc_id = DRPCClient.getOverrideServiceId();
} else {
_local_drpc_id = null;
}
}

//……

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
if (_local_drpc_id == null) {
_backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
_futures = new LinkedList<>();

int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
int index = context.getThisTaskIndex();

int port = ObjectReader.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
if (servers == null || servers.isEmpty()) {
throw new RuntimeException(“No DRPC servers configured for topology”);
}

if (numTasks < servers.size()) {
for (String s : servers) {
_futures.add(_backround.submit(new Adder(s, port, conf)));
}
} else {
int i = index % servers.size();
_futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
}
}

}

@Override
public void close() {
for (DRPCInvocationsClient client : _clients) {
client.close();
}
}

@Override
public void nextTuple() {
if (_local_drpc_id == null) {
int size = 0;
synchronized (_clients) {
size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
}
for (int i = 0; i < size; i++) {
DRPCInvocationsClient client;
synchronized (_clients) {
client = _clients.get(i);
}
if (!client.isConnected()) {
LOG.warn(“DRPCInvocationsClient [{}:{}] is not connected.”, client.getHost(), client.getPort());
reconnectAsync(client);
continue;
}
try {
DRPCRequest req = client.fetchRequest(_function);
if (req.get_request_id().length() > 0) {
Map<String, Object> returnInfo = new HashMap<>();
returnInfo.put(“id”, req.get_request_id());
returnInfo.put(“host”, client.getHost());
returnInfo.put(“port”, client.getPort());
_collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)),
new DRPCMessageId(req.get_request_id(), i));
break;
}
} catch (AuthorizationException aze) {
reconnectAsync(client);
LOG.error(“Not authorized to fetch DRPC request from DRPC server”, aze);
} catch (TException e) {
reconnectAsync(client);
LOG.error(“Failed to fetch DRPC request from DRPC server”, e);
} catch (Exception e) {
LOG.error(“Failed to fetch DRPC request from DRPC server”, e);
}
}
checkFutures();
} else {
//……
}
}

@Override
public void ack(Object msgId) {
}

@Override
public void fail(Object msgId) {
DRPCMessageId did = (DRPCMessageId) msgId;
DistributedRPCInvocations.Iface client;

if (_local_drpc_id == null) {
client = _clients.get(did.index);
} else {
client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
}

int retryCnt = 0;
int maxRetries = 3;

while (retryCnt < maxRetries) {
retryCnt++;
try {
client.failRequest(did.id);
break;
} catch (AuthorizationException aze) {
LOG.error(“Not authorized to failRequest from DRPC server”, aze);
throw new RuntimeException(aze);
} catch (TException tex) {
if (retryCnt >= maxRetries) {
LOG.error(“Failed to fail request”, tex);
break;
}
reconnectSync((DRPCInvocationsClient) client);
}
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“args”, “return-info”));
}
//……
}

open 的时候准备 DRPCInvocationsClient
nextTuple 方法通过 DRPCInvocationsClient.fetchRequest(_function)获取 DRPCRequest 信息
之后构建 returnInfo 然后 emit 数据,msgId 为 DRPCMessageId,tuple 为 Values(req.get_func_args(), JSONValue.toJSONString(returnInfo))
这里重写了 fail 方法,对于请求失败,进行重试,默认重试 3 次

PrepareRequest
storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/PrepareRequest.java
public class PrepareRequest extends BaseBasicBolt {
public static final String ARGS_STREAM = Utils.DEFAULT_STREAM_ID;
public static final String RETURN_STREAM = “ret”;
public static final String ID_STREAM = “id”;

Random rand;

@Override
public void prepare(Map<String, Object> map, TopologyContext context) {
rand = new Random();
}

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String args = tuple.getString(0);
String returnInfo = tuple.getString(1);
long requestId = rand.nextLong();
collector.emit(ARGS_STREAM, new Values(requestId, args));
collector.emit(RETURN_STREAM, new Values(requestId, returnInfo));
collector.emit(ID_STREAM, new Values(requestId));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(ARGS_STREAM, new Fields(“request”, “args”));
declarer.declareStream(RETURN_STREAM, new Fields(“request”, “return”));
declarer.declareStream(ID_STREAM, new Fields(“request”));
}
}

PrepareRequest 取出 args 及 returnInfo,构造 requestId,然后 emit 到 ARGS_STREAM、RETURN_STREAM、ID_STREAM 三个 stream
JoinResult 会接收 PrepareRequest 的 RETURN_STREAM,第一个 CoordinatedBolt 会接收 ARGS_STREAM

CoordinatedBolt
storm-2.0.0/storm-client/src/jvm/org/apache/storm/coordination/CoordinatedBolt.java
/**
* Coordination requires the request ids to be globally unique for awhile. This is so it doesn’t get confused in the case of retries.
*/
public class CoordinatedBolt implements IRichBolt {

private TimeCacheMap<Object, TrackingInfo> _tracked;

//……

public void execute(Tuple tuple) {
Object id = tuple.getValue(0);
TrackingInfo track;
TupleType type = getTupleType(tuple);
synchronized (_tracked) {
track = _tracked.get(id);
if (track == null) {
track = new TrackingInfo();
if (_idStreamSpec == null) {
track.receivedId = true;
}
_tracked.put(id, track);
}
}

if (type == TupleType.ID) {
synchronized (_tracked) {
track.receivedId = true;
}
checkFinishId(tuple, type);
} else if (type == TupleType.COORD) {
int count = (Integer) tuple.getValue(1);
synchronized (_tracked) {
track.reportCount++;
track.expectedTupleCount += count;
}
checkFinishId(tuple, type);
} else {
synchronized (_tracked) {
_delegate.execute(tuple);
}
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
_delegate.declareOutputFields(declarer);
declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields(“id”, “count”));
}

//……

public static class TrackingInfo {
int reportCount = 0;
int expectedTupleCount = 0;
int receivedTuples = 0;
boolean failed = false;
Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
boolean receivedId = false;
boolean finished = false;
List<Tuple> ackTuples = new ArrayList<>();

@Override
public String toString() {
return “reportCount: ” + reportCount + “\n” +
“expectedTupleCount: ” + expectedTupleCount + “\n” +
“receivedTuples: ” + receivedTuples + “\n” +
“failed: ” + failed + “\n” +
taskEmittedTuples.toString();
}
}
}

CoordinatedBolt 在 declareOutputFields 的时候,除了调用代理 bolt 的 declareOutputFields 外,还 declareStream,给 Constants.COORDINATED_STREAM_ID 发射 Fields(“id”, “count”)
execute 方法首先保证每个 requestId 都有一个 TrackingInfo,它记录了 expectedTupleCount 以及 receivedTuples 统计数,还有 taskEmittedTuples(这里命名有点歧义,其实是这里维护的是当前 bolt 发射给下游 bolt 的 task 的 tuple 数量,用于 emitDirect 告知下游 bolt 的 task 它应该接收到的 tuple 数量(具体是在 checkFinishId 方法中,在 finished 的时候发送),下游 bolt 接收到该统计数之后更新 expectedTupleCount)
execute 方法接收到的 tuple 有几类,一类是 TupleType.ID(_idStreamSpec 不为 null 的情况下)、一类是 TupleType.COORD(接收 Fields(“id”, “count”),并执行 checkFinishId,判断是否应该结束)、一类是 TupleType.REGULAR(正常的执行 bolt 的 execute 方法)
checkFinishId 会判断 track.reportCount == _numSourceReports 以及 track.expectedTupleCount == track.receivedTuples,如果满足条件则标记 track.finished = true,同时通知下游 bolt 它应该接收到多少数量的 tuple(如果还有的话)。

JoinResult
storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/JoinResult.java
public class JoinResult extends BaseRichBolt {
public static final Logger LOG = LoggerFactory.getLogger(JoinResult.class);

String returnComponent;
Map<Object, Tuple> returns = new HashMap<>();
Map<Object, Tuple> results = new HashMap<>();
OutputCollector _collector;

public JoinResult(String returnComponent) {
this.returnComponent = returnComponent;
}

public void prepare(Map<String, Object> map, TopologyContext context, OutputCollector collector) {
_collector = collector;
}

public void execute(Tuple tuple) {
Object requestId = tuple.getValue(0);
if (tuple.getSourceComponent().equals(returnComponent)) {
returns.put(requestId, tuple);
} else {
results.put(requestId, tuple);
}

if (returns.containsKey(requestId) && results.containsKey(requestId)) {
Tuple result = results.remove(requestId);
Tuple returner = returns.remove(requestId);
LOG.debug(result.getValue(1).toString());
List<Tuple> anchors = new ArrayList<>();
anchors.add(result);
anchors.add(returner);
_collector.emit(anchors, new Values(“” + result.getValue(1), returner.getValue(1)));
_collector.ack(result);
_collector.ack(returner);
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(“result”, “return-info”));
}
}

如果 tuple 是 PrepareRequest 发送过来的,则将 tuple 放入 returns,否则放入 results
之后判断 returns 及 results 两个 map 是否同时都有该 requestId,如果有表示匹配出了结果,则往下游 emit 数据
emit 的第一个字段为 result,第二个为 returnInfo

ReturnResults
storm-2.0.0/storm-client/src/jvm/org/apache/storm/drpc/ReturnResults.java
public class ReturnResults extends BaseRichBolt {
public static final Logger LOG = LoggerFactory.getLogger(ReturnResults.class);
//ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
static final long serialVersionUID = -774882142710631591L;
OutputCollector _collector;
boolean local;
Map<String, Object> _conf;
Map<List, DRPCInvocationsClient> _clients = new HashMap<List, DRPCInvocationsClient>();

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
_conf = topoConf;
_collector = collector;
local = topoConf.get(Config.STORM_CLUSTER_MODE).equals(“local”);
}

@Override
public void execute(Tuple input) {
String result = (String) input.getValue(0);
String returnInfo = (String) input.getValue(1);
if (returnInfo != null) {
Map<String, Object> retMap;
try {
retMap = (Map<String, Object>) JSONValue.parseWithException(returnInfo);
} catch (ParseException e) {
LOG.error(“Parseing returnInfo failed”, e);
_collector.fail(input);
return;
}
final String host = (String) retMap.get(“host”);
final int port = ObjectReader.getInt(retMap.get(“port”));
String id = (String) retMap.get(“id”);
DistributedRPCInvocations.Iface client;
if (local) {
client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(host);
} else {
List server = new ArrayList() {{
add(host);
add(port);
}};

if (!_clients.containsKey(server)) {
try {
_clients.put(server, new DRPCInvocationsClient(_conf, host, port));
} catch (TTransportException ex) {
throw new RuntimeException(ex);
}
}
client = _clients.get(server);
}

int retryCnt = 0;
int maxRetries = 3;
while (retryCnt < maxRetries) {
retryCnt++;
try {
client.result(id, result);
_collector.ack(input);
break;
} catch (AuthorizationException aze) {
LOG.error(“Not authorized to return results to DRPC server”, aze);
_collector.fail(input);
throw new RuntimeException(aze);
} catch (TException tex) {
if (retryCnt >= maxRetries) {
LOG.error(“Failed to return results to DRPC server”, tex);
_collector.fail(input);
}
reconnectClient((DRPCInvocationsClient) client);
}
}
}
}

private void reconnectClient(DRPCInvocationsClient client) {
if (client instanceof DRPCInvocationsClient) {
try {
LOG.info(“reconnecting… “);
client.reconnectClient(); //Blocking call
} catch (TException e2) {
LOG.error(“Failed to connect to DRPC server”, e2);
}
}
}

@Override
public void cleanup() {
for (DRPCInvocationsClient c : _clients.values()) {
c.close();
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

ReturnResults 主要是将结果发送给请求的 DRPCInvocationsClient
returnInfo 里头包含了要将结果发送到的目标 host、port,根据 host、port 构造 DRPCInvocationsClient
之后调用 DRPCInvocationsClient.result(id, result)方法将结果返回,默认重试 3 次,如果是 AuthorizationException 则直接 fail,如果成功则 ack

小结

LinearDRPCTopologyBuilder 在 v0.9.1-incubating 版本的时候被标记为 @Deprecated(2012 年月),当时认为 Trident 的 newDRPCStream 的替代,不过这样的话要用 drpc 就得使用 Trident,所以后来 (2018 年 4 月) 移除掉该标志,在 2.0.0, 1.1.3, 1.0.7, 1.2.2 版本均已经不是废弃标记

LinearDRPCTopologyBuilder 包装组合了 DRPCSpout、PrepareRequest、CoordinatedBolt、JoinResult、ReturnResults,对外暴露简单的 api 无需用户在构造这些 component

DRPCSpout 主要是构造 args 以及 returnInfo 信息;
PrepareRequest 将数据分流,发往 ARGS_STREAM、RETURN_STREAM、ID_STREAM;
CoordinatedBolt 主要是保障这些 bolt 之间的 tuple 被完整传递及 ack;
JoinResult 主要是匹配 requestId 及结果,将请求与响应的数据匹配上,然后发送到下游;
ReturnResults 根据 returnInfo 将数据返回给 Client 端

使用 LinearDRPCTopologyBuilder,对于第一个 bolt,其输入为 Fields(“request”, “args”);对最后一个 bolt 要求输出字段为 new Fields(“id”, “result”);对于非最后一个 bolt 要求输出字段的第一个字段为 id,即 requestId,方便 CoordinatedBolt 进行追踪统计,确认 bolt 是否成功接收上游 bolt 发送的所有 tuple。

doc

Distributed RPC
LinearDRPCTopologyBuilder Deprecated
LinearDRPCTopologyBuilder is deprecated – what to use instead
LinearDRPCTopologyBuilder shouldn’t be deprecated
Twitter Storm 源代码分析之 CoordinatedBolt

退出移动版