Skip to content

Commit 25fade7

Browse files
committed
[spark] Support partition statistics in SHOW TABLE EXTENDED PARTITION command
1 parent f6cb6c8 commit 25fade7

4 files changed

Lines changed: 92 additions & 5 deletions

File tree

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
package org.apache.paimon.spark
2020

2121
import org.apache.paimon.CoreOptions
22+
import org.apache.paimon.partition.PartitionStatistics
2223
import org.apache.paimon.table.{FileStoreTable, Table}
24+
import org.apache.paimon.table.source.ScanMode
2325
import org.apache.paimon.types.RowType
2426
import org.apache.paimon.utils.{InternalRowPartitionComputer, TypeUtils}
2527

@@ -136,7 +138,31 @@ trait PaimonPartitionManagement extends SupportsAtomicPartitionManagement with L
136138
}
137139

138140
override def loadPartitionMetadata(ident: InternalRow): JMap[String, String] = {
139-
Map.empty[String, String].asJava
141+
table match {
142+
case fileStoreTable: FileStoreTable =>
143+
val partitionSpec = toPaimonPartitions(Array(ident)).head
144+
val partitionEntries = fileStoreTable
145+
.newSnapshotReader()
146+
.withMode(ScanMode.ALL)
147+
.withPartitionFilter(partitionSpec)
148+
.partitionEntries()
149+
150+
if (!partitionEntries.isEmpty) {
151+
val entry = partitionEntries.get(0)
152+
Map(
153+
PartitionStatistics.FIELD_RECORD_COUNT -> entry.recordCount().toString,
154+
PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES -> entry.fileSizeInBytes().toString,
155+
PartitionStatistics.FIELD_FILE_COUNT -> entry.fileCount().toString,
156+
PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME -> entry
157+
.lastFileCreationTime()
158+
.toString
159+
).asJava
160+
} else {
161+
Map.empty[String, String].asJava
162+
}
163+
case _ =>
164+
Map.empty[String, String].asJava
165+
}
140166
}
141167

142168
override def listPartitionIdentifiers(

paimon-spark/paimon-spark3-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala renamed to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonShowTablePartitionCommand.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
package org.apache.paimon.spark.commands
2020

21+
import org.apache.paimon.partition.PartitionStatistics
2122
import org.apache.paimon.spark.catalyst.Compatibility
2223
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
2324

2425
import org.apache.spark.sql.{Row, SparkSession}
2526
import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
2627
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
27-
import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, ToPrettyString}
28+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
2829
import org.apache.spark.sql.connector.catalog.{Identifier, SupportsPartitionManagement, TableCatalog}
2930
import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
3031
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
@@ -87,7 +88,20 @@ case class PaimonShowTablePartitionCommand(
8788
val partitionValues = partitions.mkString("[", ", ", "]")
8889
results.put("Partition Values", s"$partitionValues")
8990

90-
// TODO "Partition Parameters", "Created Time", "Last Access", "Partition Statistics"
91+
// Partition Parameters and Partition Statistics
92+
val metadata = partitionTable.loadPartitionMetadata(row)
93+
if (!metadata.isEmpty) {
94+
val metadataMap = metadata.asScala
95+
results.put(
96+
"Partition Parameters",
97+
s"{${metadataMap.map { case (k, v) => s"$k=$v" }.mkString(", ")}}")
98+
99+
val fileSizeInBytes =
100+
metadataMap.getOrElse(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES, "0").toLong
101+
val recordCount =
102+
metadataMap.getOrElse(PartitionStatistics.FIELD_RECORD_COUNT, "0").toLong
103+
results.put("Partition Statistics", s"$recordCount rows, $fileSizeInBytes bytes")
104+
}
91105

92106
results
93107
.map {

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DescribeTableTestBase.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,44 @@ abstract class DescribeTableTestBase extends PaimonSparkTestBase {
9898
)
9999
Assertions.assertTrue(
100100
res2.select("information").collect().head.getString(0).contains("Partition Values"))
101+
102+
val info2 = res2.select("information").collect().head.getString(0)
103+
Assertions.assertTrue(
104+
info2.contains("Partition Parameters"),
105+
s"SHOW TABLE EXTENDED should contain Partition Parameters, but got: $info2")
106+
Assertions.assertTrue(
107+
info2.contains(PartitionStatistics.FIELD_RECORD_COUNT),
108+
s"SHOW TABLE EXTENDED should contain recordCount, but got: $info2")
109+
Assertions.assertTrue(
110+
info2.contains(PartitionStatistics.FIELD_FILE_SIZE_IN_BYTES),
111+
s"SHOW TABLE EXTENDED should contain fileSizeInBytes, but got: $info2")
112+
Assertions.assertTrue(
113+
info2.contains(PartitionStatistics.FIELD_FILE_COUNT),
114+
s"SHOW TABLE EXTENDED should contain fileCount, but got: $info2")
115+
Assertions.assertTrue(
116+
info2.contains(PartitionStatistics.FIELD_LAST_FILE_CREATION_TIME),
117+
s"SHOW TABLE EXTENDED should contain lastFileCreationTime, but got: $info2")
118+
// Verify recordCount value: partition (pt1='2024', pt2='11') has 1 row
119+
// Spark 3 uses "key=value" format, Spark 4 uses "key: value" format
120+
Assertions.assertTrue(
121+
info2.contains("Partition Statistics"),
122+
s"SHOW TABLE EXTENDED should contain Partition Statistics, but got: $info2")
123+
Assertions.assertTrue(
124+
info2.contains("recordCount=1") || info2.contains("recordCount: 1"),
125+
s"Partition (pt1='2024', pt2='11') should have recordCount=1, but got: $info2")
126+
Assertions.assertTrue(
127+
info2.contains("1 rows"),
128+
s"Partition Statistics should contain '1 rows', but got: $info2")
129+
130+
val res3 =
131+
spark.sql(s"SHOW TABLE EXTENDED IN $testDB LIKE 's2' PARTITION(pt = '2024')")
132+
val info3 = res3.select("information").collect().head.getString(0)
133+
Assertions.assertTrue(
134+
info3.contains("recordCount=2") || info3.contains("recordCount: 2"),
135+
s"Partition pt='2024' should have recordCount=2, but got: $info3")
136+
Assertions.assertTrue(
137+
info3.contains("2 rows"),
138+
s"Partition Statistics should contain '2 rows', but got: $info3")
101139
}
102140
}
103141
}

paimon-spark/paimon-spark4-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/Spark4ResolutionRules.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,19 @@
1818

1919
package org.apache.paimon.spark.catalyst.analysis
2020

21+
import org.apache.paimon.spark.commands.PaimonShowTablePartitionCommand
22+
2123
import org.apache.spark.sql.SparkSession
22-
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, UnresolvedPartitionSpec}
25+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ShowTablePartition}
2326
import org.apache.spark.sql.catalyst.rules.Rule
27+
import org.apache.spark.sql.connector.catalog.PaimonCatalogImplicits._
2428

2529
case class Spark4ResolutionRules(session: SparkSession) extends Rule[LogicalPlan] {
26-
override def apply(plan: LogicalPlan): LogicalPlan = plan
30+
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
31+
case s @ ShowTablePartition(rt: ResolvedTable, _, _) =>
32+
val resolvedSpec =
33+
PaimonResolvePartitionSpec.resolve(rt.catalog, rt.identifier, s.partitionSpec)
34+
PaimonShowTablePartitionCommand(s.output, rt.catalog, rt.identifier, resolvedSpec)
35+
}
2736
}

0 commit comments

Comments
 (0)