分享|开源流数据库 RisingWave 超详细学习使用笔记

发布于:2024-05-06 ⋅ 阅读:(35) ⋅ 点赞:(0)

该笔记来源于  社区用户投稿,若你对该项目感兴趣或者你要投稿,可评论或者私信留言。

本文希望帮助你快速搭建  学习环境,学习如何连接数据库,通过几个示例快速了解 Risingwave 的基本用法以及一些注意事项:

1. 学习环境部署

环境要求,一台 linux 服务器(可以是非联网环境)

注:本文案例使用的操作系统环境为Almalinux,其他使用yum包管理工具的发行版(如centos、redhat等)均可参考

一键部署命令

curl <https://risingwave.com/sh> | sh

如遇无法联网或下载缓慢的情况,请参考如下步骤进行安装

  1. 复制控制台输出... from URL into ...中的URL链接,粘贴到浏览器或迅雷等下载工具进行下载,然后上传到当前服务器。

  2. 下载shell安装脚本到本地

curl <https://risingwave.com/sh> -o deploy.sh
  1. 修改并运行安装脚本
#在上传安装包文件所在目录下执行
sed -i 's/curl -L "${URL}" | tar -zx || exit 1/tar -zxf "下载的安装包文件名"/' deploy.sh && sh deploy.sh
  1. 运行数据库
./risingwave

如因操作错误等原因导致数据库异常,可以执行 rm ~/.risingwave -rf 后在重新运行 ./risingwave 启动一个全新的数据库

2. 连接数据库

2.1 命令行连接

方式一:工具

psql -h localhost -p 4566 -U root dev

方式二:若本机安装了docker,也可使用容器,然后通过容器访问

docker exec -it postgres psql -h host.docker.internal -p 4566 -U root dev

2.2 客户端连接

以pycharm(professional版本)为例

pycharm数据源配置

Pycharm客户端中,由于兼容性问题,有时候一条 risingwave sql 语句会自动被从中间拆开提交执行,从而导致报错,此时可以尝试删除自动分段处的空格再重试,如果还是不行的话,再切换命令行下psql客户端执行。

2.3 python连接

import psycopg2
conn = psycopg2.connect(host='localhost', port=4566, user='root', database='dev')
cur = conn.cursor()
cur.execute("set timezone = 'Asia/Shanghai';")
cur.execute("select current_database(), current_timestamp, version();", )
res = cur.fetchall()[0]
for c in res:
    print(c)
cur.close()
conn.close()

3. 测试数据导入

3.1 前置条件

你需要有一个risingwave所在服务器能够访问的mysql数据库,学习环境可以考虑

Risingwave底层使用实现mysql-cdc, Mysql数据库需要进行一定的,首先需要确保mysql开启了binlog,然后创建有binlog读取权限的采集用户(如果不想使用root用户的话)

CREATE USER 'username'@'%' IDENTIFIED BY 'password';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT  ON *.* TO 'username' IDENTIFIED BY 'password';
FLUSH PRIVILEGES;

3.2 Mysql建表

-- 包含了大部分mysql基本类型-- 注意,Risingwave 的 cdc 要求上游数据库源表必须指定主键drop table if exists test;
create table test (
    c01 int,
    c02 char,
    c03 varchar(10),
    c04 text,
    c05 bigint,
    c06 long,
    c07 numeric(10, 2),
    c08 decimal(20, 8),
    c09 date,
    c10 datetime,
    c11 datetime(3),
    c12 timestamp(6),
    primary key (c01, c02)
);
insert into test (c01, c02, c03, c04, c05, c06, c07, c08, c09, c10, c11, c12)
values (1, 'a', 'abc', 'abcdefg', 123456789000, 99999999999999, 123.456, 12345.6789,
        current_date,
        current_timestamp,
        '2024-01-01 18:18:18.888',
        '2024-01-01 06:06:06.666666');
select * from test;

3.3 Risingwave 通过 mysql-cdc 接入数据

