自建es 通过Flink同步mysql数据 Docker Compose

发布于:2025-09-03 ⋅ 阅读:(19) ⋅ 点赞:(0)

资源

es:7.18  kibana:7.18 flink:1.17.2

目录

mkdir -p /usr/project/flink/{conf,job,logs}
chmod -R 777 /usr/project/flink
#资源情况
mysql8.0 Elasticsearch7.18 自建

# 目录结构
/usr/project/flink/
/usr/project/flink/
├── conf/
│   ├── flink-conf.yaml
│   └── log4j2.xml
├── job/
│   ├── flink-connector-elasticsearch7-3.0.1-1.17.jar
│   ├── flink-connector-elasticsearch-base-3.0.1-1.17.jar
│   ├── flink-sql-connector-mysql-cdc-3.1.1.jar
│   └── win_user.sql
├── logs/
└── docker-compose.yml

本地创建es kibana

version: '3.8'

services:
  jobmanager:
    image: flink:1.17.2
    container_name: flink-jobmanager
    restart: always
    ports:
      - "8081:8081"  # Flink Web UI
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - ./conf:/opt/flink/conf
      - ./job:/opt/flink/job
      - /usr/project/flink/logs:/opt/flink/log
    networks:
      - flink-network

  taskmanager:
    image: flink:1.17.2
    container_name: flink-taskmanager
    restart: always
    depends_on:
      - jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
    volumes:
      - ./conf:/opt/flink/conf
      - ./job:/opt/flink/job
      - /usr/project/flink/logs:/opt/flink/log
    networks:
      - flink-network

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.17.18
    container_name: elasticsearch
    restart: always
    environment:
      - discovery.type=single-node
      - ELASTIC_PASSWORD=123456
      - xpack.security.enabled=true
      - network.host=0.0.0.0
    ports:
      - "9200:9200"
      - "9300:9300"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - flink-network

  kibana:
    image: docker.elastic.co/kibana/kibana:7.17.18
    container_name: kibana
    restart: always
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
      - ELASTICSEARCH_USERNAME=elastic
      - ELASTICSEARCH_PASSWORD=123456
    ports:
      - "5601:5601"
    networks:
      - flink-network

volumes:
  es_data:

networks:
  flink-network:
    driver: bridge

启动

#目录
cd /usr/project/flink

#保存文件后,重新启动容器
docker-compose up -d

#关闭 停止并移除容器(不删除数据卷)
docker-compose down

#检查 restart 策略是否生效
docker inspect -f '{{.Name}} {{.HostConfig.RestartPolicy.Name}}' $(docker ps -q)

验证

http://127.0.0.1:9200

curl -u elastic:123456 http://127.0.0.1:9200

#账户密码
elastic
123456

Flink SQL Job 示例

文件 /usr/project/flink/job/win_user.sql

存量增量模式

scan.startup.mode 设置为 'initial',以从表的初始状态开始读取数据,然后再进行增量同步

将其设置为 'latest-offset',以从最新的偏移量开始读取数据,实现增量同步

验证表是否成功创建

/opt/flink/bin/sql-client.sh embedded

SHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded

SHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;

#验证表是否成功创建 进入flink sql
/opt/flink/bin/sql-client.sh embedded

SHOW TABLES;
SELECT * FROM v99_source_win_user LIMIT 10;
配置模块

vim /usr/project/flink/job/win_user.sql

CREATE TABLE v99_source_win_user (
  id INT,
  username STRING,
  merchant_id INT,
  avatar STRING,
  fcoin DECIMAL(15,4),
  coin_commission DECIMAL(15,4),
  level_id TINYINT,
  role TINYINT,
  is_promoter TINYINT,
  flag INT,
  real_name STRING,
  signature STRING,
  birthday STRING,
  area_code STRING,
  mobile STRING,
  email STRING,
  sex TINYINT,
  bind_bank TINYINT,
  address STRING,
  score INT,
  promo_code STRING,
  id_path STRING,
  sup_uid_1 INT,
  sup_username_1 STRING,
  sup_uid_2 INT,
  sup_uid_3 INT,
  sup_uid_4 INT,
  sup_uid_5 INT,
  sup_uid_6 INT,
  sup_uid_top INT,
  sup_username_top STRING,
  sup_level_top INT,
  password_hash STRING,
  password_coin STRING,
  ip STRING,
  third_login_type STRING,
  ip_region STRING,
  status TINYINT,
  last_login_ip STRING,
  last_login_ip_region STRING,
  last_login_time INT,
  last_login_device_id STRING,
  created_at INT,
  updated_at INT,
  freeze_cause STRING,
  freeze_at INT,
  operator_name STRING,
  fb_pid STRING,
  fb_cid STRING,
  created_name STRING,
  memberType TINYINT,
  google_sub_id STRING,
  facebook_sub_id STRING,
  secret STRING,
  code_url STRING,
  code_status TINYINT,
  user_type TINYINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = '127.0.0.1',
  'port' = '3306',
  'username' = 'root',
  'password' = '123456',
  'database-name' = 'db_name',
  'table-name' = 'win_user',
  'scan.startup.mode' = 'initial',          -- 读取存量数据
  'debezium.snapshot.mode' = 'never',       -- 使用快照模式initial  增量模式never 增量模式
  'scan.incremental.snapshot.enabled' = 'true'  -- 启用增量同步
);


