Kafka4.0初体验

发布于:2025-06-30 ⋅ 阅读:(19) ⋅ 点赞:(0)

一、What is Kafka?

官网地址:https://kafka.apache.org/
在这里插入图片描述

  • 超过80%的财富100强公司信任并使用Kafka;
  • Apache Kafka 是一个开源的分布式事件流处理平台,由 LinkedIn 开发并后来捐赠给 Apache 软件基金会。它被设计用于高效处理实时数据流,支持高吞吐量、低延迟的消息传递,广泛应用于大规模数据处理场景。

核心功能与特点

  • 分布式与高可用
    Kafka 采用分布式架构,支持横向扩展,通过分区(partition)和副本(replica)机制确保数据的高可靠性和容错能力。
  • 高吞吐量与低延迟
    即使在海量数据场景下,Kafka 也能保持毫秒级的消息传递速度,每秒可处理数百万条消息。
  • 事件流处理(Event Streaming)
    支持实时发布(produce)、订阅(consume)、存储(store)和处理(process)数据流,适用于日志聚合、用户活动跟踪、IoT 设备数据等场景。
  • 持久化存储
    消息可持久化到磁盘,并支持按时间或大小保留策略,避免数据丢失。
  • 多语言支持
    提供 Java、Python、Go 等多种客户端 API,易于集成到不同技术栈。

为什么 80% 的财富 100 强企业使用 Kafka?

  • 成熟稳定:经过 LinkedIn、Netflix、Uber 等超大规模验证。
  • 生态丰富:与 Confluent(Kafka 商业化公司)合作提供企业级支持,集成 Connector(如数据库、云服务)。
  • 云原生友好:支持 Kubernetes,兼容 AWS MSK、Confluent Cloud 等托管服务。
    简单来说,Kafka 是现代数据架构的“中枢神经系统”,帮助企业在实时数据时代高效流动和利用信息。

谁在使用Kafka?

在这里插入图片描述
结论:Kafka 在头部企业中的普及率极高,尤其是在制造业、保险、IT服务等领域已达到全覆盖(10/10)。金融(银行、保险)、电信、零售等数据密集型行业也广泛采用,但存在少数企业未使用(如银行业7/10)

Kafka的起源

  1. 起源背景(2000年代末的 LinkedIn)
  • 问题驱动
    LinkedIn 作为一个快速增长的职业社交平台,面临以下技术痛点:
    • 数据管道复杂:系统间依赖大量点对点(ad-hoc)的数据集成,导致维护成本高、可靠性差。
    • 实时性不足:传统消息队列(如 ActiveMQ)无法支撑高吞吐、低延迟的日志流和用户活动跟踪。
    • 扩展性瓶颈:现有系统难以水平扩展,无法应对每天数十亿条消息的规模。
  • 目标
    设计一个统一、高吞吐、低延迟的实时数据管道,将系统间的数据流标准化
  1. 核心设计者与早期开发
  • 创始团队
    • Jay Kreps(LinkedIn 首席工程师):主导 Kafka 的设计,命名灵感来自作家卡夫卡(Franz Kafka),认为它“适合分布式系统”。
    • Neha NarkhedeJun Rao:共同开发了 Kafka 的核心架构。
  • 初版发布
    • 2010 年,Kafka 在 LinkedIn 内部上线,用于日志聚合、用户行为跟踪等场景。
    • 2011 年,LinkedIn 将 Kafka 开源,并提交给 Apache 软件基金会 孵化。
      大师门取名字也是根据自己的喜好来取名,在我们看来有可能感觉很随意!
  1. 技术灵感和创新
    Kafka 的设计融合了多种分布式系统的思想:

    • 日志结构存储:借鉴了事务日志(如数据库 WAL)和日志合并(Log-Structured Merge Trees)的理念。

    • 发布-订阅模型:参考了传统消息队列(如 JMS),但优化了吞吐量和持久化。

    • 分布式共识:早期依赖 ZooKeeper 管理元数据,后来逐步自研(KIP-500 去 ZooKeeper 化)。

