Kafka 4.0入门到熟练

发布于:2025-04-02 ⋅ 阅读:(14) ⋅ 点赞:(0)

1、集群部署

1.1、JDK

cat >> /etc/ansible/playbook/install-jdk.yml << EOF 
- hosts: cluster
  remote_user: root
  tasks:
  - name: 分发JDK
    copy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz  dest=/opt/software
  - name: 解压JDK
    shell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java
  - name: 配置环境变量
    blockinfile:
      path: /etc/profile
      block: |
        export JAVA_HOME=/usr/local/java/jdk-21.0.5
        export PATH=$JAVA_HOME/bin:$PATH
      marker: "# {mark} JDK"
EOF
ansible-playbook install-jdk.yml

1.2、基础环境配置

cat >> /etc/ansible/playbook/modify_env.yml << EOF 
- hosts: cluster
  remote_user: root
  tasks:
    #设置主机名
    - name: set hostname
      shell: hostnamectl set-hostname {{hostname}}

    - name: distribute hosts to nodes
      copy:
        src: /etc/hosts
        dest: /etc

    #关闭防火墙
    - name: stop firewalld
      service:
        name: firewalld
        state: stopped
        enabled: no

    #关闭selinux
    - name: setenforce 0
      shell: "setenforce 0"
      failed_when: false

    - name: set selinux disabled
      replace:
        path: /etc/selinux/config
        regexp: '^SELINUX=enforcing'
        replace: 'SELINUX=disabled'

    - name: 设置最大文件句柄数
      lineinfile:
        path: /etc/security/limits.conf
        insertafter: '### AFTER THIS LINE'
        line: "{{ item }}"
        state: present
      with_items:
        - '*       soft   noproc  65536'
        - '*       hard   noproc  65536'
        - '*       soft   nofile  131072'
        - '*       hard   nofile  131072'
        - '*       hard memlock unlimited'
        - '*       soft memlock unlimited'

    - name: 关闭 THP
      lineinfile:
        path: /etc/rc.local
        line: |
          echo never > /sys/kernel/mm/transparent_hugepage/enabled
          echo never > /sys/kernel/mm/transparent_hugepage/defrag

    - name: Change permissions
      shell: chmod +x /etc/rc.d/rc.local

    - name: 关闭swap
      replace:
        path: /etc/fstab
        regexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'
        replace: '#\1\2\3swap\4'
        backup: yes
    - name: Disable SWAP
      shell: |
        swapoff -a
EOF
ansible-playbook modify_env.yml

1.3、免密

cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF 
- hosts: cluster
  gather_facts: no
  remote_user: root
  tasks:
  - name: 免密登录
    authorized_key:
      user: root
      key: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
      state: present
EOF
ansible-playbook ssh-pubkey.yml

1.4、部署Kafka

cat >> /etc/ansible/playbook/install-kafka.yml << EOF 
- hosts: cluster
  remote_user: root
  tasks:
  - name: 分发 kafka
    copy: src=/opt/software/kafka_2.13-4.0.0.tgz  dest=/opt/software
  - name: 解压 kafka
    shell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/
  - name: rename to kafka
    shell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka
  - name: 赋权starrocks
    shell: chown -R starrocks:starrocks /opt/module/kafka
  - name: 配置环境变量
    blockinfile:
      path: /home/starrocks/.bashrc
      block: |
        # kafka
        export KAFKA_HOME=/opt/module/kafka
        export PATH=$KAFKA_HOME/bin:$PATH
      marker: "# {mark} KAFKA"

EOF
ansible-playbook install-kafka.yml

1.5、修改Kafka配置

vi /opt/module/kafka/config/server.properties

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The node id associated with this instance's roles
## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
node.id=1

# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=kylin-01:9093

## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093

listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093

# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093

# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER

# listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

# 最大消息大小512MB
message.max.bytes=536870912
replica.fetch.max.bytes=536870912
# 最大请求字节大小1GB
max.request.size=1073741824

1.6、修改其它节点配置

node.id=2
controller.quorum.bootstrap.servers=kylin-02:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
node.id=3
controller.quorum.bootstrap.servers=kylin-03:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093

1.7、修改日志目录

vi bin/kafka-run-class.sh

# Log directory to use
# 修改日志目录
LOG_DIR=/data/kafka/log

if [ "x$LOG_DIR" = "x" ]; then
  LOG_DIR="$base_dir/logs"
fi

1.8、初始化集群

生成存储目录唯一ID

bin/kafka-storage.sh random-uuid

格式化 kafka 存储目录(每个节点都需要执行)

bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties

1.9、启动集群

每个节点都执行启动服务命令

bin/kafka-server-start.sh -daemon config/server.properties

查看服务日志

 tail -f /data/kafka/log/server.log

查看 kafka 节点状态

bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092

查看 Kafka 的端口监听状态

netstat -tuln | grep 9092

使用 ps 命令查看 Kafka 进程

ps aux | grep kafka

使用 top 或 htop 查看 Kafka 进程

top -p <PID>

2、操作命令

## 查看主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --list
 
## 查看主题明细
kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>
 
## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2
 
## 删除主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>
 
## 查看消费者列表--list
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list
 
## 查看指定消费组详情
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>
 
## 删除特定group
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>
 
## 打开一个生产者
kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>
 
## 打开一个消费者
kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>  --consumer-property group.id=<group-id>  --from-beginning 
 
## 查看所有消费组详情--all-groups
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups
 
查询消费者成员信息--members
 
## 所有消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members
 
## 指定消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>
 
## 修改到最新offset
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute

## 预演-重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run

## 重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute

## 获取指定时间戳的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000

## topic扩容
kafka-topic.sh  --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16

## 指定时间范围获取数据
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}' 
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}' 
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 --property print.timestamp=true | grep '1027729757'

3、API