create schema rwtest; -- 可以在自定义 schema 中操作-- 虽然 create table ... with () 语法也支持创建mysql-cdc表,但每创建一个表都会产生一个mysql的binlog读取连接(可以在mysql端通过show processlist查看),对业务库极不友好,不建议使用。-- CDC SOURCE 可以复用-- CDC SOURCE 不支持级联删除,如有依赖表需要手动删除drop source if exists rwtest.mysql_cdc;
create source rwtest.mysql_cdc
  with (
       connector = 'mysql-cdc',
       hostname = 'localhost',
       port = '3306',
       username = 'root',
       password = '123456',
       database.name = 'mysql',
       server.id = 16888,       -- 一般指定1000以上的随机数字,请确保server.id在数据库中的唯一性       -- 用以下方法指定debezium附加参数配置,一般不建议使用,当且仅当遇到问题时(如数据库版本兼容性问题等)针对性调整       debezium.schema.history.internal.store.only.captured.tables.ddl = 'true',
       debezium.schema.history.internal.skip.unparseable.ddl = 'true',
       );
show sources from rwtest;
-- 接入CDC表只能用 create table 语法创建实体表,不能用 create source (只支持 append only 数据源)-- CDC 表必须定义主键,且主键字段列表需同源表一致-- 类型映射参考官方文档:<https://docs.risingwave.com/docs/current/ingest-from-mysql-cdc/#data-type-mappingdrop> table if exists rwtest.test_mysql_cdc;
create table rwtest.test_mysql_cdc (
    c01 int,
    c02 string,          -- 字符串类型不需要定义精度    c03 string,
    c04 string,
    c05 bigint,
    c06 bigint,
    c07 numeric,         -- 数字不需要定义精度    c08 numeric,
    c09 date,
    c10 timestamp,       -- 时间戳类型不需要定义精度    -- c11 timestamptz,  -- 可以丢弃主键外的任意源表字段    c12 timestamptz,
    primary key (c01, c02)
) from rwtest.mysql_cdc table 'mysql.test';
select * from rwtest.test_mysql_cdc;
-- CDC表建好后,上游mysql中的表的增删改都会即使反映到risingwave中,注意:上游执行 truncate 操作除外-- 关于如何应对上游表结构变更的主题,参考官网相关文档

在实际生产环境中,业务数据库经常不愿随意开放binlog读取权限,此时也可以通过专业的 CDC 数据采集工具先将数据采集到kafka(数据格式一般是json),Risingwave 再通过 kafka 连接器进行消费,也能实现通过 CDC 连接器直连数据库数据进行采集的相同效果。

3.4 apache kafka

下例展示了从 kafka 中消费数据格式为具有固定模式的 json 字符串

kafka-json

drop table if exists user_page_view;
create table user_page_view (
    user_id               bigint,
    page_id               bigint,
    viewtime              timestamp,
    user_region           varchar,
    proc_time timestamptz AS proctime() -- proc_time 生成列目前仅支持appendoly表    -- 这里也可以定义primary key,那么后消费的数据会根据pk对之前已消费主键相同的记录进行覆盖) append only -- 如果定义了primary key,则此处不能同时定义 append only  include timestamp as kafka_ts -- 可以附加一些 kafka 元数据列,参考:<https://docs.risingwave.com/docs/current/include-clause/#kafka>  with (
      connector = 'kafka',
      properties.bootstrap.server = 'kafka1:9093',
      topic = 'pageviews',
      -- kafka集群如果配置了kerberos认证,可参考此处注释内容。所在机器还需要正确配置 krb5.conf、hosts 文件      -- properties.security.protocol='SASL_PLAINTEXT',      -- properties.sasl.kerberos.keytab='/path/to/kafka.keytab'      scan.startup.mode='latest',  -- 默认为 earliest   ) format PLAIN ENCODE json;
select * from user_page_view;

3.5 insert 导入

下面这种数据导入方式,在生产环境一般考虑在 python 脚本内通过 psycopg2 api 使用,用于导入一些数据量不太大、变化不频繁的维表数据(截至1.8版本,risingwave 还未实现类似flink中通过 jdbc 连接器batch 导入的功能)