关键创新

  • 分区(Partition)和副本(Replica)机制:实现水平扩展和高可用性。

  • 顺序磁盘 I/O + 零拷贝:突破性能瓶颈,支持海量数据实时处理。

  1. 开源与生态爆发
  • 2012 年:成为 Apache 顶级项目。
  • 2014 年:核心团队创立 Confluent 公司,提供 Kafka 的商业化支持(如托管服务、企业版工具)。
  • 2017 年后
    • Kafka 被广泛用于流处理(结合 Kafka Streams、Flink 等)。
    • 云厂商推出托管服务(如 AWS MSK、Confluent Cloud)。
  1. 为什么 Kafka 能成功?
  • 精准定位:填补了传统消息队列与大数据批处理之间的空白(实时流数据)。
  • 简单而强大的设计:以“日志”为核心抽象,兼顾可靠性和性能。
  • 社区驱动:开源生态迅速扩展,支持多语言客户端和丰富连接器(Connectors)。

Kafka运行环境前置要求

Kafka是由Scala语言(占80%,20%Java)编写而成,Scala运行在Java虚拟机上,并兼容现有的Java程序,因此部署Kakfa的时候,需要先安装JDK环境;

注意:kafka_2.13-4.0.0 最新版需要JDK17+的环境
Kafka源码: https://github.com/apache/kafka
Scala官网:https://www.scala-lang.org/
本地环境必须安装了Java 17+;(Java17、Java21、Java22都可以);
JDK长期支持版:https://www.oracle.com/java/technologies/java-se-support-roadmap.html

Kafka版本迭代演进

  • Kafka前期项目版本似乎有点凌乱,Kafka在1.x之前的版本,是采用4位版本号;

  • 比如:0.8.2.2、0.9.0.1、0.10.0.0…等等;

  • 在1.x之后,kafka 采用 Major.Minor.Patch 三位版本号;

    • Major表示大版本,通常是一些重大改变,因此彼此之间功能可能会不兼容;
    • Minor表示小版本,通常是一些新功能的增加;
    • Patch表示修订版,主要为修复一些重点Bug而发布的版本;
  • 比如:Kafka 2.1.3,大版本就是2,小版本是1,Patch版本为3,是为修复Bug发布的第3个版本;

  • Kafka总共发布了8个大版本,分别是0.7.x、0.8.x、0.9.x、0.10.x、0.11.x、1.x、2.x 及 3.x 版本,截止目前,最新版本是Kafka 4.0.0,也是最新稳定版本;

Kafka运行环境JDK安装

1、下载JDK:https://www.oracle.com/java/technologies/downloads/#java17
2、解压缩:tar -zxvf jdk-17_linux-x64_bin.tar.gz -C /usr/local
3、配置JDK环境变量:

export JAVA_HOME=/usr/local/jdk-17.0.7
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/

开始体验

第一步、下载最新的kafka版本并且解压

Linux

tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0

Windows

# 解压 .tgz 文件
tar -xzf kafka_2.13-4.0.0.tgz

# 进入解压后的目录
cd kafka_2.13-4.0.0

第二步、启动Kafka的运行环境

1、首先生成一个Cluster UUID

  • Linux
$ KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
  • Windows
    使用 PowerShell 命令替换
$KAFKA_CLUSTER_ID=$(.\bin\windows\kafka-storage.bat random-uuid)

2、格式化日志目录

  • Linux
$ bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
  • Windows
    在powershell中
# 格式化存储目录(使用 .bat 文件)
.\bin\windows\kafka-storage.bat format --standalone --cluster-id $KAFKA_CLUSTER_ID --config .\config\server.properties

注:$KAFKA_CLUSTER_ID需要用上面生成的值替换
如果不进行日志格式化启动会提示

