关于大数据:Kafka-Series-3-CLI-Command-Line-Interface

47次阅读

共计 4386 个字符,预计需要花费 11 分钟才能阅读完成。

Using MacOS (Kafka installed by Brew) as example

1. Start Zookeeper and Kafka

Make sure zookeeper and Kafka are running before proceeding to next steps, otherwise you will get ‘Broker may not be available’ error.
Open a terminal, run

$ /usr/local/bin/zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg

Open another terminal, run

$ /usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties

2. Topics

1) Create new topic
Default is one partition. If you want to modify, edit config/server.properties or config/kraft/server.properties to num.partitions=3.

$ kafka-topics --bootstrap-server localhost:9092 --topic first_topic --create

2) Create new topic specifying number of partitions

$ kafka-topics --bootstrap-server localhost:9092 --topic second_topic --create --partitions 3

3) Create new topic specifying number of partitions and replication factor

$ kafka-topics --bootstrap-server localhost:9092 --topic third_topic --create --partitions 3 --replication-factor 2

Note: I am getting an error because I only have one broker, so can’t set the replication factor to >1. If we have a cluster (multiple brokers), we can set the replication factor to >1.

4) List current topics

$ kafka-topics --bootstrap-server localhost:9092 --list

5) Describe topics

$ kafka-topics --bootstrap-server localhost:9092 --topic first_topic --describe

6) Delete topics

$ kafka-topics --bootstrap-server localhost:9092 --topic first_topic --delete

3. Producers

1) Producing to existing topic
Enter producer console

$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic

Send any message.
Press ctrl+c to exit (for Mac terminal press ctrl+.)

2) Producing to non existing topic
Kafka will create the topic automatically for you. Not recommended.

3) Producing with keys

$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic --property parse.key=true --property key.separator=:

If your message doesn’t have a key, you will get error.

4. Consumer

1) Consume what is being producing
Start a consumer

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic

Nothing is returning for now

Now open a new terminal, start a producer, and send message

$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic

You will see the message appear in the consumer terminal

2) Consume history

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --from-beginning

If you have the producer running, the new message will also show.

Note: Looks like the messages are not in the order of they being sent. It’s because we set 3 partitions in the topic, messages are randomly sent to each partition, and they are only in order within each topic.

To display detailed message information

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --formatter kafka.tools.DefaultMessageFormatter --property print.timestamp=true --property print.key=true --property print.value=true --from-beginning

3) Consumer Group
Start one consumer, specifying group to my-first-application

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --group my-first-application

Start another consumer in a new terminal, specifying to the same group:

$ kafka-console-consumer --bootstrap-server localhost:9092 --topic first_topic --group my-first-application

Start a producer and send some messages

$ kafka-console-producer --bootstrap-server localhost:9092 --topic first_topic

You can see the consumers are splitting the messages
Producer:

Consumer1:

Consumer2:

If you create a new consumer group my-second-application, then both consumer groups will receive the same messages.

list consumer groups

$ kafka-consumer-groups --bootstrap-server localhost:9092 --list

describe one specific group

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-second-application

If you have a producer producing message, but no consumers consuming the message, there will be difference between CURRENT-OFFSET and LOG-END-OFFSET, which is the ‘LAG’. After the consumer being created and starting to consume, the LAG will be removed to 0.

If you retrive history without specifying consumer group, it will create a temporary group for you


override the group.id for kafka-console-consumer using
–group mygroup

5. Offset

Reset the offsets to the beginning of each partition

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --to-earliest --execute --topic first_topic

shift offsets by 2 (forward) as another strategy (read 2 more messages from before)

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by 2 --execute --topic first_topic

shift offsets by 2 (backward) as another strategy

$ kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group my-first-application --reset-offsets --shift-by -2 --execute --topic first_topic

6.Conduktor UI demo


正文完
 0