test("SPARK-38101: concurrent updateMapOutput not interfering with getStatuses") {
val newConf = new SparkConf
newConf.set(RPC_MESSAGE_MAX_SIZE, 1)
newConf.set(RPC_ASK_TIMEOUT, "1") // Fail fast
newConf.set(SHUFFLE_MAPOUTPUT_MIN_SIZE_FOR_BROADCAST, 0L) // Always send broadcast variables
// needs TorrentBroadcast so need a SparkContext
withSpark(new SparkContext("local", "MapOutputTrackerSuite", newConf)) {sc =>
val hostname = "localhost"
val rpcEnv = createRpcEnv("spark", hostname, 0, new SecurityManager(sc.conf))
val masterTracker = newTrackerMaster(sc.conf)
masterTracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
new MapOutputTrackerMasterEndpoint(rpcEnv, masterTracker, sc.conf))
val workerRpcEnv = createRpcEnv("spark-slave", hostname, 0, new SecurityManager(sc.conf))
val workerTracker = new MapOutputTrackerWorker(sc.conf)
workerTracker.trackerEndpoint =
workerRpcEnv.setupEndpointRef(rpcEnv.address, MapOutputTracker.ENDPOINT_NAME)
masterTracker.registerShuffle(1, 4, 2)
val bmIdOne = BlockManagerId(s"exec-1", "host", 1000)
val bmIdTwo = BlockManagerId(s"exec-2", "host", 2000)
masterTracker.registerMapOutput(1, 0, MapStatus(bmIdOne, Array(1000L, 10000L), mapTaskId = 0))
masterTracker.registerMapOutput(1, 1, MapStatus(bmIdOne, Array(1000L, 10000L), mapTaskId = 1))
masterTracker.registerMapOutput(1, 2, MapStatus(bmIdTwo, Array(1000L, 10000L), mapTaskId = 2))
masterTracker.registerMapOutput(1, 3, MapStatus(bmIdTwo, Array(1000L, 10000L), mapTaskId = 3))
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
assert(workerTracker.getMapSizesByExecutorId(1, 0).toSeq.length === 2)
assert(workerTracker.getMapSizesByExecutorId(1, 1).toSeq.length === 2)
assert(1 == masterTracker.getNumCachedSerializedBroadcast)
masterTracker.updateMapOutput(1, 0, bmIdOne)
assert(0 == masterTracker.getNumCachedSerializedBroadcast)
// some thread keeps updating the map outputs
@volatile
var updaterShutdown = false
val updater = new Thread(new Runnable {
override def run(): Unit = {
while (!updaterShutdown) {
// map output does not change, but updateMapOutput invalidates cached broadcasts
masterTracker.updateMapOutput(1, 0, bmIdOne)
masterTracker.updateMapOutput(1, 1, bmIdOne)
masterTracker.updateMapOutput(1, 2, bmIdTwo)
masterTracker.updateMapOutput(1, 3, bmIdTwo)
}
}
})
updater.start()
1 to 100 foreach { i =>
assert(workerTracker.getMapSizesByExecutorId(1, 0).toSeq.length === 2)
assert(workerTracker.getMapSizesByExecutorId(1, 1).toSeq.length === 2)
// updating epoch invalidates map status cache in worker, so this always sends requests
workerTracker.updateEpoch(masterTracker.getEpoch + i)
}
// shutdown updater thread
updaterShutdown = true
updater.join(1000)
assert(!updater.isAlive)
}
}
Executors may fail when fetching map statuses while map status are modified:
The issue occurs when
getStatuses(to get to know where to read shuffle data from)updateMapOutputoccurred, which invalidates the cached broadcast variable and deletes the broadcast variable valueThere is no problem with the read and write lock in MapOutputTracker. This issue is outside the scope of these locks.
This can be reproduced:
full exception
full unit test
This occurs in multiple situations:
Ways to mitigate:
spark.shuffle.mapOutput.minSizeForBroadcast, which reduces chances of broadcast variables (not ideal)This issue has been known since 3.1.2. This exists in latest releases and master branch.
https://issues.apache.org/jira/browse/SPARK-38101