SmartETL中数据库操作与流程解耦的设计与应用

发布于:2025-07-04 ⋅ 阅读:(17) ⋅ 点赞:(0)

正如ETL这个概念本身所指示的,数据库读写访问是ETL的最常用甚至是最主要的操作。现代信息系统的设计与运行基本都是围绕数据库展开的,很多应用的核心功能都是对数据库的CRUD(创建、检索、更新、删除)操作。

SmartETL框架设计之初就考虑到了这个情况,在早期就根据团队的技术栈,实现了对MongoDBMySQLElasticSearchClickHouse等数据库的Extract操作(即Loader组件)和Load操作(即Processor组件)。具体来说,是在wikidata_filter.loader模块和wikidata_filter.iterator模块下分别创建了名为database的子模块,分别实现了相应的数据库组件。

ElasticSearch全文索引数据库操作为例,为了实现将数据写入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter组件,提供基于批量模式将一组JSON对象写入ES中。代码如下:

class ESWriter(BufferedWriter):
    """
    数据写入ES索引中
    """
    def __init__(self, host="localhost",
                 port=9200,
                 username=None,
                 password=None,
                 index=None,
                 buffer_size=1000, **kwargs):
        super().__init__(buffer_size=buffer_size)
        self.url = f"http://{host}:{port}"
        if password:
            self.auth = (username, password)
        else:
            self.auth = None
        self.index_name = index

    def write_batch(self, rows: list):
        header = {
            "Content-Type":  "application/json"
        }
        lines = []
        for row in rows:
            action_row = {}
            for key in id_keys:
                if key in row:
                    action_row["_id"] = row.pop(key)
                    break
            # row_meta = json.dumps({"index": action_row})
            row_meta = json.dumps({"index": action_row})
            try:
                row_data = json.dumps(row)
                lines.append(row_meta)
                lines.append(row_data)
            except:
                pass
        body = '\n'.join(lines)
        body += '\n'
        print(f"{self.url}/{self.index_name} bulk")
        res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)
        if res.status_code != 200:
            print("Warning, ES bulk load failed:", res.text)
            return False
        return True

需要注意,为了提高写ES的效率,ESWriter并不是直接实现JsonIterator,而是继承自BufferedWriter,通过重写write_batch方法,基于ES的bulk接口,实现了批量写入ES。

类似的,为了实现从ES读取数据,可以基于ES检索接口(POST /{index}/_search)或Scroll接口(POST /{index}/_search/scroll)实现检索Loader组件或Scroll模式的Loader组件;为了实现ES数据删除,基于ES删除接口(DELETE /{index}/_doc/{id})实现删除Processor组件;为了判断数据是否存在,基于ES详情接口(GET /{index}/_doc/{id})实现是否存在的Processor组件;……

这种方式比较简单直观,也很容易实现,但是随着应用中需要集成的数据库种类越来越多、数据操作越来越多样化,我们就会发现,为了实现对数据库的访问操作,需要针对每一类操作开发Loader组件或Processor组件,最后数据库相关操作代码就会分散在多个模块、函数中,不便于组件使用、维护和扩展。

有没有可能设计一套专门的机制,实现数据库操作与流程节点分离,同时根据需要进行绑定?当然可以!

出于这样的目的,本文设计了独立的数据库接口体系,并基于函数式组件机制实现数据库独立接口与流程的松耦合绑定。核心过程包括3步:

首先,定义一套数据库操作接口Database,包括数据库级别的表格列表list_tables、获取表格元数据desc_table等和表(集合、索引)级别的写入upsert、扫描scroll、检索search、获取详情get、是否存在exists、删除delete等。这类操作可以根据业务需要就行扩展。类图如下所示:
数据库接口类图
注意,Database接口函数通过使用命名参数,这些参数可以通过SmartETLYAML流程进行配置,方便传递特定数据库的配置。

第二,根据实际需要的数据库类型,实现对应的Database类。事实上,这就是“桥接模式”的应用,将数据库SDK提供的接口转换为本项目的Database接口。以下是一个ElasticSearch实现类(基于ES-HTTP接口)的示例代码:

import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Database

id_keys = ["_id", "id", "mongo_id"]

