一个异步架构设计:批量消费RabbitMQ,批量写入Elasticsearch(golang实现)

发布于:2025-04-16 ⋅ 阅读:(14) ⋅ 点赞:(0)

       在数仓团队,异步任务设计非常常见,主要原因就是数据量太大,不适合做成同步,在自动驾驶这个业务上,数据大到什么程度呢,单模块每天标签的上报数量就能达到5000W,如果算上车端挖掘、云端挖掘、标注、数据生产、仿真等,每天标签上亿,那是太简单的事了,因此,为了高效完成数据的入库,一个稳定、高吞吐量的异步架构设计显得非常之重要。

配图:北魏家宴饭店  摄影 by 棉花糖

在之前的一篇文章中介绍过异步设计的思想:一个优秀的rabbitmq消费者(consumer)设计,可直接上线使用。这篇文章就是上一篇文章思想的具体落地:

这次就是把上面架构图中消费者部分的批量处理具体为批量写入elasticsearch,这是一种常见的行为,具体场景就是为自动驾驶入库数据创建索引,以支持检索,方便各业务低成本获取所需的数据。

1、RabbitMQ消费者实现

rabbitmq消费者实现的一个核心设计思想:数据消费与业务逻辑处理解耦,即接收数据只从mq中获取数据,不做任何其他事宜,业务逻辑处理放到其他线程中执行。

业务逻辑的处理放到主线程中进行,而且要批量化处理,这样才能实现高吞吐量。

上面数据接收,是把mq的消息放到了一个buffer中,业务获取数据就可以直接从该buffer中读取:

批量读取到数据后,就可以批量处理,比如建索引,写es。

2、Elasticsearch的批量写入

实现es批量写入的核心代码如下:

func (b *EsBatchInsert) BatchInSert(docs []Document) error {
	var json = jsoniter.ConfigCompatibleWithStandardLibrary
	buf := bytes.Buffer{}
	for _, doc := range docs {
		meta := map[string]any{
			"index": map[string]any{
				"_index": doc.Index,
				"_id":    doc.ID,
			},
		}
		if err := json.NewEncoder(&buf).Encode(meta); err != nil {
			return err
		}
		if err := json.NewEncoder(&buf).Encode(doc.Source); err != nil {
			return err
		}
	}
	// resp, err := client.Bulk(&buf, client.Bulk.WithContext(context.Background()))
	req := esapi.BulkRequest{
		Body:   &buf,
		Pretty: true, // 格式化响应
	}
	resp, err := req.Do(context.Background(), b.client)
	if err != nil {
		fmt.Println(err.Error())
		return err
	}
	if resp.StatusCode != 200 {
		fmt.Println("error status code: ", resp.StatusCode)
		return fmt.Errorf("error status code: %d", resp.StatusCode)
	}
	return nil
}

需要重点说明的是代码的L11-L16,批量数据写入的最小单元是一条meta+一条业务数据,meta用于标识这条数据写入的索引以及id,类似下图说明:

这样有一个好处,文档之间解耦,不会相互影响,每条文档可以单独设置写入的索引和id,非常灵活、自由。

3、主程序业务逻辑实现

主程序的业务逻辑主要就是批量拿数据、组装数据、写入elastic:

编写一个生产者测试程序,一秒写入一条,内容即为当前时间,消费者消费到后,将时间写入到es中,生产者源源不断写入数据到mq中:

启动消费者,获取数据,并写入到es中:

至此,该异步架构就完成了,高效稳定,很完美。

全部的实现代码相对来说比较多,就不贴在文章中了,我会上传到github上,不知道怎么回事,github、gitee代码都传不上去,后面我再试试,如果有急需这部分代码的小伙伴,可以公众号里加微信联系,我单独发给你。

期待小伙伴们点个关注,聊聊技术,聊聊跑步,聊聊生活~~~~~~。

往期推荐:

历经沧桑的应县木塔,在风雨中已等你969年。

从北京到大同,走过600里,跨越1000年。

一个优秀的rabbitmq消费者(consumer)设计,可直接上线使用。

不告诉你Sanic Blueprints、Middleware是如此的优雅。

Python web框架sanic+tortoise服务框架搭建(MVP版本)

命令行参数的艺术:Python、Golang、C++技术实现

supervisor,你理应知道。

"谢广军女儿开盒"事件引关注,百度发声

跑步的第六年,才真正了解运动的意义

借助tritonserver完成gpt2模型的本地私有化部署

GRPC开发全攻略:从环境搭建到代码实现

武汉抗疫英雄汪勇:平凡人的非凡之举。

微信小程序文章列表焕新颜:从丑小鸭到白天鹅的华丽蜕变

趴菜就是趴菜,捯饬3天,才搞出小程序头部banner,还是个半成品

小程序实现文章列表点击跳转公众号详情页

前端小趴菜终于把公众号和开发的不能再磕碜的小程序首页关联上了

Elasticsearch高级检索对决:search_after+pit和scroll,谁才是最佳选择?

李白:为何两次选择做了上门女婿?

【续】开发triton客户端,访问clip-vit-large-patch14模型抽取图片特征。

NVIDIA tritonserver实现CLIP-ViT模型工程化:轻松获取图片特征(by grpc or http)

Elasticsearch的pit(point in time)到底是个啥玩意?

一文揭秘:Golang+Elasticsearch轻松搭建AI时代的图片搜索服务

2025年,我要做个自我介绍


网站公告

今日签到

点亮在社区的每一天
去签到