关于java:kafka-集群测试与socket-api

3次阅读

共计 11761 个字符,预计需要花费 30 分钟才能阅读完成。

本文不是讲怎么用 kafka,是讲它运行时调用哪些外围规范 c 函数, 属于底层基础知识。


程序源码
java/py/node/go/rust/php/c++ 这些语言编写的 ascii 文本文件 咱们称为源码

程序
大部分曾经通过语言处理器【如编译器】解决好的文件,大部分在 linux 下肯定的格局存储。此类程序大部分含有代码 + 数据的二进制格局。

过程
在 linux 终端下大部分通过 bash 管制过程启动它,并且启动的子过程会调用零碎函数加载程序到内存中运行。

过程调用 API
过程能调用的 api 大部分是 linux 提供的规范库 api 函数,但最终是调用零碎 syscall.

演示示例
1 kafka 下载及 quickstart
http://kafka.apache.org/docum…

2 启动各个服务过程


3 创立一个主题

.sh 文件是 shell 脚本文件,对 shell 编程不太理解的自行百度学习。
4 启动 2 个消费者过程和 1 个数据生产者过程

5 消费者过程运行时调用的相干零碎函数 syscall
sendmsg(3, {msg_name(12)={sa_family=AF_NETLINK, pid=0, groups=00000000}, msg_iov(2)=[{“\200\0\0\0d\4\1\0\0\0\0\0\0\0\0\0”, 16}, {“bin/kafka-console-consumer.sh –topic quickstart-events –from-beginning –bootstrap-server 192.168.79.140:9092\0”, 112}], msg_controllen=0, msg_flags=0}, 0) = 128 <0.000018>

bash 管制过程失去终端输出的数据【管制过程的输入输出由伪终端驱动程序提供,数据来源于 TCP,对管制过程不理解的记得百度,这是最简略的基础知识了】
clone(child_stack=0, flags=CLONE_CHILD_CLEARTID|CLONE_CHILD_SETTID|SIGCHLD, child_tidptr=0x7ffaa55b4a10) = 17999 <0.000085>
克隆创立新工作【linux 中将过程,线程称为工作】

setpgid(17999, 17999) = 0 <0.000011>
将本人设置为组长过程
execve(“bin/kafka-console-consumer.sh”, [“bin/kafka-console-consumer.sh”, “–topic”, “quickstart-events”, “–from-beginning”, “–bootstrap-server”, “192.168.79.140:9092”], [/ 29 vars /]) = 0 <0.000795>
调用 execve 加载 script shell 脚本文件,前面的 <0.000795> 是指该函数零碎调用破费的工夫.

open(“/lib64/libdl.so.2”, O_RDONLY|O_CLOEXEC) = 3 <0.000015>
加载 dl 库, 它的性能是能够显示的调用其它的动静库

open(“/lib64/libc.so.6”, O_RDONLY|O_CLOEXEC) = 3 <0.000152>
加载 libc.so 动静库,该库提供了大量的函数。

open(“bin/kafka-console-consumer.sh”, O_RDONLY) = 3 <0.000014>
关上脚本文件

dup2(3, 255) = 255 <0.000010>

