haizhilingyu 的个人知识记录

Open Source, Open Mind,
Open Sight, Open Future!

kafka简单命令实现监听消费状态

kafka简单命令实现监听消费状态

kafka官网文档

安装部署

先下载kafka安装包,我下载的是3.7.0版本

解压后进入目录

tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0

由于我们做测试,不需要使用ZooKeeper,所以使用KRaft方式启动,

初始化存储,KRaft存储初始化只需要执行一次,后续启动直接就读取本地初始化库了

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

启动kafka

./bin/kafka-server-start.sh config/kraft/server.properties

topic相关命令

向topic发送数据

新建一个测试topic

./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 1 --replication-factor 1
  • replication-factor: 指定副本数量
  • partitions:指定分区

查看Topic列表

./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --list

删除topic

./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --delete --topic test1

查看指定topic详情

./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --topic test --describe

消费组相关命令

查看指定消费组列表

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

查看指定组消费情况

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test --describe

查看消费组示例

(base) [root@node2 kafka_2.13-3.7.0]# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group view --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
view            view            0          30006           30006           0               consumer-view-1-3ba2e3cb-5059-47ae-abcb-70db2fd9b489 /10.2.15.90     consumer-view-1
view            test            0          10              10              0               -                                                    -               -

消费组列名解读

  • TOPIC: 消费者的topic名称
  • PARTITION: 分区数的名称
  • CURRENT-OFFSET: consumer group最后一次提交的offset
  • LOG-END-OFFSET: 最后提交的生产消息offset
  • LAG: 消费offset与生产offset之间的差值
  • CONSUMER-ID: 消费者的ID编号,消费者组里面最少要有一个消费者,当然也可以有多个消费者
  • HOST: 消费者的主机IP地址
  • CLIENT-ID: 链接的ID编号

简单监听脚本

每五秒执行一次查看指定消费组的详情,vim monitor.sh添加下面内容

#!/bin/bash

# 循环间隔时间(秒)
interval=5
echo "消费应用           主题           PARTITION  当前消费数  主题总数量  LAG             消费编号                                          消费主机            连接编号"
# 循环条件:无限循环
while true; do
  # 执行消费组监控脚本
  # 这里假设你有一个名为check_consumer_group.sh的脚本,它接受消费组ID和主题名称作为参数
  ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group $1 --describe |awk 'NR>2{print $N}'
  # 等待指定的间隔时间
  sleep "$interval"
done

使用命令如下:

sh monitor.sh view

测试发送数据

./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

测试接收数据

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

TOPIC支持参数

Command must include exactly one action: --list, --describe, --create, --alter or --delete
Option                                   Description
------                                   -----------
--alter                                  Alter the number of partitions and
                                           replica assignment. Update the
                                           configuration of an existing topic
                                           via --alter is no longer supported
                                           here (the kafka-configs CLI supports
                                           altering topic configs with a --
                                           bootstrap-server option).
--at-min-isr-partitions                  if set when describing topics, only
                                           show partitions whose isr count is
                                           equal to the configured minimum.
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect
  connect to>                              to.
--command-config <String: command        Property file containing configs to be
  config property file>                    passed to Admin Client. This is used
                                           only with --bootstrap-server option
                                           for describing and altering broker
                                           configs.
