Flink CDC | Mysql指定时间戳读取

createh53个月前 (02-01)技术教程20

Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。

INITIAL: 全量与增量

EARLIEST_OFFSET:最早位点

LATEST_OFFSET:最近的位点

SPECIFIC_OFFSETS:指定位点

TIMESTAMP:指定时间点

SNAPSHOT:全量

源码分析

设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。

public static void main(String[] args) {
    MySqlSource.builder()
    .startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
    .build();
}

当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java

public static BinlogOffset initializeEffectiveOffset(
    BinlogOffset offset, MySqlConnection connection) {
    BinlogOffsetKind offsetKind = offset.getOffsetKind();
    switch (offsetKind) {
        case EARLIEST:
            return BinlogOffset.ofBinlogFilePosition("", 0);
        case TIMESTAMP:
            // 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
            // 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
            return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
        case LATEST:
            return DebeziumUtils.currentBinlogOffset(connection);
        default:
            return offset;
    }
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
        MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
        BinaryLogClient client =
                new BinaryLogClient(
                        config.hostname(), config.port(), config.username(), config.password());

        List binlogFiles = new ArrayList<>();
        JdbcConnection.ResultSetConsumer rsc =
                rs -> {
                    while (rs.next()) {
                        String fileName = rs.getString(1);
                        long fileSize = rs.getLong(2);
                        if (fileSize > 0) {
                            binlogFiles.add(fileName);
                        }
                    }
                };

        try {
            // 获取mysql系统内存在的binlog
            connection.query("SHOW BINARY LOGS", rsc);
            LOG.info("Total search binlog: {}", binlogFiles);

            if (binlogFiles.isEmpty()) {
                return BinlogOffset.ofBinlogFilePosition("", 0);
            }
            // 搜索最接近的binlog文件
            String binlogName = searchBinlogName(client, targetMs, binlogFiles);
            return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }
    private static String searchBinlogName(
            BinaryLogClient client, long targetMs, List binlogFiles)
            throws IOException, InterruptedException {
        int startIdx = 0;
        int endIdx = binlogFiles.size() - 1;
        // 因为binlog文件名是递增的,同时时间也是递增的
        // 以二分法进行查找
        while (startIdx <= endIdx) {
            int mid = startIdx + (endIdx - startIdx) / 2;
            long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
            if (midTs < targetMs) {
                startIdx = mid + 1;
            } else if (targetMs < midTs) {
                endIdx = mid - 1;
            } else {
                return binlogFiles.get(mid);
            }
        }

        return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
    }

从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在
MySqlBinlogSplitReadTask.java

protected void handleEvent(
            MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
    // 当从binlog读取数据后,进行一次过滤  
    if (!eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        // check do we need to stop for read binlog for snapshot split.
        if (isBoundedRead()) {
            final BinlogOffset currentBinlogOffset =
                    RecordUtils.getBinlogPosition(offsetContext.getOffset());
            // reach the high watermark, the binlog reader should finished
            if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
                // send binlog end event
                try {
                    signalEventDispatcher.dispatchWatermarkEvent(
                            binlogSplit,
                            currentBinlogOffset,
                            SignalEventDispatcher.WatermarkKind.BINLOG_END);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    errorHandler.setProducerThrowable(
                            new DebeziumException("Error processing binlog signal event", e));
                }
                // tell reader the binlog task finished
                ((StoppableChangeEventSourceContext) context).stopChangeEventSource();
            }
        }
    }

eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。

    private Predicate createEventFilter(BinlogOffset startingOffset) {
        // 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
        if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
            long startTimestampSec = startingOffset.getTimestampSec();
            return event ->
                    EventType.HEARTBEAT.equals(event.getHeader().getEventType())
                            || event.getHeader().getTimestamp() >= startTimestampSec * 1000;
        }
        return event -> true;
    }

相关文章

「怒赞」Java8全新日期、时间API在这全明白了

满怀忧思,不如先干再说!通过学习,重新定义自己!时间对生活来说非常重要,Java也为我们提供了时间的API,多数程序员都在吐槽Java8之前的日期和时间,在Java8中引入全新的日期和时间API,目前...

java项目过程中常用的日期计算工具

在项目开发过程中,日期计算往往是一个非常常见且关键的需求,尤其是在涉及报表生成和数据分析的场景中。许多业务需求需要对日期进行各种复杂的逻辑处理,例如计算某个时间段内的数据、动态生成时间范围、处理跨月或...

一文秒懂:多级时间轮,最顶尖的Java调度算法

缓存之王 Caffeine 中,涉及到100w级、1000W级、甚至亿级元素的过期问题,如何进行高性能的定时调度,是一个难题。海量定时任务管理的问题下面的问题,来自互联网:一个大型内容审核平时,在运营...

时间戳用法详解,时间与时间戳怎么转换

在程序开发者用到的必不可少的功能就是时间戳与时间的转换了,经常数据库存的是时间戳,但是给用户需要显示具体时间,今天这篇文章就来介绍下怎么使用python,java,JavaScript,php几种语言...

java小知识-纳秒(纳秒等于多少)

作者:京东物流 崔冬冬一、System.nanoTime()java中,有这么一个方法System.nanoTime(),你用过吗?二、与System.currentTimeMillis()对比Sys...

打通 JAVA 与内核系列之 一 ReentrantLock 锁的实现原理

写JAVA代码的同学都知道,JAVA里的锁有两大类,一类是synchronized锁,一类是concurrent包里的锁(JUC锁)。其中synchronized锁是JAVA语言层面提供的能力,在此不...