主要框架:Flume+Kafka+Spark
详细:
- VM Ware虚拟机、CentOS7、jdk-8u311、MySQL
- Hadoop-2.9.2、Flume-1.9.0、Zookeeper-3.4.6、Kafka_2.11-2.8.1、Spark-2.4.8
3、IDEA、Scala-2.11.X
一-技术选型与实现流程
本项目的流程,如下图所示,首先使用 Flume 采集本地文件中的单词内容,并把更新的数据实时推送到 Kafka 消息队列中间件中,然后使用 Spark 从 Kafka 中提取消息进行实时数据计算,并把计算结果存储到 MySQL 数据库中,后续使用 Davinci 等架构访问 MySQL 数据库获取实时计算的结果,并把结果动态地、可视化地呈现在网页上。
1.Flume的配置
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#
# # Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/log.txt
a1.sources.r1.channels = c1
#
# # Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.topic = word_topic
a1.sinks.k1.producer.acks = 1
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100
2. MySQL的前期准备
CREATE TABLE word_count (
word VARCHAR(255) PRIMARY KEY,
count INT
);
3. Spark Streaming
连接kafka,写入mysql,
scala代码,ai提示词:
写一个scala代码,功能是从 Kafka 实时消费数据、统计词频并同步到 MySQL。
首先通过 Log4j 配置屏蔽 Spark 和 Kafka 的 INFO 日志,确保控制台仅显示关键信息;接着创建 Spark Streaming 上下文,配置 Kafka 连接参数,从word_topic主题消费数据;对消息内容进行清洗(分割、过滤、小写处理)后,使用滑动窗口(30 秒窗口,5 秒滑动间隔)实时统计词频;然后将统计结果按分区批量写入 MySQL,每个分区复用一个数据库连接,通过事务管理和ON DUPLICATE KEY UPDATE语句确保数据一致性和高效更新;过程中通过println输出关键更新信息,便于监控
4. Davinci 可视化
可设置自动刷新,写入文本文件的变化可对应图表的变化。
对比效果: