【大数据实践】KSQL流处理——如何将多个STREAM输出到一个TOPIC

发布于:2023-05-25 ⋅ 阅读:(72) ⋅ 点赞:(0)

【大数据实践】KSQL流处理——如何将数据处理结果推到指定Topic

需求场景描述

在生产环境中,各个业务服务产生的事件都会被push到Kafka消息中间件中。如:充值中心的 充值事件 会被push到kafka的recharge topic中,玩家 结算事件 会被push到kafka的game_score topic中。

平台希望通过处理,实时分析这些事件,筛选出满足条件的一些玩家,对其奖励相应道具。如,想做一个针对不同充值金额的玩家奖励不同道具的活动:

  • 0 < 充值金额 < 100 时, 奖励一个10万金币卡道具(道具ID:"10w")
  • 100 <= 充值金额 时,奖励一个100万金币卡道具(道具ID:"100w")

方案设计

  • 将充值的事件(原始日志数据,JSON格式),推送到Kafka的recharge topic中。充值事件数据格式:

    {"event_type" : "cash_order",
     "username" : "foo",
     "channel" : "wx_scan",
     "cash" : 100
     }
  • kafka中新建一个PROPREWARD的topic,专门接收道具奖励的事件,该主题事件消息格式为:

    {"user/name" : "foo"
     "prop/id" : "道具ID"
     "reward/reason" : "奖励的原因"}
  • 一个道具发放服务(Kafka消费者)订阅该主题,当道具奖励事件到达时,获取事件中的 用户名道具ID 为指定玩家发放道具。
  • 使用KSQL创建两个派生流(Stream),分别从recharge topic中过滤出0 < 充值金额 < 100100 <= 充值金额 的事件,过滤出符合条件的用户名,并组装成约定的道具奖励事件,将其推送到Kafka的PROPREWARD topic中。

具体实现

  • 新建一个kafka topic : PROPREWARD (大写),用于接收和存储道具奖励事件

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 4 --topic PROPREWARD
  • 从kafka topic PROPREWARD创建一个流(PROPREWARD),用于输出道具奖励事件,注意:流的名字与kafka topic的名字相同。

    CREATE STREAM PROPREWARD (`user/name` varchar, `seed/id` varchar, `reward/reason` varchar) \
    WITH (kafka_topic='PROPREWARD', value_format='JSON');
  • 根据业务需要,新建一个查询规则为0 < 充值金额 < 100 的派生流并插入到PROPREWARD流中。

    INSERT INTO PROPREWARD \
       SELECT username AS `user/name` , '10w' AS `prop/id`, '充值100元以内奖励10万金币卡道具' AS `reward/reason` \
       FROM recharge \
       WHERE EVENT_TYPE = 'cash_order' 
              AND CASH > 0 
              AND CASH <= 100;
  • 根据业务需要,新建一个查询规则为 100 <= 充值金额 的派生流并插入到PROPREWARD流中。

    INSERT INTO PROPREWARD \
       SELECT username AS `user/name` , '100w' AS `prop/id`, '充值超过100元奖励100万金币卡道具' AS `reward/reason` \
       FROM recharge \
       WHERE EVENT_TYPE = 'cash_order' AND CASH >= 100;
  • 还可以根据业务需要,从其他kafka topic中派生出其他流,插入到PROPREWARD流中。

结果验证

  • 往kafka的recharge topic中写入数据:

    {"event_type" : "cash_order",
     "username" : "foo",
     "channel" : "wx_scan",
     "cash" : 9
     }

    可以在topic PROPREWARD中接收到事件:

    {"user/name" : "foo" ,
     "prop/id" : "10w"
     "reward/reason" : "充值100元以内奖励10万金币卡道具"}
  • 往kafka的 recharge topic中写入数据:

    {"event_type" : "cash_order",
     "username" : "foo",
     "channel" : "wx_scan",
     "cash" : 11
     }

    可以在topic PROPREWARD 中接收到事件:

    {"user/name" : "foo" ,
     "prop/id" : "100w",
     "reward/reason" : "充值100元以上奖励100万金币卡道具"}

注意

  • 要想将上述两个派生流插入(INSERT INTO)到输出结果的PROPREWARD流中,需要确保:

    • PROPREWARD 的名字与输出结果的Kafka topic名字相同,否则会抛出异常。这应该是KSQL 5.0.0 的一个BUG。
    • 派生流中输出的数据结构 (SELECT username AS user/name , '100w' AS prop/id, '充值超过100元奖励100万金币卡道具' AS reward/reason 与 流 PROPREWARD 定义的结构相同。

总结

  • 通过Kafka + KSQL 流式处理,可以配置出丰富的活动——根据各种不同的事件和规则,奖励不同的道具(或者其他类型的东西),而不需要额外的代码开发!!
  • KSQL官方参考文档
本文含有隐藏内容,请 开通VIP 后查看