乐趣区

如何使用-OpenTracing-和-Jaeger-追踪-Pulsar-消息

OpenTracing 是针对应用程序和 OSS(Open-Source Software)软件包的凋谢分布式追踪规范。许多追踪后端服务都反对 OpenTracing API,例如 Jaeger、Zipkin 和 SkyWalking。

本文具体介绍如何应用 Jaeger 通过 OpenTracing API 追踪 Pulsar 音讯。

筹备工作

在开始前,须要装置好 JDK 8、Maven 3 和 Pulsar(集群模式或单机模式)。如果还没有装置 Pulsar,点击这里依照提醒进行装置。

第 1 步:启动 Jaeger 后端

  1. 在 Docker 中启动 Jaeger 后端。

    docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

    胜利启动 Jaeger 后,就能够关上 Jaeger UI 网站。

    提醒

    如何你没有 Jaeger Docker 环境,能够下载二进制文件或通过源代码构建。

  2. 拜访 http://localhost:16686,无需填写用户名或明码就能够关上 Jeager UI 网站。

第 2 步:增加 maven dependencies

本示例应用 Open Tracing Pulsar Client,它是 Pulsar Client 与 OpenTracing API(基于 Pulsar Client Interceptors)的集成,用于追踪 Pulsar 音讯。OpenTracing Pulsar Client 由 StreamNative 研发,是 StreamNatvie Hub 中的监控工具。

增加 Jaeger client dependency 以连贯到 Jaeger 后端。

<dependency>
 <groupId>org.apache.pulsar</groupId>
 <artifactId>pulsar-client</artifactId>
 <version>2.5.1</version>
</dependency>

<dependency>
 <groupId>io.streamnative</groupId>
 <artifactId>opentracing-pulsar-client</artifactId>
 <version>0.1.0</version>
</dependency>

<dependency>
  <groupId>io.jaegertracing</groupId>
  <artifactId>jaeger-client</artifactId>
  <version>1.2.0</version>
</dependency>

第 3 步:应用 OpenTracing Pulsar Client

为便于了解,本示例假如有 2 个 Job 和 2 个 topic。Job-1 向 topic-A 发送音讯,Job-2 从 topc-A 生产音讯。当 Job 2 收到 topic-A 的音讯后,Job 2 会向 topic-B 发送音讯,而后 Job-3 从 topic-B 生产音讯。因而,在这种状况下有 2 个 topic、2 个 producer 和 2 个 consumer。

要实现上述工作场景中的工作,须要启动三个应用程序。

  • Job-1:公布音讯到 topic-A
  • Job-2:生产 topic-A 中的音讯,并公布音讯到 topic-B
  • Job-3:生产 topic-B 中的音讯

Job-1

以下示例为公布音讯至 topic-A。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);

Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Producer<String> producerA = client.newProducer(Schema.STRING)
        .topic("topic-A")
        .intercept(new TracingProducerInterceptor())
        .create();

for (int i = 0; i < 10; i++) {producerA.newMessage().value(String.format("[%d] Hello", i)).send();}

Job-2

以下示例为从 topic-A 生产音讯,并将音讯公布到 topic-B。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);

Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("topic-A")
        .subscriptionName("open-tracing")
        .subscriptionType(SubscriptionType.Shared)
        .intercept(new TracingConsumerInterceptor<>())
        .subscribe();

Producer<String> producerB = client.newProducer(Schema.STRING)
        .topic("topic-B")
        .intercept(new TracingProducerInterceptor())
        .create();

while (true) {Message<String> received = consumer.receive();
    SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer);
    TypedMessageBuilder<String> messageBuilder = producerB.newMessage();
    messageBuilder.value(received.getValue() + "Pulsar and OpenTracing!");
    // Inject parent span context
    tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder));
    messageBuilder.send();
    consumer.acknowledge(received);
}

Job-3

以下示例为从 topic-B 生产音讯。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);
Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);
Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);

Tracer tracer = configuration.getTracer();
GlobalTracer.registerIfAbsent(tracer);

PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://localhost:6650")
        .build();

Consumer<String> consumer = client.newConsumer(Schema.STRING)
        .topic("topic-B")
        .subscriptionName("open-tracing")
        .subscriptionType(SubscriptionType.Shared)
        .intercept(new TracingConsumerInterceptor<>())
        .subscribe();

while (true) {Message<String> received = consumer.receive();
    System.out.println(received.getValue());
    consumer.acknowledge(received);
}

当初,能够别离运行 Job-3、Job-2 和 Job-1。控制台中会呈现 Job-3 接管的日志,如下:

[0] Hello Pulsar and OpenTracing!
[1] Hello Pulsar and OpenTracing!
...
[9] Hello Pulsar and OpenTracing!

当初,你能够再次关上 Jaeger UI,页面中会呈现十条音讯追踪链路。

点击工作名称即可查看音讯追踪链路的详细信息。

能够从 span 名称轻松分别是 producer 还是 consumer 公布了此条音讯,span 名称格局为 To__<topic-name>From__<topic-name>__<subscription_name>

总结

OpenTracing Pulsar Client 集成了 Pulsar 客户端和 OpenTracing,能够实现轻松地追踪音讯。如果你在应用程序中应用了 Pulsar 和 OpenTracing,赶快试一试吧!

退出移动版