read(255, “#!/bin/bash\n# Licensed to the Apache Software Foundation (ASF) under one or more\n# contributor license agreements. See the NOTICE file distributed with\n# this work for additional information regarding copyright ownership.\n# The ASF licenses this file to You under the Apache License, Version 2.0\n# (the \”License\”); you may not use this file except in compliance with\n# the License. You may obtain a copy of the License at\n#\n# http://www.apache.org/license…\n#\n# Unless required by applicable law or agreed to in writing, software\n# distributed under the License is distributed on an \”AS IS\” BASIS,\n# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n# See the License for the specific language governing permissions and\n# limitations under the License.\n\nif [\”x$KAFKA_HEAP_OPTS\” = \”x\”]; then\n export KAFKA_HEAP_OPTS=\”-Xmx512M\”\nfi\n\nexec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer \”$@\”\n”, 945) = 945 <0.000011>
读取脚本内容

通过了重重调用,最终调用 bin/java elf 文件【花哩胡哨的】
execve(“/home/jdk1.8.0_261/bin/java”, [“/home/jdk1.8.0_261/bin/java”, “-Xmx512M”, “-server”, “-XX:+UseG1GC”, “-XX:MaxGCPauseMillis=20”, “-XX:InitiatingHeapOccupancyPercent=35”, “-XX:+ExplicitGCInvokesConcur rent”, “-XX:MaxInlineLevel=15”, “-Djava.awt.headless=true”, “-Dcom.sun.management.jmxremote”, “-Dcom.sun.management.jmxremote.authenticate=false”, “-Dcom.sun.management.jmxremote.ssl=false”, “-Dkafka.logs.dir=/home/java/kafka_2.13- 2.7.0/bin/../logs”, “-Dlog4j.configuration=file:/home/java/kafka_2.13-2.7.0/bin/../config/tools-log4j.properties”, “-cp”, “.:/home/jdk1.8.0_261/jre/lib/ext:/home/jdk1.8.0_261/lib/dt.jar:/home/jdk1.8.0_261/lib/tools.jar:/home/java/k afka_2.13-2.7.0/bin/../libs/activation-1.1.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/aopalliance-repackaged-2.6.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/argparse4j-0.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/audience -annotations-0.5.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/commons-cli-1.4.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/commons-lang3-3.8.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect-api-2.7.0.jar:/home/java/kafka_2.13- 2.7.0/bin/../libs/connect-basic-auth-extension-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect-file-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect-json-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect- mirror-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect-mirror-client-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect-runtime-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/connect-transforms-2.7.0.jar:/home/j ava/kafka_2.13-2.7.0/bin/../libs/hk2-api-2.6.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/hk2-locator-2.6.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/hk2-utils-2.6.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-annotation s-2.10.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-core-2.10.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-databind-2.10.5.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-dataformat-csv-2.10.5.jar:/home/java /kafka_2.13-2.7.0/bin/../libs/jackson-datatype-jdk8-2.10.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-base-2.10.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-jaxrs-json-provider-2.10.5.jar:/home/java/kafka_2. 13-2.7.0/bin/../libs/jackson-module-jaxb-annotations-2.10.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-module-paranamer-2.10.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jackson-module-scala_2.13-2.10.5.jar:/home/java/kaf ka_2.13-2.7.0/bin/../libs/jakarta.activation-api-1.2.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jakarta.annotation-api-1.3.5.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jakarta.inject-2.6.1.jar:/home/java/kafka_2.13-2.7.0/bin/.. /libs/jakarta.validation-api-2.0.2.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jakarta.ws.rs-api-2.1.6.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jakarta.xml.bind-api-2.3.2.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/javassist-3.2 5.0-GA.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/javassist-3.26.0-GA.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/javax.servlet-api-3.1.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/javax.ws.rs-api-2.1.1.jar:/home/java/kafka_2.13- 2.7.0/bin/../libs/jaxb-api-2.3.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jersey-client-2.31.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jersey-common-2.31.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-2.3 1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jersey-container-servlet-core-2.31.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jersey-hk2-2.31.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jersey-media-jaxb-2.31.jar:/home/java/kafka_2. 13-2.7.0/bin/../libs/jersey-server-2.31.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-client-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-continuation-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/. ./libs/jetty-http-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-io-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-security-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty- server-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-servlet-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-servlets-9.4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jetty-util-9 .4.33.v20201020.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/jopt-simple-5.0.4.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka_2.13-2.7.0-sources.jar:/home/java/kafka_2. 13-2.7.0/bin/../libs/kafka-clients-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-log4j-appender-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-raft-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-streams- 2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-streams-examples-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-streams-scala_2.13-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-streams-test-utils-2.7.0.ja r:/home/java/kafka_2.13-2.7.0/bin/../libs/kafka-tools-2.7.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/log4j-1.2.17.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/lz4-java-1.7.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/maven-artif act-3.6.3.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/metrics-core-2.2.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/netty-buffer-4.1.51.Final.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/netty-codec-4.1.51.Final.jar:/home/java/kafk a_2.13-2.7.0/bin/../libs/netty-common-4.1.51.Final.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/netty-handler-4.1.51.Final.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/netty-resolver-4.1.51.Final.jar:/home/java/kafka_2.13-2.7.0/bin/. ./libs/netty-transport-4.1.51.Final.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-epoll-4.1.51.Final.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/netty-transport-native-unix-common-4.1.51.Final.jar:/home/java/ka fka_2.13-2.7.0/bin/../libs/osgi-resource-locator-1.0.3.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/paranamer-2.8.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/plexus-utils-3.2.1.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/reflections -0.9.12.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/rocksdbjni-5.18.4.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/scala-collection-compat_2.13-2.2.0.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/scala-java8-compat_2.13-0.9.1.jar:/hom e/java/kafka_2.13-2.7.0/bin/../libs/scala-library-2.13.3.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/scala-logging_2.13-3.9.2.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/scala-reflect-2.13.3.jar:/home/java/kafka_2.13-2.7.0/bin/../l ibs/slf4j-api-1.7.30.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/slf4j-log4j12-1.7.30.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/snappy-java-1.1.7.7.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/zookeeper-3.5.8.jar:/home/java/kafka_ 2.13-2.7.0/bin/../libs/zookeeper-jute-3.5.8.jar:/home/java/kafka_2.13-2.7.0/bin/../libs/zstd-jni-1.4.5-6.jar”, “kafka.tools.ConsoleConsumer”, “–topic”, “quickstart-events”, “–from-beginning”, “–bootstrap-server”, “192.168.79.140 :9092”], [/ 28 vars /]) = 0 <0.000362>

epoll_ctl(113, EPOLL_CTL_MOD, 116, {EPOLLIN, {u32=116, u64=116}}) = 0 <0.000013>
epoll_wait(113, [{EPOLLIN, {u32=116, u64=116}}], 4096, 2884) = 1 <0.000013>

116 是 socket 文件描述符,read 是 libc.so 规范库里提供的函数。
前面的数据是 kafka 自行封装的数据格式,能够看一些字符串
read(116, “\0\0\0\n\0\0\0\0\0\0\0]^e\v\2\22quickstart-events\2\0\0\0\0\0\0\0\0\0\0\0\0\0\10\0\0\0\0\0\0\0\10\0\0\0\0\0\0\0\0\0\377\377\377\377\351\4\0\0\0\0\0\0\0\0\0\0\0D\0\0\0\0\2\241jK\357\0\0\0\0\0\0\0\ 0\1x\271\350\f)\0\0\1x\271\350\f)\377\377\377\377\377\377\377\377\377\377\377\377\377\377\0\0\0\1$\0\0\0\1\30i like money\0\0\0\0\0\0\0\0\1\0\0\0D\0\0\0\0\2\263\272f<\0\0\0\0\0\0\0\0\1x\271\350&4\0\0\1x\271\350&4\377\377\377\377\37 7\377\377\377\377\377\377\377\377\377\0\0\0\1$\0\0\0\1\30i love money\0\0\0\0\0\0\0\0\2\0\0\0>\0\0\0\0\2\342hn\266\0\0\0\0\0\0\0\0\1x\271\360\213f\0\0\1x\271\360\213f\377\377\377\377\377\377\377\377\377\377\377\377\377\377\0\0\0\1\ 30\0\0\0\1\ftest b\0\0\0\0\0\0\0\0\3\0\0\0>\0\0\0\0\2\265\362\347\377\0\0\0\0\0\0\0\0\1x\271\361\310\3\0\0\1x\271\361\310\3\377\377\377\377\377\377\377\377\377\377\377\377\377\377\0\0\0\1\30\0\0\0\1\ftest c\0\0\0\0\0\0\0\0\4\0\0\0? \0\0\0\0\2*\301{K\0\0\0\0\0\0\0\0\1x\271\362\6;\0\0\1x\271\362\6;\377\377\377\377\377\377\377\377\377\377\377\377\377\377\0\0\0\1\32\0\0\0\1\16test bb\0\0\0\0\0\0\0\0\5\0\0\0A\0\0\0\0\2<\257z\333\0\0\0\0\0\0\0\0\1x\271\362o\313\0\0 \1x\271\362o\313\377\377\377\377\377\377\377\377\377\377\377\377\377\377\0\0\0\1\36\0\0\0\1\22test nice\0\0\0\0\0\0\0\0\6\0\0\0B\0\0\0\0\2}0\223\342\0\0\0\0\0\0\0\0\1x\271\362\376\343\0\0\1x\271\362\376\343\377\377\377\377\377\377\ 377\377\377\377\377\377\377\377\0\0\0\1 \0\0\0\1\24test china\0\0\0\0\0\0\0\0\7\0\0\0B\0\0\0\0\2\312 u\254\0\0\0\0\0\0\0\0\1x\271\363f[\0\0\1x\271\363f[\377\377\377\377\377\377\377\377\377\377\377\377\377\377\0\0\0\1 \0\0\0\1\24tes t china\0\0\0\0″, 691) = 691 <0.000015>

向终端输入接管的数据
write(1, “test china”, 10) = 10 <0.000015>

总结:
kafka 整个程序运行时,调用的零碎函数过多,每次调用都会消耗肯定的调用工夫,整个程序的性能并没有什么晋升,如果用在我的项目中,硬件配置倡议整大点,毕竟这玩意吃资源。跟个航母一样。耗油。

整个过程最终调用的零碎函数跟其它编程语言是一样的,没有什么区别,用到的常识仍然是 TCP/IP, 而 TCP/IP 的实现就是 SOCKET API , 方才咱们曾经看到 epoll,write,read,sendmsg 等这些函数。
大家通过调试,应用后,能够本人写一个简略的示例都能做出一个公布订阅性能的程序了。毕竟基础知识就这些。【当然了 kafka 实现时所应用的相干数据结构,算法不谈,你本人实现简略的程序时应用队列数据结构也就能实现】

最初,如果你对其它 rust/python/go/c/php 调用时是否应用了雷同的规范函数,能够点击 https://www.bilibili.com/vide…

欢送加群探讨

正文完
 0