diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java new file mode 100644 index 000000000000..a6ded78d33fd --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexMultiColumnWriter.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.data.InternalRow; + +import javax.annotation.Nullable; + +/** Index writer for global index that accepts multiple column values per row. */ +public interface GlobalIndexMultiColumnWriter extends GlobalIndexWriter { + + /** + * Write a projected row containing all indexed columns for one record. The row layout matches + * the fields order passed to {@link GlobalIndexerFactory#create(java.util.List, + * org.apache.paimon.options.Options)}. + */ + void write(@Nullable InternalRow row); +} diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java index 33ea405a8f9f..6f857a2186ff 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexer.java @@ -38,4 +38,9 @@ static GlobalIndexer create(String type, DataField dataField, Options options) { GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type); return globalIndexerFactory.create(dataField, options); } + + static GlobalIndexer create(String type, List fields, Options options) { + GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(type); + return globalIndexerFactory.create(fields, options); + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java index 6eabb6d25360..cef643fa463f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexerFactory.java @@ -22,10 +22,22 @@ import org.apache.paimon.options.Options; import org.apache.paimon.types.DataField; +import java.util.List; + /** File index factory to construct {@link FileIndexer}. */ public interface GlobalIndexerFactory { String identifier(); GlobalIndexer create(DataField dataField, Options options); + + default GlobalIndexer create(List fields, Options options) { + if (fields.size() > 1) { + throw new UnsupportedOperationException( + String.format( + "Index type '%s' does not support multi-column index, got columns: %s", + identifier(), fields)); + } + return create(fields.get(0), options); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java index 085423efa851..497d50ece6e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java @@ -24,18 +24,31 @@ import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.utils.Range; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** Utils for global index build. */ public class GlobalIndexBuilderUtils { + private static final Logger LOG = LoggerFactory.getLogger(GlobalIndexBuilderUtils.class); + + public static final int MULTI_COLUMN_INDEX_FIELD_ID = -1; + public static List toIndexFileMetas( FileIO fileIO, IndexPathFactory indexPathFactory, @@ -45,12 +58,56 @@ public static List toIndexFileMetas( String indexType, List entries) throws IOException { + return toIndexFileMetas( + fileIO, indexPathFactory, options, range, indexFieldId, null, indexType, entries); + } + + public static List toIndexFileMetas( + FileIO fileIO, + IndexPathFactory indexPathFactory, + CoreOptions options, + Range range, + List fields, + String indexType, + List entries) + throws IOException { + int indexFieldId; + int[] extraFieldIds; + if (fields.size() > 1) { + indexFieldId = MULTI_COLUMN_INDEX_FIELD_ID; + extraFieldIds = fields.stream().mapToInt(DataField::id).toArray(); + } else { + indexFieldId = fields.get(0).id(); + extraFieldIds = null; + } + return toIndexFileMetas( + fileIO, + indexPathFactory, + options, + range, + indexFieldId, + extraFieldIds, + indexType, + entries); + } + + private static List toIndexFileMetas( + FileIO fileIO, + IndexPathFactory indexPathFactory, + CoreOptions options, + Range range, + int indexFieldId, + @Nullable int[] extraFieldIds, + String indexType, + List entries) + throws IOException { List results = new ArrayList<>(); for (ResultEntry entry : entries) { String fileName = entry.fileName(); long fileSize = fileIO.getFileSize(indexPathFactory.toPath(fileName)); GlobalIndexMeta globalIndexMeta = - new GlobalIndexMeta(range.from, range.to, indexFieldId, null, entry.meta()); + new GlobalIndexMeta( + range.from, range.to, indexFieldId, extraFieldIds, entry.meta()); Path externalPathDir = options.globalIndexExternalPath(); String externalPathString = null; @@ -78,6 +135,77 @@ public static GlobalIndexWriter createIndexWriter( return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); } + public static GlobalIndexWriter createIndexWriter( + FileStoreTable table, String indexType, List fields, Options options) + throws IOException { + GlobalIndexer globalIndexer = GlobalIndexer.create(indexType, fields, options); + return globalIndexer.createWriter(createGlobalIndexFileReadWrite(table)); + } + + /** + * Find the minimum firstRowId among files whose schema does not contain all index columns. + * Files at or beyond this rowId cannot be indexed because the column was added later via ALTER + * TABLE. + * + * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the columns + */ + public static long findMinNonIndexableRowId( + SchemaManager schemaManager, List entries, List indexColumns) { + Map schemaContainsColumns = new HashMap<>(); + long minRowId = Long.MAX_VALUE; + long minSchemaId = -1; + for (ManifestEntry entry : entries) { + long sid = entry.file().schemaId(); + boolean contains = + schemaContainsColumns.computeIfAbsent( + sid, + id -> schemaManager.schema(id).fieldNames().containsAll(indexColumns)); + if (!contains && entry.file().firstRowId() != null) { + long rowId = entry.file().nonNullFirstRowId(); + if (rowId < minRowId) { + minRowId = rowId; + minSchemaId = sid; + } + } + } + if (minRowId != Long.MAX_VALUE) { + List schemaFields = schemaManager.schema(minSchemaId).fieldNames(); + List missingColumns = new ArrayList<>(); + for (String col : indexColumns) { + if (!schemaFields.contains(col)) { + missingColumns.add(col); + } + } + LOG.info( + "Found non-indexable files: schemaId={} missing columns {}, boundaryRowId={}.", + minSchemaId, + missingColumns, + minRowId); + } + return minRowId; + } + + /** Keep only entries whose firstRowId is strictly less than the given boundary. */ + public static List filterEntriesBefore( + List entries, long boundaryRowId) { + if (boundaryRowId == Long.MAX_VALUE) { + return entries; + } + List result = new ArrayList<>(); + for (ManifestEntry entry : entries) { + if (entry.file().firstRowId() != null + && entry.file().nonNullFirstRowId() < boundaryRowId) { + result.add(entry); + } + } + LOG.info( + "Filtered {} files to {} indexable files (boundaryRowId={}).", + entries.size(), + result.size(), + boundaryRowId); + return result; + } + private static GlobalIndexFileReadWrite createGlobalIndexFileReadWrite(FileStoreTable table) { IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); return new GlobalIndexFileReadWrite(table.fileIO(), indexPathFactory); diff --git a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java index 3e591380191a..a0e9b5dfd73a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java @@ -50,8 +50,10 @@ import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID; import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames; import static org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest; +import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; @@ -81,21 +83,28 @@ public GlobalIndexScanner( GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); int fieldId = meta.indexFieldId(); String indexType = indexFile.indexType(); - indexMetas - .computeIfAbsent(fieldId, k -> new HashMap<>()) - .computeIfAbsent(indexType, k -> new HashMap<>()) - .computeIfAbsent( - new Range(meta.rowRangeStart(), meta.rowRangeEnd()), - k -> new ArrayList<>()) - .add(indexFile); + Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); + if (fieldId != MULTI_COLUMN_INDEX_FIELD_ID) { + indexMetas + .computeIfAbsent(fieldId, k -> new HashMap<>()) + .computeIfAbsent(indexType, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(indexFile); + } + + if (meta.extraFieldIds() != null) { + for (int extraId : meta.extraFieldIds()) { + indexMetas + .computeIfAbsent(extraId, k -> new HashMap<>()) + .computeIfAbsent(indexType, k -> new HashMap<>()) + .computeIfAbsent(range, k -> new ArrayList<>()) + .add(indexFile); + } + } } IntFunction> readersFunction = - fieldId -> - createReaders( - indexFileReader, - indexMetas.get(fieldId), - rowType.getField(fieldId)); + fieldId -> createReaders(indexFileReader, indexMetas.get(fieldId), rowType); this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, readersFunction, executor); } @@ -129,7 +138,17 @@ public static Optional create( if (globalIndex == null) { return false; } - return filterFieldIds.contains(globalIndex.indexFieldId()); + if (filterFieldIds.contains(globalIndex.indexFieldId())) { + return true; + } + if (globalIndex.extraFieldIds() != null) { + for (int id : globalIndex.extraFieldIds()) { + if (filterFieldIds.contains(id)) { + return true; + } + } + } + return false; }; List indexFiles = @@ -147,7 +166,7 @@ public Optional scan(Predicate predicate) { private Collection createReaders( GlobalIndexFileReader indexFileReadWrite, Map>> indexMetas, - DataField dataField) { + RowType rowType) { if (indexMetas == null) { return Collections.emptyList(); } @@ -159,7 +178,8 @@ private Collection createReaders( Map> metas = entry.getValue(); GlobalIndexerFactory globalIndexerFactory = GlobalIndexerFactoryUtils.load(indexType); - GlobalIndexer globalIndexer = globalIndexerFactory.create(dataField, options); + List fields = resolveFields(metas, rowType); + GlobalIndexer globalIndexer = globalIndexerFactory.create(fields, options); List unionReader = new ArrayList<>(); for (Map.Entry> rangeMetas : metas.entrySet()) { @@ -187,6 +207,46 @@ private Collection createReaders( return readers; } + private List resolveFields(Map> metas, RowType rowType) { + GlobalIndexMeta firstMeta = + checkNotNull(metas.values().iterator().next().get(0).globalIndexMeta()); + int indexFieldId = firstMeta.indexFieldId(); + + if (indexFieldId == MULTI_COLUMN_INDEX_FIELD_ID) { + int[] expectedExtraIds = + checkNotNull( + firstMeta.extraFieldIds(), + "Multi-column index must have extraFieldIds."); + for (List rangeFiles : metas.values()) { + for (IndexFileMeta fileMeta : rangeFiles) { + GlobalIndexMeta meta = checkNotNull(fileMeta.globalIndexMeta()); + checkArgument( + meta.indexFieldId() == MULTI_COLUMN_INDEX_FIELD_ID, + "Inconsistent indexFieldId across range groups: expected %s but found %s.", + MULTI_COLUMN_INDEX_FIELD_ID, + meta.indexFieldId()); + checkArgument( + java.util.Arrays.equals(meta.extraFieldIds(), expectedExtraIds), + "Inconsistent extraFieldIds across range groups."); + } + } + List fields = new ArrayList<>(); + for (int id : expectedExtraIds) { + fields.add(rowType.getField(id)); + } + return fields; + } + + List fields = new ArrayList<>(); + fields.add(rowType.getField(indexFieldId)); + if (firstMeta.extraFieldIds() != null) { + for (int id : firstMeta.extraFieldIds()) { + fields.add(rowType.getField(id)); + } + } + return fields; + } + private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) { GlobalIndexMeta globalIndex = meta.globalIndexMeta(); checkNotNull(globalIndex); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java index f58f8f26ead4..7e2542bddd30 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextReadImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.GlobalIndexIOMeta; import org.apache.paimon.globalindex.GlobalIndexReader; import org.apache.paimon.globalindex.GlobalIndexResult; @@ -33,6 +34,7 @@ import org.apache.paimon.predicate.FullTextSearch; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; import java.io.IOException; import java.util.ArrayList; @@ -68,10 +70,24 @@ public GlobalIndexResult read(List splits) { Integer threadNum = table.coreOptions().globalIndexThreadNum(); - String indexType = splits.get(0).fullTextIndexFiles().get(0).indexType(); - GlobalIndexer globalIndexer = - GlobalIndexerFactoryUtils.load(indexType) - .create(textColumn, table.coreOptions().toConfiguration()); + IndexFileMeta firstFile = splits.get(0).fullTextIndexFiles().get(0); + String indexType = firstFile.indexType(); + GlobalIndexMeta firstMeta = checkNotNull(firstFile.globalIndexMeta()); + GlobalIndexer globalIndexer; + if (firstMeta.indexFieldId() == GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID) { + RowType rowType = table.rowType(); + List fields = new ArrayList<>(); + for (int id : firstMeta.extraFieldIds()) { + fields.add(rowType.getField(id)); + } + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(fields, table.coreOptions().toConfiguration()); + } else { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(textColumn, table.coreOptions().toConfiguration()); + } IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); Iterator> resultIterators = randomlyExecuteSequentialReturn( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java index cc77d9121ad5..6230b31336a3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/FullTextScanImpl.java @@ -61,7 +61,17 @@ public Plan scan() { if (globalIndex == null) { return false; } - return textColumn.id() == globalIndex.indexFieldId(); + if (textColumn.id() == globalIndex.indexFieldId()) { + return true; + } + if (globalIndex.extraFieldIds() != null) { + for (int id : globalIndex.extraFieldIds()) { + if (textColumn.id() == id) { + return true; + } + } + } + return false; }; List allIndexFiles = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java index 6971bb908409..488bf6688612 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorReadImpl.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.source; import org.apache.paimon.fs.FileIO; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.GlobalIndexIOMeta; import org.apache.paimon.globalindex.GlobalIndexReader; import org.apache.paimon.globalindex.GlobalIndexResult; @@ -35,6 +36,7 @@ import org.apache.paimon.predicate.VectorSearch; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.RoaringNavigableMap64; import javax.annotation.Nullable; @@ -83,10 +85,24 @@ public GlobalIndexResult read(List splits) { RoaringNavigableMap64 preFilter = preFilter(splits).orElse(null); Integer threadNum = table.coreOptions().globalIndexThreadNum(); - String indexType = splits.get(0).vectorIndexFiles().get(0).indexType(); - GlobalIndexer globalIndexer = - GlobalIndexerFactoryUtils.load(indexType) - .create(vectorColumn, table.coreOptions().toConfiguration()); + IndexFileMeta firstFile = splits.get(0).vectorIndexFiles().get(0); + String indexType = firstFile.indexType(); + GlobalIndexMeta firstMeta = checkNotNull(firstFile.globalIndexMeta()); + GlobalIndexer globalIndexer; + if (firstMeta.indexFieldId() == GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID) { + RowType rowType = table.rowType(); + List fields = new ArrayList<>(); + for (int id : firstMeta.extraFieldIds()) { + fields.add(rowType.getField(id)); + } + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(fields, table.coreOptions().toConfiguration()); + } else { + globalIndexer = + GlobalIndexerFactoryUtils.load(indexType) + .create(vectorColumn, table.coreOptions().toConfiguration()); + } IndexPathFactory indexPathFactory = table.store().pathFactory().globalIndexFileFactory(); Iterator> resultIterators = randomlyExecuteSequentialReturn( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java index d3db6dd13d37..1ff3f82852f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/VectorScanImpl.java @@ -82,7 +82,17 @@ public Plan scan() { return false; } int fieldId = globalIndex.indexFieldId(); - return vectorColumn.id() == fieldId || filterFieldIds.contains(fieldId); + if (vectorColumn.id() == fieldId || filterFieldIds.contains(fieldId)) { + return true; + } + if (globalIndex.extraFieldIds() != null) { + for (int id : globalIndex.extraFieldIds()) { + if (vectorColumn.id() == id || filterFieldIds.contains(id)) { + return true; + } + } + } + return false; }; List allIndexFiles = @@ -94,7 +104,7 @@ public Plan scan() { Map> vectorByRange = new HashMap<>(); for (IndexFileMeta indexFile : allIndexFiles) { GlobalIndexMeta meta = checkNotNull(indexFile.globalIndexMeta()); - if (meta.indexFieldId() == vectorColumn.id()) { + if (containsField(meta, vectorColumn.id())) { Range range = new Range(meta.rowRangeStart(), meta.rowRangeEnd()); vectorByRange.computeIfAbsent(range, k -> new ArrayList<>()).add(indexFile); } @@ -111,7 +121,7 @@ public Plan scan() { f -> { GlobalIndexMeta globalIndex = checkNotNull(f.globalIndexMeta()); - if (globalIndex.indexFieldId() == vectorColumn.id()) { + if (containsField(globalIndex, vectorColumn.id())) { return false; } return range.hasIntersection(globalIndex.rowRange()); @@ -122,4 +132,18 @@ public Plan scan() { return () -> splits; } + + private static boolean containsField(GlobalIndexMeta meta, int fieldId) { + if (meta.indexFieldId() == fieldId) { + return true; + } + if (meta.extraFieldIds() != null) { + for (int id : meta.extraFieldIds()) { + if (id == fieldId) { + return true; + } + } + } + return false; + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java new file mode 100644 index 000000000000..703c01c69633 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtilsTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.globalindex; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.index.IndexPathFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.VarCharType; +import org.apache.paimon.utils.Range; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link GlobalIndexBuilderUtils}. */ +class GlobalIndexBuilderUtilsTest { + + @TempDir java.nio.file.Path tempDir; + + private FileIO fileIO; + private IndexPathFactory indexPathFactory; + private CoreOptions coreOptions; + + @BeforeEach + void setUp() { + fileIO = new LocalFileIO(); + Path dir = new Path(tempDir.toString()); + indexPathFactory = + new IndexPathFactory() { + @Override + public Path toPath(String fileName) { + return new Path(dir, fileName); + } + + @Override + public Path newPath() { + return new Path(dir, UUID.randomUUID().toString()); + } + + @Override + public boolean isExternalPath() { + return false; + } + }; + coreOptions = new CoreOptions(new Options().toMap()); + } + + // Test: 2 columns (title + vec), indexFieldId=-1, all field ids stored in extraFieldIds + @Test + void testToIndexFileMetasMultiColumn() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + DataField vecField = new DataField(2, "vec", new ArrayType(new FloatType())); + List fields = Arrays.asList(titleField, vecField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 99); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(-1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isEqualTo(new int[] {1, 2}); + assertThat(metas.get(0).globalIndexMeta().rowRangeStart()).isEqualTo(0); + assertThat(metas.get(0).globalIndexMeta().rowRangeEnd()).isEqualTo(99); + } + + // Test: single column, extraFieldIds should be null (backward compatible with single-column + // path) + @Test + void testToIndexFileMetasSingleColumn() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + List fields = Collections.singletonList(titleField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 49); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isNull(); + } + + // Test: 3 columns (title + vec + id), indexFieldId=-1, all field ids in extraFieldIds + @Test + void testToIndexFileMetasThreeColumns() throws IOException { + DataField titleField = new DataField(1, "title", new VarCharType(Integer.MAX_VALUE)); + DataField vecField = new DataField(2, "vec", new ArrayType(new FloatType())); + DataField idField = new DataField(3, "id", new IntType()); + List fields = Arrays.asList(titleField, vecField, idField); + + List entries = createDummyResultEntries(); + Range range = new Range(0, 199); + + List metas = + GlobalIndexBuilderUtils.toIndexFileMetas( + fileIO, indexPathFactory, coreOptions, range, fields, "test-type", entries); + + assertThat(metas).hasSize(1); + assertThat(metas.get(0).globalIndexMeta().indexFieldId()).isEqualTo(-1); + assertThat(metas.get(0).globalIndexMeta().extraFieldIds()).isEqualTo(new int[] {1, 2, 3}); + } + + private List createDummyResultEntries() throws IOException { + String fileName = "test-index-" + UUID.randomUUID(); + Path filePath = indexPathFactory.toPath(fileName); + fileIO.newOutputStream(filePath, false).close(); + return Collections.singletonList(new ResultEntry(fileName, 100, null)); + } +} diff --git a/paimon-eslib/pom.xml b/paimon-eslib/pom.xml new file mode 100644 index 000000000000..98d92c2ee5b8 --- /dev/null +++ b/paimon-eslib/pom.xml @@ -0,0 +1,106 @@ + + + + 4.0.0 + + + paimon-parent + org.apache.paimon + 1.5-SNAPSHOT + + + paimon-eslib + Paimon : ESLib Index + + + + org.apache.paimon + paimon-common + ${project.version} + provided + + + + + io.github.crownchu + eslib-api + 1.0.0 + + + io.github.crownchu + eslib-core + 1.0.0 + lucene912 + + + + + + org.apache.lucene + lucene-core + 9.12.0 + + + org.apache.lucene + lucene-analysis-common + 9.12.0 + + + + org.slf4j + slf4j-api + 1.7.36 + provided + + + + + org.apache.paimon + paimon-common + ${project.version} + test-jar + test + + + org.junit.jupiter + junit-jupiter + test + + + + org.apache.paimon + paimon-test-utils + ${project.version} + test + + + + + + eslib-releases + https://raw.githubusercontent.com/CrownChu/es-paimon-lib-releases/main/repository + + + diff --git a/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexBuilderFactory.java b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexBuilderFactory.java new file mode 100644 index 000000000000..6104da5df9b0 --- /dev/null +++ b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexBuilderFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.elasticsearch.eslib.api.ESIndexBuilder; +import org.elasticsearch.eslib.api.ESIndexSearcher; +import org.elasticsearch.eslib.api.model.FieldIndexConfig; +import org.elasticsearch.eslib.builder.DefaultESIndexBuilder; +import org.elasticsearch.eslib.searcher.DefaultESIndexSearcher; + +import java.io.IOException; +import java.util.Map; + +/** Factory to create ESIndexBuilder/ESIndexSearcher from eslib-core. */ +public class ESIndexBuilderFactory { + + public static ESIndexBuilder create(Map fieldConfigs) + throws IOException { + return new DefaultESIndexBuilder(fieldConfigs); + } + + public static ESIndexSearcher createSearcher() { + return new DefaultESIndexSearcher(); + } +} diff --git a/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexReader.java b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexReader.java new file mode 100644 index 000000000000..78179ae171f5 --- /dev/null +++ b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexReader.java @@ -0,0 +1,381 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.elasticsearch.eslib.api.ArchiveDataProvider; +import org.elasticsearch.eslib.api.ESIndexSearcher; +import org.elasticsearch.eslib.api.model.FieldIndexConfig; +import org.elasticsearch.eslib.api.model.IndexFilter; +import org.elasticsearch.eslib.api.model.ScalarPredicate; +import org.elasticsearch.eslib.api.model.SearchResult; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +/** + * ES multi-index reader using ESLib. Supports vector search, full-text search, and scalar filtering + * via Lucene-based indexes stored in archive format. + */ +public class ESIndexGlobalIndexReader implements GlobalIndexReader { + + private final GlobalIndexFileReader fileReader; + private final List files; + private final List fields; + private final ESIndexOptions indexOptions; + private final ExecutorService searchExecutor; + + private final List allStreams = new ArrayList<>(); + private volatile ESIndexSearcher searcher; + private volatile boolean closed; + private volatile boolean loaded; + + public ESIndexGlobalIndexReader( + GlobalIndexFileReader fileReader, + List files, + List fields, + ESIndexOptions indexOptions) { + this(fileReader, files, fields, indexOptions, null); + } + + public ESIndexGlobalIndexReader( + GlobalIndexFileReader fileReader, + List files, + List fields, + ESIndexOptions indexOptions, + ExecutorService searchExecutor) { + this.fileReader = fileReader; + this.files = files; + this.fields = fields; + this.indexOptions = indexOptions; + this.searchExecutor = searchExecutor; + this.loaded = false; + this.closed = false; + } + + @Override + public Optional visitVectorSearch(VectorSearch vectorSearch) { + try { + ensureLoaded(); + float[] queryVector = vectorSearch.vector(); + int topK = vectorSearch.limit(); + String fieldName = vectorSearch.fieldName(); + + long[] candidateIds = null; + RoaringNavigableMap64 includeRowIds = vectorSearch.includeRowIds(); + if (includeRowIds != null) { + candidateIds = toArray(includeRowIds); + } + + SearchResult result = searcher.vectorSearch(fieldName, queryVector, topK, candidateIds); + return toScoredResult(result); + } catch (IOException e) { + throw new RuntimeException("Vector search failed", e); + } + } + + @Override + public Optional visitFullTextSearch(FullTextSearch fullTextSearch) { + try { + ensureLoaded(); + String fieldName = fullTextSearch.fieldName(); + String queryText = fullTextSearch.queryText(); + int topK = fullTextSearch.limit(); + + SearchResult result = searcher.fullTextSearch(fieldName, queryText, topK); + return toScoredResult(result); + } catch (IOException e) { + throw new RuntimeException("Full-text search failed", e); + } + } + + private Optional toScoredResult(SearchResult result) { + if (result == null || result.count == 0) { + return Optional.empty(); + } + + RoaringNavigableMap64 bitmap = new RoaringNavigableMap64(); + Map scoreMap = new HashMap<>(result.count); + for (int i = 0; i < result.count; i++) { + long id = result.ids[i]; + bitmap.add(id); + scoreMap.put(id, result.scores[i]); + } + + return Optional.of(ScoredGlobalIndexResult.create(() -> bitmap, scoreMap::get)); + } + + private void checkNotClosed() throws IOException { + if (closed) { + throw new IOException("Reader already closed"); + } + } + + private void ensureLoaded() throws IOException { + checkNotClosed(); + if (loaded) { + return; + } + + if (files.isEmpty()) { + throw new IOException("No index files to load"); + } + + GlobalIndexIOMeta meta = files.get(0); + byte[] metaBytes = meta.metadata(); + Map fileOffsets = parseFileOffsets(metaBytes); + + ArchiveDataProvider dataProvider = createProvider(meta); + + searcher = ESIndexBuilderFactory.createSearcher(); + searcher.load(dataProvider, fileOffsets, indexOptions.getFieldConfigs(), searchExecutor); + loaded = true; + } + + private ArchiveDataProvider createProvider(GlobalIndexIOMeta meta) throws IOException { + SeekableInputStream inputStream = fileReader.getInputStream(meta); + synchronized (allStreams) { + allStreams.add(inputStream); + } + return new ArchiveDataProvider() { + @Override + public byte[] readRange(long offset, int length) throws IOException { + byte[] buf = new byte[length]; + inputStream.seek(offset); + int read = 0; + while (read < length) { + int n = inputStream.read(buf, read, length - read); + if (n < 0) { + throw new IOException("Unexpected EOF at offset " + (offset + read)); + } + read += n; + } + return buf; + } + + @Override + public ArchiveDataProvider fork() throws IOException { + return createProvider(meta); + } + + @Override + public void close() throws IOException { + inputStream.close(); + } + }; + } + + /** + * Parse file offset metadata (big-endian). Format: [4-byte count] then per file: [4-byte name + * length][name bytes][8-byte offset][8-byte length] + */ + private Map parseFileOffsets(byte[] metaBytes) throws IOException { + Map offsets = new LinkedHashMap<>(); + if (metaBytes == null || metaBytes.length == 0) { + return offsets; + } + + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(metaBytes)); + int fileCount = dis.readInt(); + for (int i = 0; i < fileCount; i++) { + int nameLen = dis.readInt(); + byte[] nameBytes = new byte[nameLen]; + dis.readFully(nameBytes); + String fileName = new String(nameBytes, StandardCharsets.UTF_8); + long offset = dis.readLong(); + long length = dis.readLong(); + offsets.put(fileName, new long[] {offset, length}); + } + return offsets; + } + + private static long[] toArray(RoaringNavigableMap64 bitmap) { + long[] arr = new long[(int) bitmap.getIntCardinality()]; + int i = 0; + for (long id : bitmap) { + arr[i++] = id; + } + return arr; + } + + @Override + public void close() throws IOException { + closed = true; + if (searcher != null) { + searcher.close(); + searcher = null; + } + synchronized (allStreams) { + for (SeekableInputStream stream : allStreams) { + try { + stream.close(); + } catch (IOException ignored) { + } + } + allStreams.clear(); + } + loaded = false; + } + + // =================== unified filter dispatch ===================== + + private Optional executeFilter(String fieldName, IndexFilter filter) { + try { + ensureLoaded(); + long[] ids = searcher.filter(fieldName, filter); + if (ids == null || ids.length == 0) { + return Optional.empty(); + } + RoaringNavigableMap64 bitmap = new RoaringNavigableMap64(); + for (long id : ids) { + bitmap.add(id); + } + return Optional.of(GlobalIndexResult.create(() -> bitmap)); + } catch (IOException e) { + throw new RuntimeException("Filter failed on field: " + fieldName, e); + } + } + + private Optional dispatchFilter(FieldRef fieldRef, IndexFilter filter) { + FieldIndexConfig config = indexOptions.getConfig(fieldRef.name()); + if (config == null) { + return Optional.empty(); + } + return executeFilter(fieldRef.name(), filter); + } + + // =================== scalar / keyword comparison visitors ===================== + + @Override + public Optional visitEqual(FieldRef fieldRef, Object literal) { + FieldIndexConfig config = indexOptions.getConfig(fieldRef.name()); + if (config == null) { + return Optional.empty(); + } + FieldIndexConfig.IndexType type = config.indexType(); + if (type == FieldIndexConfig.IndexType.KEYWORD + || type == FieldIndexConfig.IndexType.FULLTEXT) { + return dispatchFilter( + fieldRef, IndexFilter.text(IndexFilter.TextFilter.TextOp.TERM, str(literal))); + } + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.eq(literal))); + } + + @Override + public Optional visitNotEqual(FieldRef fieldRef, Object literal) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.neq(literal))); + } + + @Override + public Optional visitLessThan(FieldRef fieldRef, Object literal) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.lt(literal))); + } + + @Override + public Optional visitLessOrEqual(FieldRef fieldRef, Object literal) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.lte(literal))); + } + + @Override + public Optional visitGreaterThan(FieldRef fieldRef, Object literal) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.gt(literal))); + } + + @Override + public Optional visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.gte(literal))); + } + + @Override + public Optional visitIn(FieldRef fieldRef, List literals) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.in(literals))); + } + + @Override + public Optional visitNotIn(FieldRef fieldRef, List literals) { + return dispatchFilter(fieldRef, IndexFilter.scalar(ScalarPredicate.notIn(literals))); + } + + // =================== text pattern visitors (keyword / fulltext) ===================== + + @Override + public Optional visitStartsWith(FieldRef fieldRef, Object literal) { + return dispatchFilter( + fieldRef, IndexFilter.text(IndexFilter.TextFilter.TextOp.PREFIX, str(literal))); + } + + @Override + public Optional visitEndsWith(FieldRef fieldRef, Object literal) { + return dispatchFilter( + fieldRef, + IndexFilter.text(IndexFilter.TextFilter.TextOp.WILDCARD, "*" + str(literal))); + } + + @Override + public Optional visitContains(FieldRef fieldRef, Object literal) { + return dispatchFilter( + fieldRef, + IndexFilter.text(IndexFilter.TextFilter.TextOp.WILDCARD, "*" + str(literal) + "*")); + } + + @Override + public Optional visitLike(FieldRef fieldRef, Object literal) { + String pattern = str(literal).replace('%', '*').replace('_', '?'); + return dispatchFilter( + fieldRef, IndexFilter.text(IndexFilter.TextFilter.TextOp.WILDCARD, pattern)); + } + + // =================== null checks ===================== + + @Override + public Optional visitIsNotNull(FieldRef fieldRef) { + return dispatchFilter(fieldRef, IndexFilter.exists()); + } + + @Override + public Optional visitIsNull(FieldRef fieldRef) { + return dispatchFilter(fieldRef, IndexFilter.notExists()); + } + + // =================== helpers ===================== + + private static String str(Object literal) { + return literal == null ? "" : literal.toString(); + } +} diff --git a/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexWriter.java b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexWriter.java new file mode 100644 index 000000000000..b1ffda274669 --- /dev/null +++ b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexWriter.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.apache.paimon.data.InternalArray; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VectorType; + +import org.elasticsearch.eslib.api.ESIndexBuilder; +import org.elasticsearch.eslib.api.model.FieldIndexConfig; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; +import java.util.List; + +/** + * Multi-column writer that builds ESLib (Lucene-based) indexes. Accepts InternalRow with all + * indexed fields and produces a single archive file. + */ +public class ESIndexGlobalIndexWriter implements GlobalIndexMultiColumnWriter { + + private static final String FILE_NAME_PREFIX = "es-index"; + + private final GlobalIndexFileWriter fileWriter; + private final List fields; + private final ESIndexOptions indexOptions; + private final ESIndexBuilder builder; + private long docCount; + + public ESIndexGlobalIndexWriter( + GlobalIndexFileWriter fileWriter, List fields, ESIndexOptions indexOptions) + throws IOException { + this.fileWriter = fileWriter; + this.fields = fields; + this.indexOptions = indexOptions; + this.builder = ESIndexBuilderFactory.create(indexOptions.getFieldConfigs()); + this.docCount = 0; + } + + @Override + public void write(InternalRow row) { + if (row == null) { + return; + } + try { + long docId = docCount++; + for (int i = 0; i < fields.size(); i++) { + if (row.isNullAt(i)) { + continue; + } + DataField field = fields.get(i); + FieldIndexConfig config = indexOptions.getConfig(field.name()); + if (config == null) { + continue; + } + writeField(row, i, field, config, docId); + } + } catch (IOException e) { + throw new RuntimeException("Failed to write document to ES index", e); + } + } + + private void writeField( + InternalRow row, int pos, DataField field, FieldIndexConfig config, long docId) + throws IOException { + switch (config.indexType()) { + case VECTOR: + float[] vector = extractVector(row, pos, field.type()); + if (vector != null) { + builder.addVector(field.name(), docId, vector); + } + break; + case FULLTEXT: + String text = row.getString(pos).toString(); + builder.addTextField(field.name(), docId, text); + break; + case KEYWORD: + String keyword = row.getString(pos).toString(); + builder.addScalarField( + field.name(), + docId, + keyword, + org.elasticsearch.eslib.api.model.ScalarFieldType.KEYWORD); + break; + case SCALAR: + case DATE: + Object value = extractScalar(row, pos, field.type()); + if (value != null) { + builder.addScalarField(field.name(), docId, value, config.scalarType()); + } + break; + default: + break; + } + } + + private float[] extractVector(InternalRow row, int pos, DataType type) { + if (type instanceof VectorType) { + InternalVector vec = row.getVector(pos); + return vec.toFloatArray(); + } + InternalArray array = row.getArray(pos); + return array.toFloatArray(); + } + + private Object extractScalar(InternalRow row, int pos, DataType type) { + switch (type.getTypeRoot()) { + case INTEGER: + case SMALLINT: + case TINYINT: + return row.getInt(pos); + case BIGINT: + return row.getLong(pos); + case FLOAT: + return row.getFloat(pos); + case DOUBLE: + return row.getDouble(pos); + case CHAR: + case VARCHAR: + return row.getString(pos).toString(); + default: + return row.getString(pos).toString(); + } + } + + @Override + public List finish() { + if (docCount == 0) { + return Collections.emptyList(); + } + try { + builder.build(); + Path outputDir = builder.getOutputDir(); + + // Snapshot the file list ONCE so the archive layout and the offset table in meta agree + // exactly (listFiles() order must not differ between packing and meta building). + java.io.File[] segFiles = outputDir.toFile().listFiles(); + if (segFiles == null) { + segFiles = new java.io.File[0]; + } + + byte[] meta = buildMeta(segFiles); + + String fileName = fileWriter.newFileName(FILE_NAME_PREFIX); + byte[] archiveBytes = packDirectory(segFiles); + + try (PositionOutputStream out = fileWriter.newOutputStream(fileName)) { + out.write(archiveBytes); + out.flush(); + } + + builder.close(); + deleteDirectory(outputDir); + + return Collections.singletonList(new ResultEntry(fileName, docCount, meta)); + } catch (IOException e) { + throw new RuntimeException("Failed to finish ES index build", e); + } + } + + /** + * Pack the given files into a single archive (big-endian). Layout is interleaved per file: + * [4-byte file count] then for each file: [4-byte name len][name bytes][8-byte data len][data + * bytes]. {@link #buildMeta} computes offsets against this exact layout. + */ + private byte[] packDirectory(java.io.File[] segFiles) throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + dos.writeInt(segFiles.length); + for (java.io.File file : segFiles) { + byte[] nameBytes = file.getName().getBytes(StandardCharsets.UTF_8); + byte[] data = Files.readAllBytes(file.toPath()); + dos.writeInt(nameBytes.length); + dos.write(nameBytes); + dos.writeLong(data.length); + dos.write(data); + } + dos.flush(); + return baos.toByteArray(); + } + + /** + * Build metadata stored in ResultEntry. Encodes the file offset table (big-endian) so the + * reader can provide an ArchiveDataProvider without re-parsing the archive. Format: [4-byte + * file count] then for each file: [4-byte name len][name bytes][8-byte offset][8-byte length]. + * + *

The offsets MUST match {@link #packDirectory}'s interleaved layout: each file's data + * begins right after its own [nameLen][name][dataLen] header. + */ + private byte[] buildMeta(java.io.File[] segFiles) throws IOException { + if (segFiles.length == 0) { + return null; + } + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + dos.writeInt(segFiles.length); + + long offset = 4; // past the [4-byte file count] + for (java.io.File file : segFiles) { + byte[] nameBytes = file.getName().getBytes(StandardCharsets.UTF_8); + long fileLen = file.length(); + long dataOffset = offset + 4 + nameBytes.length + 8; // after this file's header + dos.writeInt(nameBytes.length); + dos.write(nameBytes); + dos.writeLong(dataOffset); + dos.writeLong(fileLen); + offset = dataOffset + fileLen; // next file's header starts here + } + dos.flush(); + return baos.toByteArray(); + } + + private static void deleteDirectory(Path dir) { + java.io.File[] segFiles = dir.toFile().listFiles(); + if (segFiles != null) { + for (java.io.File file : segFiles) { + file.delete(); + } + } + dir.toFile().delete(); + } +} diff --git a/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexer.java b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexer.java new file mode 100644 index 000000000000..e9d443e476ee --- /dev/null +++ b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexReader; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * ES multi-index global indexer using ESLib. Builds Lucene-based indexes supporting vector, + * fulltext, and scalar fields. + */ +public class ESIndexGlobalIndexer implements GlobalIndexer { + + private final List fields; + private final ESIndexOptions indexOptions; + private final ExecutorService searchExecutor; + + public ESIndexGlobalIndexer(List fields, Options options) { + this(fields, options, null); + } + + public ESIndexGlobalIndexer( + List fields, Options options, ExecutorService searchExecutor) { + this.fields = fields; + this.indexOptions = new ESIndexOptions(fields, options); + this.searchExecutor = searchExecutor; + } + + @Override + public GlobalIndexWriter createWriter(GlobalIndexFileWriter fileWriter) throws IOException { + return new ESIndexGlobalIndexWriter(fileWriter, fields, indexOptions); + } + + @Override + public GlobalIndexReader createReader( + GlobalIndexFileReader fileReader, List files) throws IOException { + return new ESIndexGlobalIndexReader( + fileReader, files, fields, indexOptions, searchExecutor); + } +} diff --git a/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexerFactory.java b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexerFactory.java new file mode 100644 index 000000000000..2e8e1e80e5cd --- /dev/null +++ b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexerFactory.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.apache.paimon.globalindex.GlobalIndexer; +import org.apache.paimon.globalindex.GlobalIndexerFactory; +import org.apache.paimon.options.Options; +import org.apache.paimon.types.DataField; + +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Factory for creating ES multi-index global indexers. Supports vector (DiskBBQ/HNSW/Native), + * fulltext (BM25), and scalar fields. + */ +public class ESIndexGlobalIndexerFactory implements GlobalIndexerFactory { + + public static final String IDENTIFIER = "es-index"; + + private static final String READ_SEARCH_THREADS_KEY = + "global-index.es-index.read-search-threads"; + private static final int DEFAULT_READ_SEARCH_THREADS = -1; + + private static volatile ExecutorService readSearchExecutor; + private static final Object LOCK = new Object(); + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public GlobalIndexer create(DataField field, Options options) { + return new ESIndexGlobalIndexer( + java.util.Collections.singletonList(field), + options, + getOrCreateReadSearchExecutor(options)); + } + + @Override + public GlobalIndexer create(List fields, Options options) { + return new ESIndexGlobalIndexer(fields, options, getOrCreateReadSearchExecutor(options)); + } + + /** + * Returns the shared read/search thread pool. Default (unset or -1) creates a pool sized to + * CPU/2. Set to 0 to disable parallel search (returns null → serial only). + */ + private static ExecutorService getOrCreateReadSearchExecutor(Options options) { + int threads = options.getInteger(READ_SEARCH_THREADS_KEY, DEFAULT_READ_SEARCH_THREADS); + if (threads == 0) { + return null; + } + if (threads < 0) { + threads = Math.max(2, Runtime.getRuntime().availableProcessors() / 2); + } + if (readSearchExecutor == null) { + int finalThreads = threads; + synchronized (LOCK) { + if (readSearchExecutor == null) { + readSearchExecutor = createExecutor(finalThreads); + } + } + } + return readSearchExecutor; + } + + private static ExecutorService createExecutor(int threads) { + ThreadPoolExecutor executor = + new ThreadPoolExecutor( + threads, + threads, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(256), + r -> { + Thread t = new Thread(r, "paimon-es-search"); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.CallerRunsPolicy()); + executor.allowCoreThreadTimeOut(true); + return executor; + } +} diff --git a/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexOptions.java b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexOptions.java new file mode 100644 index 000000000000..96963df57f9d --- /dev/null +++ b/paimon-eslib/src/main/java/org/apache/paimon/eslib/index/ESIndexOptions.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.apache.paimon.options.Options; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.VectorType; + +import org.elasticsearch.eslib.api.model.BuiltinAnalyzer; +import org.elasticsearch.eslib.api.model.FieldIndexConfig; +import org.elasticsearch.eslib.api.model.ScalarFieldType; +import org.elasticsearch.eslib.api.model.VectorAlgorithm; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Parses Paimon Options into ESLib FieldIndexConfig for each field. + * + *

Options format (in table properties): + * + *

+ *   global-index.es-index.fields.vector_field.algorithm = diskbbq
+ *   global-index.es-index.fields.vector_field.dimension = 128
+ *   global-index.es-index.fields.text_field.analyzer = standard
+ *   global-index.es-index.fields.id_field.type = keyword
+ * 
+ */ +public class ESIndexOptions { + + private static final String FIELDS_PREFIX = "fields."; + + private final Map fieldConfigs; + + public ESIndexOptions(List fields, Options options) { + this.fieldConfigs = new LinkedHashMap<>(); + for (DataField field : fields) { + fieldConfigs.put(field.name(), parseFieldConfig(field, options)); + } + } + + public Map getFieldConfigs() { + return fieldConfigs; + } + + public FieldIndexConfig getConfig(String fieldName) { + return fieldConfigs.get(fieldName); + } + + private FieldIndexConfig parseFieldConfig(DataField field, Options options) { + String prefix = FIELDS_PREFIX + field.name() + "."; + DataType dataType = field.type(); + + // Explicit type override takes priority + String explicitType = options.getString(prefix + "type", null); + if (explicitType != null) { + return parseExplicitType(field.name(), explicitType, dataType, options, prefix); + } + + if (isVectorType(dataType)) { + return parseVectorConfig(field.name(), dataType, options, prefix); + } else if (isTextType(dataType)) { + // String fields: check if analyzer is set → FULLTEXT; otherwise → KEYWORD + String analyzer = options.getString(prefix + "analyzer", null); + if (analyzer != null) { + return FieldIndexConfig.builder(field.name(), FieldIndexConfig.IndexType.FULLTEXT) + .analyzer(BuiltinAnalyzer.fromName(analyzer)) + .build(); + } + return FieldIndexConfig.builder(field.name(), FieldIndexConfig.IndexType.KEYWORD) + .scalarType(ScalarFieldType.KEYWORD) + .build(); + } else if (isTimestampType(dataType)) { + return FieldIndexConfig.builder(field.name(), FieldIndexConfig.IndexType.DATE) + .scalarType(ScalarFieldType.DATE) + .build(); + } else { + return FieldIndexConfig.builder(field.name(), FieldIndexConfig.IndexType.SCALAR) + .scalarType(mapScalarType(dataType)) + .build(); + } + } + + private FieldIndexConfig parseExplicitType( + String fieldName, String typeName, DataType dataType, Options options, String prefix) { + switch (typeName.toLowerCase()) { + case "fulltext": + String analyzer = options.getString(prefix + "analyzer", "standard"); + return FieldIndexConfig.builder(fieldName, FieldIndexConfig.IndexType.FULLTEXT) + .analyzer(BuiltinAnalyzer.fromName(analyzer)) + .build(); + case "keyword": + return FieldIndexConfig.builder(fieldName, FieldIndexConfig.IndexType.KEYWORD) + .scalarType(ScalarFieldType.KEYWORD) + .build(); + case "geo_point": + return FieldIndexConfig.builder(fieldName, FieldIndexConfig.IndexType.GEO_POINT) + .scalarType(ScalarFieldType.GEO_POINT) + .build(); + case "date": + return FieldIndexConfig.builder(fieldName, FieldIndexConfig.IndexType.DATE) + .scalarType(ScalarFieldType.DATE) + .build(); + case "vector": + return parseVectorConfig(fieldName, dataType, options, prefix); + default: + return FieldIndexConfig.builder(fieldName, FieldIndexConfig.IndexType.SCALAR) + .scalarType(mapScalarType(dataType)) + .build(); + } + } + + private FieldIndexConfig parseVectorConfig( + String fieldName, DataType dataType, Options options, String prefix) { + String algorithm = options.getString(prefix + "algorithm", "hnsw"); + int dimension = options.getInteger(prefix + "dimension", inferDimension(dataType)); + String metric = options.getString(prefix + "metric", "cosine"); + + Map params = new LinkedHashMap<>(); + String mStr = options.getString(prefix + "m", null); + if (mStr != null) { + params.put("m", mStr); + } + String efStr = options.getString(prefix + "ef_construction", null); + if (efStr != null) { + params.put("ef_construction", efStr); + } + String vpcStr = options.getString(prefix + "vectors_per_cluster", null); + if (vpcStr != null) { + params.put("vectors_per_cluster", vpcStr); + } + + return FieldIndexConfig.builder(fieldName, FieldIndexConfig.IndexType.VECTOR) + .algorithm(VectorAlgorithm.fromName(algorithm)) + .dimension(dimension) + .metric(metric) + .algorithmParams(params) + .build(); + } + + private static ScalarFieldType mapScalarType(DataType type) { + switch (type.getTypeRoot()) { + case INTEGER: + case SMALLINT: + case TINYINT: + return ScalarFieldType.INT; + case BIGINT: + return ScalarFieldType.LONG; + case FLOAT: + return ScalarFieldType.FLOAT; + case DOUBLE: + return ScalarFieldType.DOUBLE; + default: + return ScalarFieldType.KEYWORD; + } + } + + private static boolean isVectorType(DataType type) { + if (type instanceof VectorType) { + return true; + } + if (type instanceof ArrayType) { + DataType elementType = ((ArrayType) type).getElementType(); + return elementType.getTypeRoot() == DataTypeRoot.FLOAT; + } + return false; + } + + private static boolean isTextType(DataType type) { + return type.getTypeRoot() == DataTypeRoot.VARCHAR + || type.getTypeRoot() == DataTypeRoot.CHAR; + } + + private static boolean isTimestampType(DataType type) { + DataTypeRoot root = type.getTypeRoot(); + return root == DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE + || root == DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE + || root == DataTypeRoot.DATE; + } + + private static int inferDimension(DataType type) { + if (type instanceof VectorType) { + return ((VectorType) type).getLength(); + } + return 0; + } +} diff --git a/paimon-eslib/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory b/paimon-eslib/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory new file mode 100644 index 000000000000..453f40f25bca --- /dev/null +++ b/paimon-eslib/src/main/resources/META-INF/services/org.apache.paimon.globalindex.GlobalIndexerFactory @@ -0,0 +1 @@ +org.apache.paimon.eslib.index.ESIndexGlobalIndexerFactory diff --git a/paimon-eslib/src/test/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexE2ETest.java b/paimon-eslib/src/test/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexE2ETest.java new file mode 100644 index 000000000000..4787873d28c8 --- /dev/null +++ b/paimon-eslib/src/test/java/org/apache/paimon/eslib/index/ESIndexGlobalIndexE2ETest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.eslib.index; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericArray; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.SeekableInputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.globalindex.GlobalIndexIOMeta; +import org.apache.paimon.globalindex.GlobalIndexResult; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.ScoredGlobalIndexResult; +import org.apache.paimon.globalindex.io.GlobalIndexFileReader; +import org.apache.paimon.globalindex.io.GlobalIndexFileWriter; +import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.FieldRef; +import org.apache.paimon.predicate.FullTextSearch; +import org.apache.paimon.predicate.VectorSearch; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.utils.RoaringNavigableMap64; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * End-to-end test of the P5 ↔ P4 integration: {@link ESIndexGlobalIndexWriter} feeds Paimon rows to + * {@code DefaultESIndexBuilder} (loaded reflectively), packs the Lucene segment files into an + * archive, and {@link ESIndexGlobalIndexReader} reads that archive back through {@code + * DefaultESIndexSearcher} (also reflective) to serve vector and full-text queries. + * + *

This exercises the real archive write/offset-table/read roundtrip and the eslib-core + * reflection loading, without Flink or OSS. + */ +class ESIndexGlobalIndexE2ETest { + + /** {@link GlobalIndexFileWriter} backed by a local directory. */ + private static final class LocalDirWriter implements GlobalIndexFileWriter { + private final java.nio.file.Path dir; + private final LocalFileIO fio = LocalFileIO.create(); + + LocalDirWriter(java.nio.file.Path dir) { + this.dir = dir; + } + + @Override + public String newFileName(String prefix) { + return prefix + "-" + UUID.randomUUID() + ".index"; + } + + @Override + public PositionOutputStream newOutputStream(String fileName) throws IOException { + return fio.newOutputStream( + new org.apache.paimon.fs.Path(dir.resolve(fileName).toString()), true); + } + } + + /** {@link GlobalIndexFileReader} backed by local files. */ + private static final class LocalFileReader implements GlobalIndexFileReader { + private final LocalFileIO fio = LocalFileIO.create(); + + @Override + public SeekableInputStream getInputStream(GlobalIndexIOMeta meta) throws IOException { + return fio.newInputStream(meta.filePath()); + } + } + + @Test + void writeArchiveThenReadAndSearch(@TempDir java.nio.file.Path tmp) throws IOException { + List fields = + Arrays.asList( + new DataField(0, "embedding", DataTypes.ARRAY(DataTypes.FLOAT())), + new DataField(1, "title", DataTypes.STRING()), + new DataField(2, "category", DataTypes.STRING()), + new DataField(3, "price", DataTypes.INT())); + + Map opt = new HashMap<>(); + opt.put("fields.embedding.algorithm", "hnsw"); + opt.put("fields.embedding.dimension", "4"); + opt.put("fields.embedding.metric", "l2"); + opt.put("fields.title.analyzer", "standard"); + ESIndexOptions options = new ESIndexOptions(fields, Options.fromMap(opt)); + + java.nio.file.Path archiveDir = tmp.resolve("archive"); + Files.createDirectories(archiveDir); + + // --- write 20 rows → archive --- + LocalDirWriter fileWriter = new LocalDirWriter(archiveDir); + ESIndexGlobalIndexWriter writer = new ESIndexGlobalIndexWriter(fileWriter, fields, options); + for (int i = 0; i < 20; i++) { + GenericRow row = + GenericRow.of( + new GenericArray(new float[] {i, i, i, i}), + BinaryString.fromString( + "document " + i + (i % 2 == 0 ? " even" : " odd")), + BinaryString.fromString(i % 2 == 0 ? "even" : "odd"), + i * 10); + writer.write(row); + } + List entries = writer.finish(); + assertEquals(1, entries.size(), "one archive produced"); + ResultEntry entry = entries.get(0); + assertEquals(20L, entry.rowCount(), "row count"); + assertNotNull(entry.meta(), "offset-table meta present"); + + // --- reconstruct reader over the archive --- + org.apache.paimon.fs.Path filePath = + new org.apache.paimon.fs.Path(archiveDir.resolve(entry.fileName()).toString()); + long fileSize = Files.size(archiveDir.resolve(entry.fileName())); + GlobalIndexIOMeta ioMeta = new GlobalIndexIOMeta(filePath, fileSize, entry.meta()); + + ESIndexGlobalIndexReader reader = + new ESIndexGlobalIndexReader( + new LocalFileReader(), List.of(ioMeta), fields, options); + + // --- vector search: nearest to origin is row 0 (vector [0,0,0,0]) --- + Optional vr = + reader.visitVectorSearch( + new VectorSearch(new float[] {0, 0, 0, 0}, 5, "embedding")); + assertTrue(vr.isPresent(), "vector search returns a result"); + RoaringNavigableMap64 vrows = vr.get().results(); + assertEquals(5, vrows.getIntCardinality(), "k=5 results"); + assertTrue(contains(vrows, 0L), "row 0 (origin) recalled by HNSW"); + + // --- full-text search: 10 rows contain "even" --- + Optional ft = + reader.visitFullTextSearch(new FullTextSearch("even", 50, "title")); + assertTrue(ft.isPresent(), "full-text search returns a result"); + assertEquals(10, ft.get().results().getIntCardinality(), "10 even docs"); + + // --- scalar filter: price >= 100 means rows 10..19 (price = i*10) --- + FieldRef priceRef = new FieldRef(3, "price", DataTypes.INT()); + Optional sf = reader.visitGreaterOrEqual(priceRef, 100); + assertTrue(sf.isPresent(), "scalar filter returns a result"); + assertEquals(10, sf.get().results().getIntCardinality(), "10 rows with price >= 100"); + + // --- scalar filter: price == 50 → row 5 only --- + Optional eq = reader.visitEqual(priceRef, 50); + assertTrue(eq.isPresent(), "scalar eq filter returns a result"); + assertEquals(1, eq.get().results().getIntCardinality(), "1 row with price == 50"); + assertTrue(contains(eq.get().results(), 5L), "row 5 has price=50"); + + // --- keyword filter: category == "even" → 10 rows --- + FieldRef categoryRef = new FieldRef(2, "category", DataTypes.STRING()); + Optional kw = reader.visitEqual(categoryRef, "even"); + assertTrue(kw.isPresent(), "keyword eq filter returns a result"); + assertEquals(10, kw.get().results().getIntCardinality(), "10 even category rows"); + + // --- keyword prefix: category startsWith "ev" → 10 rows with "even" --- + Optional sw = reader.visitStartsWith(categoryRef, "ev"); + assertTrue(sw.isPresent(), "startsWith filter returns a result"); + assertEquals(10, sw.get().results().getIntCardinality(), "10 rows start with 'ev'"); + + // --- keyword contains: category contains "dd" → 10 rows with "odd" --- + Optional ct = reader.visitContains(categoryRef, "dd"); + assertTrue(ct.isPresent(), "contains filter returns a result"); + assertEquals(10, ct.get().results().getIntCardinality(), "10 rows contain 'dd'"); + + try { + reader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private static boolean contains(RoaringNavigableMap64 bitmap, long id) { + for (long v : bitmap) { + if (v == id) { + return true; + } + } + return false; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java index 8b1122382aae..aed46f0078e8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/dataevolution/MergeIntoUpdateChecker.java @@ -39,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -46,6 +48,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID; + /** * The checker for merge into update result. It will check each committable to see if some * global-indexed columns are updated. It will take some actions according to {@link @@ -100,10 +104,12 @@ private void checkUpdatedColumns() { GlobalIndexMeta globalIndexMeta = entry.indexFile().globalIndexMeta(); if (globalIndexMeta != null) { - String fieldName = - rowType.getField(globalIndexMeta.indexFieldId()) - .name(); - return updatedColumns.contains(fieldName) + Collection indexedNames = + getIndexedFieldNames(globalIndexMeta, rowType); + boolean overlaps = + indexedNames.stream() + .anyMatch(updatedColumns::contains); + return overlaps && affectedPartitions.contains(entry.partition()); } return false; @@ -116,8 +122,8 @@ private void checkUpdatedColumns() { case THROW_ERROR: Set conflictedColumns = affectedEntries.stream() - .map(file -> file.indexFile().globalIndexMeta().indexFieldId()) - .map(id -> rowType.getField(id).name()) + .map(file -> file.indexFile().globalIndexMeta()) + .flatMap(meta -> getIndexedFieldNames(meta, rowType).stream()) .collect(Collectors.toSet()); throw new RuntimeException( @@ -159,4 +165,23 @@ private void checkUpdatedColumns() { } } } + + private static Collection getIndexedFieldNames(GlobalIndexMeta meta, RowType rowType) { + int fieldId = meta.indexFieldId(); + if (fieldId == MULTI_COLUMN_INDEX_FIELD_ID) { + List names = new ArrayList<>(); + for (int id : meta.extraFieldIds()) { + names.add(rowType.getField(id).name()); + } + return names; + } + List names = new ArrayList<>(); + names.add(rowType.getField(fieldId).name()); + if (meta.extraFieldIds() != null) { + for (int id : meta.extraFieldIds()) { + names.add(rowType.getField(id).name()); + } + } + return names; + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java index 5896503ce09d..99a551a9e4d9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilder.java @@ -29,7 +29,9 @@ import org.apache.paimon.flink.utils.BoundedOneInputOperator; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; @@ -38,7 +40,6 @@ import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.BatchWriteBuilder; @@ -50,6 +51,7 @@ import org.apache.paimon.types.DataField; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Range; import org.apache.flink.streaming.api.datastream.DataStream; @@ -65,7 +67,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -74,6 +75,8 @@ import java.util.stream.Collectors; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.filterEntriesBefore; +import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.findMinNonIndexableRowId; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas; import static org.apache.paimon.io.CompactIncrement.emptyIncrement; import static org.apache.paimon.io.DataIncrement.deleteIndexIncrement; @@ -103,7 +106,7 @@ public static void buildIndexAndExecute( buildIndexAndExecute( env, table, - indexColumn, + Collections.singletonList(indexColumn), indexType, partitionPredicate, userOptions, @@ -119,12 +122,49 @@ public static void buildIndexAndExecute( Options userOptions, long maxIndexedRowId) throws Exception { + buildIndexAndExecute( + env, + table, + Collections.singletonList(indexColumn), + indexType, + partitionPredicate, + userOptions, + maxIndexedRowId); + } + + public static void buildIndexAndExecute( + StreamExecutionEnvironment env, + FileStoreTable table, + List indexColumns, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions) + throws Exception { + buildIndexAndExecute( + env, + table, + indexColumns, + indexType, + partitionPredicate, + userOptions, + NO_MAX_INDEXED_ROW_ID); + } + + public static void buildIndexAndExecute( + StreamExecutionEnvironment env, + FileStoreTable table, + List indexColumns, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions, + long maxIndexedRowId) + throws Exception { boolean hasIndexToBuild = buildIndex( env, () -> new GenericGlobalIndexBuilder(table), table, - indexColumn, + indexColumns, indexType, partitionPredicate, userOptions, @@ -149,13 +189,34 @@ public static boolean buildIndex( env, indexBuilderSupplier, table, - indexColumn, + Collections.singletonList(indexColumn), indexType, partitionPredicate, userOptions, NO_MAX_INDEXED_ROW_ID); } + public static boolean buildIndex( + StreamExecutionEnvironment env, + Supplier indexBuilderSupplier, + FileStoreTable table, + String indexColumn, + String indexType, + PartitionPredicate partitionPredicate, + Options userOptions, + long maxIndexedRowId) + throws Exception { + return buildIndex( + env, + indexBuilderSupplier, + table, + Collections.singletonList(indexColumn), + indexType, + partitionPredicate, + userOptions, + maxIndexedRowId); + } + /** * Builds a generic global index topology using a {@link GenericGlobalIndexBuilder} supplier. * @@ -166,7 +227,7 @@ public static boolean buildIndex( StreamExecutionEnvironment env, Supplier indexBuilderSupplier, FileStoreTable table, - String indexColumn, + List indexColumns, String indexType, PartitionPredicate partitionPredicate, Options userOptions, @@ -183,7 +244,7 @@ public static boolean buildIndex( return buildTopology( env, table, - indexColumn, + indexColumns, indexType, userOptions, entries, @@ -203,7 +264,7 @@ public static boolean buildIndex( private static boolean buildTopology( StreamExecutionEnvironment env, FileStoreTable table, - String indexColumn, + List indexColumns, String indexType, Options userOptions, List entries, @@ -212,24 +273,24 @@ private static boolean buildTopology( throws Exception { long totalRowCount = entries.stream().mapToLong(e -> e.file().rowCount()).sum(); LOG.info( - "Scanned {} files ({} rows) across {} partitions for {} index on column '{}'" + "Scanned {} files ({} rows) across {} partitions for {} index on columns '{}'" + (maxIndexedRowId >= 0 ? ", maxIndexedRowId={}." : "."), entries.size(), totalRowCount, entries.stream().map(ManifestEntry::partition).distinct().count(), indexType, - indexColumn, + indexColumns, maxIndexedRowId); long minNonIndexableRowId = - findMinNonIndexableRowId(table.schemaManager(), entries, indexColumn); + findMinNonIndexableRowId(table.schemaManager(), entries, indexColumns); entries = filterEntriesBefore(entries, minNonIndexableRowId); RowType rowType = table.rowType(); - DataField indexField = rowType.getField(indexColumn); - // Project indexColumn + _ROW_ID so we can read the actual row ID from data - List readColumns = new ArrayList<>(); - readColumns.add(indexColumn); + List indexFields = + indexColumns.stream().map(rowType::getField).collect(Collectors.toList()); + // Project indexColumns + _ROW_ID so we can read the actual row ID from data + List readColumns = new ArrayList<>(indexColumns); readColumns.add(SpecialFields.ROW_ID.name()); RowType projectedRowType = SpecialFields.rowTypeWithRowId(rowType).project(readColumns); @@ -277,7 +338,7 @@ private static boolean buildTopology( readBuilder, table, indexType, - indexField, + indexFields, projectedRowType, mergedOptions)) .setParallelism(parallelism); @@ -298,49 +359,6 @@ private static boolean buildTopology( return true; } - /** - * Find the minimum firstRowId among files whose schema does not contain the index column. Files - * at or beyond this rowId cannot be indexed because the column was added later via ALTER TABLE. - * - * @return the boundary rowId, or {@link Long#MAX_VALUE} if all files contain the column - */ - static long findMinNonIndexableRowId( - SchemaManager schemaManager, List entries, String indexColumn) { - Map schemaContainsColumn = new HashMap<>(); - long minRowId = Long.MAX_VALUE; - for (ManifestEntry entry : entries) { - long sid = entry.file().schemaId(); - boolean contains = - schemaContainsColumn.computeIfAbsent( - sid, id -> schemaManager.schema(id).fieldNames().contains(indexColumn)); - if (!contains && entry.file().firstRowId() != null) { - minRowId = Math.min(minRowId, entry.file().nonNullFirstRowId()); - } - } - return minRowId; - } - - /** Keep only entries whose firstRowId is strictly less than the given boundary. */ - static List filterEntriesBefore( - List entries, long boundaryRowId) { - if (boundaryRowId == Long.MAX_VALUE) { - return entries; - } - List result = new ArrayList<>(); - for (ManifestEntry entry : entries) { - if (entry.file().firstRowId() != null - && entry.file().nonNullFirstRowId() < boundaryRowId) { - result.add(entry); - } - } - LOG.info( - "Filtered {} files at or beyond rowId {}, {} files remain.", - entries.size() - result.size(), - boundaryRowId, - result.size()); - return result; - } - /** * Compute shard tasks for a full build (no rows to skip). * @@ -548,25 +566,27 @@ private static class BuildIndexOperator private final ReadBuilder readBuilder; private final FileStoreTable table; private final String indexType; - private final DataField indexField; + private final List indexFields; private final RowType projectedRowType; private final Options mergedOptions; private transient TableRead tableRead; - private transient InternalRow.FieldGetter indexFieldGetter; + private transient InternalRow.FieldGetter[] indexFieldGetters; private transient int rowIdFieldIndex; + private transient boolean multiColumn; + private transient ProjectedRow writerProjection; BuildIndexOperator( ReadBuilder readBuilder, FileStoreTable table, String indexType, - DataField indexField, + List indexFields, RowType projectedRowType, Options mergedOptions) { this.readBuilder = readBuilder; this.table = table; this.indexType = indexType; - this.indexField = indexField; + this.indexFields = indexFields; this.projectedRowType = projectedRowType; this.mergedOptions = mergedOptions; } @@ -575,10 +595,22 @@ private static class BuildIndexOperator public void open() throws Exception { super.open(); this.tableRead = readBuilder.newRead(); - this.indexFieldGetter = - InternalRow.createFieldGetter( - indexField.type(), projectedRowType.getFieldIndex(indexField.name())); + this.indexFieldGetters = new InternalRow.FieldGetter[indexFields.size()]; + for (int i = 0; i < indexFields.size(); i++) { + DataField field = indexFields.get(i); + indexFieldGetters[i] = + InternalRow.createFieldGetter( + field.type(), projectedRowType.getFieldIndex(field.name())); + } this.rowIdFieldIndex = projectedRowType.getFieldIndex(SpecialFields.ROW_ID.name()); + this.multiColumn = indexFields.size() > 1; + if (multiColumn) { + int[] projection = new int[indexFields.size()]; + for (int i = 0; i < indexFields.size(); i++) { + projection[i] = projectedRowType.getFieldIndex(indexFields.get(i).name()); + } + this.writerProjection = ProjectedRow.from(projection); + } } @Override @@ -595,9 +627,8 @@ public void processElement(StreamRecord element) throws Exception { task.split.dataFiles().size()); long startTime = System.currentTimeMillis(); - GlobalIndexSingletonWriter indexWriter = - (GlobalIndexSingletonWriter) - createIndexWriter(table, indexType, indexField, mergedOptions); + GlobalIndexWriter indexWriter = + createIndexWriter(table, indexType, indexFields, mergedOptions); try { long rowsSeen = 0; @@ -626,8 +657,36 @@ public void processElement(StreamRecord element) throws Exception { } // Only write rows within this shard's range if (currentRowId >= task.shardRange.from) { - Object fieldData = indexFieldGetter.getFieldOrNull(row); - indexWriter.write(fieldData); + if (multiColumn) { + boolean hasNull = false; + for (InternalRow.FieldGetter getter : indexFieldGetters) { + if (getter.getFieldOrNull(row) == null) { + hasNull = true; + break; + } + } + if (hasNull) { + LOG.info( + "Null value in indexed columns at rowId={}, stopping shard [{}, {}].", + currentRowId, + task.shardRange.from, + task.shardRange.to); + break; + } + ((GlobalIndexMultiColumnWriter) indexWriter) + .write(writerProjection.replaceRow(row)); + } else { + Object fieldData = indexFieldGetters[0].getFieldOrNull(row); + if (fieldData == null) { + LOG.info( + "Null value at rowId={}, stopping shard [{}, {}].", + currentRowId, + task.shardRange.from, + task.shardRange.to); + break; + } + ((GlobalIndexSingletonWriter) indexWriter).write(fieldData); + } rowsSeen++; } } @@ -664,7 +723,7 @@ public void processElement(StreamRecord element) throws Exception { table, partition, task.shardRange, - indexField, + indexFields, indexType, resultEntries); output.collect( @@ -688,7 +747,7 @@ private static CommitMessage flushIndex( FileStoreTable table, BinaryRow partition, Range rowRange, - DataField indexField, + List indexFields, String indexType, List resultEntries) throws IOException { @@ -698,14 +757,14 @@ private static CommitMessage flushIndex( table.store().pathFactory().globalIndexFileFactory(), table.coreOptions(), rowRange, - indexField.id(), + indexFields, indexType, resultEntries); return new CommitMessageImpl( partition, 0, null, indexIncrement(indexFileMetas), emptyIncrement()); } - private static void closeWriterQuietly(GlobalIndexSingletonWriter writer) { + private static void closeWriterQuietly(GlobalIndexWriter writer) { if (writer instanceof Closeable) { try { ((Closeable) writer).close(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java index 5f4855567047..f9f54918ea4a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateGlobalIndexProcedure.java @@ -32,8 +32,11 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.utils.ParameterUtils.getPartitions; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -77,11 +80,28 @@ public String[] call( tableId); RowType rowType = table.rowType(); + List indexColumns = + Arrays.stream(indexColumn.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); checkArgument( - rowType.containsField(indexColumn), - "Column '%s' does not exist in table '%s'.", - indexColumn, - tableId); + indexColumns.size() == new HashSet<>(indexColumns).size(), + "Duplicate index columns are not allowed: %s", + indexColumns); + // No hard cap on the number of index columns: unlike row-store B-tree indexes + // (e.g. MySQL 16, PostgreSQL 32) whose limit comes from composing columns into a + // single key, the global index is built on per-type index frameworks. Whether + // multiple columns are supported, and any practical limit, is decided by each + // index type (single-column types reject multi-column via UnsupportedOperationException). + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableId); + } // Parse partition predicate PartitionPredicate partitionPredicate = parsePartitionPredicate(table, partitions); @@ -92,12 +112,18 @@ public String[] call( // Build global index based on index type indexType = indexType.toLowerCase().trim(); + if ("btree".equals(indexType)) { + checkArgument( + indexColumns.size() == 1, + "BTree index only supports single column, got: %s", + indexColumns); + } try { if ("btree".equals(indexType)) { BTreeIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, - indexColumn, + indexColumns.get(0), partitionPredicate, userOptions); return new String[] { @@ -107,7 +133,7 @@ public String[] call( GenericIndexTopoBuilder.buildIndexAndExecute( procedureContext.getExecutionEnvironment(), table, - indexColumn, + indexColumns, indexType, partitionPredicate, userOptions); @@ -115,8 +141,8 @@ public String[] call( } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to create %s index for column '%s' on table '%s'.", - indexType, indexColumn, table.name()), + "Failed to create %s index for columns '%s' on table '%s'.", + indexType, indexColumns, table.name()), e); } return new String[] { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java index 0de57077b295..c69b59ad6e3c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/globalindex/GenericIndexTopoBuilderTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.BinaryString; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.io.PojoDataFileMeta; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; @@ -472,10 +473,10 @@ void testAppendFilterOldFilesBeforeNewFiles() { entries.add(createEntryWithSchemaId(BinaryRow.EMPTY_ROW, 200L, 100, 0L)); List result = - GenericIndexTopoBuilder.filterEntriesBefore( + GlobalIndexBuilderUtils.filterEntriesBefore( entries, - GenericIndexTopoBuilder.findMinNonIndexableRowId( - schemaManager, entries, "vec")); + GlobalIndexBuilderUtils.findMinNonIndexableRowId( + schemaManager, entries, Collections.singletonList("vec"))); assertThat(result).hasSize(2); assertThat(result.get(0).file().nonNullFirstRowId()).isEqualTo(0L); diff --git a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 492d64bbf5bf..9c90a1e8445c 100644 --- a/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-4.0/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -21,6 +21,8 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID +import org.apache.paimon.index.GlobalIndexMeta import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable @@ -502,15 +504,29 @@ case class MergeIntoPaimonDataEvolutionTable( return updateCommit } + def getIndexedFieldNames( + meta: GlobalIndexMeta, + rt: org.apache.paimon.types.RowType): Seq[String] = { + if (meta.indexFieldId() == MULTI_COLUMN_INDEX_FIELD_ID) { + meta.extraFieldIds().map(id => rt.getField(id).name()).toSeq + } else { + val names = ArrayBuffer(rt.getField(meta.indexFieldId()).name()) + if (meta.extraFieldIds() != null) { + meta.extraFieldIds().foreach(id => names += rt.getField(id).name()) + } + names.toSeq + } + } + val filter: org.apache.paimon.utils.Filter[IndexManifestEntry] = (entry: IndexManifestEntry) => { val globalIndexMeta = entry.indexFile().globalIndexMeta() if (globalIndexMeta == null) { false } else { - val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + val indexedNames = getIndexedFieldNames(globalIndexMeta, rowType) affectedParts.contains(entry.partition()) && updateColumns.exists( - _.name.equals(fieldName)) + col => indexedNames.contains(col.name)) } } @@ -527,8 +543,7 @@ case class MergeIntoPaimonDataEvolutionTable( case GlobalIndexColumnUpdateAction.THROW_ERROR => val updatedColNames = updateColumns.map(_.name) val conflicted = affectedIndexEntries - .map(_.indexFile().globalIndexMeta().indexFieldId()) - .map(id => rowType.getField(id).name()) + .flatMap(e => getIndexedFieldNames(e.indexFile().globalIndexMeta(), rowType)) .toSet throw new RuntimeException( s"""MergeInto: update columns contain globally indexed columns, not supported now. diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java index 1485d14fac1c..a64045633c6b 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java @@ -20,7 +20,9 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexMultiColumnWriter; import org.apache.paimon.globalindex.GlobalIndexSingletonWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; import org.apache.paimon.globalindex.ResultEntry; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.CompactIncrement; @@ -33,10 +35,15 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.ProjectedRow; import org.apache.paimon.utils.Range; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.List; import static org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter; @@ -45,12 +52,13 @@ /** Default global index builder. */ public class DefaultGlobalIndexBuilder implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(DefaultGlobalIndexBuilder.class); private static final long serialVersionUID = 1L; private final FileStoreTable table; private final BinaryRow partition; private final RowType readType; - private final DataField indexField; + private final List indexFields; private final String indexType; private final Range rowRange; private final Options options; @@ -63,10 +71,28 @@ public DefaultGlobalIndexBuilder( String indexType, Range rowRange, Options options) { + this( + table, + partition, + readType, + Collections.singletonList(indexField), + indexType, + rowRange, + options); + } + + public DefaultGlobalIndexBuilder( + FileStoreTable table, + BinaryRow partition, + RowType readType, + List indexFields, + String indexType, + Range rowRange, + Options options) { this.table = table; this.partition = partition; this.readType = readType; - this.indexField = indexField; + this.indexFields = indexFields; this.indexType = indexType; this.rowRange = rowRange; this.options = options; @@ -89,7 +115,7 @@ public CommitMessage build(CloseableIterator data) throws IOExcepti table.store().pathFactory().globalIndexFileFactory(), table.coreOptions(), rowRange, - indexField.id(), + indexFields, indexType, resultEntries); DataIncrement dataIncrement = DataIncrement.indexIncrement(indexFileMetas); @@ -99,27 +125,62 @@ public CommitMessage build(CloseableIterator data) throws IOExcepti private List writePaimonRows( CloseableIterator rows, LongCounter rowCounter) throws IOException { - GlobalIndexSingletonWriter indexWriter = - (GlobalIndexSingletonWriter) - createIndexWriter(table, indexType, indexField, options); + GlobalIndexWriter indexWriter = createIndexWriter(table, indexType, indexFields, options); + boolean multiColumn = indexFields.size() > 1; try { - InternalRow.FieldGetter getter = - InternalRow.createFieldGetter( - indexField.type(), readType.getFieldIndex(indexField.name())); - rows.forEachRemaining( - row -> { - Object indexO = getter.getFieldOrNull(row); - indexWriter.write(indexO); - rowCounter.add(1); - }); + if (multiColumn) { + GlobalIndexMultiColumnWriter multiWriter = + (GlobalIndexMultiColumnWriter) indexWriter; + int[] projection = new int[indexFields.size()]; + InternalRow.FieldGetter[] getters = new InternalRow.FieldGetter[indexFields.size()]; + for (int i = 0; i < indexFields.size(); i++) { + DataField field = indexFields.get(i); + projection[i] = readType.getFieldIndex(field.name()); + getters[i] = + InternalRow.createFieldGetter( + field.type(), readType.getFieldIndex(field.name())); + } + ProjectedRow projectedRow = ProjectedRow.from(projection); + while (rows.hasNext()) { + InternalRow row = rows.next(); + boolean hasNull = false; + for (InternalRow.FieldGetter getter : getters) { + if (getter.getFieldOrNull(row) == null) { + hasNull = true; + break; + } + } + if (hasNull) { + LOG.info( + "Null value in indexed columns, stopping shard [{}, {}].", + rowRange.from, + rowRange.to); + break; + } + multiWriter.write(projectedRow.replaceRow(row)); + rowCounter.add(1); + } + } else { + DataField indexField = indexFields.get(0); + GlobalIndexSingletonWriter singleWriter = (GlobalIndexSingletonWriter) indexWriter; + InternalRow.FieldGetter getter = + InternalRow.createFieldGetter( + indexField.type(), readType.getFieldIndex(indexField.name())); + rows.forEachRemaining( + row -> { + Object indexO = getter.getFieldOrNull(row); + singleWriter.write(indexO); + rowCounter.add(1); + }); + } return indexWriter.finish(); } finally { closeWriterQuietly(indexWriter); } } - private static void closeWriterQuietly(GlobalIndexSingletonWriter writer) { + private static void closeWriterQuietly(GlobalIndexWriter writer) { if (writer instanceof java.io.Closeable) { try { ((java.io.Closeable) writer).close(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java index afd954c39a5d..ea2cda4a8b85 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexTopoBuilder.java @@ -21,12 +21,14 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.fs.Path; +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils; import org.apache.paimon.globalindex.IndexedSplit; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageSerializer; @@ -77,6 +79,28 @@ public List buildIndex( DataField indexField, Options options) throws IOException { + return buildIndex( + spark, + relation, + partitionPredicate, + table, + indexType, + readType, + Collections.singletonList(indexField), + options); + } + + @Override + public List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + List indexFields, + Options options) + throws IOException { Options tableOptions = table.coreOptions().toConfiguration(); long rowsPerShard = tableOptions @@ -88,6 +112,13 @@ public List buildIndex( List entries = table.store().newScan().withPartitionFilter(partitionPredicate).plan().files(); + List indexColumns = + indexFields.stream().map(DataField::name).collect(Collectors.toList()); + SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location()); + long boundaryRowId = + GlobalIndexBuilderUtils.findMinNonIndexableRowId( + schemaManager, entries, indexColumns); + entries = GlobalIndexBuilderUtils.filterEntriesBefore(entries, boundaryRowId); // generate splits for each partition && shard Map> splits = split(table, entries, rowsPerShard); @@ -106,7 +137,7 @@ public List buildIndex( table, partition, readType, - indexField, + indexFields, indexType, indexedSplit.rowRanges().get(0), options); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java index 50c6ab34e153..3d751f4585ac 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexTopologyBuilder.java @@ -46,4 +46,31 @@ List buildIndex( DataField indexField, Options options) throws IOException; + + default List buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + List indexFields, + Options options) + throws IOException { + if (indexFields.size() > 1) { + throw new UnsupportedOperationException( + String.format( + "Topology builder '%s' does not support multi-column index, got columns: %s", + identifier(), indexFields)); + } + return buildIndex( + spark, + relation, + partitionPredicate, + table, + indexType, + readType, + indexFields.get(0), + options); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java index e25464b173d7..18b6c807b231 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java @@ -43,11 +43,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.UUID; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.spark.sql.types.DataTypes.StringType; @@ -121,11 +123,35 @@ public InternalRow[] call(InternalRow args) { tableIdent); RowType rowType = table.rowType(); + List indexColumns = + Arrays.stream(column.split(",")) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + checkArgument(!indexColumns.isEmpty(), "At least one column required."); checkArgument( - rowType.containsField(column), - "Column '%s' does not exist in table '%s'.", - column, - tableIdent); + indexColumns.size() == new HashSet<>(indexColumns).size(), + "Duplicate index columns are not allowed: %s", + indexColumns); + // No hard cap on the number of index columns: unlike row-store B-tree + // indexes (e.g. MySQL 16, PostgreSQL 32) whose limit comes from composing + // columns into a single key, the global index is built on per-type index + // frameworks. Whether multiple columns are supported, and any practical + // limit, is decided by each index type (single-column types reject + // multi-column via UnsupportedOperationException). + for (String col : indexColumns) { + checkArgument( + rowType.containsField(col), + "Column '%s' does not exist in table '%s'.", + col, + tableIdent); + } + if ("btree".equalsIgnoreCase(indexType)) { + checkArgument( + indexColumns.size() == 1, + "BTree index only supports single column, got: %s", + indexColumns); + } DataSourceV2Relation relation = createRelation(tableIdent, sparkTable); PartitionPredicate partitionPredicate = SparkProcedureUtils.convertToPartitionPredicate( @@ -134,9 +160,11 @@ public InternalRow[] call(InternalRow args) { spark(), relation); - DataField indexField = rowType.getField(column); - RowType projectedRowType = - rowType.project(Collections.singletonList(column)); + List indexFields = + indexColumns.stream() + .map(rowType::getField) + .collect(Collectors.toList()); + RowType projectedRowType = rowType.project(indexColumns); RowType readRowType = SpecialFields.rowTypeWithRowId(projectedRowType); HashMap parsedOptions = new HashMap<>(); @@ -154,7 +182,7 @@ public InternalRow[] call(InternalRow args) { table, indexType, readRowType, - indexField, + indexFields, userOptions); try (TableCommitImpl commit = @@ -170,7 +198,7 @@ public InternalRow[] call(InternalRow args) { } catch (Exception e) { throw new RuntimeException( String.format( - "Failed to create %s index for column '%s' on table '%s'.", + "Failed to create %s index for columns '%s' on table '%s'.", indexType, column, tableIdent), e); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala index 96f8c0c5cc9f..15e03a74dbc0 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala @@ -21,6 +21,8 @@ package org.apache.paimon.spark.commands import org.apache.paimon.CoreOptions.GlobalIndexColumnUpdateAction import org.apache.paimon.data.BinaryRow import org.apache.paimon.format.blob.BlobFileFormat.isBlobFile +import org.apache.paimon.globalindex.GlobalIndexBuilderUtils.MULTI_COLUMN_INDEX_FIELD_ID +import org.apache.paimon.index.GlobalIndexMeta import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.IndexManifestEntry import org.apache.paimon.spark.SparkTable @@ -511,9 +513,9 @@ case class MergeIntoPaimonDataEvolutionTable( if (globalIndexMeta == null) { false } else { - val fieldName = rowType.getField(globalIndexMeta.indexFieldId()).name() + val indexedNames = getIndexedFieldNames(globalIndexMeta, rowType) affectedParts.contains(entry.partition()) && updateColumns.exists( - _.name.equals(fieldName)) + col => indexedNames.contains(col.name)) } } @@ -530,8 +532,7 @@ case class MergeIntoPaimonDataEvolutionTable( case GlobalIndexColumnUpdateAction.THROW_ERROR => val updatedColNames = updateColumns.map(_.name) val conflicted = affectedIndexEntries - .map(_.indexFile().globalIndexMeta().indexFieldId()) - .map(id => rowType.getField(id).name()) + .flatMap(e => getIndexedFieldNames(e.indexFile().globalIndexMeta(), rowType)) .toSet throw new RuntimeException( s"""MergeInto: update columns contain globally indexed columns, not supported now. @@ -555,6 +556,20 @@ case class MergeIntoPaimonDataEvolutionTable( } } + private def getIndexedFieldNames( + meta: GlobalIndexMeta, + rowType: org.apache.paimon.types.RowType): Seq[String] = { + if (meta.indexFieldId() == MULTI_COLUMN_INDEX_FIELD_ID) { + meta.extraFieldIds().map(id => rowType.getField(id).name()).toSeq + } else { + val names = ArrayBuffer(rowType.getField(meta.indexFieldId()).name()) + if (meta.extraFieldIds() != null) { + meta.extraFieldIds().foreach(id => names += rowType.getField(id).name()) + } + names.toSeq + } + } + private def findRelatedFirstRowIds( dataset: Dataset[Row], sparkSession: SparkSession, diff --git a/pom.xml b/pom.xml index 5a336fb76c2d..c9cec1d53578 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,7 @@ under the License. paimon-lumina paimon-vortex paimon-tantivy + paimon-eslib