基于Flink SQL的实时指标多维分析模型

发布于:2025-03-11 ⋅ 阅读:(17) ⋅ 点赞:(0)

数据流程介绍

1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。
在这里插入图片描述

Flink SQL 逻辑

SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000

CREATE TABLE kafkaTable (
       mid bigint,
       db string,
       sch string,
       tab string,
       opt string,
       ts bigint,
       ddl string,
       err string,
       src map<string,string>,
       cur map<string,string>,
       cus map<string,string>,
       id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),
       task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),
       account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
       publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
       msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
       send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),
       retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),
       update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),
       event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
       proctime AS PROCTIME()
--                           WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
      'connector' = 'kafka',
      'topic' = 'xxx',
      'jdq.client.id' = 'xxx',
      'jdq.password' = 'xxx',
      'jdq.domain' = 'xxx',
      'scan.startup.mode' = 'group-offsets', --  default: group-offsets,other: latest-offset,earliest-offset
      --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
      'format' = 'binlog'
      );


CREATE TABLE es_sink(
     send_type      STRING
    ,task_id        STRING
    ,month_dim      STRING
    ,day_dim        STRING
    ,grouping_id    INTEGER
    ,init           INTEGER
    ,cancel         INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,cancel_rate    float
    ,succ_rate      float
    ,fail_rate      float
    ,update_date    STRING
    ,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)
    with (
        'connector' = 'elasticsearch-6',
        'index' = 'index01',
        'document-type' = 'type01',
        'hosts' = 'xx',
        'format' = 'json',
        'filter.null-value'='true',
        'sink.bulk-flush.max-actions' = '1000',
        'sink.bulk-flush.max-size' = '10mb'
        );
-- 维度:
--   - send_type, 发送类型
--   - month_dim,月份维度
--   - day_dim,天维度
--   - task_id,任务ID

CREATE view  tmp as
select
    send_type,
    task_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,
    update_time,
    opt,
    ts,
    id,
    proctime,
    SUBSTRING(publish_time,1,7) as month_dim,
    SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'
  and publish_time >= '2025-01-01 00:00:00'
  and
    (    (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)
        or   (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)
        or   (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0)
        );

--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from
    (
        select *,
               row_number() over(partition by id,msg_status order by proctime desc) as rn
        from tmp
    ) t
where rn=1;

CREATE view tmp1 as
select
    send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel
     ,succ
     ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail
     ,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;

CREATE view tmp2 as
select
    send_type
     ,SPLIT_INDEX(task_id,'_R',0) AS task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,-1 AS fail
     ,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and   succ = 1 ;

CREATE view tmp3 as
select
      send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,fail
from tmp1
UNION ALL
select
    send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,fail
from tmp2;


CREATE view  tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */
    COALESCE(send_type,'N') AS send_type
     ,COALESCE(month_dim,'N') AS month_dim
     ,COALESCE(day_dim,'N') AS day_dim
     ,COALESCE(task_id,'N') AS task_id
     ,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1
           when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2
           when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3
           when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4
           when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5
    end grouping_id
     ,sum(init) as init
     ,sum(cancel) as cancel
     ,sum(succ) as succ
     ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上

INSERT INTO es_sink
select
    case when trim(send_type) = '1'  then '发送类型1'
         when trim(send_type) = '2'  then '发送类型2'
         else send_type end AS send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,grouping_id
     ,init
     ,cancel
     ,succ
     ,fail
     ,ROUND(cancel*100.0/init,2) AS cancel_rate
     ,ROUND(succ*100.0/(init - cancel),2) AS succ_rate
     ,ROUND(fail*100.0/(init - cancel),2) AS fail_rate
     ,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;

es mapping

#POST index01/type01/_mapping
{
    "type01": {
        "properties": {
            "grouping_id": {
                "type": "byte"
            },
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
           "month_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM",
           			"ignore_malformed":"true" --忽略错误的各式
           		}
           	}
           },
            "day_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd",
           			"ignore_malformed":"true"
           		}
           	}
           },
            "task_id": {
                "type": "keyword"
            },
            "init": {
                "type": "integer"
            },
            "cancel": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "cancel_rate": {
                "type": "float"
            },
            "succ_rate": {
                "type": "float"
            },
            "fail_rate": {
                "type": "float"
            },
            "update_date": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}