-- 该表用于后续sql语法演示drop table if exists dim_sensor_group cascade;
create table dim_sensor_group (
    sensor_id         int,
    warning_threshold numeric);
comment on table dim_sensor_group is '传感器报警阈值配置表';
comment on column dim_sensor_group.sensor_id is '传感器ID';
comment on column dim_sensor_group.warning_threshold is '报警阈值';
insert into dim_sensor_group
values (0, '95.0'),
       (1, '95.0'),
       (2, '95.0'),
       (3, '95.0'),
       (4, '95.0'),
       (5, '98.0'),
       (6, '98.0'),
       (7, '98.0'),
       (8, '98.0'),
       (9, '98.0');
select * from dim_sensor_group;

4. Risingwave 基本语法和功能点测试

4.1 测试数据准备

为了方便起见,语法测试使用的为 负载生成器(模拟实时接入数据) 和 上面的 dim_sensor_group。

负载生成器生成的数据包含两张表,模拟两组传感器数据:

drop table if exists sensor_grp_1 ;
create table sensor_grp_1 (
    id    int,
    value double,     -- 生成小数类型目前只支持double,指定其他类型会报语法错误    ts    timestamptz -- 测试时建议用 timestamptz , 若用 timestamp 查询时需要额外的转换) with (
      connector = 'datagen',
      fields.id.kind = 'random', -- 如果指定为sequence,序列用完后就会停止生成      fields.id.min = '0',
      fields.id.max = '4',
      fields.id.seed = '1',
      fields.value.kind = 'random',
      fields.value.min = '40',
      fields.value.max = '99',
      fields.value.seed = '1',
      fields.ts.max_past = '0h', -- 指定为0的话从当前时间开始生成      fields.ts.max_past_mode = 'relative',
      fields.ts.seed = '1',
      datagen.rows.per.second = '2'      ) format plain ENCODE json;
select * from sensor_grp_1;
drop table if exists sensor_grp_2 cascade;
create table sensor_grp_2 (
    id    int,
    value double,
    ts    timestamptz
) with (
      connector = 'datagen',
      fields.id.kind = 'random', -- 如果指定为sequence,序列用完后就会停止生成      fields.id.min = '5',
      fields.id.max = '9',
      fields.id.seed = '1',
      fields.value.kind = 'random',
      fields.value.min = '40',
      fields.value.max = '99',
      fields.value.seed = '1',
      fields.ts.max_past = '0h', -- 指定为0的话从当前时间开始生成      fields.ts.max_past_mode = 'relative',
      fields.ts.seed = '1',
      datagen.rows.per.second = '2'      ) format plain ENCODE json;
select * from sensor_grp_2 order by ts desc;

4.2 普通join、union、动态时间过滤(temporal filter)、级联删除

业务逻辑:合并两组传感器数据

set timezone = 'Asia/Shanghai'; -- 设置客户端时区,timestamptz才会显示预期值-- 删除一个物化视图或表时,如果有其他物化视图依赖要删除的对象,则会报错,这时可以用 casecade 语法级联删除drop materialized view if exists mv1 casecade;
create materialized view mv1
asselect a.id, b.group_name, b.warning_threshold
     , a.value -- round(x_double, y_int) 暂不支持,具体函数清单参考:<https://docs.risingwave.com/docs/current/sql-functions/>     , a.ts
     , a.value > b.warning_threshold as warning
  from sensor_grp_1          a
       join dim_sensor_group b on a.id = b.sensor_id
 where date_trunc('day', ts) + interval '1' day > now() --  只保留当天的数据-- 注意:temporal filter 的条件表达式右侧需满足 now() +/- interval 的格式, nowcurrent_timestamp 等价-- where ts >= datetrunt('day',now()) 错误的 temporal filter 表达式示例 union allselect a.id, b.group_name, b.warning_threshold, a.value, a.ts, a.value > b.warning_threshold as warning
  from sensor_grp_2          a
       join dim_sensor_group b on a.id = b.sensor_id
 where date_trunc('day', ts) + interval '1' day > now();

4.3 嵌套物化视图、开窗函数、累积时间窗口

Risingwave 中没有特别提出累积时间窗口这个概念,参考

