Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions pkg/sql/mvcc_backfiller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgradebase"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/json"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -996,3 +997,83 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v int);`
tdb.Exec(t, "CREATE INDEX ON t.test (v)")
require.True(t, called)
}

// TestPartialIndexBackfillWithConcurrentFamilyUpdate validates that concurrent updates
// with multiple column families still propagate to secondary indexes being constructed.
func TestPartialIndexBackfillWithConcurrentFamilyUpdate(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
s serverutils.TestServerInterface
sqlDB *gosql.DB
)

var mergeReady chan struct{}
var waitForMerge chan struct{}

params := base.TestServerArgs{}
params.Knobs = base.TestingKnobs{
SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{
BackfillChunkSize: 100,
},
DistSQL: &execinfra.TestingKnobs{
IndexBackfillMergerTestingKnobs: &backfill.IndexBackfillMergerTestingKnobs{
RunBeforeScanChunk: func(startKey roachpb.Key) error {
mergeReady <- struct{}{}
<-waitForMerge
return nil
},
},
},
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
}

s, sqlDB, _ = serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.Background())

mergeReady = make(chan struct{})
waitForMerge = make(chan struct{})
runner := sqlutils.MakeSQLRunner(sqlDB)
runner.Exec(t, `
CREATE DATABASE t;
CREATE TABLE t.test (
id INT PRIMARY KEY,
val STRING,
other_val STRING,
FAMILY f1(id, val),
FAMILY f2(other_val)
);
`)
// Insert two rows that match predicate for the index we are creating below.
runner.Exec(t, `
INSERT INTO t.test VALUES
(1, 'BLAH', 'MEH'),
(2, 'BLAH', 'BLEH'),
(3, 'MEH', 'GLEE'),
(4, 'BLEH', 'GLAH');`)

// Run index creation in the background.
grp := ctxgroup.WithContext(context.Background())
grp.GoCtx(func(ctx context.Context) error {
// Create a partial index on values that match some predicate.
_, err := sqlDB.Exec("CREATE INDEX idx_test on t.test(val) WHERE val LIKE '%BLAH%'")
return err
})
// Wait for the merge step to start.
<-mergeReady
// Updates on an unrelated column should still inject updates the new partial index,
// while its mutating.
runner.Exec(t, `UPDATE t.test SET other_val = 'B' WHERE id IN (1, 2);`)
// Also, validate deletions behave correctly.
runner.Exec(t, `DELETE FROM t.test WHERE other_val='GLAH';`)
// Allow the merge to continue
close(waitForMerge)
// Wait for schema change to finish
require.NoError(t, grp.Wait())
// Confirm with a scan of the partial index that everything looks sound.
runner.CheckQueryResults(t, `SELECT id, val FROM t.test@idx_test WHERE val LIKE '%BLAH%' ORDER BY id`, [][]string{
{"1", "BLAH"},
{"2", "BLAH"},
})
}
10 changes: 8 additions & 2 deletions pkg/sql/opt/cat/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ type Index interface {
// definition, where i < PartitionCount.
Partition(i int) Partition

// IsTemporaryIndexForBackfill returns true iff the index is an index being
// used as the temporary index being used by an in-progress index backfill.
// IsTemporaryIndexForBackfill returns true if the index is a temporary index
// for an in-progress index backfill.
IsTemporaryIndexForBackfill() bool
}

Expand All @@ -232,6 +232,12 @@ func IsMutationIndex(table Table, ord IndexOrdinal) bool {
return ord >= table.IndexCount()
}

// IsTemporaryMutationIndex is a convenience function that returns true if the index at
// the given ordinal position is a mutation index and is temporary.
func IsTemporaryMutationIndex(table Table, ord IndexOrdinal) bool {
return IsMutationIndex(table, ord) && table.Index(ord).IsTemporaryIndexForBackfill()
}

// Partition is an interface to a PARTITION BY LIST partition of an index. The
// intended use is to support planning of scans or lookup joins that will use
// locality optimized search. Locality optimized search can be planned when the
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/opt/norm/mutation_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,19 @@ func (c *CustomFuncs) SimplifiablePartialIndexProjectCols(
// required. Therefore, the partial index PUT and DEL columns cannot be
// simplified.
//
// If a secondary index is being built, then there will be points where
// all values will be forced with a Put. As a result, runtime expects mutated
// temporary indexes to always have values available for writes during index construction,
// even if the mutation does not touch those columns.
//
// Note that we use the set of index columns where the virtual
// columns have been mapped to their source columns. Virtual columns
// are never part of the updated columns. Updates to source columns
// trigger index changes.
predFilters := *pred.(*memo.FiltersExpr)
indexAndPredCols := tabMeta.IndexColumnsMapInverted(i)
indexAndPredCols.UnionWith(predFilters.OuterCols())
if indexAndPredCols.Intersects(updateCols) {
if indexAndPredCols.Intersects(updateCols) || cat.IsTemporaryMutationIndex(tabMeta.Table, i) {
continue
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/opt/norm/prune_cols_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,11 @@ func (c *CustomFuncs) NeededMutationFetchCols(
predFilters := *pred.(*memo.FiltersExpr)
indexAndPredCols.UnionWith(predFilters.OuterCols())
}
if !indexAndPredCols.Intersects(updateCols) {
// If a secondary index is being built, then there will be points where
// all values will be forced with a Put. As a result, runtime expects temporary
// mutated indexes to always have values available for writes during index
// construction, even if the mutation does not touch those columns.
if !indexAndPredCols.Intersects(updateCols) && !cat.IsTemporaryMutationIndex(tabMeta.Table, i) {
continue
}

Expand Down
Loading