Skip to content

Commit 6d6dfb4

Browse files
committed
build: Add spark-4.1 profile and shims
1 parent f04a85e commit 6d6dfb4

File tree

32 files changed

+190
-25
lines changed

32 files changed

+190
-25
lines changed

common/src/main/spark-4.0/org/apache/comet/shims/CometTypeShim.scala renamed to common/src/main/spark-4.x/org/apache/comet/shims/CometTypeShim.scala

File renamed without changes.

common/src/main/spark-4.0/org/apache/comet/shims/ShimBatchReader.scala renamed to common/src/main/spark-4.x/org/apache/comet/shims/ShimBatchReader.scala

File renamed without changes.

common/src/main/spark-4.0/org/apache/comet/shims/ShimCometConf.scala renamed to common/src/main/spark-4.x/org/apache/comet/shims/ShimCometConf.scala

File renamed without changes.

common/src/main/spark-4.0/org/apache/comet/shims/ShimFileFormat.scala renamed to common/src/main/spark-4.x/org/apache/comet/shims/ShimFileFormat.scala

File renamed without changes.

common/src/main/spark-4.0/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala renamed to common/src/main/spark-4.x/org/apache/spark/sql/comet/shims/ShimTaskMetrics.scala

File renamed without changes.

pom.xml

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -642,8 +642,8 @@ under the License.
642642
<parquet.version>1.15.2</parquet.version>
643643
<semanticdb.version>4.13.6</semanticdb.version>
644644
<slf4j.version>2.0.16</slf4j.version>
645-
<shims.majorVerSrc>spark-4.0</shims.majorVerSrc>
646-
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
645+
<shims.majorVerSrc>spark-4.x</shims.majorVerSrc>
646+
<shims.minorVerSrc>spark-4.0</shims.minorVerSrc>
647647
<!-- Use jdk17 by default -->
648648
<java.version>17</java.version>
649649
<maven.compiler.source>${java.version}</maven.compiler.source>
@@ -663,19 +663,13 @@ under the License.
663663
<parquet.version>1.16.0</parquet.version>
664664
<semanticdb.version>4.13.9</semanticdb.version>
665665
<slf4j.version>2.0.17</slf4j.version>
666-
<shims.majorVerSrc>spark-4.1</shims.majorVerSrc>
667-
<shims.minorVerSrc>not-needed-yet</shims.minorVerSrc>
666+
<shims.majorVerSrc>spark-4.x</shims.majorVerSrc>
667+
<shims.minorVerSrc>spark-4.1</shims.minorVerSrc>
668668
<!-- Use jdk17 by default -->
669669
<java.version>17</java.version>
670670
<maven.compiler.source>${java.version}</maven.compiler.source>
671671
<maven.compiler.target>${java.version}</maven.compiler.target>
672672
</properties>
673-
<repositories>
674-
<repository>
675-
<id>apache-staging</id>
676-
<url>https://repository.apache.org/content/repositories/orgapachespark-1506/</url>
677-
</repository>
678-
</repositories>
679673
</profile>
680674

681675
<profile>

