一、环境版本
环境 | 版本 |
---|---|
Flink | 1.17.0 |
Kafka | 2.12 |
MySQL | 5.7.33 |
【注意】Flink 1.13版本增加Cumulate Window,之前版本Flink Sql 没有 Trigger 功能,长时间的窗口不能在中途触发计算,输出中间结果。比如每 10S 更新一次截止到当前的pv、uv。只能用Trigger配合State实现,可参考如下实现方式:
Flink DataStream 按分钟或日期统计数据量
二、MySQL建表脚本
create table user_log
(
id int auto_increment comment '主键'
primary key,
uid int not null comment '用户id',
event int not null comment '用户行为',
logtime bigint null comment '日志时间'
)
comment '用户日志表,作为验证数据源';
三、用户日志类
新建maven项目
用以定义Kafka和MySQL中Schema
/**
* 用户日志类
*/
@Data
public class UserLog {
//用户uid
private int uid;
//用户行为
private int event;
//日志时间
private Date logtime;
//获取日期,用于按日期统计数据
public String getFormatDate() {
return DateUtil.format(logtime, "yyyyMMdd");
}
//获取时间,精确到分钟
public String getFormatTime() {
return DateUtil.format(logtime, "yyyy-MM-dd HH:mm") + ":00";
}
}
}
四、用户数据生成器
/**
* 用户数据生成器
*/
public class UserLogGenerator {
public static void main(String[] args) throws Exception {
// 1.获取执行环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 2.自定义数据生成器Source
DataGeneratorSource<UserLog> dataGeneratorSource = new DataGeneratorSource<>(
// 指定GeneratorFunction 实现类
new GeneratorFunction<Long, UserLog>(){
// 定义随机数数据生成器
public RandomDataGenerator generator;
@Override
public void open(SourceReaderContext readerContext) throws Exception {
generator = new RandomDataGenerator();
}
@Override
public UserLog map(Long aLong) throws Exception {
UserLog userLog = new UserLog();
//随机生成用户uid
userLog.setUid(generator.nextInt(1, 50));
//随机生成用户行为
userLog.setEvent(generator.nextInt(1, 2));
//随机生成用户数据时间
userLog.setLogtime(DateUtil.offset(new DateTime(), DateField.MILLISECOND, generator.nextInt(-2000, 2000)));
return userLog;
}
},
// 指定输出数据的总行数
// 60 * 60 * 10,
1200,
// 指定每秒发射的记录数
RateLimiterStrategy.perSecond(10),
// 指定返回值类型, 将Java的StockPrice封装成到TypeInformation
TypeInformation.of(UserLog.class)
);
DataStreamSource<UserLog> dataGeneratorSourceStream = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "dataGeneratorSource");
//输出生成数据
// dataGeneratorSourceStream.print();
//kafka数据写入
KafkaSink<UserLog> kafkaSink = KafkaSink.<UserLog>builder()
.setBootstrapServers("hadoop01:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.<UserLog>builder()
.setTopic("userLog")
.setValueSerializationSchema((SerializationSchema<UserLog>) userLog -> JSONUtil.toJsonStr(userLog).getBytes())
.build()
).build();
dataGeneratorSourceStream.sinkTo(kafkaSink);
//MySQL数据写入,用以数据验证
SinkFunction<UserLog> jdbcSink = JdbcSink.sink(
"insert into user_log (uid, event, logtime) values (?, ?, ?)",
new JdbcStatementBuilder<UserLog>() {
@Override
public void accept(PreparedStatement preparedStatement, UserLog userLog) throws SQLException {
preparedStatement.setInt(1, userLog.getUid());
preparedStatement.setInt(2, userLog.getEvent());
preparedStatement.setLong(3, userLog.getLogtime().getTime());
}
}
,
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://192.168.31.116:3306/demo")
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername("root")
.withPassword("root")
.build()
);
dataGeneratorSourceStream.addSink(jdbcSink);
env.execute();
}
}
五、Sql按分钟或日期统计PV和UV
public class UserLogSql {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
// 创建一个输入表SourceTable
String sourceDDL = "create table user_log\n" +
"(\n" +
" uid INT\n" +
" , event INT\n" +
" , logtime BIGINT\n" +
" , rowtime AS TO_TIMESTAMP_LTZ(logtime, 3)\n" +
" , WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND\n" +
") with (\n" +
" 'connector' = 'kafka'\n" +
" ,'topic' = 'userLog'\n" +
" ,'properties.bootstrap.servers' = 'hadoop01:9092'\n" +
" ,'scan.startup.mode' = 'latest-offset'\n" +
" ,'format' = 'json'\n" +
");";
tableEnv.executeSql(sourceDDL);
// 统计每分钟PV和UV
String result = "select\n" +
" date_format(window_start, 'yyyy-MM-dd') cal_day\n" +
" , date_format(window_start, 'HH:mm:ss') start_time\n" +
" , date_format(window_end, 'HH:mm:ss') end_time\n" +
" , count(uid) pv\n" +
" , count(distinct uid) uv\n" +
"FROM TABLE(\n" +
// 每隔10秒触发一次计算,窗口大小为1天
// " CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))\n" +
// 每隔10秒触发一次计算,窗口大小为10秒
" CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '10' SECOND))\n" +
" GROUP BY window_start, window_end\n" +
";";
// 输出sql执行结果
tableEnv.executeSql(result).print();
}
}
六、sql-client方式执行Sql
# 建表语句
create table user_log
(
uid INT,
event INT,
logtime BIGINT,
rowtime AS TO_TIMESTAMP_LTZ(logtime, 3) ,
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND
) with (
'connector' = 'kafka',
'topic' = 'userLog'
'properties.bootstrap.servers' = 'hadoop01:9092',
'scan.startup.mode' = 'latest-offset',
'format' = 'json',
);
# pv、uv计算语句, 每隔10秒触发一次计算,窗口大小为1天
select
date_format(window_start, 'yyyy-MM-dd') cal_day,
date_format(window_start, 'HH:mm:ss') start_time,
date_format(window_end, 'HH:mm:ss') end_time,
count(uid) pv,
count(distinct uid) uv
FROM TABLE(
CUMULATE(TABLE user_log, DESCRIPTOR(rowtime), INTERVAL '10' SECOND, INTERVAL '1' DAY))
GROUP BY window_start, window_end;
七、数据验证
- 启动 UserLogGenerator
- 启动 UserLogSql或在sql-client执行Sql
- 在MySQL中验证查询
转换时间戳
时间戳 | 转换前 | 转换后 |
---|---|---|
w_start | 2025-08-16 14:45:40 | 1755326740000 |
w_end | 2025-08-16 14:45:50 | 1755326750000 |
select count(distinct uid) from user_log where logtime< 1755326750000 and logtime>=1755326740000;
# 与MySql中输出一致
SQL Query Result (Table)
Refresh: 1 s Page: Last of 1 Updated: 23:50:09.972
cal_day start_time end_time pv uv
2025-08-15 23:45:30 23:45:40 15 15
2025-08-15 23:45:40 23:45:50 101 45
2025-08-15 23:45:50 23:46:00 104 42
2025-08-15 23:46:00 23:46:10 100 42
2025-08-15 23:46:10 23:46:20 97 45
2025-08-15 23:46:20 23:46:30 104 40
2025-08-15 23:46:30 23:46:40 97 42
2025-08-15 23:46:40 23:46:50 99 44
2025-08-15 23:46:50 23:47:00 103 44
2025-08-15 23:47:00 23:47:10 97 44
2025-08-15 23:47:10 23:47:20 100 43
八、常见问题
- sql-client执行查询,缺少kafka包
# 运行SQL命令
Flink SQL> select * from user_log;
# 报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
解决方法
# 下载flink对应版本的kafka包,放到flink的lib目录下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.0/flink-sql-connector-kafka-1.17.0.jar -P ${FLINK_HOME}/lib/