1、集群部署
1.1、JDK
cat >> /etc/ansible/playbook/install-jdk.yml << EOF
- hosts: cluster
remote_user: root
tasks:
- name: 分发JDK
copy: src=/opt/software/jdk-21.0.5_linux-x64_bin.tar.gz dest=/opt/software
- name: 解压JDK
shell: tar -xvzf /opt/software/jdk-21.0.5_linux-x64_bin.tar.gz -C /usr/local/java
- name: 配置环境变量
blockinfile:
path: /etc/profile
block: |
export JAVA_HOME=/usr/local/java/jdk-21.0.5
export PATH=$JAVA_HOME/bin:$PATH
marker: "# {mark} JDK"
EOF
ansible-playbook install-jdk.yml
1.2、基础环境配置
cat >> /etc/ansible/playbook/modify_env.yml << EOF
- hosts: cluster
remote_user: root
tasks:
#设置主机名
- name: set hostname
shell: hostnamectl set-hostname {{hostname}}
- name: distribute hosts to nodes
copy:
src: /etc/hosts
dest: /etc
#关闭防火墙
- name: stop firewalld
service:
name: firewalld
state: stopped
enabled: no
#关闭selinux
- name: setenforce 0
shell: "setenforce 0"
failed_when: false
- name: set selinux disabled
replace:
path: /etc/selinux/config
regexp: '^SELINUX=enforcing'
replace: 'SELINUX=disabled'
- name: 设置最大文件句柄数
lineinfile:
path: /etc/security/limits.conf
insertafter: '### AFTER THIS LINE'
line: "{{ item }}"
state: present
with_items:
- '* soft noproc 65536'
- '* hard noproc 65536'
- '* soft nofile 131072'
- '* hard nofile 131072'
- '* hard memlock unlimited'
- '* soft memlock unlimited'
- name: 关闭 THP
lineinfile:
path: /etc/rc.local
line: |
echo never > /sys/kernel/mm/transparent_hugepage/enabled
echo never > /sys/kernel/mm/transparent_hugepage/defrag
- name: Change permissions
shell: chmod +x /etc/rc.d/rc.local
- name: 关闭swap
replace:
path: /etc/fstab
regexp: '^(\s*)([^#\n]+\s+)(\w+\s+)swap(\s+.*)$'
replace: '#\1\2\3swap\4'
backup: yes
- name: Disable SWAP
shell: |
swapoff -a
EOF
ansible-playbook modify_env.yml
1.3、免密
cat >> /etc/ansible/playbook/ssh-pubkey.yml << EOF
- hosts: cluster
gather_facts: no
remote_user: root
tasks:
- name: 免密登录
authorized_key:
user: root
key: "{{ lookup('file', '/root/.ssh/id_rsa.pub') }}"
state: present
EOF
ansible-playbook ssh-pubkey.yml
1.4、部署Kafka
cat >> /etc/ansible/playbook/install-kafka.yml << EOF
- hosts: cluster
remote_user: root
tasks:
- name: 分发 kafka
copy: src=/opt/software/kafka_2.13-4.0.0.tgz dest=/opt/software
- name: 解压 kafka
shell: tar -xvzf /opt/software/kafka_2.13-4.0.0.tgz -C /opt/module/
- name: rename to kafka
shell: mv /opt/module/kafka_2.13-4.0.0 /opt/module/kafka
- name: 赋权starrocks
shell: chown -R starrocks:starrocks /opt/module/kafka
- name: 配置环境变量
blockinfile:
path: /home/starrocks/.bashrc
block: |
# kafka
export KAFKA_HOME=/opt/module/kafka
export PATH=$KAFKA_HOME/bin:$PATH
marker: "# {mark} KAFKA"
EOF
ansible-playbook install-kafka.yml
1.5、修改Kafka配置
vi /opt/module/kafka/config/server.properties
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller
# The node id associated with this instance's roles
## 为不同kafka 服务分配不同的 id值,其他服务id随ip的递增而增加
node.id=1
# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=kylin-01:9093
## 接入集群的kafka 服务的ip 和 编号,端口号使用监控和管理端口 9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
# Name of listener used for communication between brokers.
inter.broker.listener.name=PLAINTEXT
# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://kylin-01:9092,CONTROLLER://kylin-01:9093
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# listener.security.protocol.map=BROKER:SASL_SSL,CONTROLLER:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/data
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
offsets.topic.replication.factor=1
share.coordinator.state.topic.replication.factor=1
share.coordinator.state.topic.min.isr=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# 最大消息大小512MB
message.max.bytes=536870912
replica.fetch.max.bytes=536870912
# 最大请求字节大小1GB
max.request.size=1073741824
1.6、修改其它节点配置
node.id=2
controller.quorum.bootstrap.servers=kylin-02:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
advertised.listeners=PLAINTEXT://kylin-02:9092,CONTROLLER://kylin-02:9093
node.id=3
controller.quorum.bootstrap.servers=kylin-03:9093
controller.quorum.voters=1@kylin-01:9093,2@kylin-02:9093,3@kylin-03:9093
listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
advertised.listeners=PLAINTEXT://kylin-03:9092,CONTROLLER://kylin-03:9093
1.7、修改日志目录
vi bin/kafka-run-class.sh
# Log directory to use
# 修改日志目录
LOG_DIR=/data/kafka/log
if [ "x$LOG_DIR" = "x" ]; then
LOG_DIR="$base_dir/logs"
fi
1.8、初始化集群
生成存储目录唯一ID
bin/kafka-storage.sh random-uuid
格式化 kafka 存储目录(每个节点都需要执行)
bin/kafka-storage.sh format -t m6BZb8yRSzmdNvW9kzOoQg -c config/server.properties
1.9、启动集群
每个节点都执行启动服务命令
bin/kafka-server-start.sh -daemon config/server.properties
查看服务日志
tail -f /data/kafka/log/server.log
查看 kafka 节点状态
bin/kafka-broker-api-versions.sh --bootstrap-server kylin-01:9092
查看 Kafka 的端口监听状态
netstat -tuln | grep 9092
使用 ps 命令查看 Kafka 进程
ps aux | grep kafka
使用 top 或 htop 查看 Kafka 进程
top -p <PID>
2、操作命令
## 查看主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --list
## 查看主题明细
kafka-topics.sh --bootstrap-server kylin-01:9092 --describe <topic-id>
## 创建主题,分区 partition 为5,副本 replication-factor 为2,broker 数应 大于等于 副本数。(broker 不必在创建 topic 时显示指定)
kafka-topics.sh --bootstrap-server kylin-01:9092 --create --topic <topic-id> --partitions 5 --replication-factor 2
## 删除主题
kafka-topics.sh --bootstrap-server kylin-01:9092 --delete --topic <topic-id>
## 查看消费者列表--list
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --list
## 查看指定消费组详情
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --group <group-id>
## 删除特定group
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --delete --group <group-id>
## 打开一个生产者
kafka-console-producer.sh --bootstrap-server kylin-01:9092 --topic <topic-id>
## 打开一个消费者
kafka-console-consumer.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --consumer-property group.id=<group-id> --from-beginning
## 查看所有消费组详情--all-groups
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups
查询消费者成员信息--members
## 所有消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --all-groups --members
## 指定消费组成员信息
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --describe --members --group <group-id>
## 修改到最新offset
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-latest --execute
## 预演-重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --dry-run
## 重设位移位置
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --group <group-id> --reset-offsets --topic <topic-id> --to-offset 100 --execute
## 获取指定时间戳的offset
kafka-run-class.sh kafka.tools.GetOffsetShell --bootstrap-server kylin-01:9092 --topic <topic-id> --time 1740499200000
## topic扩容
kafka-topic.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --alter --partitions 16
## 指定时间范围获取数据
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print bash}'
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --max-messages 3 | awk -F 'CreateTime:|\t' '>=1742450425000 && <=1742464825000 {print $0}'
#
kafka-consumer-groups.sh --bootstrap-server kylin-01:9092 --topic <topic-id> --property print.timestamp=true --partition 0 --offset 100 --property print.timestamp=true | grep '1027729757'