数据订阅
数据订阅
TDengine 支持三种方式的订阅
-
指定一个查询语句,对结果进行订阅,灵活的最高
-
对整个超级表进行订阅,可以兼容表结构的变化。但是仅限于特定的子表,而且查出来的数据不包含标签,所以如果你希望结果中包含某个字段,而这个字段是标签,那你就不能使用子表订阅这种方式
TDengine 根据标签拆分子表 -
对整个数据库进行订阅,慎用
超级表订阅和库订阅属于高级订阅模式,容易出错,不建议使用
从 3.0.4.0 开始,订阅之前数据库的 WAL_RETENTION_PERIOD 配置不能为 0,官方例子中默认给的 3600
ALTER DATABASE server_node_base WAL_RETENTION_PERIOD 3600;
所以这个 WAL_RETENTION_PERIOD 要加到 数据库建表语句中
CREATE DATABASE power KEEP 365 DURATION 10 BUFFER 16 WAL_LEVEL 1 WAL_RETENTION_PERIOD 3600;
此外可能会出现数据消费不到的情况,此时我们就得设置保留时长 WAL_RETENTION_PERIOD
和 experimental.snapshot.enable
,但是还不清楚他们具体的作用是什么
原文如下:
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
注意:默认是从 wal 消费数据,如果 wal 被删除,消费到的数据会不全,此时可以将参数 experimental.snapshot.enable 设置为 true,从 tsdb 获取全部数据,但是这样的话就不能保证数据的消费顺序。所以建议根据自己的消费情况合理的设置 wal 的保留策略,保证可以从 wal 里订阅到全部数据。
我猜测的逻辑是这样的,类似于(mysql 的 redolog),然后等系统不忙的时候再把 redolog 中的数据刷盘,更新的数据先写到 WAL,然后按照配置的时间 WAL_RETENTION_PERIOD
至少保留这么久,然后再落到 TSDB
,落到 TSDB
之后,之前的 WAL 就删除了,创建新的 WAL,所以,WAL_RETENTION_PERIOD
就代表着这个数据从创建开始算的可消费时间,如果在这个事件之内没有被消费,那么以后就再也无法消费到了。那就只能直接去 TSDB 也就是数据库查了。
所有相关参数:
- WAL_RETENTION_PERIOD: 为了数据订阅消费,需要 WAL 日志文件额外保留的最大时长策略。WAL 日志清理,不受订阅客户端消费状态影响。单位为 s。默认为 0,表示无需为订阅保留。新建订阅,应先设置恰当的时长策略。
- WAL_RETENTION_SIZE:为了数据订阅消费,需要 WAL 日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0,表示累计大小无上限。
- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当 WAL 文件创建并写入后,经过该时间,会自动创建一个新的 WAL 文件。默认为 0,即仅在 TSDB 落盘时创建新文件。
- WAL_SEGMENT_SIZE:wal 单个文件大小,单位为 KB。当前写入文件大小超过上限后会自动创建一个新的 WAL 文件。默认为 0,即仅在 TSDB 落盘时创建新文件。
前面我们说到了,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据
,这造成了一个问题:
tdengine 的订阅,本质上就是订阅 redo 日志(WAL 日志),而不是订阅最终的数据,也就是说如果你插入了 100 条数据 (产生了 100 条插入 redo 日志),然后删除了 10 条数据 (产生了删除日志),虽然最终只有 90 条数据,但是订阅这张表,会去扫描所有的插入 redo 日志,最终还是会订阅到 100 条数据。
也就是说订阅,订阅的是插入操作,这样做是为了订阅到被删掉的数据,虽然我觉得不合理
不过时序数据库应该是不会出现频繁的删除操作的。所以这个问题也不大。
TaosConsumer
订阅主题,轮询,取消订阅,应该在一个线程里面,不能在多个线程调用,因为其代码中有限制,限制了。
TaosConsumer#acquire
直接限制方法在多线程环境下使用。
TaosConsumer
的订阅操作(subscribe)和取消订阅(unsubscribe)都是重操作,会很耗时。
官网:
https://docs.taosdata.com/develop/tmq/
https://docs.taosdata.com/taos-sql/tmq/#
CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
DROP TOPIC [IF EXISTS] topic_name;
SHOW TOPICS;
-- 查看所有的消费者的状态,如果是lost,则表示消费者失效
SHOW CONSUMERS;
-- 查看 consumer 与 vgroup 之间的分配关系
SHOW SUBSCRIPTIONS;
topic 个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个,这个配置项,在 taos.cfg
中默认不存在,加上就好了。
注意点
当我们用 select * from table
来创建订阅查询语句的时候,*
会被解释称创建主题的这个时刻,这个表的所有列,后续即使加了字段 A,这个主题的订阅返回的一行依然是订阅时的那些字段,并不包含 A。
Java 数据订阅:
https://docs.taosdata.com/connector/java/#数据订阅
知识点
主题是跨库(模式的),全局的,在任何一个模式下都可以看到所有的主题
- 创建主题的时候,虽然说是不需要确定模式的,但是我们需要确定订阅哪个库的哪张表,
- 删除主题的时候,或者查询主题的时候,不需要指定模式(库)
消费者
SHOW CONSUMERS;
结果列:
# | 列名 | 数据类型 | 说明 |
---|---|---|---|
1 | consumer_id | BIGINT | 消费者的唯一 ID |
2 | consumer_group | BINARY(192) | 消费者组 |
3 | client_id | BINARY(192) | 用户自定义字符串,通过创建 consumer 时指定 client_id 来展示 |
4 | status | BINARY(20) | 消费者当前状态。消费者状态包括:ready(正常可用)、 lost(连接已丢失)、 rebalancing(消费者所属 vgroup 正在分配中)、unknown(未知状态) |
5 | topics | BINARY(204) | 被订阅的 topic。若订阅多个 topic,则展示为多行,consumer.subscribe 方法的参数的是一个列表,假如这个主题列表包含 3 个主题,那么这个消费者(同一个消费者 ID)在 SHOW CONSUMERS; 中将有三行数据,这三行数据除了 topics 字段不同,其他字段都相同,也就是说,虽然这个字段名称叫 topics,但是一行数据的 topics 列只包含一个主题字符串![]() |
6 | up_time | TIMESTAMP | 第一次连接 taosd 的时间 |
7 | subscribe_time | TIMESTAMP | 上一次发起订阅的时间 |
8 | rebalance_time | TIMESTAMP | 上一次触发 rebalance 的时间 |
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。 | |||
创建消费者的时候,消费者组 ID 是必填项。 | |||
有一个节点 suscribe 订阅一个主题的时候(consumer.subscribe ),SHOW CONSUMERS; 中就会增加一行对应的记录,且状态为 ready,此时,这个主题是不能删除的,因为有消费者正在消费,取消订阅的时候(consumer.unsubscribe ),会删除对应的记录。 |
|||
如果一个主题,在 SHOW CONSUMERS; 结果中查出的所有的消费者的状态都是 lost,那这个主题也是可以删除的,因为所有的消费者都丢失了,自然也是可以删除的。 |
|||
所以,如果一个主题,在 SHOW CONSUMERS; 的结果中没有找到对应的消费者,或者对应的消费者状态都是 lost,那这个主题才可以删除。 |
|||
但是注意,紧接着取消订阅关闭消费者之后,立即进行主题删除操作 DROP TOPIC IF EXISTS Topic ,依然会报错,取消订阅关闭消费者之后 SHOW CONSUMERS; 的结果会立即改变,但是数据库底层好像需要一段时间才能反应过来,等数据库反应过来之后,才能删除主题,这个反应时间,大概是 500ms。这个真的是个坑。 |
TDengine Consumer is not safe for multi-threaded access
Kafka consumer多线程下not safe for multi-threaded access问题_rewerma的博客-CSDN博客_kafkaconsumer线程不安全
加锁就能解决,在在不同的线程中操作同一个 comsumer 对象的时候,先获取锁,执行完之后再释放锁。
但是,我们还是推荐在一个线程中把创建消费者,消费者轮询,取消定义,关闭消费者做完,而不是分多个线程去做这个事情。
bug
tdengine 作为时序数据库对时间是很敏感的,数据订阅不到的时候,优先考虑是不是数据插入时间是不是有问题,
数据库没有关闭突然断电
没有关闭数据库服务的情况下突然给主机断电,可能会造成 TDengine 数据库的损坏(可能是数据的损坏),如果出现这种情况,没办法,只能重装 TDengine
。在服务器断电之前,一定要关闭重要的服务
消费者丢失
订阅了主题的消费者无法获取主题中更新了的数据,具体表现为 consumer.poll(Duration.ofSeconds(1))
这个语句 CPU 占用非常高,同时,TDengine 的日志(/usr/local/taos/log/taosdlog.1
或者 /usr/local/taos/log/taosdlog.0
)在不停报错,具体报错信息为:
02/06 20:58:05.839551 00001305 MND queries updated in conn 1353495098, num:0
02/06 20:58:05.839574 00001305 MND db:1.meters, version and numOfTable not changed
02/06 20:58:05.839582 00001305 MND stb:1.server_node_base.dock, start to retrieve meta
02/06 20:58:05.839586 00001305 MND stb:1.server_node_base.airport, start to retrieve meta
02/06 20:58:05.839589 00001305 MND stb:1.server_node_base.road, start to retrieve meta
02/06 20:58:07.167516 00001304 MND ERROR consumer 864691128471973996 not exist
02/06 20:58:07.167963 00001304 MND ERROR msg:0x7f35a0009f10, failed to process since Consumer not exist, app:0x7f5a6c371ad0 type:consumer-hb, gtid:0x32f2c2ce1bbf0bd7:0x3422226ce1b00bda
这是因为本地驱动跟服务器的版本不一致。
首先,有一点需要注意,就是数据订阅无法通过 rest 连接实现,只能走本地连接,那本地连接就必须在本地加载本地驱动(Linux 系统下是 /usr/local/taos/driver/libtaos.so.3.0.1.6
,window 系统就是 /taos.dll
),春哥为了实现本地驱动,写了一个 dms-tdengine-client
模块来动态加载 taos 的本地驱动,这也是为什么我们在自定义任务中订阅主题的时候,直接使用 6030 就可以了,就是因为 dms-tdengine-client
已经动态加载了驱动,所以实际上,我们在元数据中,已经可以使用原生连接(通过6030接口
),来连接远程数据库了。
但是要注意,客户端的驱动的版本必须和服务端的版本对的上,不然就会无法连接。
正是因为版本没有对上才导致消费者无效,动态加载驱动的代码请查看此文章同目录下的 TaosLoader.java
。其实这个类的核心功能就是一个,就是设置系统属性
System.setProperty("java.library.path", path);
方便起见,本地驱动可以直接从 TDengine-client
包中获取,安装包页面,所有版本的 server 和 client 的 下载地址
不知道 rpm 包为什么不区分 CPU 平台。
libtaos.so
依赖 libjemalloc.so.2
,如果你的 Linux 环境没有这个 so 文件,Tdengine 依然无法创建本地连接,具体的错误就是在创建 TDengine 消费者的时候报错
TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(tdengineConsumerconfig);
具体报错位置是 new TMQConnector();
的时候,报错信息是:
libjemalloc.so.2: cannot open shared object file: No such file or directory
我们需要上传 libjemalloc.so.2
到 /usr/lib/
文件夹,然后将其所在目录添加到加载路径中,做法很简单,将 other.conf
上传到 /etc/ld.so.conf.d
,然后 sudo ldconfig
让其生效即可,
参考博客 Linux下找不到so文件的解决办法_rznice的博客-CSDN博客,
相关文件看同目录下的 so 文件夹
find / -name ld-linux-aarch64.so.1
主题名称太长导致提示 Topic not exist
TDengine 消费者订阅主题的时候,主题名称不能太长,大概最长不能超过 50 个字符,不是很确定,40 个字符是可以的
// tdengineTopic 不能太长,最多好像不能超过50个字符
TaosConsumer.subscribe(Collections.singletonList(tdengineTopic));
全量和增量
在消费 TDengine 或者 Kafka 的数据的时候,我们可以指定从那里开始消费,比如我们可以指定,从上次消费完的地方开始消费 earliest,也可以指定从开始订阅之后填入的数据开始消费 latest,这跟我们常说的全量和增量是对应的,全量就用 earliest,增量就用 latest,但是将全量和增量的逻辑全部委托给消息中间件是有很大风险的,一个更稳健的做法是,确定流程启动的时间点,这个时间点以前的数据全都是历史数据,直接通过 sql 分页去查询然后消费,这个时间点以后的数据(Tdengine 支持在订阅的时候指定条件),采用 earliest 全量消费,或者 latest 消费(不支持指定条件的时候)
一些特殊情况下的订阅行为
-
如果我们订阅了一个超级表,此时我们删除了历史数据,订阅机制是不会触发的,更新历史数据,是会触发的。
-
插入一行数据,即使这行数据的时间戳已经存在了,数据会发生覆盖,依然会触发订阅。这种情况下等同于数据的更新。
rest 连接支持订阅
在前面我们了解过,rest 连接无法实现数据订阅,但是从 taos-jdbcdriver
的 3.1.0
版本开始,支持通过 rest 连接开启订阅,也就是说,我们不必再安装本地驱动,通过 rest 接口就可以完成所有的操作了。
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.7</version>
</dependency>
普通连接
REST 连接中增加 batchfetch
参数并设置为 true,将开启 WebSocket 连接。
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);
user:登录 TDengine 用户名,默认值 'root'。
password:用户登录密码,默认值 'taosdata'。
batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。
httpSocketTimeout: socket 超时时间,单位 ms,默认值为 60000。仅在 batchfetch 设置为 false 时生效。
messageWaitTimeout: 消息超时时间, 单位 ms, 默认值为 60000。 仅在 batchfetch 设置为 true 时生效。
useSSL: 连接中是否使用 SSL。
httpPoolSize: REST 并发请求大小,默认 20。
注意
部分配置项(比如:locale、timezone)在 REST 连接中不生效。
与原生连接方式不同,REST 接口是无状态的。在使用 JDBC REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。例如:
INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);
如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用 /rest/sql/dbname
作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test
,那么,可以执行 sql:
insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);
数据订阅,
完整示例:完整示例
websocket 的方式跟原生方式相比,就是多了一个
config.setProperty("td.connect.type", "ws");
数据订阅的时候 websocket 断开了之后,没有重连方法,因为订阅的时候不支持 websocket 自动重连,详情请看 WSClient#reconnectBlocking
我感觉是还没实现在 tmq 场景下的自动重连,这个实现起来是没问题的。没准后续的更新会实现 tmq 场景下的 websocket 的自动重连。