-
Notifications
You must be signed in to change notification settings - Fork 594
HDDS-10306. Speed up TestSnapshotBackgroundServices #9721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9a64c82
66c82f4
fe69dfc
2b49fd0
faccac1
593b07d
97607e9
7176929
49137b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -36,6 +36,7 @@ | |||||
| import java.io.IOException; | ||||||
| import java.util.ArrayList; | ||||||
| import java.util.Collections; | ||||||
| import java.util.Iterator; | ||||||
| import java.util.List; | ||||||
| import java.util.Objects; | ||||||
| import java.util.concurrent.TimeUnit; | ||||||
|
|
@@ -62,7 +63,6 @@ | |||||
| import org.apache.hadoop.ozone.conf.OMClientConfig; | ||||||
| import org.apache.hadoop.ozone.om.OMConfigKeys; | ||||||
| import org.apache.hadoop.ozone.om.OmSnapshot; | ||||||
| import org.apache.hadoop.ozone.om.OmTestUtil; | ||||||
| import org.apache.hadoop.ozone.om.OzoneManager; | ||||||
| import org.apache.hadoop.ozone.om.SstFilteringService; | ||||||
| import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; | ||||||
|
|
@@ -80,15 +80,21 @@ | |||||
| import org.apache.ozone.test.tag.Flaky; | ||||||
| import org.apache.ratis.server.protocol.TermIndex; | ||||||
| import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; | ||||||
| import org.junit.jupiter.api.AfterEach; | ||||||
| import org.junit.jupiter.api.AfterAll; | ||||||
| import org.junit.jupiter.api.BeforeAll; | ||||||
| import org.junit.jupiter.api.BeforeEach; | ||||||
| import org.junit.jupiter.api.DisplayName; | ||||||
| import org.junit.jupiter.api.MethodOrderer; | ||||||
| import org.junit.jupiter.api.Order; | ||||||
| import org.junit.jupiter.api.Test; | ||||||
| import org.junit.jupiter.api.TestInfo; | ||||||
| import org.junit.jupiter.api.TestInstance; | ||||||
| import org.junit.jupiter.api.TestMethodOrder; | ||||||
|
|
||||||
| /** | ||||||
| * Tests snapshot background services. | ||||||
| */ | ||||||
| @TestMethodOrder(MethodOrderer.OrderAnnotation.class) | ||||||
| @TestInstance(TestInstance.Lifecycle.PER_CLASS) | ||||||
| public class TestSnapshotBackgroundServices { | ||||||
| private MiniOzoneHAClusterImpl cluster; | ||||||
| private ObjectStore objectStore; | ||||||
|
|
@@ -105,8 +111,8 @@ public class TestSnapshotBackgroundServices { | |||||
| private OzoneClient client; | ||||||
| private final AtomicInteger counter = new AtomicInteger(); | ||||||
|
|
||||||
| @BeforeEach | ||||||
| public void init(TestInfo testInfo) throws Exception { | ||||||
| @BeforeAll | ||||||
| public void init() throws Exception { | ||||||
| OzoneConfiguration conf = new OzoneConfiguration(); | ||||||
| String omServiceId = "om-service-test1"; | ||||||
| OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class); | ||||||
|
|
@@ -120,66 +126,48 @@ public void init(TestInfo testInfo) throws Exception { | |||||
| conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); | ||||||
| conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB); | ||||||
| conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB); | ||||||
| if ("testSSTFilteringBackgroundService".equals(testInfo.getDisplayName())) { | ||||||
| conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, | ||||||
| TimeUnit.SECONDS); | ||||||
| } | ||||||
| if ("testCompactionLogBackgroundService" | ||||||
| .equals(testInfo.getDisplayName())) { | ||||||
| conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, | ||||||
| TimeUnit.MILLISECONDS); | ||||||
| } | ||||||
| if ("testBackupCompactionFilesPruningBackgroundService" | ||||||
| .equals(testInfo.getDisplayName())) { | ||||||
| conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, | ||||||
| TimeUnit.MILLISECONDS); | ||||||
| conf.setTimeDuration( | ||||||
| OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 1, | ||||||
| TimeUnit.SECONDS); | ||||||
| } | ||||||
| if ("testSnapshotAndKeyDeletionBackgroundServices" | ||||||
| .equals(testInfo.getDisplayName())) { | ||||||
| conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, | ||||||
| TimeUnit.SECONDS); | ||||||
| conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, | ||||||
| TimeUnit.SECONDS); | ||||||
| conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, | ||||||
| TimeUnit.MILLISECONDS); | ||||||
| conf.setTimeDuration( | ||||||
| OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3, | ||||||
| TimeUnit.SECONDS); | ||||||
| conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 3, | ||||||
| TimeUnit.SECONDS); | ||||||
| } | ||||||
|
|
||||||
| // Used by: testSSTFilteringBackgroundService | ||||||
| conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); | ||||||
|
|
||||||
| // Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService, | ||||||
| // testSnapshotAndKeyDeletionBackgroundServices | ||||||
| conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, TimeUnit.MILLISECONDS); | ||||||
|
|
||||||
| // Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService | ||||||
| conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3, TimeUnit.SECONDS); | ||||||
|
|
||||||
| // Used by: testSnapshotAndKeyDeletionBackgroundServices | ||||||
| conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); | ||||||
| // Used by: testSnapshotAndKeyDeletionBackgroundServices | ||||||
| conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); | ||||||
|
|
||||||
| conf.setLong( | ||||||
| OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, | ||||||
| SNAPSHOT_THRESHOLD); | ||||||
| int numOfOMs = 3; | ||||||
| cluster = MiniOzoneCluster.newHABuilder(conf) | ||||||
| .setOMServiceId("om-service-test1") | ||||||
| .setOMServiceId(omServiceId) | ||||||
| .setNumOfOzoneManagers(numOfOMs) | ||||||
| .setNumOfActiveOMs(2) | ||||||
| .setNumOfActiveOMs(numOfOMs) | ||||||
| .build(); | ||||||
| if ("testBackupCompactionFilesPruningBackgroundService" | ||||||
| .equals(testInfo.getDisplayName())) { | ||||||
| cluster. | ||||||
| getOzoneManagersList() | ||||||
| .forEach( | ||||||
| TestSnapshotBackgroundServices | ||||||
| ::suspendBackupCompactionFilesPruning); | ||||||
| } | ||||||
|
|
||||||
| cluster.waitForClusterToBeReady(); | ||||||
| client = OzoneClientFactory.getRpcClient(omServiceId, conf); | ||||||
| objectStore = client.getObjectStore(); | ||||||
| } | ||||||
|
|
||||||
| @BeforeEach | ||||||
| public void setupTest() throws IOException, InterruptedException, TimeoutException { | ||||||
| recoverCluster(); | ||||||
| stopFollowerOM(cluster.getOMLeader()); | ||||||
|
|
||||||
| volumeName = "volume" + counter.incrementAndGet(); | ||||||
| bucketName = "bucket" + counter.incrementAndGet(); | ||||||
|
|
||||||
| VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() | ||||||
| .setOwner("user" + counter.incrementAndGet()) | ||||||
| .setAdmin("admin" + counter.incrementAndGet()) | ||||||
| .build(); | ||||||
|
|
||||||
| objectStore.createVolume(volumeName, createVolumeArgs); | ||||||
| OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); | ||||||
|
|
||||||
|
|
@@ -188,7 +176,32 @@ public void init(TestInfo testInfo) throws Exception { | |||||
| ozoneBucket = retVolumeinfo.getBucket(bucketName); | ||||||
| } | ||||||
|
|
||||||
| @AfterEach | ||||||
| private void recoverCluster() throws InterruptedException, TimeoutException, IOException { | ||||||
| for (OzoneManager ozoneManager : cluster.getOzoneManagersList()) { | ||||||
| if (!ozoneManager.isRunning()) { | ||||||
| cluster.restartOzoneManager(ozoneManager, false); | ||||||
| } | ||||||
|
|
||||||
| if (!ozoneManager.getMetadataManager().getStore().getRocksDBCheckpointDiffer().shouldRun()) { | ||||||
| resumeBackupCompactionFilesPruning(ozoneManager); | ||||||
| } | ||||||
| } | ||||||
| cluster.waitForClusterToBeReady(); | ||||||
| cluster.waitForLeaderOM(); | ||||||
| } | ||||||
|
|
||||||
| private void stopFollowerOM(OzoneManager leaderOM) throws TimeoutException, InterruptedException { | ||||||
| for (OzoneManager om : cluster.getOzoneManagersList()) { | ||||||
| if (om != leaderOM && om.isRunning()) { | ||||||
| String omNodeId = om.getOMNodeId(); | ||||||
| cluster.stopOzoneManager(omNodeId); | ||||||
| GenericTestUtils.waitFor(() -> !om.isRunning() && !om.isOmRpcServerRunning(), 100, 15000); | ||||||
| break; | ||||||
| } | ||||||
| } | ||||||
| } | ||||||
|
|
||||||
| @AfterAll | ||||||
| public void shutdown() { | ||||||
| IOUtils.closeQuietly(client); | ||||||
| if (cluster != null) { | ||||||
|
|
@@ -202,7 +215,7 @@ public void shutdown() { | |||||
| public void testSnapshotAndKeyDeletionBackgroundServices() | ||||||
| throws Exception { | ||||||
| OzoneManager leaderOM = getLeaderOM(); | ||||||
| OzoneManager followerOM = getInactiveFollowerOM(leaderOM); | ||||||
| OzoneManager followerOM = getInactiveFollowerOM(); | ||||||
|
|
||||||
| createSnapshotsEachWithNewKeys(leaderOM); | ||||||
|
|
||||||
|
|
@@ -332,7 +345,7 @@ private void startInactiveFollower(OzoneManager leaderOM, | |||||
| long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex(); | ||||||
|
|
||||||
| // Start the inactive OM. Checkpoint installation will happen spontaneously. | ||||||
| cluster.startInactiveOM(followerOM.getOMNodeId()); | ||||||
| cluster.restartOzoneManager(followerOM, true); | ||||||
| actionAfterStarting.run(); | ||||||
|
|
||||||
| // The recently started OM should be lagging behind the leader OM. | ||||||
|
|
@@ -357,26 +370,41 @@ private void createSnapshotsEachWithNewKeys(OzoneManager ozoneManager) | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| private OzoneManager getInactiveFollowerOM(OzoneManager leaderOM) { | ||||||
| String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); | ||||||
| if (cluster.isOMActive(followerNodeId)) { | ||||||
| followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); | ||||||
| } | ||||||
| return cluster.getOzoneManager(followerNodeId); | ||||||
| private OzoneManager getInactiveFollowerOM() | ||||||
| throws TimeoutException, InterruptedException { | ||||||
| // Wait for an inactive OM to be available | ||||||
| AtomicReference<OzoneManager> inactiveOM = new AtomicReference<>(); | ||||||
| GenericTestUtils.waitFor(() -> { | ||||||
| Iterator<OzoneManager> iterator = cluster.getInactiveOM(); | ||||||
| if (iterator.hasNext()) { | ||||||
| inactiveOM.set(iterator.next()); | ||||||
| return true; | ||||||
hevinhsu marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
| } | ||||||
| return false; | ||||||
| }, 100, 10000); | ||||||
| OzoneManager result = inactiveOM.get(); | ||||||
| assertNotNull(result, "No inactive OM available"); | ||||||
| return result; | ||||||
| } | ||||||
|
|
||||||
| private OzoneManager getLeaderOM() { | ||||||
| final String leaderOMNodeId = OmTestUtil.getCurrentOmProxyNodeId(objectStore); | ||||||
| return cluster.getOzoneManager(leaderOMNodeId); | ||||||
| return cluster.getOMLeader(); | ||||||
| } | ||||||
|
|
||||||
| @Test | ||||||
| @DisplayName("testCompactionLogBackgroundService") | ||||||
| @Flaky("HDDS-11672") | ||||||
| @Order(Integer.MAX_VALUE) | ||||||
| public void testCompactionLogBackgroundService() | ||||||
| throws IOException, InterruptedException, TimeoutException { | ||||||
|
|
||||||
| // reset to the default value to avoid side effects | ||||||
| cluster.restartOzoneManagersWithConfigCustomizer(config -> { | ||||||
| config.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 10, TimeUnit.MINUTES); | ||||||
| }); | ||||||
|
|
||||||
| OzoneManager leaderOM = getLeaderOM(); | ||||||
| OzoneManager followerOM = getInactiveFollowerOM(leaderOM); | ||||||
| OzoneManager followerOM = getInactiveFollowerOM(); | ||||||
|
|
||||||
| createSnapshotsEachWithNewKeys(leaderOM); | ||||||
|
|
||||||
|
|
@@ -407,6 +435,7 @@ public void testCompactionLogBackgroundService() | |||||
| newLeaderOM.getMetadataManager().getStore() | ||||||
| .getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes() | ||||||
| .stream().map(CompactionNode::getFileName).collect(toSet())); | ||||||
|
|
||||||
| assertEquals(leaderOM.getMetadataManager().getStore() | ||||||
| .getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges() | ||||||
| .stream().map(edge -> | ||||||
|
|
@@ -440,8 +469,14 @@ private List<CompactionLogEntry> getCompactionLogEntries(OzoneManager om) | |||||
| @DisplayName("testBackupCompactionFilesPruningBackgroundService") | ||||||
| public void testBackupCompactionFilesPruningBackgroundService() | ||||||
| throws IOException, InterruptedException, TimeoutException { | ||||||
|
|
||||||
| cluster.getOzoneManagersList().stream() | ||||||
| .filter(OzoneManager::isRunning) | ||||||
| .forEach(TestSnapshotBackgroundServices::suspendBackupCompactionFilesPruning); | ||||||
|
|
||||||
| OzoneManager leaderOM = getLeaderOM(); | ||||||
| OzoneManager followerOM = getInactiveFollowerOM(leaderOM); | ||||||
| OzoneManager followerOM = getInactiveFollowerOM(); | ||||||
|
|
||||||
|
|
||||||
| startInactiveFollower(leaderOM, followerOM, | ||||||
| () -> suspendBackupCompactionFilesPruning(followerOM)); | ||||||
|
|
@@ -463,6 +498,7 @@ public void testBackupCompactionFilesPruningBackgroundService() | |||||
| assertNotNull(files); | ||||||
| int numberOfSstFiles = files.length; | ||||||
|
|
||||||
| assertEquals(cluster.getOMLeader(), newLeaderOM); | ||||||
|
||||||
| assertEquals(cluster.getOMLeader(), newLeaderOM); | |
| assertEquals(cluster.waitForLeaderOM(), newLeaderOM); |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -33,6 +33,7 @@ | |||||||||||||||||
| import java.util.Map; | ||||||||||||||||||
| import java.util.concurrent.TimeUnit; | ||||||||||||||||||
| import java.util.concurrent.TimeoutException; | ||||||||||||||||||
| import java.util.function.Consumer; | ||||||||||||||||||
| import java.util.function.Function; | ||||||||||||||||||
| import org.apache.commons.lang3.StringUtils; | ||||||||||||||||||
| import org.apache.hadoop.hdds.ExitManager; | ||||||||||||||||||
|
|
@@ -125,6 +126,10 @@ public Iterator<StorageContainerManager> getInactiveSCM() { | |||||||||||||||||
| return scmhaService.inactiveServices(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| public Iterator<OzoneManager> getInactiveOM() { | ||||||||||||||||||
| return omhaService.inactiveServices(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| public StorageContainerManager getSCM(String scmNodeId) { | ||||||||||||||||||
| return this.scmhaService.getServiceById(scmNodeId); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
@@ -141,6 +146,30 @@ public List<OzoneManager> getOzoneManagersList() { | |||||||||||||||||
| return omhaService.getServices(); | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| public void restartOzoneManagersWithConfigCustomizer(Consumer<OzoneConfiguration> configCustomizer) | ||||||||||||||||||
| throws IOException, TimeoutException, InterruptedException { | ||||||||||||||||||
| List<OzoneManager> toRestart = new ArrayList<>(); | ||||||||||||||||||
| for (OzoneManager om : getOzoneManagersList()) { | ||||||||||||||||||
| OzoneConfiguration configuration = new OzoneConfiguration(om.getConfiguration()); | ||||||||||||||||||
| if (configCustomizer != null) { | ||||||||||||||||||
| configCustomizer.accept(configuration); | ||||||||||||||||||
| } | ||||||||||||||||||
| om.setConfiguration(configuration); | ||||||||||||||||||
|
||||||||||||||||||
| if (om.isRunning()) { | ||||||||||||||||||
|
||||||||||||||||||
| toRestart.add(om); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| for (OzoneManager om : toRestart) { | ||||||||||||||||||
| if (!om.stop()) { | ||||||||||||||||||
| continue; | ||||||||||||||||||
| } | ||||||||||||||||||
| om.join(); | ||||||||||||||||||
| om.restart(); | ||||||||||||||||||
| GenericTestUtils.waitFor(om::isRunning, 1000, 30000); | ||||||||||||||||||
|
||||||||||||||||||
| GenericTestUtils.waitFor(om::isRunning, 1000, 30000); | |
| long startTime = System.currentTimeMillis(); | |
| while (!om.isRunning()) { | |
| if (System.currentTimeMillis() - startTime > 30000) { | |
| throw new TimeoutException("Timed out waiting for OzoneManager to start."); | |
| } | |
| Thread.sleep(1000); | |
| } |
Copilot
AI
Feb 11, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
restartOzoneManagersWithConfigCustomizer silently skips restarting an OM when om.stop() returns false (even though the OM was in the toRestart list). This can mask a failed stop and leave the cluster partially restarted with a mix of old/new config. Consider failing fast (throw) or at least logging and still attempting a restart / waiting for a consistent state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setupTestpassescluster.getOMLeader()intostopFollowerOM(...), butgetOMLeader()can legally return null transiently (eg during leader transitions) and would cause the helper to stop an arbitrary OM. Prefer using the result ofcluster.waitForLeaderOM()(or assert non-null) to avoid test flakiness.