--config <String: name=value>            A topic configuration override for the
                                           topic being created. The following
                                           is a list of valid configurations:
                                                cleanup.policy
                                                compression.type
                                                delete.retention.ms
                                                file.delete.delay.ms
                                                flush.messages
                                                flush.ms
                                                follower.replication.throttled.
                                           replicas
                                                index.interval.bytes
                                                leader.replication.throttled.replicas
                                                local.retention.bytes
                                                local.retention.ms
                                                max.compaction.lag.ms
                                                max.message.bytes
                                                message.downconversion.enable
                                                message.format.version
                                                message.timestamp.after.max.ms
                                                message.timestamp.before.max.ms
                                                message.timestamp.difference.max.ms
                                                message.timestamp.type
                                                min.cleanable.dirty.ratio
                                                min.compaction.lag.ms
                                                min.insync.replicas
                                                preallocate
                                                remote.storage.enable
                                                retention.bytes
                                                retention.ms
                                                segment.bytes
                                                segment.index.bytes
                                                segment.jitter.ms
                                                segment.ms
                                                unclean.leader.election.enable
                                         See the Kafka documentation for full
                                           details on the topic configs. It is
                                           supported only in combination with --
                                           create if --bootstrap-server option
                                           is used (the kafka-configs CLI
                                           supports altering topic configs with
                                           a --bootstrap-server option).
--create                                 Create a new topic.
--delete                                 Delete a topic
--delete-config <String: name>           A topic configuration override to be
                                           removed for an existing topic (see
                                           the list of configurations under the
                                           --config option). Not supported with
                                           the --bootstrap-server option.
--describe                               List details for the given topics.
--exclude-internal                       exclude internal topics when running
                                           list or describe command. The
                                           internal topics will be listed by
                                           default
--help                                   Print usage information.
--if-exists                              if set when altering or deleting or
                                           describing topics, the action will
                                           only execute if the topic exists.
--if-not-exists                          if set when creating topics, the
                                           action will only execute if the
                                           topic does not already exist.
--list                                   List all available topics.
--partitions <Integer: # of partitions>  The number of partitions for the topic
                                           being created or altered (WARNING:
                                           If partitions are increased for a
                                           topic that has a key, the partition
                                           logic or ordering of the messages
                                           will be affected). If not supplied
                                           for create, defaults to the cluster
                                           default.
--replica-assignment <String:            A list of manual partition-to-broker
  broker_id_for_part1_replica1 :           assignments for the topic being
  broker_id_for_part1_replica2 ,           created or altered.
  broker_id_for_part2_replica1 :
  broker_id_for_part2_replica2 , ...>
--replication-factor <Integer:           The replication factor for each
  replication factor>                      partition in the topic being
                                           created. If not supplied, defaults
                                           to the cluster default.
--topic <String: topic>                  The topic to create, alter, describe
                                           or delete. It also accepts a regular
                                           expression, except for --create
                                           option. Put topic name in double
                                           quotes and use the '\' prefix to
                                           escape regular expression symbols; e.
                                           g. "test\.topic".
--topic-id <String: topic-id>            The topic-id to describe.This is used
                                           only with --bootstrap-server option
                                           for describing topics.
--topics-with-overrides                  if set when describing topics, only
                                           show topics that have overridden
                                           configs
--unavailable-partitions                 if set when describing topics, only
                                           show partitions whose leader is not
                                           available
--under-min-isr-partitions               if set when describing topics, only
                                           show partitions whose isr count is
                                           less than the configured minimum.
--under-replicated-partitions            if set when describing topics, only
                                           show under replicated partitions
--version                                Display Kafka version.

消费组支持参数

Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets
Option                                  Description
------                                  -----------
--all-groups                            Apply to all consumer groups.
--all-topics                            Consider all topics assigned to a
                                          group in the `reset-offsets` process.
--bootstrap-server <String: server to   REQUIRED: The server(s) to connect to.
  connect to>
--by-duration <String: duration>        Reset offsets to offset by duration
                                          from current timestamp. Format:
                                          'PnDTnHnMnS'
--command-config <String: command       Property file containing configs to be
  config property file>                   passed to Admin Client and Consumer.
--delete                                Pass in groups to delete topic
                                          partition offsets and ownership
                                          information over the entire consumer
                                          group. For instance --group g1 --
                                          group g2
--delete-offsets                        Delete offsets of consumer group.
                                          Supports one consumer group at the
                                          time, and multiple topics.
--describe                              Describe consumer group and list
                                          offset lag (number of messages not
                                          yet processed) related to given
                                          group.
