乐趣区

聊聊storm的submitTopology


本文主要研究一下 storm 的 submitTopology
提交 topology 日志实例
2018-10-08 17:32:55.738 INFO 2870 — [main] org.apache.storm.StormSubmitter : Generated ZooKeeper secret payload for MD5-digest: -8659577410336375158:-6351873438041855318
2018-10-08 17:32:55.893 INFO 2870 — [main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627
2018-10-08 17:32:56.059 INFO 2870 — [main] o.apache.storm.security.auth.AuthUtils : Got AutoCreds []
2018-10-08 17:32:56.073 INFO 2870 — [main] org.apache.storm.utils.NimbusClient : Found leader nimbus : a391f7a04044:6627
2018-10-08 17:32:56.123 INFO 2870 — [main] org.apache.storm.StormSubmitter : Uploading dependencies – jars…
2018-10-08 17:32:56.125 INFO 2870 — [main] org.apache.storm.StormSubmitter : Uploading dependencies – artifacts…
2018-10-08 17:32:56.125 INFO 2870 — [main] org.apache.storm.StormSubmitter : Dependency Blob keys – jars : [] / artifacts : []
2018-10-08 17:32:56.149 INFO 2870 — [main] org.apache.storm.StormSubmitter : Uploading topology jar /tmp/storm-demo/target/storm-demo-0.0.1-SNAPSHOT.jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
2018-10-08 17:32:57.105 INFO 2870 — [main] org.apache.storm.StormSubmitter : Successfully uploaded topology jar to assigned location: /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
2018-10-08 17:32:57.106 INFO 2870 — [main] org.apache.storm.StormSubmitter : Submitting topology DemoTopology in distributed mode with conf {“nimbus.seeds”:[“192.168.99.100″],”storm.zookeeper.topology.auth.scheme”:”digest”,”topology.workers”:1,”storm.zookeeper.port”:2181,”nimbus.thrift.port”:6627,”storm.zookeeper.topology.auth.payload”:”-8659577410336375158:-6351873438041855318″,”storm.zookeeper.servers”:[“192.168.99.100”]}
2018-10-08 17:32:58.008 INFO 2870 — [main] org.apache.storm.StormSubmitter : Finished submitting topology: DemoTopology
这里可以看到这里上传到了 nimbus 的路径为 /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
StormSubmitter
submitTopology
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
public static void submitTopology(String name, Map stormConf, StormTopology topology)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopology(name, stormConf, topology, null, null);
}

public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
submitTopologyAs(name, stormConf, topology, opts, progressListener, null);
}

public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException(“Storm conf is not valid. Must be json-serializable”);
}
stormConf = new HashMap(stormConf);
stormConf.putAll(Utils.readCommandLineOpts());
Map conf = Utils.readStormConfig();
conf.putAll(stormConf);
stormConf.putAll(prepareZookeeperAuthentication(conf));

validateConfs(conf, topology);

Map<String,String> passedCreds = new HashMap<>();
if (opts != null) {
Credentials tmpCreds = opts.get_creds();
if (tmpCreds != null) {
passedCreds = tmpCreds.get_creds();
}
}
Map<String,String> fullCreds = populateCredentials(conf, passedCreds);
if (!fullCreds.isEmpty()) {
if (opts == null) {
opts = new SubmitOptions(TopologyInitialStatus.ACTIVE);
}
opts.set_creds(new Credentials(fullCreds));
}
try {
if (localNimbus!=null) {
LOG.info(“Submitting topology ” + name + ” in local mode”);
if (opts!=null) {
localNimbus.submitTopologyWithOpts(name, stormConf, topology, opts);
} else {
// this is for backwards compatibility
localNimbus.submitTopology(name, stormConf, topology);
}
LOG.info(“Finished submitting topology: ” + name);
} else {
String serConf = JSONValue.toJSONString(stormConf);
try (NimbusClient client = NimbusClient.getConfiguredClientAs(conf, asUser)) {
if (topologyNameExists(name, client)) {
throw new RuntimeException(“Topology with name `” + name + “` already exists on cluster”);
}

// Dependency uploading only makes sense for distributed mode
List<String> jarsBlobKeys = Collections.emptyList();
List<String> artifactsBlobKeys;

DependencyUploader uploader = new DependencyUploader();
try {
uploader.init();

jarsBlobKeys = uploadDependencyJarsToBlobStore(uploader);

artifactsBlobKeys = uploadDependencyArtifactsToBlobStore(uploader);
} catch (Throwable e) {
// remove uploaded jars blobs, not artifacts since they’re shared across the cluster
uploader.deleteBlobs(jarsBlobKeys);
uploader.shutdown();
throw e;
}

try {
setDependencyBlobsToTopology(topology, jarsBlobKeys, artifactsBlobKeys);
submitTopologyInDistributeMode(name, topology, opts, progressListener, asUser, conf, serConf, client);
} catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
// remove uploaded jars blobs, not artifacts since they’re shared across the cluster
// Note that we don’t handle TException to delete jars blobs
// because it’s safer to leave some blobs instead of topology not running
uploader.deleteBlobs(jarsBlobKeys);
throw e;
} finally {
uploader.shutdown();
}
}
}
} catch(TException e) {
throw new RuntimeException(e);
}
invokeSubmitterHook(name, asUser, conf, topology);

}

