Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ class ExecutorPodsAllocator(
kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
try {
addOwnerReference(createdExecutorPod, resources)
kubernetesClient.resourceList(resources: _*).forceConflicts().serverSideApply()
resources
.filter(_.getKind == "PersistentVolumeClaim")
.foreach { resource =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.jdk.CollectionConverters._
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, anyString, eq => meq}
import org.mockito.Mockito.{never, times, verify, when}
import org.mockito.invocation.InvocationOnMock
Expand All @@ -46,6 +46,8 @@ import org.apache.spark.util.ManualClock

class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {

private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)

private val driverPodName = "driver"

private val driverPod = new PodBuilder()
Expand Down Expand Up @@ -112,6 +114,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var schedulerBackend: KubernetesClusterSchedulerBackend = _

@Mock
private var resourceList: RESOURCE_LIST = _

private var createdResourcesArgumentCaptor: ArgumentCaptor[Array[HasMetadata]] = _

private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _

private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
Expand All @@ -121,6 +128,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
val appId = "testapp"

before {
createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[Array[HasMetadata]])
MockitoAnnotations.openMocks(this).close()
when(kubernetesClient.pods()).thenReturn(podOperations)
when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
Expand Down Expand Up @@ -150,6 +158,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
when(pvcWithNamespace.resource(any())).thenReturn(pvcResource)
when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava)
when(resourceList.forceConflicts()).thenReturn(resourceList)
doReturn(resourceList)
.when(kubernetesClient)
.resourceList(createdResourcesArgumentCaptor.capture(): _*)
}

test("SPARK-49447: Prevent small values less than 100 for batch delay") {
Expand Down Expand Up @@ -775,6 +787,38 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(!podsAllocatorUnderTest.isDeleted("7"))
}

test("SPARK-55585: executor feature steps can create resources") {
val service = new ServiceBuilder()
.withNewMetadata()
.withName("servicename")
.endMetadata()
.build()

when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
// have the feature step define a kubernetes service (resource)
meq(kubernetesClient), any(classOf[ResourceProfile])))
.thenAnswer((invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
KubernetesExecutorSpec(
executorPodWithId(k8sConf.executorId.toInt, k8sConf.resourceProfileId),
Seq(service))
})

val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)

// Scale up to one executor
podsAllocatorUnderTest.setTotalExpectedExecutors(
Map(defaultProfile -> 1))
assert(podsAllocatorUnderTest.invokePrivate(numOutstandingPods).get() == 1)
verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))

// service is considered for creation
// resources should have been created
verify(kubernetesClient, times(1)).resourceList(meq(service))
verify(resourceList).serverSideApply()
}

test("SPARK-33262: pod allocator does not stall with pending pods") {
when(podsWithNamespace
.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
Expand Down