业务逻辑:每组传感器中历史以来数值最大的三条记录

drop materialized view if exists mv2;
create materialized view mv2
asselect group_name, id, value, ts, rk
  from (
       select group_name, id, value, ts, dense_rank() over (partition by group_name order by value desc) as rk
         from mv1 -- 物化视图可以被多层嵌套引用       ) s
 where s.rk <= 3;
select * from mv2 order by group_name, value desc;

4.4 hop时间窗口,分组聚合

risingwave 中包含两种时间窗口的概念:hop(最近多久)和 tumble(每隔多久)

业务逻辑:查询最近5分钟内,每组传感器中,值排前三的纪录

drop materialized view if exists mv3;
create materialized view mv3
asselect *  from (
       select group_name
            , id            , value            , ts
            , warning_threshold
            , window_start
            , window_end
            , rank() over (partition by group_name,window_start order by value desc) as grp_rk
         from hop(mv1, ts, interval '1' minute, interval '5' minute) --  窗口长度参数必须能够被滑动步长整除       ) s
 where grp_rk <= 3   and window_end < now() -- hop时间窗口的是由数据驱动推进的,最后一个可能的时间窗口为(离当前时间最近的切分点, start+窗口长度], 如果不加此条件,最后一个窗口实际已发生时间可能小于步长时间 ;
-- mv3中包含了所有窗口的计算结果,如果只想看最后一个窗口,一般还要再做一层子查询过滤或通过limit实现;select * from mv3 order by window_start desc, group_name, grp_rk order by window_start desc, group_name, grp_rk limit 9;
-- 此处共3个组,每个窗口3条记录,limit 9 就是只取各组最后一个窗口的的记录;

4.5 tumble时间窗口、时间窗口和维表连接、时间窗口连接(Windows join)、cte语法,即席查询

业务逻辑:每个小时内,要对比A组和C组中最高纪录值情况

with cte1 as (
               select group_name, id, value, ts, window_start, window_end, row_number() over (partition by group_name,window_start order by value desc) as rn
                 from tumble(mv2, ts, interval '1' hour) a -- tumble 可以作用于 Table/Source/MV,在flink中被称为窗口表值函数(TVF)                where group_name = 'A'               ),
       cte2 as (
               select group_name, id, value, ts, window_start, window_end, row_number() over (partition by group_name,window_start order by value desc) as rn
                 from tumble(mv2, ts, interval '1' hour) a
                where group_name = 'C'               )
select a.window_start
     , a.window_end
     , a.group_name || '-' || a.id || '-' || a.value as grp_a_max -- 方便的字符串连接操作     , b.group_name || '-' || b.id || '-' || b.value as grp_c_max
     , a.ts                                          as a_ts
     , b.ts                                          as c_ts
     , now()::varchar                                             -- 方便的类型转换操作符  from cte1      a
       join cte2 b
       on a.window_end = b.window_end -- 时间窗口关联时,窗口类型和大小必须一致 where a.rn = 1 and b.rn = 1 order by a.window_start desc, a.rn
;

4.6 间隔连接(Interval join)

业务逻辑:找到A组中最近1个小时内峰值记录,查询在此之前的1天中,B、C组中是否有更大值记录出现

with cte1 as (
               select group_name, id, value, ts
                 from mv2
                where group_name = 'A' and ts > now() - interval '1' hour -- 取A组中最近1个小时内的峰值记录                order by value desc                limit 1               )
select a.ts, a.value, b.*  from cte1     a
       join mv1 b
         on b.ts between a.ts - interval '1' day and a.ts
         -- interval join 就是对两个表中时间字段进行条件关联,此处为:限定B/C组中的记录时间为A组峰值记录时间的前一天内        and b.value > a.value where b.group_name in ('B', 'C')
;

4.7 处理时间时态连接(Process-time temporal join)、只读事务

这是Risingwave中最为特殊的一种时间连接,它主要用于实现如下目标,当维表发生变化是,物化视图中的已经计算过的数据,不希望再受影响。

比如,在以下demo中,客户和销售员的绑定关系发生变化时,历史考核明细表中不希望受到影响。

