Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
7efeab0
Initial Spark 4 skeleton
FabianMeiswinkel Jan 15, 2026
34744fc
Small fixes
FabianMeiswinkel Jan 15, 2026
030f42c
Update cosmos-emulator-matrix.json
FabianMeiswinkel Jan 15, 2026
2e2fc17
Reacted to code review comments
FabianMeiswinkel Jan 15, 2026
3dfd452
Update spark.yml
FabianMeiswinkel Jan 15, 2026
080aee2
Fixing build issues
FabianMeiswinkel Jan 15, 2026
2e2cd4d
Fixing build issues
FabianMeiswinkel Jan 15, 2026
000c114
Fixing dependencies
FabianMeiswinkel Jan 15, 2026
350919c
Fixing dependency issues
FabianMeiswinkel Jan 15, 2026
5966610
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 15, 2026
da17e72
Update databricks-jar-install.sh
FabianMeiswinkel Jan 15, 2026
04d21a7
Merge branch 'users/fabianm/spark4' of https://github.com/FabianMeisw…
FabianMeiswinkel Jan 15, 2026
425380c
Update spark.yml
FabianMeiswinkel Jan 15, 2026
a397e61
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 15, 2026
5222ace
Update spark.databricks.yml
FabianMeiswinkel Jan 15, 2026
3caa816
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 15, 2026
61a963c
Update aggregate-reports.yml
FabianMeiswinkel Jan 15, 2026
bb68add
Merge branch 'users/fabianm/spark4' of https://github.com/FabianMeisw…
FabianMeiswinkel Jan 15, 2026
73f24d3
Create azure-cosmos-spark.properties
FabianMeiswinkel Jan 15, 2026
f43acac
Fixed handling of SAS Uris
FabianMeiswinkel Jan 15, 2026
0f68ce2
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 15, 2026
92651fc
Update spark.databricks.yml
FabianMeiswinkel Jan 15, 2026
74fd301
Fixing test failures
FabianMeiswinkel Jan 15, 2026
8fb735d
Fixing build issues
FabianMeiswinkel Jan 15, 2026
5d6c0c9
Fixing test failure
FabianMeiswinkel Jan 16, 2026
0c1f7d9
Update spark.databricks.yml
FabianMeiswinkel Jan 16, 2026
8d4fda6
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 16, 2026
0018f46
Debugging failing scala spark 4 unit tests when running with java 11
FabianMeiswinkel Jan 16, 2026
9c229d3
Fixing unit tests
FabianMeiswinkel Jan 19, 2026
ec65ea0
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 19, 2026
2c14d12
Fixing unit tests
FabianMeiswinkel Jan 19, 2026
3afb6f4
Fix unit test failures
FabianMeiswinkel Jan 19, 2026
daa3798
Update pom.xml
FabianMeiswinkel Jan 19, 2026
f8353d1
Update pom.xml
FabianMeiswinkel Jan 19, 2026
bdc9d51
Try to get better scalatest failures
FabianMeiswinkel Jan 20, 2026
81e162d
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 20, 2026
e6c8139
changing scalatest-maven-plugin config
FabianMeiswinkel Jan 20, 2026
bac9710
Update cosmos-emulator-matrix.json
FabianMeiswinkel Jan 20, 2026
4feef36
Update cosmos-emulator-matrix.json
FabianMeiswinkel Jan 20, 2026
6823c9e
Trying to enable -X for mvn cmd when running cosmos integration tests
FabianMeiswinkel Jan 20, 2026
8cd2213
Update cosmos-sdk-client.yml
FabianMeiswinkel Jan 20, 2026
ac94180
Update cosmos-sdk-client.yml
FabianMeiswinkel Jan 20, 2026
105764e
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 20, 2026
77aac60
Debugging tests
FabianMeiswinkel Jan 20, 2026
c4c3a30
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 20, 2026
32eb38d
Adding missing hive test dependency
FabianMeiswinkel Jan 20, 2026
b3b1e65
Adding derbytools dependency
FabianMeiswinkel Jan 20, 2026
5a36d48
Merge branch 'main' into users/fabianm/spark4
FabianMeiswinkel Jan 20, 2026
dc323b9
Fixing unit tests
FabianMeiswinkel Jan 21, 2026
3dca44c
Update cosmos-emulator-matrix.json
FabianMeiswinkel Jan 21, 2026
ad55f00
Update cosmos-emulator-matrix.json
FabianMeiswinkel Jan 21, 2026
ecce390
Update cosmos-sdk-client.yml
FabianMeiswinkel Jan 21, 2026
796c30b
Update cosmos-sdk-client.yml
FabianMeiswinkel Jan 21, 2026
afaa102
Update external_dependencies.txt
FabianMeiswinkel Jan 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"sdk/cosmos/azure-cosmos-spark_3-5/**",
"sdk/cosmos/azure-cosmos-spark_3-5_2-12/**",
"sdk/cosmos/azure-cosmos-spark_3-5_2-13/**",
"sdk/cosmos/azure-cosmos-spark_4-0_2-13/**",
"sdk/cosmos/azure-cosmos-spark-account-data-resolver-sample/**",
"sdk/cosmos/fabric-cosmos-spark-auth_3/**",
"sdk/cosmos/azure-cosmos-encryption/**",
Expand Down
1 change: 1 addition & 0 deletions eng/.docsettings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ known_content_issues:
- ['sdk/cosmos/azure-cosmos-spark_3-4_2-12/README.md', '#3113']
- ['sdk/cosmos/azure-cosmos-spark_3-5_2-12/README.md', '#3113']
- ['sdk/cosmos/azure-cosmos-spark_3-5_2-13/README.md', '#3113']
- ['sdk/cosmos/azure-cosmos-spark_4-0_2-13/README.md', '#3113']
- ['sdk/cosmos/azure-cosmos-spark-account-data-resolver-sample/README.md', '#3113']
- ['sdk/cosmos/fabric-cosmos-spark-auth_3/README.md', '#3113']
- ['sdk/cosmos/azure-cosmos-spark_3_2-12/dev/README.md', '#3113']
Expand Down
2 changes: 1 addition & 1 deletion eng/pipelines/aggregate-reports.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extends:
displayName: 'Build all libraries that support Java $(JavaBuildVersion)'
inputs:
mavenPomFile: pom.xml
options: '$(DefaultOptions) -T 2C -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dcheckstyle.skip=true -Dspotbugs.skip=true -Djacoco.skip=true -Drevapi.skip=true -Dshade.skip=true -Dspotless.skip=true -pl !com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12,!com.azure.cosmos.spark:azure-cosmos-spark_3-4_2-12,!com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12,!com.azure.cosmos.spark:azure-cosmos-spark-account-data-resolver-sample,!com.azure.cosmos.kafka:azure-cosmos-kafka-connect,!com.microsoft.azure:azure-batch'
options: '$(DefaultOptions) -T 2C -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dcodesnippet.skip=true -Dcheckstyle.skip=true -Dspotbugs.skip=true -Djacoco.skip=true -Drevapi.skip=true -Dshade.skip=true -Dspotless.skip=true -pl !com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12,!com.azure.cosmos.spark:azure-cosmos-spark_3-4_2-12,!com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12,!com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-13,!com.azure.cosmos.spark:azure-cosmos-spark_4-0_2-13,!com.azure.cosmos.spark:azure-cosmos-spark-account-data-resolver-sample,!com.azure.cosmos.kafka:azure-cosmos-kafka-connect,!com.microsoft.azure:azure-batch'
mavenOptions: '$(MemoryOptions) $(LoggingOptions)'
javaHomeOption: 'JDKVersion'
jdkVersionOption: $(JavaBuildVersion)
Expand Down
14 changes: 14 additions & 0 deletions eng/pipelines/templates/stages/cosmos-emulator-matrix.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,20 @@
"JavaTestVersion": "1.17",
"AdditionalArgs": "-DACCOUNT_HOST=https://localhost:8081/ -Dhadoop.home.dir=D:/Hadoop -DCOSMOS.AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY=true"
},
"Spark 4.0, Scala 2.13 Integration Tests targeting Cosmos Emulator - Java 17'": {
"ProfileFlag": "-Dspark-e2e_4-0_2-13",
"PROTOCOLS": "[\"Tcp\"]",
"DESIRED_CONSISTENCIES": "[\"Session\"]",
"JavaTestVersion": "1.17",
"AdditionalArgs": "-DACCOUNT_HOST=https://localhost:8081/ -Dhadoop.home.dir=D:/Hadoop -DCOSMOS.AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY=true"
},
"Spark 4.0, Scala 2.13 Integration Tests targeting Cosmos Emulator - Java 21'": {
"ProfileFlag": "-Dspark-e2e_4-0_2-13",
"PROTOCOLS": "[\"Tcp\"]",
"DESIRED_CONSISTENCIES": "[\"Session\"]",
"JavaTestVersion": "1.21",
"AdditionalArgs": "-DACCOUNT_HOST=https://localhost:8081/ -Dhadoop.home.dir=D:/Hadoop -DCOSMOS.AZURE_COSMOS_DISABLE_NON_STREAMING_ORDER_BY=true"
},
"Kafka Integration Tests targeting Cosmos Emulator - Java 11": {
"ProfileFlag": "-Pkafka-emulator",
"PROTOCOLS": "[\"Tcp\"]",
Expand Down
29 changes: 28 additions & 1 deletion eng/pipelines/templates/stages/cosmos-sdk-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,34 @@ extends:
keytool -keystore cacerts -delete -noprompt -alias CosmosDbEmulatorCert -storepass changeit
keytool -keystore cacerts -importcert -noprompt -trustcacerts -alias CosmosDbEmulatorCert -file CosmosDbEmulatorCert.cer -storepass changeit
displayName: 'Create Java TrustStore'

PostSteps:
- powershell: |
Write-Host "===== SCALATEST REPORTS (TXT) ====="
Get-ChildItem -Path "$(System.DefaultWorkingDirectory)" -Recurse -Filter "*.txt" | Where-Object { $_.FullName -like "*surefire-reports*" } | ForEach-Object {
Write-Host "File: $($_.FullName)"
Get-Content $_.FullName
}

Write-Host "===== SCALATEST REPORTS (OUT) ====="
Get-ChildItem -Path "$(System.DefaultWorkingDirectory)" -Recurse -Filter "*.out" | Where-Object { $_.FullName -like "*surefire-reports*" } | ForEach-Object {
Write-Host "File: $($_.FullName)"
Get-Content $_.FullName
}

Write-Host "===== SCALATEST REPORTS (ERR) ====="
Get-ChildItem -Path "$(System.DefaultWorkingDirectory)" -Recurse -Filter "*.err" | Where-Object { $_.FullName -like "*surefire-reports*" } | ForEach-Object {
Write-Host "File: $($_.FullName)"
Get-Content $_.FullName
}

Write-Host "===== SCALATEST XML (RAW) ====="
Get-ChildItem -Path "$(System.DefaultWorkingDirectory)" -Recurse -Filter "*.xml" | Where-Object { $_.FullName -like "*surefire-reports*" } | ForEach-Object {
Write-Host "File: $($_.FullName)"
Get-Content $_.FullName
}
displayName: "Print ScalaTest report files"
errorActionPreference: continue
condition: always()
- ${{ if eq(variables['System.TeamProject'], 'internal') }}:
- ${{ each mode in parameters.VnextEmulatorModes }}:
- stage:
Expand Down
4 changes: 2 additions & 2 deletions eng/versioning/external_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ com.fasterxml.jackson.datatype:jackson-datatype-jsr310;2.18.4
com.fasterxml.jackson.module:jackson-module-afterburner;2.18.4
com.fasterxml.jackson.module:jackson-module-blackbird;2.18.4
com.fasterxml.jackson.module:jackson-module-scala_2.12;2.18.4
com.fasterxml.jackson.module:jackson-module-scala_2.13;2.18.4
com.github.spotbugs:spotbugs;4.8.3
com.github.spotbugs:spotbugs-annotations;4.8.3
com.github.spotbugs:spotbugs-maven-plugin;4.8.3.1
Expand Down Expand Up @@ -239,15 +240,14 @@ cosmos_io.dropwizard.metrics:metrics-graphite;4.1.0
cosmos_io.dropwizard.metrics:metrics-jvm;4.1.0
cosmos_org.mpierce.metrics.reservoir:hdrhistogram-metrics-reservoir;1.1.0
cosmos_org.hdrhistogram:HdrHistogram;2.1.12
cosmos_com.fasterxml.jackson.core:jackson-databind;2.15.2
cosmos_com.fasterxml.jackson.module:jackson-module-scala_2.12;2.15.2
cosmos_com.microsoft.azure.synapse:synapseutils_2.12;1.5.4

## Cosmos Spark connector under sdk\cosmos\azure-cosmos-spark_3-<version>_2-12\pom.xml
# Cosmos Spark connector runtime dependencies - provided by Spark runtime/host
cosmos-spark_3-3_org.apache.spark:spark-sql_2.12;3.3.0
cosmos-spark_3-4_org.apache.spark:spark-sql_2.12;3.4.0
cosmos-spark_3-5_org.apache.spark:spark-sql_2.12;3.5.0
cosmos-spark_4-0_org.apache.spark:spark-sql_2.13;4.0.0
cosmos-spark_3-3_org.apache.spark:spark-hive_2.12;3.3.0
cosmos-spark_3-4_org.apache.spark:spark-hive_2.12;3.4.0
cosmos-spark_3-5_org.apache.spark:spark-hive_2.12;3.5.0
Expand Down
3 changes: 2 additions & 1 deletion eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ com.azure:azure-cosmos-test;1.0.0-beta.16;1.0.0-beta.17
com.azure.cosmos.spark:azure-cosmos-spark_3-3_2-12;4.42.0;4.43.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-4_2-12;4.42.0;4.43.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12;4.42.0;4.43.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-13;4.42.0;4.43.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-13;4.43.0-beta.1;4.43.0-beta.1
com.azure.cosmos.spark:azure-cosmos-spark_4-0_2-13;4.43.0-beta.1;4.43.0-beta.1
com.azure.cosmos.spark:fabric-cosmos-spark-auth_3;1.1.0;1.2.0-beta.1
com.azure:azure-cosmos-tests;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-data-appconfiguration;1.9.0;1.10.0-beta.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version> <!-- {x-version-update;cosmos_com.fasterxml.jackson.core:jackson-databind;external_dependency} -->
<version>2.18.4</version> <!-- {x-version-update;com.fasterxml.jackson.core:jackson-databind;external_dependency} -->
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
<version>2.15.2</version> <!-- {x-version-update;cosmos_com.fasterxml.jackson.module:jackson-module-scala_2.12;external_dependency} -->
<version>2.18.4</version> <!-- {x-version-update;com.fasterxml.jackson.module:jackson-module-scala_2.12;external_dependency} -->
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand Down Expand Up @@ -289,8 +289,6 @@
<include>com.fasterxml.jackson.datatype:jackson-datatype-jsr310:[2.18.4]</include> <!-- {x-include-update;com.fasterxml.jackson.datatype:jackson-datatype-jsr310;external_dependency} -->
<include>com.fasterxml.jackson.core:jackson-databind:[2.18.4]</include> <!-- {x-include-update;com.fasterxml.jackson.core:jackson-databind;external_dependency} -->
<include>com.fasterxml.jackson.module:jackson-module-scala_2.12:[2.18.4]</include> <!-- {x-include-update;com.fasterxml.jackson.module:jackson-module-scala_2.12;external_dependency} -->
<include>com.fasterxml.jackson.core:jackson-databind:[2.15.2]</include> <!-- {x-include-update;cosmos_com.fasterxml.jackson.core:jackson-databind;external_dependency} -->
<include>com.fasterxml.jackson.module:jackson-module-scala_2.12:[2.15.2]</include> <!-- {x-include-update;cosmos_com.fasterxml.jackson.module:jackson-module-scala_2.12;external_dependency} -->
<include>com.globalmentor:hadoop-bare-naked-local-fs:[0.1.0]</include> <!-- {x-include-update;cosmos_com.globalmentor:hadoop-bare-naked-local-fs;external_dependency} -->
<include>com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12:[4.43.0-beta.1]</include> <!-- {x-include-update;com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12;current} -->
</includes>
Expand Down
6 changes: 6 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-3_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.1.0</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest-maven-plugin;external_dependency} -->
<configuration>
<config>stdOut=true,verbose=true,stdErr=true</config>
<noScalaTestIgnore>true</noScalaTestIgnore>
<stdout>FDEF</stdout>
<stderr>FDEF</stderr>
<forkMode>once</forkMode>
<logForkedProcessCommand>true</logForkedProcessCommand>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// scalastyle:off magic.number
// scalastyle:off multiple.string.literals

