Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,17 @@ Flink SQL> SELECT * FROM orders;
<li>false(默认):所有类型的消息都保持原样下发。</li>
</td>
</tr>
<tr>
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。<br>
建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。<br>
这是一项实验性功能,默认关闭。
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.backfill.skip</td>
<td>optional</td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,17 @@ pipeline:
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
</td>
</tr>
<tr>
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。<br>
建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。<br>
这是一项实验性功能,默认关闭。
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,17 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
<li>false (default): All types of messages are sent as is.</li>
</td>
</tr>
<tr>
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.<br>
It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.<br>
This is an experimental feature and is disabled by default.
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.backfill.skip</td>
<td>optional</td>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,17 @@ pipeline:
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
<tr>
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.<br>
It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.<br>
This is an experimental feature and is disabled by default.
</td>
</tr>
<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
Expand Down Expand Up @@ -167,6 +168,8 @@ public DataSource createDataSource(Context context) {
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
boolean skipBinlogDeserializationOfUnsubscribedTables =
config.get(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -220,6 +223,8 @@ public DataSource createDataSource(Context context) {
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
.skipBinlogDeserializationOfUnsubscribedTables(
skipBinlogDeserializationOfUnsubscribedTables)
.skipSnapshotBackfill(skipSnapshotBackfill);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
Expand Down Expand Up @@ -351,6 +356,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
options.add(METADATA_LIST);
options.add(INCLUDE_COMMENTS_ENABLED);
options.add(USE_LEGACY_JSON_FORMAT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

@Experimental
public static final ConfigOption<Boolean> SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED =
ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to skip deserialization of binlog row events for unsubscribed tables.");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
*
* <p>Line 52-56: Add a new constructor with {@link TableIdFilter} supplied.
*
* <p>Line 70-74: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
*/
public class DeleteRowsEventDataDeserializer
extends AbstractRowsEventDataDeserializer<DeleteRowsEventData> {

private boolean mayContainExtraInformation;

/** the table id filter to skip further deserialization of unsubscribed table ids. */
private final TableIdFilter tableIdFilter;

public DeleteRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
this(tableMapEventByTableId, TableIdFilter.all());
}

public DeleteRowsEventDataDeserializer(
Map<Long, TableMapEventData> tableMapEventByTableId, TableIdFilter tableIdFilter) {
super(tableMapEventByTableId);
this.tableIdFilter = tableIdFilter;
}

public DeleteRowsEventDataDeserializer setMayContainExtraInformation(
boolean mayContainExtraInformation) {
this.mayContainExtraInformation = mayContainExtraInformation;
return this;
}

@Override
public DeleteRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
DeleteRowsEventData eventData = new DeleteRowsEventData();
eventData.setTableId(inputStream.readLong(6));

// skip further deserialization if the table id is unsubscribed
if (!tableIdFilter.test(eventData.getTableId())) {
eventData.setIncludedColumns(null);
eventData.setRows(Collections.emptyList());
return eventData;
}

inputStream.readInteger(2); // reserved
if (mayContainExtraInformation) {
int extraInfoLength = inputStream.readInteger(2);
inputStream.skip(extraInfoLength - 2);
}
int numberOfColumns = inputStream.readPackedInteger();
eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
eventData.setRows(
deserializeRows(
eventData.getTableId(), eventData.getIncludedColumns(), inputStream));
return eventData;
}

private List<Serializable[]> deserializeRows(
long tableId, BitSet includedColumns, ByteArrayInputStream inputStream)
throws IOException {
List<Serializable[]> result = new LinkedList<Serializable[]>();
while (inputStream.available() > 0) {
result.add(deserializeRow(tableId, includedColumns, inputStream));
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.shyiko.mysql.binlog.event.deserialization;

import java.util.function.Predicate;

/** The filter used for skipping the binlog deserialization of unsubscribed table. */
public interface TableIdFilter extends Predicate<Long> {

@Override
boolean test(Long tableId);

static TableIdFilter all() {
return tableId -> true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;

/**
* Copied from mysql-binlog-connector 0.27.2 to add a {@link TableIdFilter}.
*
* <p>Line 53-57: Add a new constructor with {@link TableIdFilter} supplied.
*
* <p>Line 71-75: Use a {@link TableIdFilter} to skip the binlog deserialization of unwanted tables.
*/
public class UpdateRowsEventDataDeserializer
extends AbstractRowsEventDataDeserializer<UpdateRowsEventData> {

private boolean mayContainExtraInformation;

/** the table id filter to skip further deserialization of unsubscribed table ids. */
private final TableIdFilter tableIdFilter;

public UpdateRowsEventDataDeserializer(Map<Long, TableMapEventData> tableMapEventByTableId) {
this(tableMapEventByTableId, TableIdFilter.all());
}

public UpdateRowsEventDataDeserializer(
Map<Long, TableMapEventData> tableMapEventByTableId, TableIdFilter tableIdFilter) {
super(tableMapEventByTableId);
this.tableIdFilter = tableIdFilter;
}

public UpdateRowsEventDataDeserializer setMayContainExtraInformation(
boolean mayContainExtraInformation) {
this.mayContainExtraInformation = mayContainExtraInformation;
return this;
}

@Override
public UpdateRowsEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
UpdateRowsEventData eventData = new UpdateRowsEventData();
eventData.setTableId(inputStream.readLong(6));

// skip further deserialization if the table id is unsubscribed
if (!tableIdFilter.test(eventData.getTableId())) {
eventData.setIncludedColumns(null);
eventData.setRows(Collections.emptyList());
return eventData;
}

inputStream.skip(2); // reserved
if (mayContainExtraInformation) {
int extraInfoLength = inputStream.readInteger(2);
inputStream.skip(extraInfoLength - 2);
}
int numberOfColumns = inputStream.readPackedInteger();
eventData.setIncludedColumnsBeforeUpdate(inputStream.readBitSet(numberOfColumns, true));
eventData.setIncludedColumns(inputStream.readBitSet(numberOfColumns, true));
eventData.setRows(deserializeRows(eventData, inputStream));
return eventData;
}

private List<Map.Entry<Serializable[], Serializable[]>> deserializeRows(
UpdateRowsEventData eventData, ByteArrayInputStream inputStream) throws IOException {
long tableId = eventData.getTableId();
BitSet includedColumnsBeforeUpdate = eventData.getIncludedColumnsBeforeUpdate(),
includedColumns = eventData.getIncludedColumns();
List<Map.Entry<Serializable[], Serializable[]>> rows =
new ArrayList<Map.Entry<Serializable[], Serializable[]>>();
while (inputStream.available() > 0) {
rows.add(
new AbstractMap.SimpleEntry<Serializable[], Serializable[]>(
deserializeRow(tableId, includedColumnsBeforeUpdate, inputStream),
deserializeRow(tableId, includedColumns, inputStream)));
}
return rows;
}
}
Loading
Loading