对于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级我的项目,是下一代云原生分布式音讯流平台,集音讯、存储、轻量化函数式计算为一体,采纳计算与存储拆散架构设计,反对多租户、长久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储个性。
GitHub 地址:http://github.com/apache/pulsar/
本文翻译自:《Using Apache Pulsar With Kotlin》,作者 Gilles Barbier。
原文链接:https://gillesbarbier.medium....
译者简介
宋博,就任于北京百观科技有限公司,高级开发工程师,专一于微服务,云计算,大数据畛域。

Apache Pulsar 通常被形容为下一代 Kafka,是开发人员工具集中一颗冉冉升起的新星。Pulsar 是用于 server-to-server 消息传递的多租户、高性能解决方案,通常用作可扩大应用程序的外围。

Pulsar 能够与 Kotlin 一起应用,因为它是用 Java 编写的。不过,它的 API 并没有思考 Kotlin 带来的弱小性能,例如数据类、协程或无反射序列化。

在这篇文章中,我将探讨如何通过 Kotlin 来应用 Pulsar。

为音讯体应用原生序列化

在 Kotlin 中定义音讯的一种默认形式是应用数据类,这些类的次要目标是保留数据。对于此类数据类,Kotlin 会主动提供 equals()、toString()、copy()等办法 ,从而缩短代码长度并升高呈现谬误的危险。

应用 Java 创立一个 Pulsar 生产者:

Producer<MyAvro> avroProducer = client  .newProducer(Schema.AVRO(MyAvro.class))  .topic(“some-avro-topic”)  .create();

该 Schema.AVRO(MyAvro.class) 指令将内省 MyAvro Java 类并从中推断出一个 Schema。这须要校验新的生产者是否会产生与现有消费者理论兼容的音讯。然而 Kotlin 数据类的 Java 实现不能很好地与 Pulsar 应用的默认序列化器配合应用。但侥幸的是,从 2.7.0 版本开始,Pulsar 容许您对生产者和消费者应用自定义序列化程序。

首先,您须要装置官网 Kotlin 序列化插件。应用它能够创立一个如下的音讯类:

@Serializable        data class RunTask(             val taskName: TaskName,             val taskId: TaskId,        val taskInput: TaskInput,        val taskOptions: TaskOptions,        val taskMeta: TaskMeta         )
留神 @Serializable 注解。有了它,你就能够应用 RunTask.serialiser() 让序列化器在不内省的状况下工作,这将使效率大大晋升!

目前,序列化插件仅反对 JSON(以及一些其余在 beta 内的格局 例如 protobuf)。所以咱们还须要 avro4k 库来扩大它并反对 Avro 格局。

应用这些工具,咱们能够创立一个像上面这样的 Producer 工作:

import com.github.avrokotlin.avro4k.Avroimport com.github.avrokotlin.avro4k.io.AvroEncodeFormatimport io.infinitic.common.tasks.executors.messages.RunTaskimport kotlinx.serialization.KSerializerimport org.apache.avro.file.SeekableByteArrayInputimport org.apache.avro.generic.GenericDatumReaderimport org.apache.avro.generic.GenericRecordimport org.apache.avro.io.DecoderFactoryimport org.apache.pulsar.client.api.Consumerimport org.apache.pulsar.client.api.Producerimport org.apache.pulsar.client.api.PulsarClientimport org.apache.pulsar.client.api.Schemaimport org.apache.pulsar.client.api.schema.SchemaDefinitionimport org.apache.pulsar.client.api.schema.SchemaReaderimport org.apache.pulsar.client.api.schema.SchemaWriterimport java.io.ByteArrayOutputStreamimport java.io.InputStream// Convert T instance to Avro schemaless binary formatfun <T : Any> writeBinary(t: T, serializer: KSerializer<T>): ByteArray {    val out = ByteArrayOutputStream()    Avro.default.openOutputStream(serializer) {        encodeFormat = AvroEncodeFormat.Binary        schema = Avro.default.schema(serializer)    }.to(out).write(t).close()    return out.toByteArray()}// Convert Avro schemaless byte array to T instancefun <T> readBinary(bytes: ByteArray, serializer: KSerializer<T>): T {    val datumReader = GenericDatumReader<GenericRecord>(Avro.default.schema(serializer))    val decoder = DecoderFactory.get().binaryDecoder(SeekableByteArrayInput(bytes), null)    return Avro.default.fromRecord(serializer, datumReader.read(null, decoder))}// custom Pulsar SchemaReaderclass RunTaskSchemaReader: SchemaReader<RunTask> {    override fun read(bytes: ByteArray, offset: Int, length: Int) =        read(bytes.inputStream(offset, length))    override fun read(inputStream: InputStream) =        readBinary(inputStream.readBytes(), RunTask.serializer())}// custom Pulsar SchemaWriterclass RunTaskSchemaWriter : SchemaWriter<RunTask> {    override fun write(message: RunTask) = writeBinary(message, RunTask.serializer())}// custom Pulsar SchemaDefinition<RunTask>fun runTaskSchemaDefinition(): SchemaDefinition<RunTask> =    SchemaDefinition.builder<RunTask>()        .withJsonDef(Avro.default.schema(RunTask.serializer()).toString())        .withSchemaReader(RunTaskSchemaReader())        .withSchemaWriter(RunTaskSchemaWriter())        .withSupportSchemaVersioning(true)        .build()// Create an instance of Producer<RunTask>fun runTaskProducer(client: PulsarClient): Producer<RunTask> = client    .newProducer(Schema.AVRO(runTaskSchemaDefinition()))    .topic("some-avro-topic")    .create();// Create an instance of Consumer<RunTask>fun runTaskConsumer(client: PulsarClient): Consumer<RunTask> = client    .newConsumer(Schema.AVRO(runTaskSchemaDefinition()))    .topic("some-avro-topic")    .subscribe();密封类音讯和每个 Topic 一个封装Pulsar 每个 Topic 只容许一种类型的音讯。在某些非凡状况下,这并不能满足全副需要。但这个问题能够通过应用封装模式来变通。首先,应用密封类从一个 Topic 创立所有类型音讯:@Serializablesealed class TaskEngineMessage() {    abstract val taskId: TaskId}@Serializabledata class DispatchTask(    override val taskId: TaskId,    val taskName: TaskName,    val methodName: MethodName,    val methodParameterTypes: MethodParameterTypes?,    val methodInput: MethodInput,    val workflowId: WorkflowId?,    val methodRunId: MethodRunId?,    val taskMeta: TaskMeta,    val taskOptions: TaskOptions = TaskOptions()) : TaskEngineMessage()@Serializabledata class CancelTask(    override val taskId: TaskId,    val taskOutput: MethodOutput) : TaskEngineMessage()@Serializabledata class TaskCanceled(    override val taskId: TaskId,    val taskOutput: MethodOutput,    val taskMeta: TaskMeta) : TaskEngineMessage()@Serializabledata class TaskCompleted(    override val taskId: TaskId,    val taskName: TaskName,    val taskOutput: MethodOutput,    val taskMeta: TaskMeta) : TaskEngineMessage()

而后,再为这些音讯创立一个封装:

Note @Serializabledata class TaskEngineEnvelope(    val taskId: TaskId,    val type: TaskEngineMessageType,    val dispatchTask: DispatchTask? = null,    val cancelTask: CancelTask? = null,    val taskCanceled: TaskCanceled? = null,    val taskCompleted: TaskCompleted? = null,) {    init {        val noNull = listOfNotNull(            dispatchTask,            cancelTask,            taskCanceled,            taskCompleted        )        require(noNull.size == 1)        require(noNull.first() == message())        require(noNull.first().taskId == taskId)    }    companion object {        fun from(msg: TaskEngineMessage) = when (msg) {            is DispatchTask -> TaskEngineEnvelope(                msg.taskId,                TaskEngineMessageType.DISPATCH_TASK,                dispatchTask = msg            )            is CancelTask -> TaskEngineEnvelope(                msg.taskId,                TaskEngineMessageType.CANCEL_TASK,                cancelTask = msg            )            is TaskCanceled -> TaskEngineEnvelope(                msg.taskId,                TaskEngineMessageType.TASK_CANCELED,                taskCanceled = msg            )            is TaskCompleted -> TaskEngineEnvelope(                msg.taskId,                TaskEngineMessageType.TASK_COMPLETED,                taskCompleted = msg            )        }    }    fun message(): TaskEngineMessage = when (type) {        TaskEngineMessageType.DISPATCH_TASK -> dispatchTask!!        TaskEngineMessageType.CANCEL_TASK -> cancelTask!!        TaskEngineMessageType.TASK_CANCELED -> taskCanceled!!        TaskEngineMessageType.TASK_COMPLETED -> taskCompleted!!    }}enum class TaskEngineMessageType {    CANCEL_TASK,    DISPATCH_TASK,    TASK_CANCELED,    TASK_COMPLETED}

请留神 Kotlin 如何优雅地查看init! 能够借助 TaskEngineEnvelope.from(msg) 很容易创立一个封装,并通过 envelope.message() 返回原始音讯。

为什么这里增加了一个显式 taskId 值,而非应用一个全局字段 message:TaskEngineMessage,并且针对每种音讯类型应用一个字段呢?是因为通过这种形式,我就能够借助 taskId 或 type,亦或者两者相结合的形式应用PulsarSQL 来获取这个 Topic 的信息。

通过协程来构建 Worker

在一般 Java 中应用 Thread 很简单且容易出错。好在 Koltin 提供了 coroutines——一种更简略的异步解决形象——和 channels——一种在协程之间传输数据的便捷形式。

我能够通过以下形式创立一个 Worker:

  • 单个 ("task-engine-message-puller") 专用于从 Pulsar 拉取音讯的协程
  • N 个协程 ( "task-engine-$i") 并行处理音讯
  • 单个 ("task-engine-message-acknoldeger") 解决后确认 Pulsar 音讯的协程

有很多个相似于这样的过程后我曾经增加了一个 logChannel 用来采集日志。请留神,为了可能在与接管它的协程不同的协程中确认 Pulsar 音讯,我须要将TaskEngineMessage封装到蕴含Pulsar messageIdMessageToProcess<TaskEngineMessage>中:

typealias TaskEngineMessageToProcess = MessageToProcess<TaskEngineMessage>fun CoroutineScope.startPulsarTaskEngineWorker(    taskEngineConsumer: Consumer<TaskEngineEnvelope>,    taskEngine: TaskEngine,    logChannel: SendChannel<TaskEngineMessageToProcess>?,    enginesNumber: Int) = launch(Dispatchers.IO) {    val taskInputChannel = Channel<TaskEngineMessageToProcess>()    val taskResultsChannel = Channel<TaskEngineMessageToProcess>()    // coroutine dedicated to pulsar message pulling    launch(CoroutineName("task-engine-message-puller")) {        while (isActive) {            val message: Message<TaskEngineEnvelope> = taskEngineConsumer.receiveAsync().await()            try {                val envelope = readBinary(message.data, TaskEngineEnvelope.serializer())                taskInputChannel.send(MessageToProcess(envelope.message(), message.messageId))            } catch (e: Exception) {                taskEngineConsumer.negativeAcknowledge(message.messageId)                throw e            }        }    }    // coroutines dedicated to Task Engine    repeat(enginesNumber) {        launch(CoroutineName("task-engine-$it")) {            for (messageToProcess in taskInputChannel) {                try {                    messageToProcess.output = taskEngine.handle(messageToProcess.message)                } catch (e: Exception) {                    messageToProcess.exception = e                }                taskResultsChannel.send(messageToProcess)            }        }    }    // coroutine dedicated to pulsar message acknowledging    launch(CoroutineName("task-engine-message-acknowledger")) {        for (messageToProcess in taskResultsChannel) {            if (messageToProcess.exception == null) {                taskEngineConsumer.acknowledgeAsync(messageToProcess.messageId).await()            } else {                taskEngineConsumer.negativeAcknowledge(messageToProcess.messageId)            }            logChannel?.send(messageToProcess)        }    }}data class MessageToProcess<T> (    val message: T,    val messageId: MessageId,    var exception: Exception? = null,    var output: Any? = null)

总结

在本文中,咱们介绍了如何在 Kotlin 中实现的 Pulsar 应用办法:

  • 代码音讯(包含接管多种类型音讯的 Pulsar Topic 的封装);
  • 创立 Pulsar 的生产者/消费者;
  • 构建一个可能并行处理许多音讯的简略 Worker。

关注公众号「Apache Pulsar」,获取更多技术干货

退出 Apache Pulsar 中文交换群