JDBC 流式读取
JDBC 流式读取
流式读取 API 的效果是按需加载,在读取超大数据量的表的时候非常好用
JDBC 的 ResultSet 是支持流式读取的,它会一次只加载十条或者 100 条,然后,当你消费完了的时候,它会自动去后台拉数据,但是 Mysql 的驱动默认不支持 fetsize 参数,会把所有数据都查出来,只有把 fetchsize 改成 0 才可以开启流式读取.
一般情况下,设置
PreparedStatement preparedStatement = connection.prepareStatement("select * from table", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
然后再设置 fetchSize 才能生效。
此外,小的数据量可能流式读取并不会生效,只有在数据量比较大的时候,比如一百万,fetchSize 为 1 万,这种,才会生效,是各个驱动自己的实现为主
Dameng
达梦的驱动做的很人性化,自动就开启了流式读取,不需要在创建 PreparedStatement
的时候配置 ResultSet.TYPE_FORWARD_ONLY
, ResultSet.CONCUR_READ_ONLY
而且 fetchSize 也不用设置,而且设置了也没用,定死了 fetchSize 为 100
简单实践
DM 数据库驱动
<dependency>
<groupId>com.dameng</groupId>
<artifactId>DmJdbcDriver18</artifactId>
<version>8.1.2.84</version>
</dependency>
建库语句,test_data
表
create table "test_data"
(
"name" VARCHAR(200),
"age" INT(10),
"score" INT(10),
"description" VARCHAR(500)
);
数据插入代码
DataInsert
:
public class DataInsert {
public static void main(String[] args) {
try {
Connection dameng = DriverManager.getConnection("jdbc:dm://192.168.101.158:5236", "DMS", "888888888");
String sql = "insert into test_data values (?,?,?,? );";
PreparedStatement statement = dameng.prepareStatement(sql);
for (int i = 0; i < 200; i++) {
statement.clearParameters();
statement.setString(1, "xiashuo" + i);
statement.setInt(2, i);
statement.setInt(3, 100 + i);
statement.setString(4, "测试" + i);
statement.execute();
System.out.println("插入:xiashuo" + i);
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
流式读取代码
public class DataRead {
private static int nowTag = 0;
public static void main(String[] args) {
try {
Connection dameng = DriverManager.getConnection("jdbc:dm://192.168.101.158:5236", "DMS", "888888888");
String sql = "select * from test_data where age > ? ";
// 达梦的驱动做的很人性化,自动就开启了流式读取,不需要在创建 PreparedStatement 的时候配置 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
// 而且 fetchSize 也不用设置,而且设置了也没用,定死了 fetchSize 为 100
// PreparedStatement statement = dameng.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
// statement.setFetchSize(10000);
PreparedStatement statement = dameng.prepareStatement(sql);
while (true) {
statement.setInt(1, nowTag);
ResultSet resultSet = statement.executeQuery();
// 按需加载的代码在这里,
while (resultSet.next()) {
extracted(resultSet);
}
resultSet.close();
System.out.println("一个 resultSet 结束");
Thread.sleep(1000);
}
// dameng.close();
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void extracted(ResultSet resultSet) throws SQLException, InterruptedException {
ResultSetMetaData metaData = resultSet.getMetaData();
// 注意,从 1 开始
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String columnName = metaData.getColumnName(i);
Object object = resultSet.getObject(columnName);
System.out.println(columnName + "---" + object.toString());
if (columnName.equals("age")) {
nowTag = (int) object;
}
}
System.out.println("----------------------------------");
Thread.sleep(100);
}
}
Mysql
不需要设置 ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY
需要将 FetchSize 设置为 Integer.MIN_VALUE,这跟其他的库都不一样。
https://zhuanlan.zhihu.com/p/421952313
setFetchSize 必须设置成 Integer.MIN_VALUE
源码在 StatementImpl 的
public void enableStreamingResults() throws SQLException {
try {
synchronized(this.checkClosed().getConnectionMutex()) {
this.originalResultSetType = this.query.getResultType();
this.originalFetchSize = this.query.getResultFetchSize();
this.setFetchSize(Integer.MIN_VALUE);
this.setResultSetType(Type.FORWARD_ONLY);
}
} catch (CJException var5) {
throw SQLExceptionsMapping.translateException(var5, this.getExceptionInterceptor());
}
}
简单实践
数据库驱动
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
建表语句
create table test_data
(
name varchar(200) null,
age int null,
score int null,
description varchar(500) null
);
插入代码
public class DataInsert {
public static void main(String[] args) {
try {
Connection mysqlXS = DriverManager.getConnection("jdbc:mysql://xiashuo.xyz:3306/?allowPublicKeyRetrieval=true", "mysqlXS", "mySql@327541");
String sql = "insert into core_java.test_data values (?,?,?,? );";
PreparedStatement statement = mysqlXS.prepareStatement(sql);
for (int i = 0; i < 100; i++) {
statement.clearParameters();
statement.setString(1, "xiashuo" + i);
statement.setInt(2, i);
statement.setInt(3, 100 + i);
statement.setString(4, "测试" + i);
statement.execute();
System.out.println("插入:xiashuo" + i);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
流式读取代码
public class DataRead {
private static int nowTag = 0;
public static void main(String[] args) {
try {
Connection mysqlXS = DriverManager.getConnection("jdbc:mysql://192.168.101.158:3306/?allowPublicKeyRetrieval=true", "mysql", "mySql@mySql");
// Statement statement = mysqlXS.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
String sql = "select * from core_java.test_data where age > ? ";
// 不需要设置 ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY
// PreparedStatement statement = mysqlXS.prepareStatement(sql, ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
PreparedStatement statement = mysqlXS.prepareStatement(sql);
// Mysql 开启流式读取很特殊,需要将 FetchSize 设置为 Integer.MIN_VALUE
statement.setFetchSize(Integer.MIN_VALUE);
while (true) {
statement.setInt(1, nowTag);
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
extracted(resultSet);
}
resultSet.close();
System.out.println("一个 resultSet 结束");
Thread.sleep(1000);
}
// mysqlXS.close();
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private static void extracted(ResultSet resultSet) throws SQLException, InterruptedException {
ResultSetMetaData metaData = resultSet.getMetaData();
// 注意,从 1 开始
for (int i = 1; i <= metaData.getColumnCount(); i++) {
String columnName = metaData.getColumnName(i);
Object object = resultSet.getObject(columnName);
System.out.println(columnName + "---" + object.toString());
if (columnName.equals("age")) {
nowTag = (int) object;
}
}
System.out.println("----------------------------------");
Thread.sleep(100);
}
}
TDengine
TDengine 3.2.7 版本的驱动不支持按需加载
那就手动实现,比如实现 Iteratior 接口,内置一个分页查询,查询一次就将分页查询的结果放到内存里,流式查询的时候返回内存中的结果,如果这一页查完了,就查下一页的。麻烦一点的地方就是需要自己来维护这个流式读取的进度。而且会有跨事务的问题,因此我们需要手动开启事务,手动提交事务。关于事务的问题,请看 流式读取时的事务管理
。
流式读取时的事务管理
注意:数据库都是有事务管理的,在执行查询语句之后,后续的流式读取操作读取的都是执行查询操作时的快照的数据。
ResultSet resultSet = statement.executeQuery();
举个例子,执行查询语句的时候的时间为 A,此时有 100 条数据,流式查询并处理完这一百条数据需要 10s,那么在 A 到 A+10s 这段时间内,又插入了 50 条数据,那么流式读取会读取到新增加的这 50 条吗?答案是不会,流式读取只会读取执行查询语句时的那个快照下的数据,也就时 100 条。
如果想要查询到后续增加的这 50 条,很简单,再执行一次查询操作即可。