package com.azure.cosmos.spark

Expand Down
8 changes: 8 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-4_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.1.0</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest-maven-plugin;external_dependency} -->
<configuration>
<config>stdOut=true,verbose=true,stdErr=true</config>
<noScalaTestIgnore>true</noScalaTestIgnore>
<stdout>FDEF</stdout>
<stderr>FDEF</stderr>
<config>stdOut=true,verbose=true,stdErr=true</config>
<noScalaTestIgnore>true</noScalaTestIgnore>
<forkMode>once</forkMode>
<logForkedProcessCommand>true</logForkedProcessCommand>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
// scalastyle:off magic.number
// scalastyle:off multiple.string.literals

package com.azure.cosmos.spark

import com.azure.cosmos.changeFeedMetrics.{ChangeFeedMetricsListener, ChangeFeedMetricsTracker}
import com.azure.cosmos.implementation.guava25.collect.{HashBiMap, Maps}
import org.apache.spark.Success
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.scheduler.{SparkListenerTaskEnd, TaskInfo}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.mockito.ArgumentMatchers
import org.mockito.Mockito.{mock, when}

import java.lang.reflect.Field
import java.util.concurrent.ConcurrentHashMap

class ChangeFeedMetricsListenerSpec extends UnitSpec {
"ChangeFeedMetricsListener" should "be able to capture changeFeed performance metrics" in {
val taskEnd = SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "ResultTask",
reason = Success,
taskInfo = mock(classOf[TaskInfo]),
taskExecutorMetrics = mock(classOf[ExecutorMetrics]),
taskMetrics = mock(classOf[TaskMetrics])
)

val metrics = Map[String, SQLMetric](
CosmosConstants.MetricNames.ChangeFeedPartitionIndex -> new SQLMetric("index", 1),
CosmosConstants.MetricNames.ChangeFeedLsnRange -> new SQLMetric("lsn", 100),
CosmosConstants.MetricNames.ChangeFeedItemsCnt -> new SQLMetric("items", 100)
)

// create sparkInternalsBridge mock
val sparkInternalsBridge = mock(classOf[SparkInternalsBridge])
when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
ArgumentMatchers.any[Set[String]],
ArgumentMatchers.any[TaskMetrics]
)).thenReturn(metrics)

