PostgreSQL技术内幕25:时序数据库插件TimescaleDB

发布于:2025-02-15 ⋅ 阅读:(10) ⋅ 点赞:(0)

0.简介

现今时序数据库的应用场景十分广泛,其通过保留时间序列的性质,记录下事务随时间变化的事实。本文将介绍时序数据库产生背景,概念等基础知识,以及对于PG中时序数据库插件TimescaleDB做原理和代码的剖析。

1.基础知识

1.1 背景

随着物联网(IoT)和工业4.0的快速发展,各行各业都开始产出海量数据,这些数据以时间戳为主要特征,包括传感器数据、日志数据、设备监控数据等,这些数据都具有明显的实时性和时序相关性,而一系列的时序相关性数据可以反映出一个变化的趋势。比如一个温度36.0可能并没有太多的意义,但是通过保留变化过程,就可以以一个动态的视角去看过去,现在和未来。

传统的数据库在面对上述海量和快速产生以及对实时性有要求的场景下会有明显的问题,于是,时序数据库就应运而生了。

1.2 概念

时序数据是按时间维度记录系统、设备状态变化的数据类型,其基本结构特点是数据中自带数据产生的时间,即数据带有时间戳。时序数据库则是专门用于处理这类带有时间标签的数据的数据库系统。这些时间标签使得数据能够按照时间的顺序进行存储和查询,从而满足对时间序列数据的分析和处理需求。

1.3 特点

要理解时序数据库的特点首先要理解时序数据的特点:

1)多:体现在两方面,即数据格式多样和数据量大。

2)高:数据时效性和性能要求高。

3)稀疏:数据点间隔可能不均匀或存在缺失值。

4)少:更新操作少,多为读取和写入。

为应对上述时序数据的特点,时序数据库需要具备以下能力:

1)高效写入:时序数据体量庞大且可能存在高频上报,所以需要高效写入的能力。

2)压缩能力:数据量大为降低空间要求就需要压缩。

3)快速查询:针对时间格式进行优化,使得数据可以快速被检索和分析。

4)支持扩展:需要能够支持水平扩展和负载均衡部署,这使得时序数据库能够处理更大规模的数据和更高的查询负载,同时保持高性能和稳定性。

5)高效分析:因为是时序相关数据,所以一般都是多条进行分析,需要高效的聚合和分析能力。

2.TimescaleDB

2.1 安装使用

安装使用参考如下文档:

https://docs.timescaledb.cn/self-hosted/latest/install/installation-source/

主要内容如下,也可以参考源码中README.md

git clone git@github.com:timescale/timescaledb.git
cd timescaledb
# Find the latest release and checkout, e.g. for 2.5.0:
git checkout 2.5.0
# Bootstrap the build system
./bootstrap
# To build the extension
cd build && make
# To install
make install

2.1 文件结构

本节将介绍timescaledb的源码主要的目录结构:
在这里插入图片描述

2.2 原理

2.2.1 整体结构

在这里插入图片描述

从上图来看,其有几个核心的概念实现,一个是hyper table(超表),分区算法,子表(也被称为分片(chunk))以及其实际物理存储的file文件结构,下面将对其挨个详细分析并介绍其写入查询以及数据保留的机制。

2.2.2 超表

超表可以理解为一个逻辑视图,其有点像分区表,只不过其是完全透明的,用户使用起来和普通表完全没有差异,当然因为其会有自动分区,所以唯一索引包含必须分区列的限制还在。其结构如下

typedef struct FormData_hypertable
{
    //唯一标识
    int32 id;
    //模式名称
    NameData schema_name;
    //表名称
    NameData table_name;
    //存储管理内部chunk的模式
    NameData associated_schema_name;
    //超表和内部表格的前缀
    NameData associated_table_prefix;
    //分区维度的数量
    int16 num_dimensions;
    //chunk大小相关的func
    NameData chunk_sizing_func_schema;
    NameData chunk_sizing_func_name;
    int64 chunk_target_size;
    //压缩相关
    int16 compression_state;
    int32 compressed_hypertable_id;
    //状态码
    int32 status;
} FormData_hypertable;

typedef struct Hypertable
{
    FormData_hypertable fd;
    //主表标识符
    Oid main_table_relid;
    //chun_size fun的标识符
    Oid chunk_sizing_func;
    //存储相关,像分区,类型等
    Hyperspace *space;
    SubspaceStore *chunk_cache;
    /*
     * Allows restricting the data nodes to use for the hypertable. Default is to
     * use all available data nodes.
     */
} Hypertable;
2.2.3 自动分区

TimescaleDB自动分区的实现是通过创建超表(Hypertable)做的。超表是基于普通的PostgreSQL表创建的,但添加了自动分区、压缩和连续聚合的功能。当向超级表中插入数据时,TimescaleDB会根据分区键(通常是时间戳)自动将数据分散到不同的数据块(chunks)中。这些数据块在TimescaleDB中被称为分区。也就是上面图中的childtable。分区策略可以动态调整,可以参考代码如下,可以使用不同的分区策略:

extern PartitioningInfo *ts_partitioning_info_create(const char *schema, const char *partfunc,
                                                     const char *partcol, DimensionType dimtype,
                                                     Oid relid);
extern TSDLLEXPORT Datum ts_partitioning_func_apply(PartitioningInfo *pinfo, Oid collation,
                                                    Datum value);
/* NOTE: assume the tuple belongs to the root table, use ts_partitioning_func_apply for chunk tuples
 */
extern TSDLLEXPORT Datum ts_partitioning_func_apply_slot(PartitioningInfo *pinfo,
                                                         TupleTableSlot *slot, bool *isnull);
2.2.4 数据写入与查询优化

写入数据优化方式(其和分区表类似,可以考虑如下几个方面):

1)使用批量写入。

2)调整分区策略,减少数据碎片。

  查询优化方式(其和分区表类似,可以考虑如下几个方面):

1)创建合适的索引(其也是使用B+树)。

2)可以使用分区裁剪(通过条件限制过滤分区)。

3)数据压缩,根据需要启用。

4)并行查询,尽可能多个chunk并行查询。

2.2.5 数据保留策略

为了管理时序数据,TimescaleDB还提供了数据保留策略的功能。数据保留策略用于定义数据的生命周期,即何时删除旧的数据。例如,我们可以创建一个策略,只保留最近一小时的数据:

SELECT add_retention_policy('test', INTERVAL '1 hour');

添加代码如下,其也可以删除和修改:

Datum
policy_retention_add_internal(Oid ht_oid, Oid window_type, Datum window_datum,
                              Interval *created_before, Interval default_schedule_interval,
                              bool if_not_exists, bool fixed_schedule, TimestampTz initial_start,
                              const char *timezone)
{
    NameData application_name;
    int32 job_id;
    Hypertable *hypertable;
    Cache *hcache;
    Oid owner_id = ts_hypertable_permissions_check(ht_oid, GetUserId());
    Oid partitioning_type;
    const Dimension *dim;
    /* Default scheduled interval for drop_chunks jobs is currently 1 day (24 hours) */
    /* Default max runtime should not be very long. Right now set to 5 minutes */
    Interval default_max_runtime = { .time = 5 * USECS_PER_MINUTE };
    /* Default retry period is currently 5 minutes */
    Interval default_retry_period = { .time = 5 * USECS_PER_MINUTE };
    /* Right now, there is an infinite number of retries for drop_chunks jobs */
    int default_max_retries = -1;
    /* Verify that the hypertable owner can create a background worker */
    ts_bgw_job_validate_job_owner(owner_id);
    /* Make sure that an existing policy doesn't exist on this hypertable */
    hcache = ts_hypertable_cache_pin();
    hypertable = validate_drop_chunks_hypertable(hcache, ht_oid);
    dim = hyperspace_get_open_dimension(hypertable->space, 0);
    partitioning_type = ts_dimension_get_partition_type(dim);
    List *jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_RETENTION_PROC_NAME,
                                                           FUNCTIONS_SCHEMA_NAME,
                                                           hypertable->fd.id);
    if (jobs != NIL)
    {
        bool is_equal = false;
        if (!if_not_exists)
            ereport(ERROR,
                    (errcode(ERRCODE_DUPLICATE_OBJECT),
                     errmsg("retention policy already exists for hypertable \"%s\"",
                            get_rel_name(ht_oid))));
        Assert(list_length(jobs) == 1);
        BgwJob *existing = linitial(jobs);
        if (OidIsValid(window_type))
            is_equal =
                policy_config_check_hypertable_lag_equality(existing->fd.config,
                                                            POL_RETENTION_CONF_KEY_DROP_AFTER,
                                                            partitioning_type,
                                                            window_type,
                                                            window_datum,
                                                            false /* isnull */);
        else
        {
            Assert(created_before != NULL);
            is_equal = policy_config_check_hypertable_lag_equality(
                existing->fd.config,
                POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
                partitioning_type,
                INTERVALOID,
                IntervalPGetDatum(created_before),
                false /* isnull */);
        }
        if (is_equal)
        {
            /* If all arguments are the same, do nothing */
            ts_cache_release(hcache);
            ereport(NOTICE,
                    (errmsg("retention policy already exists for hypertable \"%s\", skipping",
                            get_rel_name(ht_oid))));
            PG_RETURN_INT32(-1);
        }
        else
        {
            ts_cache_release(hcache);
            ereport(WARNING,
                    (errmsg("retention policy already exists for hypertable \"%s\"",
                            get_rel_name(ht_oid)),
                     errdetail("A policy already exists with different arguments."),
                     errhint("Remove the existing policy before adding a new one.")));
            PG_RETURN_INT32(-1);
        }
    }
    if (created_before)
    {
        Assert(!OidIsValid(window_type));
        window_type = INTERVALOID;
    }
    if (IS_INTEGER_TYPE(partitioning_type))
    {
        ContinuousAgg *cagg = ts_continuous_agg_find_by_relid(ht_oid);
        if ((IS_INTEGER_TYPE(window_type) && cagg == NULL &&
             !OidIsValid(ts_get_integer_now_func(dim, false))) ||
            (!IS_INTEGER_TYPE(window_type) && created_before == NULL))
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER),
                     errhint(
                         "Integer duration in \"drop_after\" with valid \"integer_now\" function"
                         " or interval time duration"
                         " in \"drop_created_before\" is required for hypertables with integer "
                         "time dimension.")));
    }
    if (IS_TIMESTAMP_TYPE(partitioning_type) && window_type != INTERVALOID)
        ereport(ERROR,
                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                 errmsg("invalid value for parameter %s", POL_RETENTION_CONF_KEY_DROP_AFTER),
                 errhint("Interval time duration is required for hypertable"
                         " with timestamp-based time dimension.")));
    JsonbParseState *parse_state = NULL;
    pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
    ts_jsonb_add_int32(parse_state, POL_RETENTION_CONF_KEY_HYPERTABLE_ID, hypertable->fd.id);
    switch (window_type)
    {
        case INTERVALOID:
            if (created_before)
                ts_jsonb_add_interval(parse_state,
                                      POL_RETENTION_CONF_KEY_DROP_CREATED_BEFORE,
                                      created_before);
            else
                ts_jsonb_add_interval(parse_state,
                                      POL_RETENTION_CONF_KEY_DROP_AFTER,
                                      DatumGetIntervalP(window_datum));
            break;
        case INT2OID:
            ts_jsonb_add_int64(parse_state,
                               POL_RETENTION_CONF_KEY_DROP_AFTER,
                               DatumGetInt16(window_datum));
            break;
        case INT4OID:
            ts_jsonb_add_int64(parse_state,
                               POL_RETENTION_CONF_KEY_DROP_AFTER,
                               DatumGetInt32(window_datum));
            break;
        case INT8OID:
            ts_jsonb_add_int64(parse_state,
                               POL_RETENTION_CONF_KEY_DROP_AFTER,
                               DatumGetInt64(window_datum));
            break;
        default:
            ereport(ERROR,
                    (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                     errmsg("unsupported datatype for %s: %s",
                            POL_RETENTION_CONF_KEY_DROP_AFTER,
                            format_type_be(window_type))));
    }
    JsonbValue *result = pushJsonbValue(&parse_state, WJB_END_OBJECT, NULL);
    Jsonb *config = JsonbValueToJsonb(result);
    /* Next, insert a new job into jobs table */
    namestrcpy(&application_name, "Retention Policy");
    NameData proc_name, proc_schema, check_schema, check_name;
    namestrcpy(&proc_name, POLICY_RETENTION_PROC_NAME);
    namestrcpy(&proc_schema, FUNCTIONS_SCHEMA_NAME);
    namestrcpy(&check_name, POLICY_RETENTION_CHECK_NAME);
    namestrcpy(&check_schema, FUNCTIONS_SCHEMA_NAME);
    job_id = ts_bgw_job_insert_relation(&application_name,
                                        &default_schedule_interval,
                                        &default_max_runtime,
                                        default_max_retries,
                                        &default_retry_period,
                                        &proc_schema,
                                        &proc_name,
                                        &check_schema,
                                        &check_name,
                                        owner_id,
                                        true,
                                        fixed_schedule,
                                        hypertable->fd.id,
                                        config,
                                        initial_start,
                                        timezone);
    ts_cache_release(hcache);
    PG_RETURN_INT32(job_id);
}
2.2.6 更多特性

1)Open source (Apache 2.0, Timescale License)
2)Self-hosted/ Cloud (AWS, Azure, GCP) / Fully-managed (SaaS)
3)PostgreSQL ecosystem + TimescaleDB tools (Promscale, Tune, etc)
4)Hypertables and distributed hypertables
5)Full SQL + TimescaleDB hyperfunctions,除了完善的 SQL 能力,还提供了很多面向时序的分析函数,比如 time_bucket,支持任意时间间隔,比如要统计过去 5 分钟的平均车速;last(temperture,time) 根据聚合的时间返回最新的温度值;更多信息可以参考 https://docs.timescale.com/api/latest/hyperfunctions
6)Real-time continuous aggregates,持续聚集、实时聚集,当添加新数据或修改旧数据时,会在后台自动刷新
7)Data retention,搭配 drop_chunks 和作业实现
8)Downsampling
9)User-defined actions,User-defined actions let you schedule custom-defined procedures to run within Timescale.
10)Native compression,压缩需要制定 segment_by 和 order by,一般选择有查询需求,有重复值的字段设置为 segment_by;其次支持多样的压缩算法,Delta-delta encoding, Simple-8b, XOR-based compression 等等
11)Massive scaling
12)Function pipelines*