数据订阅

数据订阅

官网文档:数据订阅,,订阅SQL


TDengine 支持三种方式的订阅



从 3.0.4.0 开始,订阅之前数据库的 WAL_RETENTION_PERIOD 配置不能为 0,官方例子中默认给的 3600

ALTER DATABASE server_node_base WAL_RETENTION_PERIOD 3600;

所以这个 WAL_RETENTION_PERIOD 要加到 数据库建表语句中

CREATE DATABASE power KEEP 365 DURATION 10 BUFFER 16 WAL_LEVEL 1 WAL_RETENTION_PERIOD 3600;

参数说明
语句变更

此外可能会出现数据消费不到的情况,此时我们就得设置保留时长 WAL_RETENTION_PERIODexperimental.snapshot.enable,但是还不清楚他们具体的作用是什么

原文如下:
为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
注意:默认是从 wal 消费数据,如果 wal 被删除,消费到的数据会不全,此时可以将参数 experimental.snapshot.enable 设置为 true,从 tsdb 获取全部数据,但是这样的话就不能保证数据的消费顺序。所以建议根据自己的消费情况合理的设置 wal 的保留策略,保证可以从 wal 里订阅到全部数据。

我猜测的逻辑是这样的,类似于(mysql 的 redolog),然后等系统不忙的时候再把 redolog 中的数据刷盘,更新的数据先写到 WAL,然后按照配置的时间 WAL_RETENTION_PERIOD 至少保留这么久,然后再落到 TSDB,落到 TSDB 之后,之前的 WAL 就删除了,创建新的 WAL,所以,WAL_RETENTION_PERIOD 就代表着这个数据从创建开始算的可消费时间,如果在这个事件之内没有被消费,那么以后就再也无法消费到了。那就只能直接去 TSDB 也就是数据库查了。

所有相关参数:


前面我们说到了,在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,这造成了一个问题:

tdengine 的订阅,本质上就是订阅 redo 日志(WAL 日志),而不是订阅最终的数据,也就是说如果你插入了 100 条数据 (产生了 100 条插入 redo 日志),然后删除了 10 条数据 (产生了删除日志),虽然最终只有 90 条数据,但是订阅这张表,会去扫描所有的插入 redo 日志,最终还是会订阅到 100 条数据。

也就是说订阅,订阅的是插入操作,这样做是为了订阅到被删掉的数据,虽然我觉得不合理

不过时序数据库应该是不会出现频繁的删除操作的。所以这个问题也不大。


TaosConsumer 订阅主题,轮询,取消订阅,应该在一个线程里面,不能在多个线程调用,因为其代码中有限制,限制了。

TaosConsumer#acquire 直接限制方法在多线程环境下使用。

TaosConsumer 的订阅操作(subscribe)和取消订阅(unsubscribe)都是重操作,会很耗时。


官网:

https://docs.taosdata.com/develop/tmq/

https://docs.taosdata.com/taos-sql/tmq/#

CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
DROP TOPIC [IF EXISTS] topic_name;
SHOW TOPICS;
-- 查看所有的消费者的状态,如果是lost,则表示消费者失效
SHOW CONSUMERS;
-- 查看 consumer 与 vgroup 之间的分配关系
SHOW SUBSCRIPTIONS;

topic 个数有上限,通过参数 tmqMaxTopicNum 控制,默认 20 个,这个配置项,在 taos.cfg 中默认不存在,加上就好了。

注意点

当我们用 select * from table 来创建订阅查询语句的时候,* 会被解释称创建主题的这个时刻,这个表的所有列,后续即使加了字段 A,这个主题的订阅返回的一行依然是订阅时的那些字段,并不包含 A。


Java 数据订阅:

https://docs.taosdata.com/connector/java/#数据订阅

知识点

主题是跨库(模式的),全局的,在任何一个模式下都可以看到所有的主题

消费者

SHOW CONSUMERS;

结果列:

https://docs.taosdata.com/taos-sql/perf/#perf_consumers