-- 客户员工关系表drop table if exists cust_emp_rel cascade;
create table cust_emp_rel (
    cust_id int primary key, -- 必须定义主键    emp_id  varchar);
insert into cust_emp_rel values (1, 'emp_1'), (2, 'emp_2'), (3, 'emp_3');
-- 客户交易表drop table if exists cust_trade_detail cascade;
-- 下面这一句在jetbrains客户端中就无法执行(会从with那里自动分成两个statement提交),需要通过python或命令行提交-- 将 only 后面的空格删掉则可以正常执行create table cust_trade_detail (
    cust_id    int,
    trd_amt    double precision,
    trd_time timestamptz
) append onlywith ( connector = 'datagen',
       fields.cust_id.kind = 'random',
       fields.cust_id.min = '1',
       fields.cust_id.max = '6',
       fields.trd_amt.kind = 'random',
       fields.trd_amt.min = '1000',
       fields.trd_amt.max = '2000',
       fields.trd_time.max_past = '0h',
       fields.trd_time.max_past_mode = 'relative',
       fields.trd_time.seed = '3',
       datagen.rows.per.second = '1',
       );
select * from cust_trade_detail;
-- 考核明细视图-- 实验组,使用 Process-time temporal join,预期行为是:维表变化时,历史数据不变create materialized view emp_kpi_detail1
asselect b.emp_id, a.cust_id, a.trd_amt, a.trd_time
  from cust_trade_detail a
       left join cust_emp_rel for SYSTEM_TIME as of PROCTIME() b  -- 这一固定语法要求右表必须有主键,否则会报错;注意别名的位置         on a.cust_id = b.cust_id;
-- 对照组,使用普通join,预期行为是:维表变化时,历史数据同步刷新create materialized view emp_kpi_detail2
asselect b.emp_id, a.cust_id, a.trd_amt, a.trd_time
  from cust_trade_detail a
       left join cust_emp_rel b
         on a.cust_id = b.cust_id;
-- 维表无变化时,上面两个视图数据应该完全一致-- 只读事务特性:事务期间的数据库变化对事务不可见begin read only; -- 开启只读事务-- 虽然 cust_trade_detail 表中数据一直在源源不断产生,但在结束只读事务之前:多次查询 emp_kpi_detail1 结果都相同的、emp_kpi_detail1 和 emp_kpi_detail2 的结果是完全一致的select * from emp_kpi_detail1;
select * from emp_kpi_detail2;
select * from emp_kpi_detail1 except select * from emp_kpi_detail2
union allselect * from emp_kpi_detail2 except select * from emp_kpi_detail1; -- 验证两个视图数据无差异commit; -- 结束只读事务-- 修改维表数据update cust_emp_rel set emp_id ='emp_111' where cust_id = 1;
insert into cust_emp_rel values (4, 'emp_4'), (5, 'emp_5'), (6, 'emp_6'), (7, 'emp_7'), (8, 'emp_8');
-- 维表数据变化后,emp_kpi_detail1 中的历史数据不会发生变化,而 emp_kpi_detail2 中则会select * from emp_kpi_detail1 where cust_id in (1, 4, 5, 6) order by trd_time ; -- cust_id 等于1的记录中,emp_id 得值仍然是 emp_1select * from emp_kpi_detail2 where cust_id in (1, 4, 5, 6) order by trd_time ; -- cust_id 等于1的记录中,emp_id 得值全变成 emp_111

5. 关于 RisingWave

是一款分布式 SQL 流处理数据库,旨在帮助用户降低实时应用的的开发成本。作为专为云上分布式流处理而设计的系统,RisingWave 为用户提供了与 PostgreSQL 类似的使用体验,并且具备比 Flink 高出 10 倍的性能以及更低的成本。

🧑‍💻想要了解和探索 RisingWave,欢迎浏览官网:

🔧快速上手 RisingWave,欢迎体验入门教程:

💻深入理解使用 RisingWave,欢迎阅读用户文档:

🔍关于更多常见问题及答案,欢迎大家来这里搜索留言: