[WIP]#5149
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces the ability to ignore specific DDL event types in the schema store via the ignore-ddl-types configuration. It updates the binlog-filter package to support classifying and mapping additional DDL types (such as foreign keys and full-text indexes), adds configuration validation, and includes integration tests. The code review feedback suggests critical performance optimizations, specifically constructing the BinlogEvent filter once rather than recreating it for every single DDL event, updating the associated tests to match this optimized signature, and passing configuration slices directly to allow in-place normalization.
| func filterIgnoredDDLEvents(events []commonEvent.DDLEvent) []commonEvent.DDLEvent { | ||
| serverConfig := config.GetGlobalServerConfig() | ||
| ignoreDDLTypes := serverConfig.Debug.SchemaStore.IgnoreDDLTypes | ||
| if len(ignoreDDLTypes) == 0 || len(events) == 0 { | ||
| return events | ||
| } | ||
|
|
||
| filteredEvents := events[:0] | ||
| for _, event := range events { | ||
| if shouldIgnoreDDLEventByType(event, ignoreDDLTypes) { | ||
| log.Info("ignore ddl event by type", | ||
| zap.Any("type", event.GetDDLType()), | ||
| zap.Uint64("finishedTs", event.FinishedTs), | ||
| zap.String("query", event.Query)) | ||
| continue | ||
| } | ||
| filteredEvents = append(filteredEvents, event) | ||
| } | ||
| return filteredEvents | ||
| } | ||
|
|
||
| func shouldIgnoreDDLEventByType(ddlEvent commonEvent.DDLEvent, ignoreDDLTypes []bf.EventType) bool { | ||
| if len(ignoreDDLTypes) == 0 { | ||
| return false | ||
| } | ||
|
|
||
| actionType := ddlEvent.GetDDLType() | ||
| ddlType := filter.DDLToEventType(actionType) | ||
| if ddlType == bf.NullEvent { | ||
| log.Warn("schema store ignore ddl type found unsupported ddl", | ||
| zap.String("type", actionType.String()), | ||
| zap.String("query", ddlEvent.Query)) | ||
| return false | ||
| } | ||
|
|
||
| eventFilter, err := bf.NewBinlogEvent(false, []*bf.BinlogEventRule{ | ||
| { | ||
| SchemaPattern: "schema-store", | ||
| TablePattern: "ddl", | ||
| Events: append([]bf.EventType(nil), ignoreDDLTypes...), | ||
| Action: bf.Ignore, | ||
| }, | ||
| }) | ||
| if err != nil { | ||
| log.Warn("schema store ignore ddl type config is invalid", | ||
| zap.Any("ignoreDDLTypes", ignoreDDLTypes), | ||
| zap.Error(err)) | ||
| return false | ||
| } | ||
|
|
||
| ignored, err := matchIgnoreDDLType(eventFilter, ddlType, ddlEvent.Query) | ||
| if err != nil { | ||
| log.Warn("schema store ignore ddl type failed", | ||
| zap.String("type", actionType.String()), | ||
| zap.String("query", ddlEvent.Query), | ||
| zap.Error(err)) | ||
| return false | ||
| } | ||
| if ignored { | ||
| return true | ||
| } | ||
|
|
||
| if !filter.IsAlterTableDDL(actionType) { | ||
| return false | ||
| } | ||
|
|
||
| ignored, err = matchIgnoreDDLType(eventFilter, bf.AlterTable, ddlEvent.Query) | ||
| if err != nil { | ||
| log.Warn("schema store ignore alter table ddl type failed", | ||
| zap.String("type", actionType.String()), | ||
| zap.String("query", ddlEvent.Query), | ||
| zap.Error(err)) | ||
| return false | ||
| } | ||
| return ignored | ||
| } |
There was a problem hiding this comment.
Recreating the BinlogEvent filter for every single DDL event in shouldIgnoreDDLEventByType is highly inefficient and causes unnecessary memory allocations and CPU overhead.
Since the ignoreDDLTypes list is static for a given configuration, we should construct the BinlogEvent filter once in filterIgnoredDDLEvents and pass it to shouldIgnoreDDLEventByType.
func filterIgnoredDDLEvents(events []commonEvent.DDLEvent) []commonEvent.DDLEvent {
serverConfig := config.GetGlobalServerConfig()
ignoreDDLTypes := serverConfig.Debug.SchemaStore.IgnoreDDLTypes
if len(ignoreDDLTypes) == 0 || len(events) == 0 {
return events
}
eventFilter, err := bf.NewBinlogEvent(false, []*bf.BinlogEventRule{
{
SchemaPattern: "schema-store",
TablePattern: "ddl",
Events: append([]bf.EventType(nil), ignoreDDLTypes...),
Action: bf.Ignore,
},
})
if err != nil {
log.Warn("schema store ignore ddl type config is invalid",
zap.Any("ignoreDDLTypes", ignoreDDLTypes),
zap.Error(err))
return events
}
filteredEvents := events[:0]
for _, event := range events {
if shouldIgnoreDDLEventByType(event, eventFilter) {
log.Info("ignore ddl event by type",
zap.Any("type", event.GetDDLType()),
zap.Uint64("finishedTs", event.FinishedTs),
zap.String("query", event.Query))
continue
}
filteredEvents = append(filteredEvents, event)
}
return filteredEvents
}
func shouldIgnoreDDLEventByType(ddlEvent commonEvent.DDLEvent, eventFilter *bf.BinlogEvent) bool {
actionType := ddlEvent.GetDDLType()
ddlType := filter.DDLToEventType(actionType)
if ddlType == bf.NullEvent {
log.Warn("schema store ignore ddl type found unsupported ddl",
zap.String("type", actionType.String()),
zap.String("query", ddlEvent.Query))
return false
}
ignored, err := matchIgnoreDDLType(eventFilter, ddlType, ddlEvent.Query)
if err != nil {
log.Warn("schema store ignore ddl type failed",
zap.String("type", actionType.String()),
zap.String("query", ddlEvent.Query),
zap.Error(err))
return false
}
if ignored {
return true
}
if !filter.IsAlterTableDDL(actionType) {
return false
}
ignored, err = matchIgnoreDDLType(eventFilter, bf.AlterTable, ddlEvent.Query)
if err != nil {
log.Warn("schema store ignore alter table ddl type failed",
zap.String("type", actionType.String()),
zap.String("query", ddlEvent.Query),
zap.Error(err))
return false
}
return ignored
}| func TestIgnoreDDLEventByTypeCoversSupportedDDLTypes(t *testing.T) { | ||
| for _, actionType := range cdcfilter.SupportedDDLActionTypes() { | ||
| ddlEvent := commonEvent.DDLEvent{ | ||
| Type: byte(actionType), | ||
| Query: "ddl", | ||
| } | ||
| eventType := cdcfilter.DDLToEventType(actionType) | ||
| require.NotEqual(t, bf.NullEvent, eventType, actionType.String()) | ||
|
|
||
| require.True(t, | ||
| shouldIgnoreDDLEventByType(ddlEvent, []bf.EventType{eventType}), | ||
| actionType.String()) | ||
| require.True(t, | ||
| shouldIgnoreDDLEventByType(ddlEvent, []bf.EventType{bf.AllDDL}), | ||
| actionType.String()) | ||
| require.False(t, | ||
| shouldIgnoreDDLEventByType(ddlEvent, []bf.EventType{bf.NoneDDL}), | ||
| actionType.String()) | ||
|
|
||
| if cdcfilter.IsAlterTableDDL(actionType) { | ||
| require.True(t, | ||
| shouldIgnoreDDLEventByType(ddlEvent, []bf.EventType{bf.AlterTable}), | ||
| actionType.String()) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Update the test to match the updated signature of shouldIgnoreDDLEventByType which now accepts *bf.BinlogEvent instead of []bf.EventType.
func TestIgnoreDDLEventByTypeCoversSupportedDDLTypes(t *testing.T) {
for _, actionType := range cdcfilter.SupportedDDLActionTypes() {
ddlEvent := commonEvent.DDLEvent{
Type: byte(actionType),
Query: "ddl",
}
eventType := cdcfilter.DDLToEventType(actionType)
require.NotEqual(t, bf.NullEvent, eventType, actionType.String())
helper := func(types []bf.EventType) bool {
eventFilter, err := bf.NewBinlogEvent(false, []*bf.BinlogEventRule{
{
SchemaPattern: "schema-store",
TablePattern: "ddl",
Events: append([]bf.EventType(nil), types...),
Action: bf.Ignore,
},
})
require.NoError(t, err)
return shouldIgnoreDDLEventByType(ddlEvent, eventFilter)
}
require.True(t,
helper([]bf.EventType{eventType}),
actionType.String())
require.True(t,
helper([]bf.EventType{bf.AllDDL}),
actionType.String())
require.False(t,
helper([]bf.EventType{bf.NoneDDL}),
actionType.String())
if cdcfilter.IsAlterTableDDL(actionType) {
require.True(t,
helper([]bf.EventType{bf.AlterTable}),
actionType.String())
}
}
}| _, err := bf.NewBinlogEvent(false, []*bf.BinlogEventRule{ | ||
| { | ||
| SchemaPattern: "schema-store", | ||
| TablePattern: "ddl", | ||
| Events: append([]bf.EventType(nil), c.IgnoreDDLTypes...), | ||
| Action: bf.Ignore, | ||
| }, | ||
| }) | ||
| return errors.Trace(err) |
There was a problem hiding this comment.
Since ValidateAndAdjust is intended to normalize and adjust configuration values, we should pass c.IgnoreDDLTypes directly to bf.NewBinlogEvent instead of copying it. This allows bf.NewBinlogEvent to normalize the event types (e.g., converting them to lowercase) in-place within the configuration struct.
| _, err := bf.NewBinlogEvent(false, []*bf.BinlogEventRule{ | |
| { | |
| SchemaPattern: "schema-store", | |
| TablePattern: "ddl", | |
| Events: append([]bf.EventType(nil), c.IgnoreDDLTypes...), | |
| Action: bf.Ignore, | |
| }, | |
| }) | |
| return errors.Trace(err) | |
| _, err := bf.NewBinlogEvent(false, []*bf.BinlogEventRule{ | |
| { | |
| SchemaPattern: "schema-store", | |
| TablePattern: "ddl", | |
| Events: c.IgnoreDDLTypes, | |
| Action: bf.Ignore, | |
| }, | |
| }) | |
| return errors.Trace(err) |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note