val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
partitionIndexMap.put(NormalizedRange("0", "FF"), 1)

val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)

// set the internal sparkInternalsBridgeField
val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge")
sparkInternalsBridgeField.setAccessible(true)
sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge)

// verify that metrics will be properly tracked
changeFeedMetricsListener.onTaskEnd(taskEnd)
partitionMetricsMap.size() shouldBe 1
partitionMetricsMap.containsKey(NormalizedRange("0", "FF")) shouldBe true
partitionMetricsMap.get(NormalizedRange("0", "FF")).getWeightedChangeFeedItemsPerLsn.get shouldBe 1
}

it should "ignore metrics for unknown partition index" in {
val taskEnd = SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "ResultTask",
reason = Success,
taskInfo = mock(classOf[TaskInfo]),
taskExecutorMetrics = mock(classOf[ExecutorMetrics]),
taskMetrics = mock(classOf[TaskMetrics])
)

val metrics = Map[String, SQLMetric](
CosmosConstants.MetricNames.ChangeFeedPartitionIndex -> new SQLMetric("index", 10),
CosmosConstants.MetricNames.ChangeFeedLsnRange -> new SQLMetric("lsn", 100),
CosmosConstants.MetricNames.ChangeFeedItemsCnt -> new SQLMetric("items", 100)
)

