JDBC 流式读取

JDBC 流式读取

流式读取 API 的效果是按需加载,在读取超大数据量的表的时候非常好用
JDBC 的 ResultSet 是支持流式读取的,它会一次只加载十条或者 100 条,然后,当你消费完了的时候,它会自动去后台拉数据,但是 Mysql 的驱动默认不支持 fetsize 参数,会把所有数据都查出来,只有把 fetchsize 改成 0 才可以开启流式读取.
一般情况下,设置

PreparedStatement preparedStatement = connection.prepareStatement("select * from table", ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);

然后再设置 fetchSize 才能生效。
此外,小的数据量可能流式读取并不会生效,只有在数据量比较大的时候,比如一百万,fetchSize 为 1 万,这种,才会生效,是各个驱动自己的实现为主

Dameng

达梦的驱动做的很人性化,自动就开启了流式读取,不需要在创建 PreparedStatement 的时候配置 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
而且 fetchSize 也不用设置,而且设置了也没用,定死了 fetchSize 为 100
简单实践
DM 数据库驱动

<dependency>
    <groupId>com.dameng</groupId>
    <artifactId>DmJdbcDriver18</artifactId>
    <version>8.1.2.84</version>
</dependency>

建库语句,test_data

create table "test_data"
(
    "name"        VARCHAR(200),
    "age"         INT(10),
    "score"       INT(10),
    "description" VARCHAR(500)
);

数据插入代码
DataInsert

