kafka安装与参数配置

发布于:2025-08-09 ⋅ 阅读:(16) ⋅ 点赞:(0)

安装

先安装zookeeper

apache-zookeeper-3.8.4

暂时无法在飞书文档外展示此内容

安装kafka

kafka_2.13-3.8.0

暂时无法在飞书文档外展示此内容

  1. 上传解压
  2. 修改配置

./config/server.properties

broker.id=0
listeners=PLAINTEXT://your.server.ip.address:9092
zookeeper.connect=localhost:2181
log.dirs=/opt/kafka/data

启动脚本

nohup /home/kafka/kafka_2.13-3.8.0/bin/kafka-server-start.sh /home/kafka/kafka_2.13-3.8.0/config/server.properties > /home/kafka/kafka_2.13-3.8.0/logs/kafka-server.log 2>&1 &
key value
安装目录 /home/kafka/kafka_2.13-3.8.0
配置文件 /home/kafka/kafka_2.13-3.8.0/config/server.properties
启动 sh /home/kafka/start.sh

测试创建一个测试主题(Topic)

./kafka-topics.sh --create \
  --topic test-topic \
  --bootstrap-server 192.168.1.106:9092 \
  --partitions 1 \
  --replication-factor 1

列出所有主题,验证创建成功

./kafka-topics.sh --list --bootstrap-server 192.168.1.106:9092

测试步骤 4:启动消费者(Consumer)接收消息

打开一个新的终端窗口或 tab,启动一个控制台消费者,用于接收消息。

./kafka-console-consumer.sh --bootstrap-server 192.168.1.106:9092 \
  --topic test-topic \
  --from-beginning
  • --from-beginning:表示从该主题的最开始读取消息(即使之前已有消息)。
  • 此时消费者会等待,不会立即输出内容。

启动生产者(Producer)发送消息

回到原来的终端,启动一个控制台生产者。

./kafka-console-producer.sh --bootstrap-server 192.168.1.106:9092 \
  --topic test-topic

输入一些测试消息,每输入一行按回车发送:

Hello Kafka!
This is a test message.
Kafka is working!
^C  # 按 Ctrl+C 退出

删除测试主题(可选)

测试完成后,可以删除主题(需确保 delete.topic.enable=trueserver.properties 中已启用,默认在新版本中为 true):

./kafka-topics.sh --delete \
  --topic test-topic \
  --bootstrap-server 192.168.1.106:9092

配置详解

生产者配置文件

