已经在你的应用程序中应用过异步解决吗?在解决不须要立刻执行的工作时,异步代码仿佛是不可避免的。Apache Kafka是最罕用和最强壮的开源事件流平台之一。许多公司和开发者利用它的弱小性能来创立高性能的异步操作,用于微服务的数据集成,以及用于应用程序衰弱指标的监控工具。这篇文章解释了在.NET应用程序中应用Kafka的细节,还展现了如何在Windows操作系统上装置及应用。
它是如何工作的
当今世界,数据正在以指数模式增长。为了包容一直增长的数据,Kafka这样的工具应运而生,提供了强壮而令人印象粗浅的架构。
然而Kafka是如何在幕后工作的呢?
Kafka在生产者和消费者之间替换信息。生产者和消费者是这一线性过程的两个次要角色。
Kafka也能够在一个或多个服务器的集群中工作。这些服务器被称为Kafka代理,通过代理你能够受害于多种个性,例如数据复制、容错和高可用。
这些代理由另一个叫做Zookeeper的工具治理。总之,它是一种旨在放弃分布式系统中同步和组织配置数据的服务。
Kafka Topics
Kafka只是一个代理,所有的行为都产生在这。生产者向世界发送音讯,而消费者读取特定的数据块。如何辨别数据的一个特定局部与其余局部?消费者如何晓得要应用哪些数据?要了解这一点,你须要一个新的内容:topic。
Kafka topics是传递音讯的载体。由生产者产生的Kafka记录被组织并存储到topic中。
假如你正在解决一个用于记录动物目录的API我的项目。你要确保公司中的每个人都可能拜访每一个新注册的动物。所以你选了Kafka。
在零碎中注册的每一个新动物都将通过Kafka进行播送。topic的名称是tree_catalog。
在这种状况下,topic像堆栈一样工作。它将信息保留在达到时的雷同地位,并保证数据不会失落。
达到的每个数据记录被存储在一个slot中,并用一个称为offset的惟一地位号注册。
例如,当一个消费者生产了存储在offset是0的音讯时,它提交音讯,申明一切正常,而后挪动到下一个offset,依此类推。这个过程通常是线性的。然而,因为许多消费者能够同时将记录“插入”到同一个topic中,所以确定哪些数据地位曾经被占用的责任留给了消费者。这意味着消费者能够决定应用音讯的程序,甚至决定是否从头开始从新开始解决(offset为0)。
分区
分布式系统的一个要害个性是数据复制。它容许一个更平安的体系结构,数据能够被复制到其余中央,以防不好的事件产生。Kafka通过分区解决复制。Kafka topics被配置为扩散在几个分区(可配置的)。每个分区通过惟一的offset保留数据记录。
为了实现冗余,Kafka在分区(一个或多个)创立正本,并在集群中流传数据。
这个过程遵循leader-follower模型,其中一个leader正本总是解决给定分区的申请,而follower复制该分区。每次制作人将音讯推送到某个主题时,它都会间接传递给该主题的领导者。
生产组
在Kafka中,生产来自topic的音讯最合适的形式是通过生产组。
顾名思义,这些组由一个或多个消费者组成,目标是获取来自特定主题的所有音讯。
为此,组必须始终具备惟一的id(由属性group.id设置)。无论何时消费者想要退出那个组,它都将通过组id来实现。
每次你增加或删除一个组的消费者,Kafka会从新均衡它们之间的负载,这样就不会过载。
设置
当初,你曾经理解了Kafka的通用工作原理,是时候开始环境设置了。为了简化,这个例子将应用Docker来保留Kafka和Zookeeper映像,而不是将它们装置到你的机器上。这样能够节俭一些空间和复杂性。
对于Windows用户,Docker提供了一种装置和治理Docker容器的简略形式:Docker桌面。进入它的下载页面并下载安装程序。运行它,并在不更改默认设置选项的状况下持续到最初。
确保在此过程实现后重新启动计算机。重启后,Docker可能会要求你装置其余依赖项,所以请确保承受每一个依赖项。在Docker上装置一个无效的Kafka本地环境最快的门路之一是通过Docker Compose。通过这种形式,能够通过一个YAML文件建设应用程序服务,并疾速地让它们运行。
创立一个名为docker-compose的新文件,并将以下的内容保留到其中:
version: '2'services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka: image: wurstmeister/kafka ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1 KAFKA_CREATE_TOPICS: "simpletalk_topic:1:1" KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /var/run/docker.sock:/var/run/docker.sock
留神,代码从Docker Hub的wurstmeister帐户中导入了两个服务镜像(kafka和zookeeper)。这是在Docker上应用Kafka最稳固的镜像之一。端口也应用它们的推荐值进行设置,因而请留神不要更改它们。
其中最重要的设置之一属于KAFKA_CREATE_TOPICS。在这里,你必须定义要创立的topic名称。还有其余办法能够创立主题,当前你将看到。
通过命令行导航到docker-compose.yml所在的文件夹。而后执行如下命令启动镜像:
docker-compose up
这段代码将加载所有依赖项并启动镜像。在此过程中,可能会看到大量的日志。
如果没有谬误日志显示,阐明启动胜利。
为了查看Docker镜像是否启动,在另一个cmd窗口中运行以下命令:
docker ps
显示如下:
亲自动手
你的Kafka环境曾经能够应用了。下一步是在Visual Studio中进行我的项目创立。进入我的项目创立窗口。搜寻ASP.NET Core Web Application模板,单击Next。
解决方案新建一个名称消费者我的项目和生产者我的项目将在同一个解决方案中共存。
下一个窗口抉择API模板。勾销勾选“配置为HTTPS”选项。
创立我的项目后,右键单击解决方案,抉择增加新我的项目,而后,抉择ASP.NET Core Web Application我的项目类型。
持续并像后面一样抉择API模板。
当初,在ST-Kafka-NET解决方案中有两个我的项目。
NuGet包
为了让C#代码了解如何产生和生产音讯,你须要一个Kafka的客户端。当初最罕用的客户端是Confluent’s Kafka .NET Client。
抉择并单击Install。或者,你能够通过命令行增加它们:
PM> Install-Package Confluent.Kafka
设置消费者
当初来实现消费者我的项目。尽管它是一个相似rest的应用程序,但消费者不是必须的。任何类型的.net我的项目都能够监听topic音讯。
该我的项目曾经蕴含一个Controllers文件夹。你须要创立一个名为Handlers的新类,并向其增加一个KafkaConsumerHandler.cs的文件。内容如下:
using Confluent.Kafka;using Microsoft.Extensions.Hosting;using System;using System.Threading;using System.Threading.Tasks;namespace ST_KafkaConsumer.Handlers{ public class KafkaConsumerHandler : IHostedService { private readonly string topic = "simpletalk_topic"; public Task StartAsync(CancellationToken cancellationToken) { var conf = new ConsumerConfig { GroupId = "st_consumer_group", BootstrapServers = "localhost:9092", AutoOffsetReset = AutoOffsetReset.Earliest }; using (var builder = new ConsumerBuilder<Ignore, string>(conf).Build()) { builder.Subscribe(topic); var cancelToken = new CancellationTokenSource(); try { while (true) { var consumer = builder.Consume(cancelToken.Token); Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}"); } } catch (Exception) { builder.Close(); } } return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } }}
这个处理程序必须在一个独自的线程中运行,因为它将永远在while循环中监督传入音讯。因而,须要在这个类中应用异步工作。
请留神topic名称和消费者配置。它们与docker-compose.yml中的设置齐全匹配。肯定要重复查看你的输出,否则可能会导致一些莫名其妙的谬误。
消费者组id能够是任何你想要的。通常,它们都有直观的名称,以帮忙进行保护和故障排除。
每当新音讯被公布到simpletalk_topic时,该消费者将应用它并将其记录到控制台。当然,在事实应用程序中,你会更好地利用这些数据。
你还须要将这个托管服务类增加到Startup中,因而,关上它,并在ConfigureServices办法中增加以下代码行:
services.AddSingleton<IHostedService, KafkaConsumerHandler>();
并确保引入了以下命名空间:
using ST_KafkaConsumer.Handlers;
设置生产者
至于生产者,这里的解决形式会有所不同。因为不须要有限循环来监听达到的音讯,生产者能够简略地从任何中央公布音讯,甚至是从控制器。在理论的应用程序中,最好将这类代码与MVC层离开,但本例保持应用控制器,以放弃简略。
在Controllers文件夹中创立一个名为KafkaProducerController.cs的文件,并向其增加一下内容:
using System;using Confluent.Kafka;using Microsoft.AspNetCore.Mvc;namespace Kafka.Producer.API.Controllers{ [Route("api/kafka")] [ApiController] public class KafkaProducerController : ControllerBase { private readonly ProducerConfig config = new ProducerConfig { BootstrapServers = "localhost:9092" }; private readonly string topic = "simpletalk_topic"; [HttpPost] public IActionResult Post([FromQuery] string message) { return Created(string.Empty, SendToKafka(topic, message)); } private Object SendToKafka(string topic, string message) { using (var producer = new ProducerBuilder<Null, string>(config).Build()) { try { return producer.ProduceAsync(topic, new Message<Null, string> { Value = message }) .GetAwaiter() .GetResult(); } catch (Exception e) { Console.WriteLine($"Oops, something went wrong: {e}"); } } return null; } }}
生产者代码比消费者代码简略得多。ProducerBuilder类负责依据提供的配置选项、Kafka服务器和topic名称创立一个功能齐全的Kafka生产者。
重要的是要记住整个过程是异步的。然而,你能够应用Confluent的API来检索awaiter对象,而后从API办法返回后果。
测试
要测试这个示例,你须要别离运行生产者和使用者应用程序。在工具栏中,找到Startup Projects组合框并抉择ST-KafkaConsumer选项:
点击按钮IIS Express来运行消费者应用程序。这将启动一个新的浏览器窗口,咱们将疏忽并最小化它,因为消费者API不是重点。
关上一个新的cmd窗口,跳转到producer文件夹。运行命令dotnet run来启动它。
请留神它所运行的URL和端口。
当初是时候通过producer API发送一些音讯了。为此,你能够应用任何API测试工具,例如Postman。
为了让上面的命令失常工作,必须确保Docker镜像失常工作。因而,请确保再次执行docker ps来查看。有时,重新启动计算机会进行这些过程。
如果命令没有任何日志信息,那么再运行一次docker-compose。
要测试公布-订阅音讯,关上另一个cmd窗口并收回以下命令:
curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!"
这个申请发送到生产者API并向Kafka公布一个新音讯。
要查看消费者是否收到了它,你能够找到输入窗口并抉择ST-KafkaConsumer – ASP.NET Core Web Server,如图所示:
cmd窗口也能够显示JSON后果。然而,它没有格式化。要解决这个问题,如果你装置了Python,你能够运行以下命令:
curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!" | python -m json.tool
输入如下:
这是目前能够取得的对于topic message对象的所有信息。第二个测试将显示当消费者我的项目敞开并公布音讯时产生了什么。
进行Visual Studio中的consumer我的项目,但这一次有一个不同的音讯:
curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Is%20anybody%20there?" | python -m json.tool
接着启动消费者我的项目,察看日志记录,内容如下:
总结
Kafka是一个灵便和强壮的工具,它容许在许多类型的我的项目中进行弱小的实现,这是它被宽泛采纳的第一个起因。
这篇文章只是对它的世界的一个简要介绍,然而还有更多的货色能够看到。在下一篇文章中,我将探讨Kafka的性能。