Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,26 @@ class StagingInMemoryTableCatalog extends InMemoryTableCatalog with StagingTable
new TestStagedCreateTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
tableInfo.constraints()))
}

override def stageReplace(ident: Identifier, tableInfo: TableInfo): StagedTable = {
validateStagedTable(tableInfo.partitions, tableInfo.properties)
new TestStagedReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties()))
tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
tableInfo.constraints()))
}

override def stageCreateOrReplace(ident: Identifier, tableInfo: TableInfo) : StagedTable = {
validateStagedTable(tableInfo.partitions, tableInfo.properties)
new TestStagedCreateOrReplaceTable(
ident,
new InMemoryTable(s"$name.${ident.quoted}",
tableInfo.schema(), tableInfo.partitions(), tableInfo.properties))
tableInfo.columns(), tableInfo.partitions(), tableInfo.properties(),
tableInfo.constraints()))
}

private def validateStagedTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ case class AtomicReplaceTableExec(
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(tableProperties.asJava)
.withConstraints(tableSpec.constraints.toArray)
.build()
catalog.stageCreateOrReplace(identifier, tableInfo)
} else if (catalog.tableExists(identifier)) {
Expand All @@ -91,6 +92,7 @@ case class AtomicReplaceTableExec(
.withColumns(columns)
.withPartitions(partitioning.toArray)
.withProperties(tableProperties.asJava)
.withConstraints(tableSpec.constraints.toArray)
.build()
catalog.stageReplace(identifier, tableInfo)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,20 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
}
}

test("Replace table with check constraint using atomic catalog") {
getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) =>
withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
val constraintStr = s"CONSTRAINT c1 CHECK (id > 0) $characteristic"
sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
val table = loadTable(atomicCatalog, "ns", "tbl")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "c1")
assert(constraint.toDDL == s"CONSTRAINT c1 CHECK (id > 0) $expectedDDL")
}
}
}

test("Alter table add check constraint") {
getConstraintCharacteristics().foreach { case (characteristic, expectedDDL) =>
withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ trait CommandSuiteBase extends SharedSparkSession {
def commandVersion: String = "V2" // The command version is added to test names
def catalog: String = "test_catalog" // The default V2 catalog for testing
def nonPartitionCatalog: String = "non_part_test_catalog" // Catalog for non-partitioned tables
def atomicCatalog: String = "atomic_test_catalog" // Catalog with StagingTableCatalog support
def rowLevelOPCatalog: String = "row_level_op_catalog"
def defaultUsing: String = "USING _" // The clause is used in creating v2 tables under testing

// V2 catalogs created and used especially for testing
override def sparkConf: SparkConf = super.sparkConf
.set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName)
.set(s"spark.sql.catalog.$nonPartitionCatalog", classOf[InMemoryTableCatalog].getName)
.set(s"spark.sql.catalog.$atomicCatalog", classOf[StagingInMemoryTableCatalog].getName)
.set(s"spark.sql.catalog.fun_$catalog", classOf[InMemoryCatalog].getName)
.set(s"spark.sql.catalog.$rowLevelOPCatalog",
classOf[InMemoryRowLevelOperationTableCatalog].getName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ class ForeignKeyConstraintSuite extends QueryTest with CommandSuiteBase with DDL
}
}

test("REPLACE table with foreign key constraint using atomic catalog") {
validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
sql(s"CREATE TABLE ${t}_ref (id bigint, data string) $defaultUsing")
val constraintStr = s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
s"REFERENCES ${t}_ref(id) $characteristic"
sql(s"REPLACE TABLE $t (id bigint, fk bigint, data string, $constraintStr) $defaultUsing")
val table = loadTable(atomicCatalog, "ns", "tbl")
assert(table.constraints.length == 1)
val constraint = table.constraints.head
assert(constraint.name() == "fk1")
assert(constraint.toDDL == s"CONSTRAINT fk1 FOREIGN KEY (fk) " +
s"REFERENCES $atomicCatalog.ns.tbl_ref (id) $expectedDDL")
}
}
}

test("Add duplicated foreign key constraint") {
withNamespaceAndTable("ns", "tbl", catalog) { t =>
sql(s"CREATE TABLE $t (id bigint, fk bigint, data string) $defaultUsing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ class PrimaryKeyConstraintSuite extends QueryTest with CommandSuiteBase with DDL
}
}

test("Replace table with primary key constraint using atomic catalog") {
validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
val constraintStr = s"CONSTRAINT pk1 PRIMARY KEY (id) $characteristic"
sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
val table = loadTable(atomicCatalog, "ns", "tbl")
assert(table.constraints.length == 1)
val constraint = table.constraints.head
assert(constraint.name() == "pk1")
assert(constraint.toDDL == s"CONSTRAINT pk1 PRIMARY KEY (id) $expectedDDL")
}
}
}

test("Add duplicated primary key constraint") {
withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ class UniqueConstraintSuite extends QueryTest with CommandSuiteBase with DDLComm
}
}

test("Replace table with unique constraint using atomic catalog") {
validConstraintCharacteristics.foreach { case (characteristic, expectedDDL) =>
withNamespaceAndTable("ns", "tbl", atomicCatalog) { t =>
val constraintStr = s"CONSTRAINT uk1 UNIQUE (id) $characteristic"
sql(s"CREATE TABLE $t (id bigint) $defaultUsing")
sql(s"REPLACE TABLE $t (id bigint, data string, $constraintStr) $defaultUsing")
val table = loadTable(atomicCatalog, "ns", "tbl")
assert(table.constraints.length == 1)
val constraint = table.constraints.head
assert(constraint.name() == "uk1")
assert(constraint.toDDL == s"CONSTRAINT uk1 UNIQUE (id) $expectedDDL")
}
}
}

test("Add duplicated unique constraint") {
withNamespaceAndTable("ns", "tbl", catalog) { t =>
sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
Expand Down