diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala index ee2400cab35c8..7ded99c709a39 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/StagingInMemoryTableCatalog.scala @@ -36,7 +36,8 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable new TestStagedCreateTable( ident, new InMemoryTable(s"$name.${ident.quoted}", - tableInfo.schema(), tableInfo.partitions(), tableInfo.properties())) + tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(), + tableInfo.constraints())) } override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable = { @@ -44,7 +45,8 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable new TestStagedReplaceTable( ident, new InMemoryTable(s"$name.${ident.quoted}", - tableInfo.schema(), tableInfo.partitions(), tableInfo.properties())) + tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(), + tableInfo.constraints())) } override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable = { @@ -52,7 +54,8 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable new TestStagedCreateOrReplaceTable( ident, new InMemoryTable(s"$name.${ident.quoted}", - tableInfo.schema(), tableInfo.partitions(), tableInfo.properties)) + tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(), + tableInfo.constraints())) } private def validateStagedTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala index 7ce95ced0d242..454a4041d36e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala @@ -83,6 +83,7 @@ case class AtomicReplaceTableExec( .withColumns(columns) .withPartitions(partitioning.toArray) .withProperties(tableProperties.asJava) + .withConstraints(tableSpec.constraints.toArray) .build() catalog.stageCreateOrReplace(identifier, tableInfo) } else if (catalog.tableExists(identifier)) { @@ -91,6 +92,7 @@ case class AtomicReplaceTableExec( .withColumns(columns) .withPartitions(partitioning.toArray) .withProperties(tableProperties.asJava) + .withConstraints(tableSpec.constraints.toArray) .build() catalog.stageReplace(identifier, tableInfo) } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala index ee2dd476958e1..8e295ecc3d5de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala @@ -205,6 +205,20 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma } } + test("Replace table with check constraint using atomic catalog") { + getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) => + withNamespaceAndTable("ns", "tbl", atomicCatalog) { t => + val constraintStr = s"CONSTRAINT c1 CHECK (id > 0) $characteristic" + sql(s"CREATE TABLE $t (id bigint) $defaultUsing") + sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing") + val table = loadTable(atomicCatalog, "ns", "tbl") + val constraint = getCheckConstraint(table) + assert(constraint.name() == "c1") + assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) $expectedDDL") + } + } + } + test("Alter table add check constraint") { getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) => withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala index 24bc4483d31c7..548e15f9e783b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CommandSuiteBase.scala @@ -35,6 +35,7 @@ trait CommandSuiteBase extends SharedSparkSession { def commandVersion: String = "V2" // The command version is added to test names def catalog: String = "test_catalog" // The default V2 catalog for testing def nonPartitionCatalog: String = "non_part_test_catalog" // Catalog for non-partitioned tables + def atomicCatalog: String = "atomic_test_catalog" // Catalog with StagingTableCatalog support def rowLevelOPCatalog: String = "row_level_op_catalog" def defaultUsing: String = "USING _" // The clause is used in creating v2 tables under testing @@ -42,6 +43,7 @@ trait CommandSuiteBase extends SharedSparkSession { override def sparkConf: SparkConf = super.sparkConf .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName) .set(s"spark.sql.catalog.$nonPartitionCatalog", classOf[InMemoryTableCatalog].getName) + .set(s"spark.sql.catalog.$atomicCatalog", classOf[StagingInMemoryTableCatalog].getName) .set(s"spark.sql.catalog.fun_$catalog", classOf[InMemoryCatalog].getName) .set(s"spark.sql.catalog.$rowLevelOPCatalog", classOf[InMemoryRowLevelOperationTableCatalog].getName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala index a876013490ea7..b1eb85b450639 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ForeignKeyConstraintSuite.scala @@ -85,6 +85,24 @@ class ForeignKeyConstraintSuite extends QueryTest with CommandSuiteBase with DDL } } + test("REPLACE table with foreign key constraint using atomic catalog") { + validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) => + withNamespaceAndTable("ns", "tbl", atomicCatalog) { t => + sql(s"CREATE TABLE $t (id bigint) $defaultUsing") + sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing") + val constraintStr = s"CONSTRAINT fk1 FOREIGN KEY (fk) " + + s"REFERENCES ${t}_ref(id) $characteristic" + sql(s"REPLACE TABLE $t (id bigint, fk bigint, data string, $constraintStr) $defaultUsing") + val table = loadTable(atomicCatalog, "ns", "tbl") + assert(table.constraints.length == 1) + val constraint = table.constraints.head + assert(constraint.name() == "fk1") + assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " + + s"REFERENCES $atomicCatalog.ns.tbl_ref (id) $expectedDDL") + } + } + } + test("Add duplicated foreign key constraint") { withNamespaceAndTable("ns", "tbl", catalog) { t => sql(s"CREATE TABLE $t (id bigint, fk bigint, data string) $defaultUsing") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala index f692f9588161e..f7d551d4e34d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/PrimaryKeyConstraintSuite.scala @@ -76,6 +76,21 @@ class PrimaryKeyConstraintSuite extends QueryTest with CommandSuiteBase with DDL } } + test("Replace table with primary key constraint using atomic catalog") { + validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) => + withNamespaceAndTable("ns", "tbl", atomicCatalog) { t => + val constraintStr = s"CONSTRAINT pk1 PRIMARY KEY (id) $characteristic" + sql(s"CREATE TABLE $t (id bigint) $defaultUsing") + sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing") + val table = loadTable(atomicCatalog, "ns", "tbl") + assert(table.constraints.length == 1) + val constraint = table.constraints.head + assert(constraint.name() == "pk1") + assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id) $expectedDDL") + } + } + } + test("Add duplicated primary key constraint") { withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala index 6efc3912af9d2..96a9945b10f9f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/UniqueConstraintSuite.scala @@ -76,6 +76,21 @@ class UniqueConstraintSuite extends QueryTest with CommandSuiteBase with DDLComm } } + test("Replace table with unique constraint using atomic catalog") { + validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) => + withNamespaceAndTable("ns", "tbl", atomicCatalog) { t => + val constraintStr = s"CONSTRAINT uk1 UNIQUE (id) $characteristic" + sql(s"CREATE TABLE $t (id bigint) $defaultUsing") + sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing") + val table = loadTable(atomicCatalog, "ns", "tbl") + assert(table.constraints.length == 1) + val constraint = table.constraints.head + assert(constraint.name() == "uk1") + assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL") + } + } + } + test("Add duplicated unique constraint") { withNamespaceAndTable("ns", "tbl", catalog) { t => sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")