前言
单词统计【word count】是flink的最最最基础的入门案例,就如同学习java的第一堂课是运行一个hello world程序同样重要。
这里通过使用netcat发送数据来模拟无界数据流。
代码案例
在代码案例之前,我们需要先介绍一下netcat这个工具。
netcat介绍
netcat(简称nc)是一个强大的网络工具,被称之为网络瑞士军刀。他能通过TCP/UDP协议进行数据传输,支持一下核心功能
- 端口扫描:测试端口是否开放
- 数据传输:作为客户端、服务端发送或接收数据
- 网络调试:模拟socket服务端或客户端
这个案例中,我们就使用netcat来模拟socket服务端来发送数据。
大多数的linux发行版本默认预装了netcat,直接使用nc命令就行了
# 安装(如未预装)
sudo apt install netcat # Debian/Ubuntu
sudo yum install nc # CentOS/RHEL
# 启动服务端(监听端口 9999)
nc -lk 9999
笔者使用的是腾讯云轻量级服务器ubuntu,已经预装了netcat,记得要在开放对应的端口权限,比如
创建maven工程
创建一个简单的maven工程,对应的pom文件如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tml</groupId>
<artifactId>flink-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>flink-demo</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.0</flink.version> <!-- 根据你的 Flink 版本进行调整 -->
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Streaming API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink Table API and SQL -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</project>
代码案例
这里的需求就是,每来了一条消息,我就按照空字符进行切分,进行统计,比如hello world这样一条消息,我会根据空格来进行切分,对hello、world分别计数为1,最后进行累加,从而实时统计单词的数量
package com.tml;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 从socket流读取数据
*/
public class WordCountFromSocket {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> socketTextStream = env.socketTextStream("xxx", 9999);
//数据流处理
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketTextStream.flatMap(new Tokenizer()).keyBy(t -> t.f0).sum(1);
sum.print();
env.execute("Socket Stream WordCount~");
}
}
真正的逻辑处理是在Tokenizer类中,对应的代码如下
package com.tml;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 自定义 FlatMapFunction 实现单词拆分
*/
public class Tokenizer implements FlatMapFunction<String, Tuple2<String,Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
//按照空格或者制表符分割单词
String[] words = s.split("\\s+");
for (String word : words) {
collector.collect(new Tuple2<>(word, 1));
}
}
}
运行案例
先启动服务端
再启动客户端
运行main主类,发现程序再阻塞中,等待数据的流入,从这里也可以看出,flink是基于事件驱动的。
模拟socket输入
运行结果查看
从控制台的结果输出可以看到,flink的实时统计的结果是没有问题的!
总结
flink入门级别的案例,从运行过程到运行结果,可以感受到flink实时计算的强大!完整的代码已上传至github【flink-demo】,欢迎围观!