CREATE TABLE es_sink_table_win_user (
  id INT,
  username STRING,
  merchant_id INT,
  avatar STRING,
  fcoin DECIMAL(15,4),
  coin_commission DECIMAL(15,4),
  level_id TINYINT,
  role TINYINT,
  is_promoter TINYINT,
  flag INT,
  real_name STRING,
  signature STRING,
  birthday STRING,
  area_code STRING,
  mobile STRING,
  email STRING,
  sex TINYINT,
  bind_bank TINYINT,
  address STRING,
  score INT,
  promo_code STRING,
  id_path STRING,
  sup_uid_1 INT,
  sup_username_1 STRING,
  sup_uid_2 INT,
  sup_uid_3 INT,
  sup_uid_4 INT,
  sup_uid_5 INT,
  sup_uid_6 INT,
  sup_uid_top INT,
  sup_username_top STRING,
  sup_level_top INT,
  password_hash STRING,
  password_coin STRING,
  ip STRING,
  third_login_type STRING,
  ip_region STRING,
  status TINYINT,
  last_login_ip STRING,
  last_login_ip_region STRING,
  last_login_time INT,
  last_login_device_id STRING,
  created_at INT,
  updated_at INT,
  freeze_cause STRING,
  freeze_at INT,
  operator_name STRING,
  fb_pid STRING,
  fb_cid STRING,
  created_name STRING,
  memberType TINYINT,
  google_sub_id STRING,
  facebook_sub_id STRING,
  secret STRING,
  code_url STRING,
  code_status TINYINT,
  user_type TINYINT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'elasticsearch-7',
  'hosts' = 'http://elasticsearch:9200',
  'username' = 'elastic',
  'password' = '123456',
  'index' = 'win_user',  -- 确保索引名称与 Elasticsearch 中的匹配
  'sink.bulk-flush.interval' = '1s',
  'sink.bulk-flush.backoff.max-retries' = '3',   -- 设置最大重试次数
  'sink.bulk-flush.max-actions' = '100', -- 一条数据也会同步不等待
  'sink.bulk-flush.max-size' = '1mb', -- 达到 1MB 或 200 条数据时批量 flush
  'sink.bulk-flush.backoff.delay' = '100ms',       -- 设置重试的延迟
  'sink.bulk-flush.backoff.strategy' = 'constant'  -- 重试策略
);

-- 3. 执行数据插入任务
INSERT INTO es_sink_table_win_user
SELECT * FROM v99_source_win_user;

验证

/opt/flink/bin/sql-client.sh embedded
#验证
SHOW TABLES;
desc es_sink_table_win_user;
DROP TABLE IF EXISTS es_sink_table_win_user;
DROP TABLE IF EXISTS v99_source_win_user;

# Flink 1.17 中,您可以使用以下命令查看已注册的连接器
SHOW TABLES;
#作业状态
SHOW JOBS;
#详情
EXPLAIN SELECT * FROM v99_source_win_user;

SELECT * FROM v99_source_win_user LIMIT 10;

优化配置 必须要配置

/opt/flink/bin/sql-client.sh embedded
#增加的 Session 全局配置(SET)
SET execution.checkpointing.interval = '1s';
SET restart-strategy = 'fixed-delay';
SET restart-strategy.fixed-delay.attempts = '3';
SET restart-strategy.fixed-delay.delay = '5s';
SET parallelism.default = 4;
SET state.backend = 'rocksdb';
SET state.backend.rocksdb.memory.managed = 'true';
SET execution.parallelism = 8;

#-- 提交作业时设置  Sink 的并行度提升
SET parallelism.default = 2; 
#最高作业任务
SET execution.parallelism = 8;
#查看验证配置
SET;

连接器下载配置

flink-connector-elasticsearch包官方下载地址 https://repo1.maven.org/maven2/org/apache/flink/ 要选对版本 es7.17

flink-1.17.2

cd /usr/project/flink/job

#删除当前目录除win_user.sql其他的文件
find . -maxdepth 1 ! -name 'win_user.sql' ! -name '.' -type f -exec rm -f {} +

# MySQL CDC
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.1.1/flink-sql-connector-mysql-cdc-3.1.1.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.1/flink-sql-connector-mysql-cdc-2.4.1.jar
# Elasticsearch
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/3.0.1-1.17/flink-sql-connector-elasticsearch7-3.0.1-1.17.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base/3.0.1-1.17/flink-connector-elasticsearch-base-3.0.1-1.17.jar

# 补充依赖
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpclient/4.5.13/httpclient-4.5.13.jar
wget https://repo1.maven.org/maven2/org/apache/httpcomponents/httpcore/4.4.13/httpcore-4.4.13.jar
wget https://repo1.maven.org/maven2/commons-logging/commons-logging/1.2/commons-logging-1.2.jar
wget https://repo1.maven.org/maven2/commons-codec/commons-codec/1.11/commons-codec-1.11.jar


网站公告

今日签到

点亮在社区的每一天
去签到