# 列名 数据类型 说明
1 consumer_id BIGINT 消费者的唯一 ID
2 consumer_group BINARY(192) 消费者组
3 client_id BINARY(192) 用户自定义字符串,通过创建 consumer 时指定 client_id 来展示
4 status BINARY(20) 消费者当前状态。消费者状态包括:ready(正常可用)、 lost(连接已丢失)、 rebalancing(消费者所属 vgroup 正在分配中)、unknown(未知状态)
5 topics BINARY(204) 被订阅的 topic。若订阅多个 topic,则展示为多行,
consumer.subscribe 方法的参数的是一个列表,假如这个主题列表包含 3 个主题,那么这个消费者(同一个消费者 ID)在 SHOW CONSUMERS; 中将有三行数据,这三行数据除了 topics 字段不同,其他字段都相同,也就是说,虽然这个字段名称叫 topics,但是一行数据的 topics 列只包含一个主题字符串
6 up_time TIMESTAMP 第一次连接 taosd 的时间
7 subscribe_time TIMESTAMP 上一次发起订阅的时间
8 rebalance_time TIMESTAMP 上一次触发 rebalance 的时间
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个 topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的 ACK 机制,在宕机、重启等复杂环境下确保 at least once 消费。
创建消费者的时候,消费者组 ID 是必填项
有一个节点 suscribe 订阅一个主题的时候(consumer.subscribe),SHOW CONSUMERS; 中就会增加一行对应的记录,且状态为 ready,此时,这个主题是不能删除的,因为有消费者正在消费,取消订阅的时候(consumer.unsubscribe),会删除对应的记录。
如果一个主题,在 SHOW CONSUMERS; 结果中查出的所有的消费者的状态都是 lost,那这个主题也是可以删除的,因为所有的消费者都丢失了,自然也是可以删除的。
所以,如果一个主题,在 SHOW CONSUMERS; 的结果中没有找到对应的消费者,或者对应的消费者状态都是 lost,那这个主题才可以删除
但是注意,紧接着取消订阅关闭消费者之后,立即进行主题删除操作 DROP TOPIC IF EXISTS Topic,依然会报错,取消订阅关闭消费者之后 SHOW CONSUMERS; 的结果会立即改变,但是数据库底层好像需要一段时间才能反应过来,等数据库反应过来之后,才能删除主题,这个反应时间,大概是 500ms。这个真的是个坑。

TDengine Consumer is not safe for multi-threaded access

Kafka consumer多线程下not safe for multi-threaded access问题_rewerma的博客-CSDN博客_kafkaconsumer线程不安全
加锁就能解决,在在不同的线程中操作同一个 comsumer 对象的时候,先获取锁,执行完之后再释放锁。
但是,我们还是推荐在一个线程中把创建消费者,消费者轮询,取消定义,关闭消费者做完,而不是分多个线程去做这个事情

bug

tdengine 作为时序数据库对时间是很敏感的,数据订阅不到的时候,优先考虑是不是数据插入时间是不是有问题,

数据库没有关闭突然断电

没有关闭数据库服务的情况下突然给主机断电,可能会造成 TDengine 数据库的损坏(可能是数据的损坏),如果出现这种情况,没办法,只能重装 TDengine。在服务器断电之前,一定要关闭重要的服务

消费者丢失

订阅了主题的消费者无法获取主题中更新了的数据,具体表现为 consumer.poll(Duration.ofSeconds(1)) 这个语句 CPU 占用非常高,同时,TDengine 的日志(/usr/local/taos/log/taosdlog.1 或者 /usr/local/taos/log/taosdlog.0)在不停报错,具体报错信息为:

02/06 20:58:05.839551 00001305 MND queries updated in conn 1353495098, num:0
02/06 20:58:05.839574 00001305 MND db:1.meters, version and numOfTable not changed
02/06 20:58:05.839582 00001305 MND stb:1.server_node_base.dock, start to retrieve meta
02/06 20:58:05.839586 00001305 MND stb:1.server_node_base.airport, start to retrieve meta
02/06 20:58:05.839589 00001305 MND stb:1.server_node_base.road, start to retrieve meta
02/06 20:58:07.167516 00001304 MND ERROR consumer 864691128471973996 not exist
02/06 20:58:07.167963 00001304 MND ERROR msg:0x7f35a0009f10, failed to process since Consumer not exist, app:0x7f5a6c371ad0 type:consumer-hb, gtid:0x32f2c2ce1bbf0bd7:0x3422226ce1b00bda

这是因为本地驱动跟服务器的版本不一致。
首先,有一点需要注意,就是数据订阅无法通过 rest 连接实现,只能走本地连接,那本地连接就必须在本地加载本地驱动(Linux 系统下是 /usr/local/taos/driver/libtaos.so.3.0.1.6,window 系统就是 /taos.dll),春哥为了实现本地驱动,写了一个 dms-tdengine-client 模块来动态加载 taos 的本地驱动,这也是为什么我们在自定义任务中订阅主题的时候,直接使用 6030 就可以了,就是因为 dms-tdengine-client 已经动态加载了驱动,所以实际上,我们在元数据中,已经可以使用原生连接(通过6030接口),来连接远程数据库了。

但是要注意,客户端的驱动的版本必须和服务端的版本对的上,不然就会无法连接。

正是因为版本没有对上才导致消费者无效,动态加载驱动的代码请查看此文章同目录下的 TaosLoader.java。其实这个类的核心功能就是一个,就是设置系统属性

System.setProperty("java.library.path", path);

方便起见,本地驱动可以直接从 TDengine-client 包中获取,安装包页面,所有版本的 server 和 client 的 下载地址


不知道 rpm 包为什么不区分 CPU 平台。
libtaos.so 依赖 libjemalloc.so.2,如果你的 Linux 环境没有这个 so 文件,Tdengine 依然无法创建本地连接,具体的错误就是在创建 TDengine 消费者的时候报错

TaosConsumer<Map<String, Object>> consumer = new TaosConsumer<>(tdengineConsumerconfig);

