共计 3732 个字符,预计需要花费 10 分钟才能阅读完成。
序
本文主要演示一下 storm drpc 实例
配置
version: ‘2’
services:
supervisor:
image: storm
container_name: supervisor
command: storm supervisor -c storm.local.hostname=”192.168.99.100″ -c drpc.servers='[“192.168.99.100″]’ -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
depends_on:
– nimbus
– zookeeper
links:
– nimbus
– zookeeper
restart: always
ports:
– 6700:6700
– 6701:6701
– 6702:6702
– 6703:6703
– 8000:8000
drpc:
image: storm
container_name: drpc
command: storm drpc -c storm.local.hostname=”192.168.99.100” -c drpc.port=3772 -c drpc.invocations.port=3773 -c drpc.http.port=3774
depends_on:
– nimbus
– supervisor
– zookeeper
links:
– nimbus
– supervisor
– zookeeper
restart: always
ports:
– 3772:3772
– 3773:3773
– 3774:3774
这里对 supervisor 配置 drpc.servers 及 drpc.port、drpc.invocations.port,好让 worker 通过 drpc.invocations.port 去访问 drpc 节点
对于 drpc 服务,则暴露 drpc.port(好让外部的 DRPCClient 访问)、drpc.invocations.port(让 worker 访问)
TridentTopology
@Test
public void testDeployDRPCStateQuery() throws InterruptedException, TException {
TridentTopology topology = new TridentTopology();
FixedBatchSpout spout = new FixedBatchSpout(new Fields(“sentence”), 3,
new Values(“the cow jumped over the moon”),
new Values(“the man went to the store and bought some candy”),
new Values(“four score and seven years ago”),
new Values(“how many apples can you eat”));
spout.setCycle(true);
TridentState wordCounts =
topology.newStream(“spout1”, spout)
.each(new Fields(“sentence”), new Split(), new Fields(“word”))
.groupBy(new Fields(“word”))
//NOTE transforms a Stream into a TridentState object
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields(“count”))
.parallelismHint(6);
topology.newDRPCStream(“words”)
.each(new Fields(“args”), new Split(), new Fields(“word”))
.groupBy(new Fields(“word”))
.stateQuery(wordCounts, new Fields(“word”), new MapGet(), new Fields(“count”))
.each(new Fields(“count”), new FilterNull())
.aggregate(new Fields(“count”), new Sum(), new Fields(“sum”));
StormTopology stormTopology = topology.build();
// 远程提交 mvn clean package -Dmaven.test.skip=true
//storm 默认会使用 System.getProperty(“storm.jar”) 去取,如果不设定,就不能提交
System.setProperty(“storm.jar”,TOPOLOGY_JAR);
Config conf = new Config();
conf.put(Config.NIMBUS_SEEDS,Arrays.asList(“192.168.99.100”)); // 配置 nimbus 连接主机地址,比如:192.168.10.1
conf.put(Config.NIMBUS_THRIFT_PORT,6627);// 配置 nimbus 连接端口,默认 6627
conf.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(“192.168.99.100”)); // 配置 zookeeper 连接主机地址,可以使用集合存放多个
conf.put(Config.STORM_ZOOKEEPER_PORT,2181); // 配置 zookeeper 连接端口,默认 2181
StormSubmitter.submitTopology(“DRPCStateQuery”, conf, stormTopology);
}
这里 newStream 创建了一个 TridentState,然后 newDRPCStream 创建了一个 DRPCStream,其 stateQuery 指定为前面创建的 TridentState
由于 TridentState 把结果存储到了 MemoryMapState,因而这里的 DRPCStream 通过 drpc 进行 stateQuery
DRPCClient
@Test
public void testLaunchDrpcClient() throws TException {
Config conf = new Config();
//NOTE 要设置 Config.DRPC_THRIFT_TRANSPORT_PLUGIN 属性,不然 client 直接跑空指针
conf.put(Config.DRPC_THRIFT_TRANSPORT_PLUGIN,SimpleTransportPlugin.class.getName());
conf.put(Config.STORM_NIMBUS_RETRY_TIMES,3);
conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL,10000);
conf.put(Config.STORM_NIMBUS_RETRY_INTERVAL_CEILING,10000);
conf.put(Config.DRPC_MAX_BUFFER_SIZE, 104857600); // 100M
DRPCClient client = new DRPCClient(conf, “192.168.99.100”, 3772);
System.out.println(client.execute(“words”, “cat dog the man”));
}
注意这里的配置项不能少,否则会引发空指针
Config.DRPC_THRIFT_TRANSPORT_PLUGIN 这里使用的是 SimpleTransportPlugin.class.getName(),虽然该类被废弃了,不过还可以跑通
由于使用了 SimpleTransportPlugin.class,因而这里要配置 Config.DRPC_MAX_BUFFER_SIZE
DRPCClient 配置了 drpc 的地址及 port
client.execute 这里要传入 newDRPCStream 指定的 function 名称
小结
使用 drpc 的时候,需要通过 storm drpc 启动 drpc server 服务节点,另外要暴露两个端口,一个 drpc.port 是供外部 DRPCClient 调用,一个 drpc.invocations.port 是给 worker 来访问;drpc.http.port 端口是暴露给 http 协议调用的 (DRPCClient 使用的是 thrift 协议调用)
supervisor 要配置 drpc.servers、drpc.invocations.port,好让 worker 去访问到 drpc server
DRPCClient 使用 drpc.port 指定的端口来访问,另外 client.execute 这里要传入 newDRPCStream 指定的 function 名称
doc
Trident Tutorial
Distributed RPC
Running Apache Storm Securely