资源
es:7.18 kibana:7.18 flink:1.17.2
目录
mkdir -p /usr/project/flink/{conf,job,logs}
chmod -R 777 /usr/project/flink
#资源情况
mysql8.0 Elasticsearch7.18 自建
# 目录结构
/usr/project/flink/
/usr/project/flink/
├── conf/
│ ├── flink-conf.yaml
│ └── log4j2.xml
├── job/
│ ├── flink-connector-elasticsearch7-3.0.1-1.17.jar
│ ├── flink-connector-elasticsearch-base-3.0.1-1.17.jar
│ ├── flink-sql-connector-mysql-cdc-3.1.1.jar
│ └── win_user.sql
├── logs/
└── docker-compose.yml
本地创建es kibana
version: '3.8'
services:
jobmanager:
image: flink:1.17.2
container_name: flink-jobmanager
restart: always
ports:
- "8081:8081" # Flink Web UI
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./conf:/opt/flink/conf
- ./job:/opt/flink/job
- /usr/project/flink/logs:/opt/flink/log
networks:
- flink-network
taskmanager:
image: flink:1.17.2
container_name: flink-taskmanager
restart: always
depends_on:
- jobmanager
command: taskmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
volumes:
- ./conf:/opt/flink/conf
- ./job:/opt/flink/job
- /usr/project/flink/logs:/opt/flink/log
networks:
- flink-network
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.17.18
container_name: elasticsearch
restart: always
environment:
- discovery.type=single-node
- ELASTIC_PASSWORD=123456
- xpack.security.enabled=true
- network.host=0.0.0.0
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- flink-network
kibana:
image: docker.elastic.co/kibana/kibana:7.17.18
container_name: kibana
restart: always
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=elastic
- ELASTICSEARCH_PASSWORD=123456
ports:
- "5601:5601"
networks:
- flink-network
volumes:
es_data:
networks:
flink-network:
driver: bridge
启动
#目录
cd /usr/project/flink
#保存文件后,重新启动容器
docker-compose up -d
#关闭 停止并移除容器(不删除数据卷)
docker-compose down
#检查 restart 策略是否生效
docker inspect -f '{{.Name}} {{.HostConfig.RestartPolicy.Name}}' $(docker ps -q)
验证
http://127.0.0.1:9200
curl -u elastic:123456 http://127.0.0.1:9200
#账户密码
elastic
123456
Flink SQL Job 示例
文件 /usr/project/flink/job/win_user.sql
存量增量模式
scan.startup.mode
设置为 'initial'
,以从表的初始状态开始读取数据,然后再进行增量同步
将其设置为 'latest-offset'
,以从最新的偏移量开始读取数据,实现增量同步
验证表是否成功创建
/opt/flink/bin/sql-client.sh embedded
SHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded
SHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded
SHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
配置模块
vim /usr/project/flink/job/win_user.sql
CREATE TABLE v99_source_win_user (
id INT,
username STRING,
merchant_id INT,
avatar STRING,
fcoin DECIMAL(15,4),
coin_commission DECIMAL(15,4),
level_id TINYINT,
role TINYINT,
is_promoter TINYINT,
flag INT,
real_name STRING,
signature STRING,
birthday STRING,
area_code STRING,
mobile STRING,
email STRING,
sex TINYINT,
bind_bank TINYINT,
address STRING,
score INT,
promo_code STRING,
id_path STRING,
sup_uid_1 INT,
sup_username_1 STRING,
sup_uid_2 INT,
sup_uid_3 INT,
sup_uid_4 INT,
sup_uid_5 INT,
sup_uid_6 INT,
sup_uid_top INT,
sup_username_top STRING,
sup_level_top INT,
password_hash STRING,
password_coin STRING,
ip STRING,
third_login_type STRING,
ip_region STRING,
status TINYINT,
last_login_ip STRING,
last_login_ip_region STRING,
last_login_time INT,
last_login_device_id STRING,
created_at INT,
updated_at INT,
freeze_cause STRING,
freeze_at INT,
operator_name STRING,
fb_pid STRING,
fb_cid STRING,
created_name STRING,
memberType TINYINT,
google_sub_id STRING,
facebook_sub_id STRING,
secret STRING,
code_url STRING,
code_status TINYINT,
user_type TINYINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '127.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'db_name',
'table-name' = 'win_user',
'scan.startup.mode' = 'initial', -- 读取存量数据
'debezium.snapshot.mode' = 'never', -- 使用快照模式initial 增量模式never 增量模式
'scan.incremental.snapshot.enabled' = 'true' -- 启用增量同步
);
CREATE TABLE es_sink_table_win_user (
id INT,
username STRING,
merchant_id INT,
avatar STRING,
fcoin DECIMAL(15,4),
coin_commission DECIMAL(15,4),
level_id TINYINT,
role TINYINT,
is_promoter TINYINT,
flag INT,
real_name STRING,
signature STRING,
birthday STRING,
area_code STRING,
mobile STRING,
email STRING,
sex TINYINT,
bind_bank TINYINT,
address STRING,
score INT,
promo_code STRING,
id_path STRING,
sup_uid_1 INT,
sup_username_1 STRING,
sup_uid_2 INT,
sup_uid_3 INT,
sup_uid_4 INT,
sup_uid_5 INT,
sup_uid_6 INT,
sup_uid_top INT,
sup_username_top STRING,
sup_level_top INT,
password_hash STRING,
password_coin STRING,
ip STRING,
third_login_type STRING,
ip_region STRING,
status TINYINT,
last_login_ip STRING,
last_login_ip_region STRING,
last_login_time INT,
last_login_device_id STRING,
created_at INT,
updated_at INT,
freeze_cause STRING,
freeze_at INT,
operator_name STRING,
fb_pid STRING,
fb_cid STRING,
created_name STRING,
memberType TINYINT,
google_sub_id STRING,
facebook_sub_id STRING,
secret STRING,
code_url STRING,
code_status TINYINT,
user_type TINYINT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://elasticsearch:9200',
'username' = 'elastic',
'password' = '123456',
'index' = 'win_user', -- 确保索引名称与 Elasticsearch 中的匹配
'sink.bulk-flush.interval' = '1s',
'sink.bulk-flush.backoff.max-retries' = '3', -- 设置最大重试次数
'sink.bulk-flush.max-actions' = '100', -- 一条数据也会同步不等待
'sink.bulk-flush.max-size' = '1mb', -- 达到 1MB 或 200 条数据时批量 flush
'sink.bulk-flush.backoff.delay' = '100ms', -- 设置重试的延迟
'sink.bulk-flush.backoff.strategy' = 'constant' -- 重试策略
);
-- 3. 执行数据插入任务
INSERT INTO es_sink_table_win_user
SELECT * FROM v99_source_win_user;
验证
/opt/flink/bin/sql-client.sh embedded
#验证
SHOW TABLES;
desc es_sink_table_win_user;
DROP TABLE IF EXISTS es_sink_table_win_user;
DROP TABLE IF EXISTS v99_source_win_user;
# Flink 1.17 中,您可以使用以下命令查看已注册的连接器
SHOW TABLES;
#作业状态
SHOW JOBS;
#详情
EXPLAIN SELECT * FROM v99_source_win_user;
SELECT * FROM v99_source_win_user LIMIT 10;
优化配置 必须要配置
/opt/flink/bin/sql-client.sh embedded
#增加的 Session 全局配置(SET)
SET execution.checkpointing.interval = '1s';
SET restart-strategy = 'fixed-delay';
SET restart-strategy.fixed-delay.attempts = '3';
SET restart-strategy.fixed-delay.delay = '5s';
SET parallelism.default = 4;
SET state.backend = 'rocksdb';
SET state.backend.rocksdb.memory.managed = 'true';
SET execution.parallelism = 8;
#-- 提交作业时设置 Sink 的并行度提升
SET parallelism.default = 2;
#最高作业任务
SET execution.parallelism = 8;
#查看验证配置
SET;
连接器下载配置
flink-connector-elasticsearch包官方下载地址 https://repo1.maven.org/maven2/org/apache/flink/ 要选对版本 es7.17
flink-1.17.2
cd /usr/project/flink/job
#删除当前目录除win_user.sql其他的文件
find . -maxdepth 1 ! -name 'win_user.sql' ! -name '.' -type f -exec rm -f {} +
# MySQL CDC
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.1/flink-sql-connector-mysql-cdc-3.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# Elasticsearch
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base/3.0.1-1.17/flink-connector-elasticsearch-base-3.0.1-1.17.jar
# 补充依赖
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
wget https://repo1.maven.org/maven2/commons-codec/commons-codec/1.11/commons-codec-1.11.jar