headers = {
    'Content-Type': 'application/json',
    'Accept': 'application/json'
}
class ES(Database):
    """
    读取ES指定索引全部数据,支持提供查询条件
    """
    def __init__(self, host: str = "localhost",
                 port: int = 9200,
                 username: str = None,
                 password: str = None,
                 index: str = None,
                 secure: bool = False,
                 **kwargs):
        self.url = f"{'https' if secure else 'http'}://{host}:{port}"
        if password:
            self.auth = HTTPBasicAuth(username, password)
        else:
            self.auth = None
        self.index = index

    def search(self, query: dict = None,
               query_body: dict = None,
               fetch_size: int = 10,
               index: str = None,
               **kwargs):
        index = index or self.index
        query_body = query_body or {}
        if query:
            query_body['query'] = query
        elif 'query' not in query_body:
            query_body['query'] = {"match_all": {}}
        if 'size' not in query_body:
            query_body['size'] = fetch_size
        print("ES search query_body:", query_body)
        res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)
        if res.status_code != 200:
            print("Error:", res.text)
            return

        res = res.json()

        if 'hits' not in res or 'hits' not in res['hits']:
            print('ERROR', res)
            return

        hits = res['hits']['hits']
        for hit in hits:
            # print(hit)
            doc = hit.get('_source') or {}
            doc['_id'] = hit['_id']
            doc['_score'] = hit['_score']
            if 'fields' in hit:
                doc.update(hit['fields'])
            yield doc

    def scroll(self, query: dict = None,
               query_body: dict = None,
               batch_size: int = 10,
               fetch_size: int = 10000,
               index: str = None,
               _scroll: str = "1m",
               **kwargs):
        index = index or self.index
        query_body = query_body or {}
        if query:
            query_body['query'] = query
        elif 'query' not in query_body:
            query_body['query'] = {"match_all": {}}
        if 'size' not in query_body:
            query_body['size'] = batch_size
        print("ES scroll query_body:", query_body)
        scroll_id = None
        total = 0
        while True:
            if scroll_id:
                # 后续请求
                url = f'{self.url}/_search/scroll'
                res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)
            else:
                # 第一次请求 scroll
                url = f'{self.url}/{index}/_search?scroll={_scroll}'
                res = requests.post(url, auth=self.auth, json=query_body, **kwargs)

            if res.status_code != 200:
                print("Error:", res.text)
                break

            res = res.json()

            if 'hits' not in res or 'hits' not in res['hits']:
                print('ERROR', res)
                continue

            if '_scroll_id' in res:
                scroll_id = res['_scroll_id']

            hits = res['hits']['hits']
            for hit in hits:
                doc = hit.get('_source') or {}
                doc['_id'] = hit['_id']
                yield doc

            total += len(hits)

            if len(hits) < batch_size or 0 < fetch_size <= total:
                break

        if scroll_id:
            # clear scroll
            url = f'{self.url}/_search/scroll'
            requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})

    def exists(self, _id, index: str = None, **kwargs):
        index = index or self.index
        if isinstance(_id, dict):
            _id = _id.get("_id") or _id.get("id")
        url = f'{self.url}/{index}/_doc/{_id}?_source=_id'
        res = requests.get(url, auth=self.auth)
        if res.status_code == 200:
            return res.json().get("found") is True
        return False

    def delete(self, _id, index: str = None, **kwargs):
        index = index or self.index
        if isinstance(_id, dict):
            _id = _id.get("_id") or _id.get("id")
        url = f'{self.url}/{index}/_doc/{_id}'
        res = requests.delete(url, auth=self.auth)
        return res.status_code == 200

    def upsert(self, items: dict or list, index: str = None, **kwargs):
        index = index or self.index
        header = {
            "Content-Type": "application/json"
        }
        if not isinstance(items, list):
            items = [items]
        lines = []
        for row in items:
            action_row = {}
            for key in id_keys:
                if key in row:
                    action_row["_id"] = row.pop(key)
                    break
            row_meta = json.dumps({"index": action_row})
            row_data = json.dumps(row)
            lines.append(row_meta)
            lines.append(row_data)
        body = '\n'.join(lines)
        body += '\n'
        print(f"{self.url}/{index} bulk")
        res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)
        if res.status_code != 200:
            print("Warning, ES bulk load failed:", res.text)
            return False
        return True

SmartETL项目中根据目前业务流程需求,初步实现了ElasticSearchClickHouseMongoDBMySQLPostgreSQLQdrantSQLiteMinIO等8类数据库,类结构图如下所示:
数据库类图结构
第三,定义一组数据库操作的代理函数,将流程的数据库调用需求转发给数据库组件。目前定义为gestata.dbops模块,代码很简单,如下所示:

from wikidata_filter.util.database.base import Database

def tables(db: Database, *database_list, columns: bool = False):
    if database_list:
        for database in database_list:
            for table in db.list_tables(database):
                yield {
                    "name": table,
                    "columns": db.desc_table(table, database) if columns else [],
                    "database": database
                }
    else:
        for table in db.list_tables():
            yield {
                "name": table,
                "columns": db.desc_table(table) if columns else []
            }

def search(db: Database, **kwargs):
    return db.search(**kwargs)

def scroll(db: Database, **kwargs):
    return db.scroll(**kwargs)

def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return row

def delete(_id, db: Database, **kwargs):
    return db.delete(_id, **kwargs)

def exists(_id, db: Database, **kwargs) -> bool:
    return db.exists(_id, **kwargs)

def get(_id, db: Database, **kwargs):
    return db.get(_id, **kwargs)

这里需要注意各个函数的返回值,大多数直接返回数据库组件对应方法的执行结果,但upsert返回的是输入参数。这是为什么呢?这样就能够将数据库写入操作作为流程的中间节点,也就是说数据经过入库流程,但没有终止,而是继续流入后续节点中,入下图所示:
入库流程示意
至此,我们完成了将数据库组件与流程节点组件进行解耦的设计。下面来看一个应用案例,通过读取arXiv数据集(来自kaggle,可参考这篇文章),写入ElasticSearch(建立全文索引),流程定义如下:

from: local/db_envs.yaml

name: load arXiv meta flow
description: 读取arXiv-meta数据集,写入ES
arguments: 1

consts:
  type_mapping:
    all: .jsonl

nodes:
  es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')

  select: SelectVal('data')
  rename: RenameFields(id='_id')
  change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引
  write_es: Map('gestata.dbops.upsert', es)

loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)

源代码及流程定义详见SmartETL项目。需要注意,由于项目持续演化,目前除了Kafka,其他数据库组件都已经按照新的方式完成重构。

总结:本文阐述了SmartETL项目中的数据库与流程解耦的设计,包括动机、目的、设计思路、应用案例。作为软件设计中的一条基本原则,高内聚、松耦合是我们持续追求的目标,也只有好的设计,才能让我们的代码能够易于维护与扩展,从而快速响应业务需求,降低开发成本。