// create sparkInternalsBridge mock
val sparkInternalsBridge = mock(classOf[SparkInternalsBridge])
when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
ArgumentMatchers.any[Set[String]],
ArgumentMatchers.any[TaskMetrics]
)).thenReturn(metrics)

val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
partitionIndexMap.put(NormalizedRange("0", "FF"), 1)

val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)

// set the internal sparkInternalsBridgeField
val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge")
sparkInternalsBridgeField.setAccessible(true)
sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge)

// because partition index 10 does not exist in the partitionIndexMap, it will be ignored
changeFeedMetricsListener.onTaskEnd(taskEnd)
partitionMetricsMap shouldBe empty
}

it should "ignore unrelated metrics" in {
val taskEnd = SparkListenerTaskEnd(
stageId = 1,
stageAttemptId = 0,
taskType = "ResultTask",
reason = Success,
taskInfo = mock(classOf[TaskInfo]),
taskExecutorMetrics = mock(classOf[ExecutorMetrics]),
taskMetrics = mock(classOf[TaskMetrics])
)

val metrics = Map[String, SQLMetric](
"unknownMetrics" -> new SQLMetric("index", 10)
)

// create sparkInternalsBridge mock
val sparkInternalsBridge = mock(classOf[SparkInternalsBridge])
when(sparkInternalsBridge.getInternalCustomTaskMetricsAsSQLMetric(
ArgumentMatchers.any[Set[String]],
ArgumentMatchers.any[TaskMetrics]
)).thenReturn(metrics)

val partitionIndexMap = Maps.synchronizedBiMap(HashBiMap.create[NormalizedRange, Long]())
partitionIndexMap.put(NormalizedRange("0", "FF"), 1)

val partitionMetricsMap = new ConcurrentHashMap[NormalizedRange, ChangeFeedMetricsTracker]()
val changeFeedMetricsListener = new ChangeFeedMetricsListener(partitionIndexMap, partitionMetricsMap)

// set the internal sparkInternalsBridgeField
val sparkInternalsBridgeField: Field = classOf[ChangeFeedMetricsListener].getDeclaredField("sparkInternalsBridge")
sparkInternalsBridgeField.setAccessible(true)
sparkInternalsBridgeField.set(changeFeedMetricsListener, sparkInternalsBridge)

// because partition index 10 does not exist in the partitionIndexMap, it will be ignored
changeFeedMetricsListener.onTaskEnd(taskEnd)
partitionMetricsMap shouldBe empty
}
}
Loading
Loading