From ea64eb7808f77bd98376180ee72bfa57fb6702e1 Mon Sep 17 00:00:00 2001 From: Stefanietry Date: Thu, 16 Apr 2026 21:37:44 +0800 Subject: [PATCH] [core] support vector on spark --- .../org/apache/paimon/hive/HiveTypeUtils.java | 6 + .../PaimonObjectInspectorFactory.java | 4 + .../spark/AbstractSparkInternalRow.java | 9 +- .../apache/paimon/spark/DataConverter.java | 14 ++ .../org/apache/paimon/spark/SparkCatalog.java | 15 ++ .../paimon/spark/SparkInternalRowWrapper.java | 24 +++- .../org/apache/paimon/spark/SparkRow.java | 96 ++++++++++++- .../apache/paimon/spark/SparkTypeUtils.java | 18 ++- .../paimon/spark/SparkMultimodalITCase.java | 132 ++++++++++++++++++ 9 files changed, 312 insertions(+), 6 deletions(-) create mode 100644 paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java diff --git a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java index e4799341d1dc..23c01d1144ac 100644 --- a/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java +++ b/paimon-hive/paimon-hive-common/src/main/java/org/apache/paimon/hive/HiveTypeUtils.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.hadoop.hive.common.type.HiveChar; import org.apache.hadoop.hive.common.type.HiveVarchar; @@ -235,6 +236,11 @@ public TypeInfo visit(BlobType blobType) { return TypeInfoFactory.binaryTypeInfo; } + @Override + public TypeInfo visit(VectorType vectorType) { + return TypeInfoFactory.getListTypeInfo(vectorType.getElementType().accept(this)); + } + @Override protected TypeInfo defaultMethod(org.apache.paimon.types.DataType dataType) { throw new UnsupportedOperationException("Unsupported type: " + dataType); diff --git a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java index 9e9eb0f31f7a..9ad6a1a99b3a 100644 --- a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java +++ b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/objectinspector/PaimonObjectInspectorFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.types.TimeType; import org.apache.paimon.types.VarCharType; +import org.apache.paimon.types.VectorType; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; @@ -81,6 +82,9 @@ public static ObjectInspector create(DataType logicalType) { case ARRAY: ArrayType arrayType = (ArrayType) logicalType; return new PaimonListObjectInspector(arrayType.getElementType()); + case VECTOR: + VectorType vectorType = (VectorType) logicalType; + return new PaimonListObjectInspector(vectorType.getElementType()); case MAP: MapType mapType = (MapType) logicalType; return new PaimonMapObjectInspector(mapType.getKeyType(), mapType.getValueType()); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java index 283077430e19..5910c6074450 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/AbstractSparkInternalRow.java @@ -25,6 +25,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeChecks; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.InternalRowUtils; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -172,7 +173,13 @@ public org.apache.spark.sql.catalyst.InternalRow getStruct(int ordinal, int numF @Override public ArrayData getArray(int ordinal) { - return fromPaimon(row.getArray(ordinal), (ArrayType) rowType.getTypeAt(ordinal)); + DataType type = rowType.getTypeAt(ordinal); + if (type instanceof ArrayType) { + return fromPaimon(row.getArray(ordinal), (ArrayType) type); + } else if (type instanceof VectorType) { + return DataConverter.fromPaimon(row.getVector(ordinal), (VectorType) type); + } + throw new UnsupportedOperationException("Not an array type: " + type); } @Override diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java index 0b5ea899476e..5c8026f461df 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/DataConverter.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalArray; import org.apache.paimon.data.InternalMap; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.InternalVector; import org.apache.paimon.data.Timestamp; import org.apache.paimon.spark.data.SparkArrayData; import org.apache.paimon.spark.data.SparkInternalRow; @@ -32,6 +33,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.MultisetType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; import org.apache.spark.sql.catalyst.util.ArrayBasedMapData; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -58,6 +60,8 @@ public static Object fromPaimon(Object o, DataType type) { return fromPaimon((org.apache.paimon.data.Decimal) o); case ARRAY: return fromPaimon((InternalArray) o, (ArrayType) type); + case VECTOR: + return fromPaimon((InternalVector) o, (VectorType) type); case MAP: case MULTISET: return fromPaimon((InternalMap) o, type); @@ -93,6 +97,16 @@ public static ArrayData fromPaimon(InternalArray array, ArrayType arrayType) { return fromPaimonArrayElementType(array, arrayType.getElementType()); } + public static ArrayData fromPaimon(InternalVector vector, VectorType vectorType) { + if (vector.size() != vectorType.getLength()) { + throw new IllegalArgumentException( + String.format( + "Vector length mismatch. Expected %d but was %d.", + vectorType.getLength(), vector.size())); + } + return fromPaimonArrayElementType(vector, vectorType.getElementType()); + } + private static ArrayData fromPaimonArrayElementType(InternalArray array, DataType elementType) { return SparkArrayData.create(elementType).replace(array); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 913d4f582af5..d8e99f78d310 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -44,7 +44,9 @@ import org.apache.paimon.types.BlobType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.ExceptionUtils; +import org.apache.paimon.utils.Preconditions; import org.apache.spark.sql.PaimonSparkSession$; import org.apache.spark.sql.SparkSession; @@ -73,6 +75,7 @@ import org.apache.spark.sql.execution.datasources.DataSource; import org.apache.spark.sql.execution.datasources.FileFormat; import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2; +import org.apache.spark.sql.types.ArrayType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; @@ -563,6 +566,7 @@ private Schema toInitialSchema( List blobFields = CoreOptions.blobField(properties); Set blobDescriptorFields = new CoreOptions(properties).blobDescriptorField(); List blobViewFields = CoreOptions.blobViewField(properties); + Set vectorFields = CoreOptions.fromMap(properties).vectorField(); String provider = properties.get(TableCatalog.PROP_PROVIDER); if (!usePaimon(provider)) { if (isFormatTable(provider)) { @@ -603,6 +607,17 @@ private Schema toInitialSchema( field.dataType() instanceof org.apache.spark.sql.types.BinaryType, "The type of blob field must be binary"); type = new BlobType(); + } else if (vectorFields.contains(field.name())) { + Preconditions.checkArgument( + field.dataType() instanceof ArrayType, + "The type of blob field must be array"); + ArrayType arrayType = (ArrayType) field.dataType(); + String dimKey = String.format("field.%s.vector-dim", field.name()); + type = + new VectorType( + arrayType.containsNull(), + Integer.parseInt(properties.get(dimKey)), + toPaimonType(arrayType.elementType())); } else { type = toPaimonType(field.dataType()).copy(field.nullable()); } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java index 45a7c0af41ee..97b5771594d4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkInternalRowWrapper.java @@ -259,7 +259,20 @@ public InternalArray getArray(int pos) { @Override public InternalVector getVector(int pos) { - throw new UnsupportedOperationException("Not support VectorType yet."); + int actualPos = getActualFieldPosition(pos); + if (actualPos == -1 || internalRow.isNullAt(actualPos)) { + return null; + } + DataType dataType = tableSchema.fields()[pos].dataType(); + return toSparkInternalVector(dataType, internalRow.getArray(actualPos)); + } + + private static InternalVector toSparkInternalVector(DataType dataType, ArrayData arrayData) { + if (!(dataType instanceof ArrayType)) { + throw new UnsupportedOperationException("Not a vector type: " + dataType); + } + ArrayType arrayType = (ArrayType) dataType; + return new SparkInternalVector(arrayData, arrayType.elementType()); } @Override @@ -435,7 +448,7 @@ public InternalArray getArray(int pos) { @Override public InternalVector getVector(int pos) { - throw new UnsupportedOperationException("Not support VectorType yet."); + return toSparkInternalVector(elementType, arrayData.getArray(pos)); } @Override @@ -452,6 +465,13 @@ public InternalRow getRow(int pos, int numFields) { } } + /** adapt to spark internal vector. */ + public static class SparkInternalVector extends SparkInternalArray implements InternalVector { + public SparkInternalVector(ArrayData arrayData, DataType elementType) { + super(arrayData, elementType); + } + } + /** adapt to spark internal map. */ public static class SparkInternalMap implements InternalMap { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java index 84767db9abac..6dea5a5d2981 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkRow.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.BinaryVector; import org.apache.paimon.data.Blob; import org.apache.paimon.data.Decimal; import org.apache.paimon.data.InternalArray; @@ -35,6 +36,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.VectorType; import org.apache.paimon.utils.DateTimeUtils; import org.apache.paimon.utils.UriReaderFactory; @@ -48,6 +50,7 @@ import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -168,7 +171,10 @@ public InternalArray getArray(int i) { @Override public InternalVector getVector(int pos) { - throw new UnsupportedOperationException("Not support VectorType yet."); + if (row.isNullAt(pos)) { + return null; + } + return toPaimonVector((VectorType) type.getTypeAt(pos), row.get(pos)); } @Override @@ -426,4 +432,92 @@ public double[] toDoubleArray() { return res; } } + + private static InternalVector toPaimonVector(VectorType vectorType, Object vector) { + if (vector == null) { + return null; + } + if (vector instanceof boolean[]) { + return BinaryVector.fromPrimitiveArray((boolean[]) vector); + } else if (vector instanceof byte[]) { + return BinaryVector.fromPrimitiveArray((byte[]) vector); + } else if (vector instanceof short[]) { + return BinaryVector.fromPrimitiveArray((short[]) vector); + } else if (vector instanceof int[]) { + return BinaryVector.fromPrimitiveArray((int[]) vector); + } else if (vector instanceof long[]) { + return BinaryVector.fromPrimitiveArray((long[]) vector); + } else if (vector instanceof float[]) { + return BinaryVector.fromPrimitiveArray((float[]) vector); + } else if (vector instanceof double[]) { + return BinaryVector.fromPrimitiveArray((double[]) vector); + } + if (vector instanceof scala.collection.Seq) { + vector = JavaConverters.seqAsJavaList((scala.collection.Seq) vector); + } else if (vector.getClass().isArray()) { + vector = Arrays.asList((Object[]) vector); + } + if (!(vector instanceof List)) { + throw new UnsupportedOperationException( + "Unsupported vector object: " + vector.getClass().getName()); + } + return toPaimonVector(vectorType, (List) vector); + } + + private static InternalVector toPaimonVector(VectorType vectorType, List list) { + int expectedLength = vectorType.getLength(); + if (list.size() != expectedLength) { + throw new IllegalArgumentException( + String.format( + "Vector length mismatch. Expected %d but was %d.", + expectedLength, list.size())); + } + switch (vectorType.getElementType().getTypeRoot()) { + case BOOLEAN: + boolean[] booleanValues = new boolean[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + booleanValues[i] = (Boolean) list.get(i); + } + return BinaryVector.fromPrimitiveArray(booleanValues); + case TINYINT: + byte[] byteValues = new byte[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + byteValues[i] = ((Number) list.get(i)).byteValue(); + } + return BinaryVector.fromPrimitiveArray(byteValues); + case SMALLINT: + short[] shortValues = new short[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + shortValues[i] = ((Number) list.get(i)).shortValue(); + } + return BinaryVector.fromPrimitiveArray(shortValues); + case INTEGER: + int[] intValues = new int[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + intValues[i] = ((Number) list.get(i)).intValue(); + } + return BinaryVector.fromPrimitiveArray(intValues); + case BIGINT: + long[] longValues = new long[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + longValues[i] = ((Number) list.get(i)).longValue(); + } + return BinaryVector.fromPrimitiveArray(longValues); + case FLOAT: + float[] floatValues = new float[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + floatValues[i] = ((Number) list.get(i)).floatValue(); + } + return BinaryVector.fromPrimitiveArray(floatValues); + case DOUBLE: + double[] doubleValues = new double[expectedLength]; + for (int i = 0; i < expectedLength; i++) { + doubleValues[i] = ((Number) list.get(i)).doubleValue(); + } + return BinaryVector.fromPrimitiveArray(doubleValues); + default: + throw new UnsupportedOperationException( + "Unsupported element type for vector " + vectorType.getElementType()); + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java index dc2f8b30acab..9846b431b46d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java @@ -44,6 +44,7 @@ import org.apache.paimon.types.VarBinaryType; import org.apache.paimon.types.VarCharType; import org.apache.paimon.types.VariantType; +import org.apache.paimon.types.VectorType; import org.apache.spark.sql.paimon.shims.SparkShimLoader; import org.apache.spark.sql.types.DataType; @@ -127,8 +128,16 @@ private static org.apache.paimon.types.DataType prunePaimonType( } else if (sparkDataType instanceof org.apache.spark.sql.types.ArrayType) { org.apache.spark.sql.types.ArrayType s = (org.apache.spark.sql.types.ArrayType) sparkDataType; - ArrayType r = (ArrayType) paimonDataType; - return r.newElementType(prunePaimonType(s.elementType(), r.getElementType())); + if (paimonDataType instanceof VectorType) { + VectorType v = (VectorType) paimonDataType; + return new VectorType( + v.isNullable(), + v.getLength(), + prunePaimonType(s.elementType(), v.getElementType())); + } else { + ArrayType r = (ArrayType) paimonDataType; + return r.newElementType(prunePaimonType(s.elementType(), r.getElementType())); + } } else { return paimonDataType; } @@ -242,6 +251,11 @@ public DataType visit(ArrayType arrayType) { return DataTypes.createArrayType(elementType.accept(this), elementType.isNullable()); } + @Override + public DataType visit(VectorType vectorType) { + return DataTypes.createArrayType(vectorType.getElementType().accept(this), vectorType.isNullable()); + } + @Override public DataType visit(MultisetType multisetType) { return DataTypes.createMapType( diff --git a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java new file mode 100644 index 000000000000..496a0f159f65 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkMultimodalITCase.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark; + +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for Paimon Multimodality type support on Spark. */ +public class SparkMultimodalITCase { + + private static TestHiveMetastore testHiveMetastore; + private static final int PORT = 9092; + + @BeforeAll + public static void startMetastore() { + testHiveMetastore = new TestHiveMetastore(); + testHiveMetastore.start(PORT); + } + + @AfterAll + public static void closeMetastore() throws Exception { + testHiveMetastore.stop(); + } + + private SparkSession.Builder createSparkSessionBuilder(Path warehousePath) { + return SparkSession.builder() + .config("spark.sql.warehouse.dir", warehousePath.toString()) + // with hive metastore + .config("spark.sql.catalogImplementation", "hive") + .config("hive.metastore.uris", "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog", SparkCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.metastore", "hive") + .config( + "spark.sql.catalog.spark_catalog.hive.metastore.uris", + "thrift://localhost:" + PORT) + .config("spark.sql.catalog.spark_catalog.format-table.enabled", "true") + .config("spark.sql.catalog.spark_catalog.warehouse", warehousePath.toString()) + .config( + "spark.sql.extensions", + "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions") + .master("local[2]"); + } + + @Test + public void testVector(@TempDir java.nio.file.Path tempDir) throws IOException { + Path warehousePath = new Path("file:" + tempDir.toString()); + SparkSession.Builder builder = createSparkSessionBuilder(warehousePath); + SparkSession spark = builder.getOrCreate(); + spark.sql("CREATE DATABASE IF NOT EXISTS my_db1"); + spark.sql("USE spark_catalog.my_db1"); + + /** Create table */ + spark.sql( + "\n" + + "CREATE TABLE my_db1.vector_test (gid BIGINT, sid STRING, embs ARRAY, date STRING COMMENT 'date')\n" + + "USING paimon\n" + + "TBLPROPERTIES (\n" + + " 'vector.file.format'='lance',\n" + + " 'vector-field'='embs',\n" + + " 'field.embs.vector-dim'='4',\n" + + " 'row-tracking.enabled'='true',\n" + + " 'data-evolution.enabled'='true',\n" + + " 'global-index.enabled' = 'true'\n" + + ");"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "insert overwrite table my_db1.vector_test\n" + + "VALUES (1, '1', array(cast(1.0 as float), cast(2.0 as float), cast(3.0 as float), cast(4.0 as float)), '20260420'),\n" + + "(2, '2', array(cast(2.0 as float), cast(3.0 as float), cast(4.0 as float), cast(5.0 as float)), '20260420'),\n" + + "(3, '3', array(cast(3.0 as float), cast(4.0 as float), cast(5.0 as float), cast(6.0 as float)), '20260420'),\n" + + "(4, '4', array(cast(4.0 as float), cast(5.0 as float), cast(6.0 as float), cast(7.0 as float)), '20260420'),\n" + + "(5, '5', array(cast(5.0 as float), cast(6.0 as float), cast(7.0 as float), cast(8.0 as float)), '20260420'),\n" + + "(6, '6', array(cast(6.0 as float), cast(7.0 as float), cast(8.0 as float), cast(9.0 as float)), '20260420'),\n" + + "(7, '7', array(cast(7.0 as float), cast(8.0 as float), cast(9.0 as float), cast(10.0 as float)), '20260420'),\n" + + "(8, '8', array(cast(8.0 as float), cast(9.0 as float), cast(10.0 as float), cast(11.0 as float)), '20260420');"); + spark.close(); + + spark = builder.getOrCreate(); + spark.sql( + "\n" + + "CALL paimon.sys.create_global_index(\n" + + " `table` => 'my_db1.vector_test',\n" + + " `partitions` => \"date='20260420'\",\n" + + " `index_column` => 'embs',\n" + + " `index_type` => 'lumina-vector-ann',\n" + + " `options` => 'lumina.index.dimension=4'\n" + + ");"); + spark.close(); + + spark = builder.getOrCreate(); + List rows = + spark.sql("select gid, sid, embs from my_db1.vector_test where date = '20260420';") + .collectAsList(); + assertThat(rows).hasSize(8); + rows = + spark.sql( + "select gid, sid, embs from vector_search('my_db1.vector_test', 'embs', array(1.0f, 2.0f, 3.0f, 4.0f), 5) where date = '20260420'") + .collectAsList(); + assertThat(rows).hasSize(5); + spark.close(); + } +}