Flink CDC | Mysql指定时间戳读取
Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。
INITIAL: 全量与增量
EARLIEST_OFFSET:最早位点
LATEST_OFFSET:最近的位点
SPECIFIC_OFFSETS:指定位点
TIMESTAMP:指定时间点
SNAPSHOT:全量
源码分析
设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。
public static void main(String[] args) {
MySqlSource.builder()
.startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
.build();
}
当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java
public static BinlogOffset initializeEffectiveOffset(
BinlogOffset offset, MySqlConnection connection) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
// 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
// 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:
return offset;
}
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(), config.password());
List binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
while (rs.next()) {
String fileName = rs.getString(1);
long fileSize = rs.getLong(2);
if (fileSize > 0) {
binlogFiles.add(fileName);
}
}
};
try {
// 获取mysql系统内存在的binlog
connection.query("SHOW BINARY LOGS", rsc);
LOG.info("Total search binlog: {}", binlogFiles);
if (binlogFiles.isEmpty()) {
return BinlogOffset.ofBinlogFilePosition("", 0);
}
// 搜索最接近的binlog文件
String binlogName = searchBinlogName(client, targetMs, binlogFiles);
return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
} catch (Exception e) {
throw new FlinkRuntimeException(e);
}
}
private static String searchBinlogName(
BinaryLogClient client, long targetMs, List binlogFiles)
throws IOException, InterruptedException {
int startIdx = 0;
int endIdx = binlogFiles.size() - 1;
// 因为binlog文件名是递增的,同时时间也是递增的
// 以二分法进行查找
while (startIdx <= endIdx) {
int mid = startIdx + (endIdx - startIdx) / 2;
long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
if (midTs < targetMs) {
startIdx = mid + 1;
} else if (targetMs < midTs) {
endIdx = mid - 1;
} else {
return binlogFiles.get(mid);
}
}
return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
}
从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在
MySqlBinlogSplitReadTask.java
protected void handleEvent(
MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
// 当从binlog读取数据后,进行一次过滤
if (!eventFilter.test(event)) {
return;
}
super.handleEvent(partition, offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
final BinlogOffset currentBinlogOffset =
RecordUtils.getBinlogPosition(offsetContext.getOffset());
// reach the high watermark, the binlog reader should finished
if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
// send binlog end event
try {
signalEventDispatcher.dispatchWatermarkEvent(
binlogSplit,
currentBinlogOffset,
SignalEventDispatcher.WatermarkKind.BINLOG_END);
} catch (InterruptedException e) {
LOG.error("Send signal event error.", e);
errorHandler.setProducerThrowable(
new DebeziumException("Error processing binlog signal event", e));
}
// tell reader the binlog task finished
((StoppableChangeEventSourceContext) context).stopChangeEventSource();
}
}
}
eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。
private Predicate createEventFilter(BinlogOffset startingOffset) {
// 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;
}
return event -> true;
}