--dry-run                               Only show results without executing
                                          changes on Consumer Groups.
                                          Supported operations: reset-offsets.
--execute                               Execute operation. Supported
                                          operations: reset-offsets.
--export                                Export operation execution to a CSV
                                          file. Supported operations: reset-
                                          offsets.
--from-file <String: path to CSV file>  Reset offsets to values defined in CSV
                                          file.
--group <String: consumer group>        The consumer group we wish to act on.
--help                                  Print usage information.
--list                                  List all consumer groups.
--members                               Describe members of the group. This
                                          option may be used with '--describe'
                                          and '--bootstrap-server' options
                                          only.
                                        Example: --bootstrap-server localhost:
                                          9092 --describe --group group1 --
                                          members
--offsets                               Describe the group and list all topic
                                          partitions in the group along with
                                          their offset lag. This is the
                                          default sub-action of and may be
                                          used with '--describe' and '--
                                          bootstrap-server' options only.
                                        Example: --bootstrap-server localhost:
                                          9092 --describe --group group1 --
                                          offsets
--reset-offsets                         Reset offsets of consumer group.
                                          Supports one consumer group at the
                                          time, and instances should be
                                          inactive
                                        Has 2 execution options: --dry-run
                                          (the default) to plan which offsets
                                          to reset, and --execute to update
                                          the offsets. Additionally, the --
                                          export option is used to export the
                                          results to a CSV format.
                                        You must choose one of the following
                                          reset specifications: --to-datetime,
                                          --by-duration, --to-earliest, --to-
                                          latest, --shift-by, --from-file, --
                                          to-current, --to-offset.
                                        To define the scope use --all-topics
                                          or --topic. One scope must be
                                          specified unless you use '--from-
                                          file'.
--shift-by <Long: number-of-offsets>    Reset offsets shifting current offset
                                          by 'n', where 'n' can be positive or
                                          negative.
--state [String]                        When specified with '--describe',
                                          includes the state of the group.
                                        Example: --bootstrap-server localhost:
                                          9092 --describe --group group1 --
                                          state
                                        When specified with '--list', it
                                          displays the state of all groups. It
                                          can also be used to list groups with
                                          specific states.
                                        Example: --bootstrap-server localhost:
                                          9092 --list --state stable,empty
                                        This option may be used with '--
                                          describe', '--list' and '--bootstrap-
                                          server' options only.
--timeout <Long: timeout (ms)>          The timeout that can be set for some
                                          use cases. For example, it can be
                                          used when describing the group to
                                          specify the maximum amount of time
                                          in milliseconds to wait before the
                                          group stabilizes (when the group is
                                          just created, or is going through
                                          some changes). (default: 5000)
--to-current                            Reset offsets to current offset.
--to-datetime <String: datetime>        Reset offsets to offset from datetime.
                                          Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest                           Reset offsets to earliest offset.
--to-latest                             Reset offsets to latest offset.
--to-offset <Long: offset>              Reset offsets to a specific offset.
--topic <String: topic>                 The topic whose consumer group
                                          information should be deleted or
                                          topic whose should be included in
                                          the reset offset process. In `reset-
                                          offsets` case, partitions can be
                                          specified using this format: `topic1:
                                          0,1,2`, where 0,1,2 are the
                                          partition to be included in the
                                          process. Reset-offsets also supports
                                          multiple topic inputs.
--verbose                               Provide additional information, if
                                          any, when describing the group. This
                                          option may be used with '--
                                          offsets'/'--members'/'--state' and
                                          '--bootstrap-server' options only.
                                        Example: --bootstrap-server localhost:
                                          9092 --describe --group group1 --
                                          members --verbose
--version                               Display Kafka version.


标题:kafka简单命令实现监听消费状态
作者:haizhilingyu
地址:https://xiweihai.site/articles/2024/03/25/1711366421121.html