ProducerConfig values: 
        acks = -1
        batch.size = 16384
        bootstrap.servers = [192.168.1.106:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 3
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

提供的是一份 Kafka 生产者(Producer)的完整配置信息,这些配置决定了生产者如何将消息发送到 Kafka 集群。下面将逐项解释这些配置的含义、作用以及推荐使用场景。

🔹 核心配置(Core Configuration)

配置项 解释
bootstrap.servers [192.168.1.106:9092] 指定 Kafka 集群的入口地址,生产者通过它来连接和获取集群元数据。即使只写一个 broker 地址,Kafka 也会自动发现整个集群。
key.serializer StringSerializer 将消息的 key 序列化为字节数组,此处使用字符串序列化器。
value.serializer StringSerializer 将消息的 value 序列化为字节数组。
client.id producer-1 标识该生产者的逻辑名称,用于服务器端日志、监控和配额管理。

🔹 可靠性与重试(Reliability & Retry)

配置项 解释
acks -1(等同于 all) 控制消息写入副本的确认机制:
• acks=1:leader 写入即确认
• acks=0:不等待确认(可能丢消息)
• acks=-1 或 all:等待 ISR 中所有副本都写入才确认,最强一致性。
retries 3 发送失败后重试次数(如网络问题、leader 切换等)。
retry.backoff.ms 100 两次重试之间的等待时间(毫秒),避免频繁重试造成雪崩。
enable.idempotence TRUE 开启幂等性生产者,确保单分区内的消息不重复、不丢失、不乱序(需配合 acks=all 和 max.in.flight.requests.per.connection <= 5)。
这是实现“恰好一次语义”(exactly-once)的基础。

🔹 批量与性能优化(Batching & Performance)

配置项 解释
batch.size 16384(16KB) 每个分区的批量发送缓冲区大小。当一个分区的消息积累到这个大小时,会触发发送。注意:不是必须填满才发。
linger.ms 0 消息在 batch 中等待更多消息加入的延迟时间。设为 0 表示立即发送;设为 5~100 可提升吞吐,但增加延迟。
buffer.memory 33554432(32MB) 生产者本地用于缓冲待发送消息的总内存大小。超过此值,send() 会阻塞或抛出异常(取决于 max.block.ms)。
max.request.size 1048576(1MB) 单个请求最大大小(包括所有消息总和),不能超过 broker 的 message.max.bytes。

🔹 连接与网络(Connection & Networking)

配置项 解释
connections.max.idle.ms 540000(9分钟) 连接空闲多久后关闭。
request.timeout.ms 30000(30秒) 等待 broker 响应请求的最大时间,超时则重试。
delivery.timeout.ms 120000(120秒) 从 send() 调用开始到确认发送成功或失败的总超时时间,涵盖重试、缓存、请求等全过程。
max.block.ms 60000(60秒) 当缓冲区满或元数据不可用时,send() 方法最多阻塞的时间。超过则抛出 TimeoutException。
metadata.max.age.ms 300000(5分钟) 强制刷新元数据(如 topic 分区变化)的周期。
reconnect.backoff.ms / .max.ms 50 / 1000 连接失败后重试的初始和最大退避时间,避免频繁重连。
send.buffer.bytes / receive.buffer.bytes 131072 / 32768 TCP 发送和接收缓冲区大小,设为 -1 表示使用系统默认值。

🔹 分区与路由(Partitioning)

配置项 解释
partitioner.class DefaultPartitioner 分区策略类:
• 有 key:按 key 的 hash 值分配分区(保证相同 key 到同一分区)
• 无 key:轮询或粘性分区(Sticky Partitioning)
可自定义实现。

🔹 幂等性与事务(Idempotence & Transactions)

配置项 解释
enable.idempotence TRUE 已解释如上,开启后 Kafka 会为每条消息分配序列号,防止重复。
max.in.flight.requests.per.connection 5 每个连接最多允许多少个未确认的请求。若开启幂等性,必须 ≤5,否则可能破坏顺序性。
transactional.id null 若启用事务(跨分区原子写入),需设置此 ID。当前未启用。
transaction.timeout.ms 60000(60秒) 事务最大超时时间,由 broker 控制。

🔹 安全配置(Security)

配置项 解释
security.protocol PLAINTEXT 使用明文传输,无加密或认证。
生产环境应使用 SSL 或 SASL_SSL。
sasl.mechanism GSSAPI SASL 认证机制,GSSAPI 通常用于 Kerberos 认证。
但当前 security.protocol=PLAINTEXT,所以 SASL 不生效。
SSL 相关配置 多项 当前未启用 SSL 加密(因 security.protocol=PLAINTEXT),这些配置仅在使用 SSL 或 SASL_SSL 时生效。

🔹 监控与指标(Metrics & Monitoring)

配置项 解释
metric.reporters [] 自定义指标上报器(如 Prometheus),当前无。
metrics.sample.window.ms 30000(30秒) 统计窗口大小。
metrics.num.samples 2 保留的样本数。
metrics.recording.level INFO 记录的指标级别。

🔹 其他(Miscellaneous)

配置项 解释
interceptor.classes [] 生产者拦截器,可在消息发送前后插入逻辑(如日志、监控)。
client.dns.lookup use_all_dns_ips DNS 解析策略,支持多 IP 地址负载均衡。
socket.connection.setup.timeout.ms 10000 建立 TCP 连接的超时时间。

✅ 总结:该配置的特点

  • 高可靠性:acks=all + retries=3 + enable.idempotence=true → 保证消息不丢、不重复。
  • 低延迟:linger.ms=0 → 消息立即发送,不等待批量。
  • 中等吞吐:batch.size=16KB 较小,适合低延迟场景;若追求高吞吐,可适当调大 batch.sizelinger.ms
  • 无安全加密:security.protocol=PLAINTEXT → 仅适用于内网或测试环境。
  • 非事务性:transactional.id=null → 不支持跨分区原子操作。

📌 建议(根据场景)

场景 建议调整
高吞吐 增大 batch.size(如 64KB~1MB),设置 linger.ms=5~20
更强安全 改用 security.protocol=SASL_SSL,配置 SSL 证书或 Kerberos
事务支持 设置 transactional.id=your-txn-id,并启用事务 API
更低延迟 保持当前配置即可

消费者配置详解核心配置项

2025-08-07 10:10:11.537  INFO 20840 --- [pool-3-thread-1] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
        acks = -1
        batch.size = 16384
        bootstrap.servers = [192.168.1.106:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-2
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 3
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.connect.timeout.ms = null
        sasl.login.read.timeout.ms = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.login.retry.backoff.max.ms = 10000
        sasl.login.retry.backoff.ms = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
        sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer
  1. 基本连接配置

bootstrap.servers = [192.168.1.106:9092]:Kafka 集群地址,消费者将连接到这个地址

client.id = consumer-user-service-group-2:消费者客户端标识符,用于标识不同的消费者实例

consumer: group-id:

user-service-group

client.id = consumer-user-service-group-2:消费者客户端标识符,用于标识不同的消费者实例

为什么实际id多了个 -2?

Kafka Client ID 自动扩展机制,当您在配置中设置 client.id 时,Kafka 客户端库通常会在运行时自动扩展这个 ID,以确保在同一个 JVM 或同一台机器上运行的多个消费者实例具有唯一的标识符

基于实例计数的扩展:

Kafka 客户端可能会在基础 client.id 后面添加数字后缀

例如:如果配置了 consumer-user-service-group,实际可能变成 consumer-user-service-group-1、consumer-user-service-group-2 等

group相同,client.id不同会重复消费吗?

这是一个关于 Kafka 消费者组和消息消费机制的重要问题。让我详细解释一下:

相同 group.id,不同 client.id 是否会重复消费

简短答案

不会重复消费同一份消息。在同一个消费者组(group.id 相同)中,即使 client.id 不同,Kafka 也会确保每条消息只被组内的一个消费者实例消费。

  1. 主题相关配置

allow.auto.create.topics = true:允许自动创建主题,当消费者订阅不存在的主题时会自动创建

auto.offset.reset = latest:当没有初始偏移量或服务器上不再存在偏移量时的重置策略,设置为 latest 表示从最新的消息开始消费

相关配置

  1. earliest:与latest相反,设置为earliest表示当没有初始偏移量或当前偏移量不再可用时,消费者将从最早的消息开始消费。这对于希望重新处理所有历史消息的场景非常有用。
  2. none:如果设置为none,则消费者不会自动重置偏移量。这意味着如果没有找到先前提交的偏移量,消费者会报错。这要求用户必须明确知道如何处理这种情况。
  1. 消费组配置

group.id = user-service-group:消费者组 ID,标识消费者属于哪个消费组

同一个组,同一个主题下的消息,组内consumer共同消费,不会重复消费,提供消费者吞吐量

partition.assignment.strategy = [RangeAssignor, CooperativeStickyAssignor]:分区分配策略,决定如何将分区分配给消费者

  1. 提交配置

enable.auto.commit = false:禁用自动提交偏移量,需要手动提交以确保消息处理的可靠性

auto.commit.interval.ms = 5000:自动提交偏移量的时间间隔(毫秒),但因为启用了手动提交,此配置不生效

  1. 拉取配置

fetch.min.bytes = 1:消费者从服务器获取记录的最小字节数

fetch.max.bytes = 52428800:服务器为每个分区返回的最大数据量(50MB)

fetch.max.wait.ms = 500:如果没有足够的数据满足 fetch.min.bytes,服务器在响应前等待的最长时间

max.partition.fetch.bytes = 1048576:服务器从每个分区返回的最大数据量(1MB)

  1. 序列化配置

key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer:键的反序列化器

value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer:值的反序列化器

  1. 心跳和会话配置

heartbeat.interval.ms = 3000:消费者协调者心跳发送频率(3秒)

session.timeout.ms = 45000:消费者会话超时时间(45秒),超过此时间未收到心跳将触发重新平衡

max.poll.interval.ms = 300000:两次 poll 调用之间的最大时间间隔(5分钟)

  1. 网络和安全配置

security.protocol = PLAINTEXT:安全协议,使用明文传输

receive.buffer.bytes = 65536:TCP 接收缓冲区大小

send.buffer.bytes = 131072:TCP 发送缓冲区大小


网站公告

今日签到

点亮在社区的每一天
去签到