共计 7358 个字符,预计需要花费 19 分钟才能阅读完成。
已经在你的应用程序中应用过异步解决吗? 在解决不须要立刻执行的工作时,异步代码仿佛是不可避免的。Apache Kafka 是最罕用和最强壮的开源事件流平台之一。许多公司和开发者利用它的弱小性能来创立高性能的异步操作,用于微服务的数据集成,以及用于应用程序衰弱指标的监控工具。这篇文章解释了在.NET 应用程序中应用 Kafka 的细节,还展现了如何在 Windows 操作系统上装置及应用。
它是如何工作的
当今世界,数据正在以指数模式增长。为了包容一直增长的数据,Kafka 这样的工具应运而生,提供了强壮而令人印象粗浅的架构。
然而 Kafka 是如何在幕后工作的呢?
Kafka 在生产者和消费者之间替换信息。生产者和消费者是这一线性过程的两个次要角色。
Kafka 也能够在一个或多个服务器的集群中工作。这些服务器被称为 Kafka 代理,通过代理你能够受害于多种个性,例如数据复制、容错和高可用。
这些代理由另一个叫做 Zookeeper 的工具治理。总之,它是一种旨在放弃分布式系统中同步和组织配置数据的服务。
Kafka Topics
Kafka 只是一个代理,所有的行为都产生在这。生产者向世界发送音讯,而消费者读取特定的数据块。如何辨别数据的一个特定局部与其余局部? 消费者如何晓得要应用哪些数据? 要了解这一点,你须要一个新的内容:topic。
Kafka topics 是传递音讯的载体。由生产者产生的 Kafka 记录被组织并存储到 topic 中。
假如你正在解决一个用于记录动物目录的 API 我的项目。你要确保公司中的每个人都可能拜访每一个新注册的动物。所以你选了 Kafka。
在零碎中注册的每一个新动物都将通过 Kafka 进行播送。topic 的名称是 tree_catalog。
在这种状况下,topic 像堆栈一样工作。它将信息保留在达到时的雷同地位,并保证数据不会失落。
达到的每个数据记录被存储在一个 slot 中,并用一个称为 offset 的惟一地位号注册。
例如,当一个消费者生产了存储在 offset 是 0 的音讯时,它提交音讯,申明一切正常,而后挪动到下一个 offset,依此类推。这个过程通常是线性的。然而,因为许多消费者能够同时将记录“插入”到同一个 topic 中,所以确定哪些数据地位曾经被占用的责任留给了消费者。这意味着消费者能够决定应用音讯的程序,甚至决定是否从头开始从新开始解决(offset 为 0)。
分区
分布式系统的一个要害个性是数据复制。它容许一个更平安的体系结构,数据能够被复制到其余中央,以防不好的事件产生。Kafka 通过分区解决复制。Kafka topics 被配置为扩散在几个分区(可配置的)。每个分区通过惟一的 offset 保留数据记录。
为了实现冗余,Kafka 在分区 (一个或多个) 创立正本,并在集群中流传数据。
这个过程遵循 leader-follower 模型,其中一个 leader 正本总是解决给定分区的申请,而 follower 复制该分区。每次制作人将音讯推送到某个主题时,它都会间接传递给该主题的领导者。
生产组
在 Kafka 中,生产来自 topic 的音讯最合适的形式是通过生产组。
顾名思义,这些组由一个或多个消费者组成,目标是获取来自特定主题的所有音讯。
为此,组必须始终具备惟一的 id(由属性 group.id 设置)。无论何时消费者想要退出那个组,它都将通过组 id 来实现。
每次你增加或删除一个组的消费者,Kafka 会从新均衡它们之间的负载,这样就不会过载。
设置
当初,你曾经理解了 Kafka 的通用工作原理,是时候开始环境设置了。为了简化,这个例子将应用 Docker 来保留 Kafka 和 Zookeeper 映像,而不是将它们装置到你的机器上。这样能够节俭一些空间和复杂性。
对于 Windows 用户,Docker 提供了一种装置和治理 Docker 容器的简略形式:Docker 桌面。进入它的下载页面并下载安装程序。运行它,并在不更改默认设置选项的状况下持续到最初。
确保在此过程实现后重新启动计算机。重启后,Docker 可能会要求你装置其余依赖项,所以请确保承受每一个依赖项。在 Docker 上装置一个无效的 Kafka 本地环境最快的门路之一是通过 Docker Compose。通过这种形式,能够通过一个 YAML 文件建设应用程序服务,并疾速地让它们运行。
创立一个名为 docker-compose 的新文件,并将以下的内容保留到其中:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "simpletalk_topic:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
留神,代码从 Docker Hub 的 wurstmeister 帐户中导入了两个服务镜像(kafka 和 zookeeper)。这是在 Docker 上应用 Kafka 最稳固的镜像之一。端口也应用它们的推荐值进行设置,因而请留神不要更改它们。
其中最重要的设置之一属于 KAFKA_CREATE_TOPICS。在这里,你必须定义要创立的 topic 名称。还有其余办法能够创立主题,当前你将看到。
通过命令行导航到 docker-compose.yml 所在的文件夹。而后执行如下命令启动镜像:
docker-compose up
这段代码将加载所有依赖项并启动镜像。在此过程中,可能会看到大量的日志。
如果没有谬误日志显示,阐明启动胜利。
为了查看 Docker 镜像是否启动,在另一个 cmd 窗口中运行以下命令:
docker ps
显示如下:
亲自动手
你的 Kafka 环境曾经能够应用了。下一步是在 Visual Studio 中进行我的项目创立。进入我的项目创立窗口。搜寻 ASP.NET Core Web Application 模板, 单击 Next。
解决方案新建一个名称消费者我的项目和生产者我的项目将在同一个解决方案中共存。
下一个窗口抉择 API 模板。勾销勾选“配置为 HTTPS”选项。
创立我的项目后,右键单击解决方案,抉择增加新我的项目,而后,抉择 ASP.NET Core Web Application 我的项目类型。
持续并像后面一样抉择 API 模板。
当初,在 ST-Kafka-NET 解决方案中有两个我的项目。
NuGet 包
为了让 C# 代码了解如何产生和生产音讯,你须要一个 Kafka 的客户端。当初最罕用的客户端是 Confluent’s Kafka .NET Client。
抉择并单击 Install。或者,你能够通过命令行增加它们:
PM> Install-Package Confluent.Kafka
设置消费者
当初来实现消费者我的项目。尽管它是一个相似 rest 的应用程序,但消费者不是必须的。任何类型的.net 我的项目都能够监听 topic 音讯。
该我的项目曾经蕴含一个 Controllers 文件夹。你须要创立一个名为 Handlers 的新类,并向其增加一个 KafkaConsumerHandler.cs 的文件。内容如下:
using Confluent.Kafka;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace ST_KafkaConsumer.Handlers
{
public class KafkaConsumerHandler : IHostedService
{
private readonly string topic = "simpletalk_topic";
public Task StartAsync(CancellationToken cancellationToken)
{
var conf = new ConsumerConfig
{
GroupId = "st_consumer_group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var builder = new ConsumerBuilder<Ignore,
string>(conf).Build())
{builder.Subscribe(topic);
var cancelToken = new CancellationTokenSource();
try
{while (true)
{var consumer = builder.Consume(cancelToken.Token);
Console.WriteLine($"Message: {consumer.Message.Value} received from {consumer.TopicPartitionOffset}");
}
}
catch (Exception)
{builder.Close();
}
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{return Task.CompletedTask;}
}
}
这个处理程序必须在一个独自的线程中运行,因为它将永远在 while 循环中监督传入音讯。因而,须要在这个类中应用异步工作。
请留神 topic 名称和消费者配置。它们与 docker-compose.yml 中的设置齐全匹配。肯定要重复查看你的输出,否则可能会导致一些莫名其妙的谬误。
消费者组 id 能够是任何你想要的。通常,它们都有直观的名称,以帮忙进行保护和故障排除。
每当新音讯被公布到 simpletalk_topic 时,该消费者将应用它并将其记录到控制台。当然,在事实应用程序中,你会更好地利用这些数据。
你还须要将这个托管服务类增加到 Startup 中,因而,关上它,并在 ConfigureServices 办法中增加以下代码行:
services.AddSingleton<IHostedService, KafkaConsumerHandler>();
并确保引入了以下命名空间:
using ST_KafkaConsumer.Handlers;
设置生产者
至于生产者,这里的解决形式会有所不同。因为不须要有限循环来监听达到的音讯,生产者能够简略地从任何中央公布音讯,甚至是从控制器。在理论的应用程序中,最好将这类代码与 MVC 层离开,但本例保持应用控制器,以放弃简略。
在 Controllers 文件夹中创立一个名为 KafkaProducerController.cs 的文件,并向其增加一下内容:
using System;
using Confluent.Kafka;
using Microsoft.AspNetCore.Mvc;
namespace Kafka.Producer.API.Controllers
{[Route("api/kafka")]
[ApiController]
public class KafkaProducerController : ControllerBase
{
private readonly ProducerConfig config = new ProducerConfig
{BootstrapServers = "localhost:9092"};
private readonly string topic = "simpletalk_topic";
[HttpPost]
public IActionResult Post([FromQuery] string message)
{return Created(string.Empty, SendToKafka(topic, message));
}
private Object SendToKafka(string topic, string message)
{
using (var producer =
new ProducerBuilder<Null, string>(config).Build())
{
try
{return producer.ProduceAsync(topic, new Message<Null, string> { Value = message})
.GetAwaiter()
.GetResult();}
catch (Exception e)
{Console.WriteLine($"Oops, something went wrong: {e}");
}
}
return null;
}
}
}
生产者代码比消费者代码简略得多。ProducerBuilder 类负责依据提供的配置选项、Kafka 服务器和 topic 名称创立一个功能齐全的 Kafka 生产者。
重要的是要记住整个过程是异步的。然而,你能够应用 Confluent 的 API 来检索 awaiter 对象,而后从 API 办法返回后果。
测试
要测试这个示例,你须要别离运行生产者和使用者应用程序。在工具栏中,找到 Startup Projects 组合框并抉择 ST-KafkaConsumer 选项:
点击按钮 IIS Express 来运行消费者应用程序。这将启动一个新的浏览器窗口,咱们将疏忽并最小化它,因为消费者 API 不是重点。
关上一个新的 cmd 窗口,跳转到 producer 文件夹。运行命令 dotnet run 来启动它。
请留神它所运行的 URL 和端口。
当初是时候通过 producer API 发送一些音讯了。为此,你能够应用任何 API 测试工具,例如 Postman。
为了让上面的命令失常工作,必须确保 Docker 镜像失常工作。因而,请确保再次执行 docker ps 来查看。有时,重新启动计算机会进行这些过程。
如果命令没有任何日志信息,那么再运行一次 docker-compose。
要测试公布 - 订阅音讯,关上另一个 cmd 窗口并收回以下命令:
curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!"
这个申请发送到生产者 API 并向 Kafka 公布一个新音讯。
要查看消费者是否收到了它,你能够找到输入窗口并抉择 ST-KafkaConsumer – ASP.NET Core Web Server,如图所示:
cmd 窗口也能够显示 JSON 后果。然而,它没有格式化。要解决这个问题,如果你装置了 Python,你能够运行以下命令:
curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Hello,kafka!" | python -m json.tool
输入如下:
这是目前能够取得的对于 topic message 对象的所有信息。第二个测试将显示当消费者我的项目敞开并公布音讯时产生了什么。
进行 Visual Studio 中的 consumer 我的项目,但这一次有一个不同的音讯:
curl -H "Content-Length: 0" -X POST "http://localhost:51249/api/kafka?message=Is%20anybody%20there?" | python -m json.tool
接着启动消费者我的项目,察看日志记录,内容如下:
总结
Kafka 是一个灵便和强壮的工具,它容许在许多类型的我的项目中进行弱小的实现,这是它被宽泛采纳的第一个起因。
这篇文章只是对它的世界的一个简要介绍,然而还有更多的货色能够看到。在下一篇文章中,我将探讨 Kafka 的性能。