public class DataInsert {
    public static void main(String[] args) {
        try {
            Connection dameng = DriverManager.getConnection("jdbc:dm://192.168.101.158:5236", "DMS", "888888888");
            String sql = "insert into test_data values (?,?,?,? );";
            PreparedStatement statement = dameng.prepareStatement(sql);
            for (int i = 0; i < 200; i++) {
                statement.clearParameters();
                statement.setString(1, "xiashuo" + i);
                statement.setInt(2, i);
                statement.setInt(3, 100 + i);
                statement.setString(4, "测试" + i);
                statement.execute();
                System.out.println("插入:xiashuo" + i);
                // try {
                //     Thread.sleep(100);
                // } catch (InterruptedException e) {
                //     throw new RuntimeException(e);
                // }
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

流式读取代码

public class DataRead {
    private static int nowTag = 0;
    public static void main(String[] args) {
        try {
            Connection dameng = DriverManager.getConnection("jdbc:dm://192.168.101.158:5236", "DMS", "888888888");
            String sql = "select * from test_data where age > ? ";
            // 达梦的驱动做的很人性化,自动就开启了流式读取,不需要在创建 PreparedStatement  的时候配置 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY
            // 而且 fetchSize 也不用设置,而且设置了也没用,定死了 fetchSize 为 100
            // PreparedStatement statement = dameng.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
            // statement.setFetchSize(10000);
            PreparedStatement statement = dameng.prepareStatement(sql);
            while (true) {
                statement.setInt(1, nowTag);
                ResultSet resultSet = statement.executeQuery();
                // 按需加载的代码在这里,
                while (resultSet.next()) {
                    extracted(resultSet);
                }
                resultSet.close();
                System.out.println("一个 resultSet 结束");
                Thread.sleep(1000);
            }
            // dameng.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    private static void extracted(ResultSet resultSet) throws SQLException, InterruptedException {
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 注意,从 1 开始
        for (int i = 1; i <= metaData.getColumnCount(); i++) {
            String columnName = metaData.getColumnName(i);
            Object object = resultSet.getObject(columnName);
            System.out.println(columnName + "---" + object.toString());
            if (columnName.equals("age")) {
                nowTag = (int) object;
            }
        }
        System.out.println("----------------------------------");
        Thread.sleep(100);
    }
}

Mysql

不需要设置 ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY
需要将 FetchSize 设置为 Integer.MIN_VALUE,这跟其他的库都不一样。
https://zhuanlan.zhihu.com/p/421952313
setFetchSize 必须设置成 Integer.MIN_VALUE
源码在 StatementImpl 的

    public void enableStreamingResults() throws SQLException {
        try {
            synchronized(this.checkClosed().getConnectionMutex()) {
                this.originalResultSetType = this.query.getResultType();
                this.originalFetchSize = this.query.getResultFetchSize();
                this.setFetchSize(Integer.MIN_VALUE);
                this.setResultSetType(Type.FORWARD_ONLY);
            }
        } catch (CJException var5) {
            throw SQLExceptionsMapping.translateException(var5, this.getExceptionInterceptor());
        }
    }

简单实践
数据库驱动

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.30</version>
</dependency>

建表语句

create table test_data
(
    name        varchar(200) null,
    age         int          null,
    score       int          null,
    description varchar(500) null
);

插入代码

public class DataInsert {
    public static void main(String[] args) {
        try {
            Connection mysqlXS = DriverManager.getConnection("jdbc:mysql://xiashuo.xyz:3306/?allowPublicKeyRetrieval=true", "mysqlXS", "mySql@327541");
            String sql = "insert into core_java.test_data values (?,?,?,? );";
            PreparedStatement statement = mysqlXS.prepareStatement(sql);
            for (int i = 0; i < 100; i++) {
                statement.clearParameters();
                statement.setString(1, "xiashuo" + i);
                statement.setInt(2, i);
                statement.setInt(3, 100 + i);
                statement.setString(4, "测试" + i);
                statement.execute();
                System.out.println("插入:xiashuo" + i);
                // try {
                //     Thread.sleep(1000);
                // } catch (InterruptedException e) {
                //     throw new RuntimeException(e);
                // }
            }
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

流式读取代码

public class DataRead {
    private static int nowTag = 0;
    public static void main(String[] args) {
        try {
            Connection mysqlXS = DriverManager.getConnection("jdbc:mysql://192.168.101.158:3306/?allowPublicKeyRetrieval=true", "mysql", "mySql@mySql");
            // Statement statement = mysqlXS.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
            String sql = "select * from core_java.test_data where age > ? ";
            // 不需要设置 ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY
            // PreparedStatement statement = mysqlXS.prepareStatement(sql, ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
            PreparedStatement statement = mysqlXS.prepareStatement(sql);
            // Mysql 开启流式读取很特殊,需要将 FetchSize 设置为 Integer.MIN_VALUE
            statement.setFetchSize(Integer.MIN_VALUE);
            while (true) {
                statement.setInt(1, nowTag);
                ResultSet resultSet = statement.executeQuery();
                while (resultSet.next()) {
                    extracted(resultSet);
                }
                resultSet.close();
                System.out.println("一个 resultSet 结束");
                Thread.sleep(1000);
            }
            // mysqlXS.close();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
    private static void extracted(ResultSet resultSet) throws SQLException, InterruptedException {
        ResultSetMetaData metaData = resultSet.getMetaData();
        // 注意,从 1 开始
        for (int i = 1; i <= metaData.getColumnCount(); i++) {
            String columnName = metaData.getColumnName(i);
            Object object = resultSet.getObject(columnName);
            System.out.println(columnName + "---" + object.toString());
            if (columnName.equals("age")) {
                nowTag = (int) object;
            }
        }
        System.out.println("----------------------------------");
        Thread.sleep(100);
    }
}

TDengine

TDengine 3.2.7 版本的驱动不支持按需加载
那就手动实现,比如实现 Iteratior 接口,内置一个分页查询,查询一次就将分页查询的结果放到内存里,流式查询的时候返回内存中的结果,如果这一页查完了,就查下一页的。麻烦一点的地方就是需要自己来维护这个流式读取的进度。而且会有跨事务的问题,因此我们需要手动开启事务,手动提交事务。关于事务的问题,请看 流式读取时的事务管理

流式读取时的事务管理

注意:数据库都是有事务管理的,在执行查询语句之后,后续的流式读取操作读取的都是执行查询操作时的快照的数据。

ResultSet resultSet = statement.executeQuery();

举个例子,执行查询语句的时候的时间为 A,此时有 100 条数据,流式查询并处理完这一百条数据需要 10s,那么在 A 到 A+10s 这段时间内,又插入了 50 条数据,那么流式读取会读取到新增加的这 50 条吗?答案是不会,流式读取只会读取执行查询语句时的那个快照下的数据,也就时 100 条。
如果想要查询到后续增加的这 50 条,很简单,再执行一次查询操作即可。