[2025-06-29 17:17:48,236] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2025-06-29 17:17:48,766] ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.RuntimeException: No readable meta.properties files found.
	at org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.verify(MetaPropertiesEnsemble.java:480) ~[kafka-metadata-4.0.0.jar:?]
	at kafka.server.KafkaRaftServer$.initializeLogDirs(KafkaRaftServer.scala:141) ~[kafka_2.13-4.0.0.jar:?]
	at kafka.server.KafkaRaftServer.<init>(KafkaRaftServer.scala:56) ~[kafka_2.13-4.0.0.jar:?]
	at kafka.Kafka$.buildServer(Kafka.scala:68) ~[kafka_2.13-4.0.0.jar:?]
	at kafka.Kafka$.main(Kafka.scala:75) [kafka_2.13-4.0.0.jar:?]
	at kafka.Kafka.main(Kafka.scala) [kafka_2.13-4.0.0.jar:?]

3、启动 Kafka Server

  • Linux
$ bin/kafka-server-start.sh config/server.properties
  • Windows (PowserShell工具)
.\bin\windows\kafka-server-start.bat .\config\server.properties

4、创建一个topic来存储你的事件
Kafka 是一个分布式事件流处理平台,它允许您在多台机器上读取、写入、存储和处理事件(在文档中也被称为记录或消息)

  • Linux
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
  • Windows(PowserShell工具)
# 使用 .bat 文件(注意路径是 \windows\)
.\bin\windows\kafka-topics.bat --create --topic quickstart-events --bootstrap-server localhost:9092

显示新主题的详细信息(比如分区数量)

  • Linux
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events        TopicId: NPmZHyhbR9y00wMglMH2sg PartitionCount: 1       ReplicationFactor: 1	Configs:
Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0
  • Windows(PowserShell工具)
 .\bin\windows\kafka-topics.bat --describe --topic quickstart-events --bootstrap-server localhost:9092                                                                                                                 
Topic: quickstart-events        TopicId: RZC2UrhoSPKnq7twM5cimg PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: quickstart-events        Partition: 0    Leader: 1       Replicas: 1     Isr: 1  Elr:    LastKnownElr:
PS D:\kafka_2.13-4.0.0>   

5、写入消息到主题

  • Linux
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event
  • Windows(PowserShell工具)
 D:\kafka_2.13-4.0.0> .\bin\windows\kafka-console-producer.bat --topic quickstart-events --bootstrap-server localhost:9092
>This my first event
>this is my second event

6、读取事件消息

  • Linux
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
  • Windows(PowserShell工具)
PS D:\kafka_2.13-4.0.0> .\bin\windows\kafka-console-consumer.bat --topic quickstart-events --from-beginning --bootstrap-server localhost:9092                                                                                                   
This my first event
this is my second event

7、使用 Kafka Connect 将你的数据作为事件流导入/导出

补充说明

  • Kafka Connect
    Kafka 官方提供的工具,专门用于在 Kafka 和其他系统(如数据库、云服务等)之间高效传输数据。
  • 事件流(streams of events)
    指数据以连续的、实时的事件流形式处理,而非传统的批量传输。
  • 典型场景
    • 从数据库导出变更到 Kafka(如 Debezium 实现 CDC)
    • 将 Kafka 数据导入到 Elasticsearch/S3/HDFS 等

7.1 修改config/connect-standalone.properties 修改或者添加plugin.path配置

  • Linux
$ echo "plugin.path=libs/connect-file-4.0.0.jar" >> config/connect-standalone.properties

在这里插入图片描述
7.2 准备数据

  • Linux
$ echo -e "foo\nbar" > test.txt
  • Windows(PowserShell工具)
$ echo foo > test.txt
$ echo bar >> test.txt

7.3 执行如下命令

  • Linux
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
  • Windows(PowserShell工具)
D:\kafka_2.13-4.0.0> .\bin\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-file-source.properties .\config\connect-file-sink.properties 

在客户端查看

  • Linux
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
  • Windows
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic connect-test --from-beginning  --property consumer.encoding=UTF-8





使用Docker启动运行Kafka

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

使用Docker镜像启动
1、拉取Kafka镜像:docker pull apache/kafka:4.0.0
2、启动Kafka容器:docker run -p 9092:9092 apache/kafka:4.0.0
查看已安装的镜像:docker images
删除镜像:docker rmi apache/kafka:4.0.0

总结

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述