具体报错位置是 new TMQConnector(); 的时候,报错信息是:

libjemalloc.so.2: cannot open shared object file: No such file or directory

我们需要上传 libjemalloc.so.2/usr/lib/ 文件夹,然后将其所在目录添加到加载路径中,做法很简单,将 other.conf 上传到 /etc/ld.so.conf.d,然后 sudo ldconfig 让其生效即可,

参考博客 Linux下找不到so文件的解决办法_rznice的博客-CSDN博客

相关文件看同目录下的 so 文件夹


find / -name ld-linux-aarch64.so.1

主题名称太长导致提示 Topic not exist

TDengine 消费者订阅主题的时候,主题名称不能太长,大概最长不能超过 50 个字符,不是很确定,40 个字符是可以的

// tdengineTopic 不能太长,最多好像不能超过50个字符
TaosConsumer.subscribe(Collections.singletonList(tdengineTopic));

全量和增量

在消费 TDengine 或者 Kafka 的数据的时候,我们可以指定从那里开始消费,比如我们可以指定,从上次消费完的地方开始消费 earliest,也可以指定从开始订阅之后填入的数据开始消费 latest,这跟我们常说的全量和增量是对应的,全量就用 earliest,增量就用 latest,但是将全量和增量的逻辑全部委托给消息中间件是有很大风险的,一个更稳健的做法是,确定流程启动的时间点,这个时间点以前的数据全都是历史数据,直接通过 sql 分页去查询然后消费,这个时间点以后的数据(Tdengine 支持在订阅的时候指定条件),采用 earliest 全量消费,或者 latest 消费(不支持指定条件的时候)

一些特殊情况下的订阅行为

rest 连接支持订阅

在前面我们了解过,rest 连接无法实现数据订阅,但是从 taos-jdbcdriver3.1.0 版本开始,支持通过 rest 连接开启订阅,也就是说,我们不必再安装本地驱动,通过 rest 接口就可以完成所有的操作了。

官方文档

<dependency>
  <groupId>com.taosdata.jdbc</groupId>
  <artifactId>taos-jdbcdriver</artifactId>
  <version>3.2.7</version>
</dependency>

普通连接

REST 连接中增加 batchfetch 参数并设置为 true,将开启 WebSocket 连接。

Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://taosdemo.com:6041/test?user=root&password=taosdata";
Connection conn = DriverManager.getConnection(jdbcUrl);

user:登录 TDengine 用户名,默认值 'root'。
password:用户登录密码,默认值 'taosdata'。
batchfetch: true:在执行查询时批量拉取结果集;false:逐行拉取结果集。默认值为:false。逐行拉取结果集使用 HTTP 方式进行数据传输。JDBC REST 连接支持批量拉取数据功能。taos-jdbcdriver 与 TDengine 之间通过 WebSocket 连接进行数据传输。相较于 HTTP,WebSocket 可以使 JDBC REST 连接支持大数据量查询,并提升查询性能。
charset: 当开启批量拉取数据时,指定解析字符串数据的字符集。
batchErrorIgnore:true:在执行 Statement 的 executeBatch 时,如果中间有一条 SQL 执行失败,继续执行下面的 SQL 了。false:不再执行失败 SQL 后的任何语句。默认值为:false。
httpConnectTimeout: 连接超时时间,单位 ms, 默认值为 60000。
httpSocketTimeout: socket 超时时间,单位 ms,默认值为 60000。仅在 batchfetch 设置为 false 时生效。
messageWaitTimeout: 消息超时时间, 单位 ms, 默认值为 60000。 仅在 batchfetch 设置为 true 时生效。
useSSL: 连接中是否使用 SSL。
httpPoolSize: REST 并发请求大小,默认 20。

注意

部分配置项(比如:locale、timezone)在 REST 连接中不生效。

原生连接方式不同,REST 接口是无状态的。在使用 JDBC REST 连接时,需要在 SQL 中指定表、超级表的数据库名称。例如:

INSERT INTO test.t1 USING test.weather (ts, temperature) TAGS('California.SanFrancisco') VALUES(now, 24.6);

如果在 url 中指定了 dbname,那么,JDBC REST 连接会默认使用 /rest/sql/dbname 作为 restful 请求的 url,在 SQL 中不需要指定 dbname。例如:url 为 jdbc:TAOS-RS://127.0.0.1:6041/test,那么,可以执行 sql:

insert into t1 using weather(ts, temperature) tags('California.SanFrancisco') values(now, 24.6);

数据订阅,

完整示例:完整示例

websocket 的方式跟原生方式相比,就是多了一个

config.setProperty("td.connect.type", "ws");

数据订阅的时候 websocket 断开了之后,没有重连方法,因为订阅的时候不支持 websocket 自动重连,详情请看 WSClient#reconnectBlocking

我感觉是还没实现在 tmq 场景下的自动重连,这个实现起来是没问题的。没准后续的更新会实现 tmq 场景下的 websocket 的自动重连。