spark/pom.xml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,19 @@ under the License.
267267
<version>1.10.0</version>
268268
<scope>test</scope>
269269
</dependency>
270+
<!-- Jetty 11.x for Spark 4.1 (jakarta.servlet) -->
271+
<dependency>
272+
<groupId>org.eclipse.jetty</groupId>
273+
<artifactId>jetty-server</artifactId>
274+
<version>11.0.24</version>
275+
<scope>test</scope>
276+
</dependency>
277+
<dependency>
278+
<groupId>org.eclipse.jetty</groupId>
279+
<artifactId>jetty-servlet</artifactId>
280+
<version>11.0.24</version>
281+
<scope>test</scope>
282+
</dependency>
270283
</dependencies>
271284
</profile>
272285
</profiles>

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometBypassMergeSortShuffleWriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.apache.spark.memory.TaskMemoryManager;
4242
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
4343
import org.apache.spark.scheduler.MapStatus;
44-
import org.apache.spark.scheduler.MapStatus$;
4544
import org.apache.spark.serializer.SerializerInstance;
4645
import org.apache.spark.shuffle.ShuffleWriteMetricsReporter;
4746
import org.apache.spark.shuffle.ShuffleWriter;
@@ -172,7 +171,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
172171
.commitAllPartitions(ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE)
173172
.getPartitionLengths();
174173
mapStatus =
175-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
174+
new MapStatusBuilder(blockManager.shuffleServerId(), partitionLengths, mapId).build();
176175
return;
177176
}
178177
final long openStartTime = System.nanoTime();
@@ -262,7 +261,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
262261
// TODO: We probably can move checksum generation here when concatenating partition files
263262
partitionLengths = writePartitionedData(mapOutputWriter);
264263
mapStatus =
265-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
264+
new MapStatusBuilder(blockManager.shuffleServerId(), partitionLengths, mapId).build();
266265
} catch (Exception e) {
267266
try {
268267
mapOutputWriter.abort(e);

spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/CometUnsafeShuffleWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.spark.network.shuffle.checksum.ShuffleChecksumHelper;
5151
import org.apache.spark.network.util.LimitedInputStream;
5252
import org.apache.spark.scheduler.MapStatus;
53-
import org.apache.spark.scheduler.MapStatus$;
5453
import org.apache.spark.serializer.SerializationStream;
5554
import org.apache.spark.serializer.SerializerInstance;
5655
import org.apache.spark.shuffle.BaseShuffleHandle;
@@ -289,7 +288,7 @@ void closeAndWriteOutput() throws IOException {
289288
}
290289
}
291290
mapStatus =
292-
MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths, mapId, 0);
291+
new MapStatusBuilder(blockManager.shuffleServerId(), partitionLengths, mapId).build();
293292
}
294293

295294
@VisibleForTesting

spark/src/main/scala/org/apache/comet/serde/aggregates.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ package org.apache.comet.serde
2222
import scala.jdk.CollectionConverters._
2323

2424
import org.apache.spark.sql.catalyst.expressions.{Attribute, EvalMode}
25-
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, Corr, Count, CovPopulation, CovSample, Covariance, First, Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
25+
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average, BitAndAgg, BitOrAgg, BitXorAgg, BloomFilterAggregate, CentralMomentAgg, Corr, Count, Covariance, CovPopulation, CovSample, First, Last, Max, Min, StddevPop, StddevSamp, Sum, VariancePop, VarianceSamp}
2626
import org.apache.spark.sql.internal.SQLConf
2727
import org.apache.spark.sql.types.{ByteType, DataTypes, DecimalType, IntegerType, LongType, ShortType, StringType}
2828

2929
import org.apache.comet.CometConf
3030
import org.apache.comet.CometConf.COMET_EXEC_STRICT_FLOATING_POINT
3131
import org.apache.comet.CometSparkSessionExtensions.withInfo
32-
import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType}
33-
import org.apache.comet.shims.CometExprShim
32+
import org.apache.comet.serde.QueryPlanSerde.{evalModeToProto, exprToProto, serializeDataType}
33+
import org.apache.comet.shims.{CometEvalModeUtil, CometSumShim}
3434

3535
object CometMin extends CometAggregateExpressionSerde[Min] {
3636

@@ -211,7 +211,7 @@ object CometAverage extends CometAggregateExpressionSerde[Average] {
211211
}
212212
}
213213

214-
object CometSum extends CometAggregateExpressionSerde[Sum] with CometExprShim {
214+
object CometSum extends CometAggregateExpressionSerde[Sum] with CometSumShim {
215215

216216
override def getSupportLevel(sum: Sum): SupportLevel = {
217217
sparkEvalMode(sum) match {
@@ -243,7 +243,8 @@ object CometSum extends CometAggregateExpressionSerde[Sum] with CometExprShim {
243243
val builder = ExprOuterClass.Sum.newBuilder()
244244
builder.setChild(childExpr.get)
245245
builder.setDatatype(dataType.get)
246-
builder.setFailOnError(sparkEvalMode(sum) == EvalMode.ANSI)
246+
builder.setEvalMode(
247+
evalModeToProto(CometEvalModeUtil.fromSparkEvalMode(sparkEvalMode(sum))))
247248

248249
Some(
249250
ExprOuterClass.AggExpr

0 commit comments

Comments
 (0)