private static void submitTopologyInDistributeMode(String name, StormTopology topology, SubmitOptions opts,
ProgressListener progressListener, String asUser, Map conf,
String serConf, NimbusClient client) throws TException {
try {
String jar = submitJarAs(conf, System.getProperty(“storm.jar”), progressListener, client);
LOG.info(“Submitting topology {} in distributed mode with conf {}”, name, serConf);

if (opts != null) {
client.getClient().submitTopologyWithOpts(name, jar, serConf, topology, opts);
} else {
// this is for backwards compatibility
client.getClient().submitTopology(name, jar, serConf, topology);
}
LOG.info(“Finished submitting topology: {}”, name);
} catch (InvalidTopologyException e) {
LOG.warn(“Topology submission exception: {}”, e.get_msg());
throw e;
} catch (AlreadyAliveException e) {
LOG.warn(“Topology already alive exception”, e);
throw e;
}
}

public static String submitJarAs(Map conf, String localJar, ProgressListener listener, NimbusClient client) {
if (localJar == null) {
throw new RuntimeException(“Must submit topologies using the ‘storm’ client script so that StormSubmitter knows which jar to upload.”);
}

try {
String uploadLocation = client.getClient().beginFileUpload();
LOG.info(“Uploading topology jar ” + localJar + ” to assigned location: ” + uploadLocation);
BufferFileInputStream is = new BufferFileInputStream(localJar, THRIFT_CHUNK_SIZE_BYTES);

long totalSize = new File(localJar).length();
if (listener != null) {
listener.onStart(localJar, uploadLocation, totalSize);
}

long bytesUploaded = 0;
while(true) {
byte[] toSubmit = is.read();
bytesUploaded += toSubmit.length;
if (listener != null) {
listener.onProgress(localJar, uploadLocation, bytesUploaded, totalSize);
}

if(toSubmit.length==0) break;
client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit));
}
client.getClient().finishFileUpload(uploadLocation);

if (listener != null) {
listener.onCompleted(localJar, uploadLocation, totalSize);
}

LOG.info(“Successfully uploaded topology jar to assigned location: ” + uploadLocation);
return uploadLocation;
} catch(Exception e) {
throw new RuntimeException(e);
}
}

主要通过 submitTopologyAs 方法来提交 topology
而 submitTopologyAs 调用了 submitTopologyInDistributeMode,通过 DependencyUploader 上传依赖,最后再通过 submitJarAs 方法上传 topology 的 jar 包
从前面的日志可以看到,上传到 nimbus 的路径为 /data/nimbus/inbox/stormjar-4ead82bb-74a3-45a3-aca4-3af2f1d23998.jar
client.getClient().submitTopology 主要是提交 topology 信息

