kafka简单命令实现监听消费状态
kafka简单命令实现监听消费状态
安装部署
先下载kafka安装包,我下载的是3.7.0版本
解压后进入目录
tar -xzf kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0
由于我们做测试,不需要使用ZooKeeper,所以使用KRaft方式启动,
初始化存储,KRaft存储初始化只需要执行一次,后续启动直接就读取本地初始化库了
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
./bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties
启动kafka
./bin/kafka-server-start.sh config/kraft/server.properties
topic相关命令
向topic发送数据
新建一个测试topic
./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 1 --replication-factor 1
- replication-factor: 指定副本数量
- partitions:指定分区
查看Topic列表
./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --list
删除topic
./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --delete --topic test1
查看指定topic详情
./bin/kafka-topics.sh -bootstrap-server 127.0.0.1:9092 --topic test --describe
消费组相关命令
查看指定消费组列表
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
查看指定组消费情况
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test --describe
查看消费组示例
(base) [root@node2 kafka_2.13-3.7.0]# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group view --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
view view 0 30006 30006 0 consumer-view-1-3ba2e3cb-5059-47ae-abcb-70db2fd9b489 /10.2.15.90 consumer-view-1
view test 0 10 10 0 - - -
消费组列名解读
- TOPIC: 消费者的topic名称
- PARTITION: 分区数的名称
- CURRENT-OFFSET: consumer group最后一次提交的offset
- LOG-END-OFFSET: 最后提交的生产消息offset
- LAG: 消费offset与生产offset之间的差值
- CONSUMER-ID: 消费者的ID编号,消费者组里面最少要有一个消费者,当然也可以有多个消费者
- HOST: 消费者的主机IP地址
- CLIENT-ID: 链接的ID编号
简单监听脚本
每五秒执行一次查看指定消费组的详情,vim monitor.sh添加下面内容
#!/bin/bash
# 循环间隔时间(秒)
interval=5
echo "消费应用 主题 PARTITION 当前消费数 主题总数量 LAG 消费编号 消费主机 连接编号"
# 循环条件:无限循环
while true; do
# 执行消费组监控脚本
# 这里假设你有一个名为check_consumer_group.sh的脚本,它接受消费组ID和主题名称作为参数
./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group $1 --describe |awk 'NR>2{print $N}'
# 等待指定的间隔时间
sleep "$interval"
done
使用命令如下:
sh monitor.sh view
测试发送数据
./bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
测试接收数据
./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
TOPIC支持参数
Command must include exactly one action: --list, --describe, --create, --alter or --delete
Option Description
------ -----------
--alter Alter the number of partitions and
replica assignment. Update the
configuration of an existing topic
via --alter is no longer supported
here (the kafka-configs CLI supports
altering topic configs with a --
bootstrap-server option).
--at-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
equal to the configured minimum.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--config <String: name=value> A topic configuration override for the
topic being created. The following
is a list of valid configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
local.retention.bytes
local.retention.ms
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.after.max.ms
message.timestamp.before.max.ms
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
remote.storage.enable
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs. It is
supported only in combination with --
create if --bootstrap-server option
is used (the kafka-configs CLI
supports altering topic configs with
a --bootstrap-server option).
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). Not supported with
the --bootstrap-server option.
--describe List details for the given topics.
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--help Print usage information.
--if-exists if set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected). If not supplied
for create, defaults to the cluster
default.
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being
created. If not supplied, defaults
to the cluster default.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option. Put topic name in double
quotes and use the '\' prefix to
escape regular expression symbols; e.
g. "test\.topic".
--topic-id <String: topic-id> The topic-id to describe.This is used
only with --bootstrap-server option
for describing topics.
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
less than the configured minimum.
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--version Display Kafka version.
消费组支持参数
Command must include exactly one action: --list, --describe, --delete, --reset-offsets, --delete-offsets
Option Description
------ -----------
--all-groups Apply to all consumer groups.
--all-topics Consider all topics assigned to a
group in the `reset-offsets` process.
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--by-duration <String: duration> Reset offsets to offset by duration
from current timestamp. Format:
'PnDTnHnMnS'
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client and Consumer.
--delete Pass in groups to delete topic
partition offsets and ownership
information over the entire consumer
group. For instance --group g1 --
group g2
--delete-offsets Delete offsets of consumer group.
Supports one consumer group at the
time, and multiple topics.
--describe Describe consumer group and list
offset lag (number of messages not
yet processed) related to given
group.
--dry-run Only show results without executing
changes on Consumer Groups.
Supported operations: reset-offsets.
--execute Execute operation. Supported
operations: reset-offsets.
--export Export operation execution to a CSV
file. Supported operations: reset-
offsets.
--from-file <String: path to CSV file> Reset offsets to values defined in CSV
file.
--group <String: consumer group> The consumer group we wish to act on.
--help Print usage information.
--list List all consumer groups.
--members Describe members of the group. This
option may be used with '--describe'
and '--bootstrap-server' options
only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members
--offsets Describe the group and list all topic
partitions in the group along with
their offset lag. This is the
default sub-action of and may be
used with '--describe' and '--
bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
offsets
--reset-offsets Reset offsets of consumer group.
Supports one consumer group at the
time, and instances should be
inactive
Has 2 execution options: --dry-run
(the default) to plan which offsets
to reset, and --execute to update
the offsets. Additionally, the --
export option is used to export the
results to a CSV format.
You must choose one of the following
reset specifications: --to-datetime,
--by-duration, --to-earliest, --to-
latest, --shift-by, --from-file, --
to-current, --to-offset.
To define the scope use --all-topics
or --topic. One scope must be
specified unless you use '--from-
file'.
--shift-by <Long: number-of-offsets> Reset offsets shifting current offset
by 'n', where 'n' can be positive or
negative.
--state [String] When specified with '--describe',
includes the state of the group.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
state
When specified with '--list', it
displays the state of all groups. It
can also be used to list groups with
specific states.
Example: --bootstrap-server localhost:
9092 --list --state stable,empty
This option may be used with '--
describe', '--list' and '--bootstrap-
server' options only.
--timeout <Long: timeout (ms)> The timeout that can be set for some
use cases. For example, it can be
used when describing the group to
specify the maximum amount of time
in milliseconds to wait before the
group stabilizes (when the group is
just created, or is going through
some changes). (default: 5000)
--to-current Reset offsets to current offset.
--to-datetime <String: datetime> Reset offsets to offset from datetime.
Format: 'YYYY-MM-DDTHH:mm:SS.sss'
--to-earliest Reset offsets to earliest offset.
--to-latest Reset offsets to latest offset.
--to-offset <Long: offset> Reset offsets to a specific offset.
--topic <String: topic> The topic whose consumer group
information should be deleted or
topic whose should be included in
the reset offset process. In `reset-
offsets` case, partitions can be
specified using this format: `topic1:
0,1,2`, where 0,1,2 are the
partition to be included in the
process. Reset-offsets also supports
multiple topic inputs.
--verbose Provide additional information, if
any, when describing the group. This
option may be used with '--
offsets'/'--members'/'--state' and
'--bootstrap-server' options only.
Example: --bootstrap-server localhost:
9092 --describe --group group1 --
members --verbose
--version Display Kafka version.
标题:kafka简单命令实现监听消费状态
作者:haizhilingyu
地址:https://xiweihai.site/articles/2024/03/25/1711366421121.html