Flink CDC | Mysql指定时间戳读取

createh52个月前 (02-01)技术教程14

Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。

INITIAL: 全量与增量

EARLIEST_OFFSET:最早位点

LATEST_OFFSET:最近的位点

SPECIFIC_OFFSETS:指定位点

TIMESTAMP:指定时间点

SNAPSHOT:全量

源码分析

设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。

Bash
public static void main(String[] args) {
    MySqlSource.builder()
    .startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
    .build();
}

当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java

Bash
public static BinlogOffset initializeEffectiveOffset(
    BinlogOffset offset, MySqlConnection connection) {
    BinlogOffsetKind offsetKind = offset.getOffsetKind();
    switch (offsetKind) {
        case EARLIEST:
            return BinlogOffset.ofBinlogFilePosition("", 0);
        case TIMESTAMP:
            // 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
            // 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
            return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
        case LATEST:
            return DebeziumUtils.currentBinlogOffset(connection);
        default:
            return offset;
    }
}
Bash
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
        MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
        BinaryLogClient client =
                new BinaryLogClient(
                        config.hostname(), config.port(), config.username(), config.password());

        List binlogFiles = new ArrayList<>();
        JdbcConnection.ResultSetConsumer rsc =
                rs -> {
                    while (rs.next()) {
                        String fileName = rs.getString(1);
                        long fileSize = rs.getLong(2);
                        if (fileSize > 0) {
                            binlogFiles.add(fileName);
                        }
                    }
                };

        try {
            // 获取mysql系统内存在的binlog
            connection.query("SHOW BINARY LOGS", rsc);
            LOG.info("Total search binlog: {}", binlogFiles);

            if (binlogFiles.isEmpty()) {
                return BinlogOffset.ofBinlogFilePosition("", 0);
            }
            // 搜索最接近的binlog文件
            String binlogName = searchBinlogName(client, targetMs, binlogFiles);
            return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }
Bash
    private static String searchBinlogName(
            BinaryLogClient client, long targetMs, List binlogFiles)
            throws IOException, InterruptedException {
        int startIdx = 0;
        int endIdx = binlogFiles.size() - 1;
        // 因为binlog文件名是递增的,同时时间也是递增的
        // 以二分法进行查找
        while (startIdx <= endIdx) {
            int mid = startIdx + (endIdx - startIdx) / 2;
            long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
            if (midTs < targetMs) {
                startIdx = mid + 1;
            } else if (targetMs < midTs) {
                endIdx = mid - 1;
            } else {
                return binlogFiles.get(mid);
            }
        }

        return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
    }

从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在
MySqlBinlogSplitReadTask.java

Bash
protected void handleEvent(
            MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
    // 当从binlog读取数据后,进行一次过滤  
    if (!eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        // check do we need to stop for read binlog for snapshot split.
        if (isBoundedRead()) {
            final BinlogOffset currentBinlogOffset =
                    RecordUtils.getBinlogPosition(offsetContext.getOffset());
            // reach the high watermark, the binlog reader should finished
            if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
                // send binlog end event
                try {
                    signalEventDispatcher.dispatchWatermarkEvent(
                            binlogSplit,
                            currentBinlogOffset,
                            SignalEventDispatcher.WatermarkKind.BINLOG_END);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    errorHandler.setProducerThrowable(
                            new DebeziumException("Error processing binlog signal event", e));
                }
                // tell reader the binlog task finished
                ((StoppableChangeEventSourceContext) context).stopChangeEventSource();
            }
        }
    }

eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。

Bash
    private Predicate createEventFilter(BinlogOffset startingOffset) {
        // 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
        if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
            long startTimestampSec = startingOffset.getTimestampSec();
            return event ->
                    EventType.HEARTBEAT.equals(event.getHeader().getEventType())
                            || event.getHeader().getTimestamp() >= startTimestampSec * 1000;
        }
        return event -> true;
    }

相关文章

「怒赞」Java8全新日期、时间API在这全明白了

满怀忧思,不如先干再说!通过学习,重新定义自己!时间对生活来说非常重要,Java也为我们提供了时间的API,多数程序员都在吐槽Java8之前的日期和时间,在Java8中引入全新的日期和时间API,目前...

「Java工具类」java8时间日期工具类,整个项目有它就足够了

介绍语本头条号主要是Java常用关键技术点,通用工具类的分享;以及springboot+springcloud+Mybatisplus+druid+mysql+redis+swagger+maven+...

时间戳用法详解,时间与时间戳怎么转换

在程序开发者用到的必不可少的功能就是时间戳与时间的转换了,经常数据库存的是时间戳,但是给用户需要显示具体时间,今天这篇文章就来介绍下怎么使用python,java,JavaScript,php几种语言...

时间管理大师:Java DateTimeFormatter.ofPattern 的幽默指南

前言在这个快节奏的世界里,时间就像一张消费券,谁都想把它花得更值!想象一下,能够像一个时间管理大师一样,随心所欲地掌控每一秒。Java 的 DateTimeFormatter.ofPattern 就是...

打通 JAVA 与内核系列之 一 ReentrantLock 锁的实现原理

写JAVA代码的同学都知道,JAVA里的锁有两大类,一类是synchronized锁,一类是concurrent包里的锁(JUC锁)。其中synchronized锁是JAVA语言层面提供的能力,在此不...

荒废了3年大学时间,Java自学6个多月,找到13k的工作

莫等闲,白了少年头,空悲切。发这个帖子就是劝诫各位学弟们不要像我一样,临近毕业时才意识到学技术学知识的重要性,能趁早尽量趁早,过去应该做的事情没有去做,后面都需要你加倍补回来,如果你不去弥补前面的空缺...