uploadDependencyJarsToBlobStore
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
private static List<String> uploadDependencyJarsToBlobStore(DependencyUploader uploader) {
LOG.info(“Uploading dependencies – jars…”);

DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();

String depJarsProp = System.getProperty(“storm.dependency.jars”, “”);
List<File> depJars = propertiesParser.parseJarsProperties(depJarsProp);

try {
return uploader.uploadFiles(depJars, true);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
uploadDependencyArtifactsToBlobStore
storm-core-1.1.0-sources.jar!/org/apache/storm/StormSubmitter.java
private static List<String> uploadDependencyArtifactsToBlobStore(DependencyUploader uploader) {
LOG.info(“Uploading dependencies – artifacts…”);

DependencyPropertiesParser propertiesParser = new DependencyPropertiesParser();

String depArtifactsProp = System.getProperty(“storm.dependency.artifacts”, “{}”);
Map<String, File> depArtifacts = propertiesParser.parseArtifactsProperties(depArtifactsProp);

try {
return uploader.uploadArtifacts(depArtifacts);
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
DependencyUploader
storm-core-1.1.0-sources.jar!/org/apache/storm/dependency/DependencyUploader.java
public List<String> uploadFiles(List<File> dependencies, boolean cleanupIfFails) throws IOException, AuthorizationException {
checkFilesExist(dependencies);

List<String> keys = new ArrayList<>(dependencies.size());
try {
for (File dependency : dependencies) {
String fileName = dependency.getName();
String key = BlobStoreUtils.generateDependencyBlobKey(BlobStoreUtils.applyUUIDToFileName(fileName));

try {
uploadDependencyToBlobStore(key, dependency);
} catch (KeyAlreadyExistsException e) {
// it should never happened since we apply UUID
throw new RuntimeException(e);
}

keys.add(key);
}
} catch (Throwable e) {
if (getBlobStore() != null && cleanupIfFails) {
deleteBlobs(keys);
}
throw new RuntimeException(e);
}

return keys;
}

public List<String> uploadArtifacts(Map<String, File> artifacts) {
checkFilesExist(artifacts.values());

List<String> keys = new ArrayList<>(artifacts.size());
try {
for (Map.Entry<String, File> artifactToFile : artifacts.entrySet()) {
String artifact = artifactToFile.getKey();
File dependency = artifactToFile.getValue();

String key = BlobStoreUtils.generateDependencyBlobKey(convertArtifactToJarFileName(artifact));
try {
uploadDependencyToBlobStore(key, dependency);
} catch (KeyAlreadyExistsException e) {
// we lose the race, but it doesn’t matter
}

keys.add(key);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}

return keys;
}

private boolean uploadDependencyToBlobStore(String key, File dependency)
throws KeyAlreadyExistsException, AuthorizationException, IOException {

boolean uploadNew = false;
try {
// FIXME: we can filter by listKeys() with local blobstore when STORM-1986 is going to be resolved
// as a workaround, we call getBlobMeta() for all keys
getBlobStore().getBlobMeta(key);
} catch (KeyNotFoundException e) {
// TODO: do we want to add ACL here?
AtomicOutputStream blob = getBlobStore()
.createBlob(key, new SettableBlobMeta(new ArrayList<AccessControl>()));
Files.copy(dependency.toPath(), blob);
blob.close();

uploadNew = true;
}

return uploadNew;
}

uploadFiles 以及 uploadArtifacts 方法最后都调用 uploadDependencyToBlobStore
uploadDependencyToBlobStore 方法将数据写入 AtomicOutputStream

NimbusUploadAtomicOutputStream
storm-core-1.1.0-sources.jar!/org/apache/storm/blobstore/NimbusBlobStore.java
public class NimbusUploadAtomicOutputStream extends AtomicOutputStream {
private String session;
private int maxChunkSize = 4096;
private String key;

public NimbusUploadAtomicOutputStream(String session, int bufferSize, String key) {
this.session = session;
this.maxChunkSize = bufferSize;
this.key = key;
}

@Override
public void cancel() throws IOException {
try {
synchronized(client) {
client.getClient().cancelBlobUpload(session);
}
} catch (TException e) {
throw new RuntimeException(e);
}
}

@Override
public void write(int b) throws IOException {
try {
synchronized(client) {
client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(new byte[] {(byte)b}));
}
} catch (TException e) {
throw new RuntimeException(e);
}
}

@Override
public void write(byte []b) throws IOException {
write(b, 0, b.length);
}

@Override
public void write(byte []b, int offset, int len) throws IOException {
try {
int end = offset + len;
for (int realOffset = offset; realOffset < end; realOffset += maxChunkSize) {
int realLen = Math.min(end – realOffset, maxChunkSize);
LOG.debug(“Writing {} bytes of {} remaining”,realLen,(end-realOffset));
synchronized(client) {
client.getClient().uploadBlobChunk(session, ByteBuffer.wrap(b, realOffset, realLen));
}
}
} catch (TException e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws IOException {
try {
synchronized(client) {
client.getClient().finishBlobUpload(session);
client.getClient().createStateInZookeeper(key);
}
} catch (TException e) {
throw new RuntimeException(e);
}
}
}
NimbusUploadAtomicOutputStream 的 write 方法通过 client.getClient().uploadBlobChunk 完成数据上传
send&recv
storm-core-1.1.0-sources.jar!/org/apache/storm/generated/Nimbus.java
public String beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
{
send_beginFileUpload();
return recv_beginFileUpload();
}

public void send_beginFileUpload() throws org.apache.thrift.TException
{
beginFileUpload_args args = new beginFileUpload_args();
sendBase(“beginFileUpload”, args);
}

public String recv_beginFileUpload() throws AuthorizationException, org.apache.thrift.TException
{
beginFileUpload_result result = new beginFileUpload_result();
receiveBase(result, “beginFileUpload”);
if (result.is_set_success()) {
return result.success;
}
if (result.aze != null) {
throw result.aze;
}
throw new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT, “beginFileUpload failed: unknown result”);
}

public void send_finishFileUpload(String location) throws org.apache.thrift.TException
{
finishFileUpload_args args = new finishFileUpload_args();
args.set_location(location);
sendBase(“finishFileUpload”, args);
}

public void uploadChunk(String location, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
{
send_uploadChunk(location, chunk);
recv_uploadChunk();
}

public void send_uploadChunk(String location, ByteBuffer chunk) throws org.apache.thrift.TException
{
uploadChunk_args args = new uploadChunk_args();
args.set_location(location);
args.set_chunk(chunk);
sendBase(“uploadChunk”, args);
}

public void recv_uploadChunk() throws AuthorizationException, org.apache.thrift.TException
{
uploadChunk_result result = new uploadChunk_result();
receiveBase(result, “uploadChunk”);
if (result.aze != null) {
throw result.aze;
}
return;
}

public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
{
send_submitTopology(name, uploadedJarLocation, jsonConf, topology);
recv_submitTopology();
}

public void send_submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws org.apache.thrift.TException
{
submitTopology_args args = new submitTopology_args();
args.set_name(name);
args.set_uploadedJarLocation(uploadedJarLocation);
args.set_jsonConf(jsonConf);
args.set_topology(topology);
sendBase(“submitTopology”, args);
}

public void recv_submitTopology() throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
{
submitTopology_result result = new submitTopology_result();
receiveBase(result, “submitTopology”);
if (result.e != null) {
throw result.e;
}
if (result.ite != null) {
throw result.ite;
}
if (result.aze != null) {
throw result.aze;
}
return;
}

public void uploadBlobChunk(String session, ByteBuffer chunk) throws AuthorizationException, org.apache.thrift.TException
{
send_uploadBlobChunk(session, chunk);
recv_uploadBlobChunk();
}

public void send_uploadBlobChunk(String session, ByteBuffer chunk) throws org.apache.thrift.TException
{
uploadBlobChunk_args args = new uploadBlobChunk_args();
args.set_session(session);
args.set_chunk(chunk);
sendBase(“uploadBlobChunk”, args);
}

public void recv_uploadBlobChunk() throws AuthorizationException, org.apache.thrift.TException
{
uploadBlobChunk_result result = new uploadBlobChunk_result();
receiveBase(result, “uploadBlobChunk”);
if (result.aze != null) {
throw result.aze;
}
return;
}
通过 sendBase 发送数据,通过 receiveBase 接收数据
小结
storm 的 submitTopology 会先上传 storm.dependency.jars 指定的依赖 jar,再上传 storm.dependency.artifacts 指定的依赖,最后再上传指定的 jar 包,他们都是通过远程方法 sendBase 发送数据以及 receiveBase 接收数据。
doc
Storm 1.1.0 released

退出移动版