diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java index e5da202421a..5e3f49e4f39 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotBackgroundServices.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; @@ -62,7 +63,6 @@ import org.apache.hadoop.ozone.conf.OMClientConfig; import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OmSnapshot; -import org.apache.hadoop.ozone.om.OmTestUtil; import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.SstFilteringService; import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException; @@ -80,15 +80,21 @@ import org.apache.ozone.test.tag.Flaky; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestMethodOrder; /** * Tests snapshot background services. */ +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) public class TestSnapshotBackgroundServices { private MiniOzoneHAClusterImpl cluster; private ObjectStore objectStore; @@ -105,8 +111,8 @@ public class TestSnapshotBackgroundServices { private OzoneClient client; private final AtomicInteger counter = new AtomicInteger(); - @BeforeEach - public void init(TestInfo testInfo) throws Exception { + @BeforeAll + public void init() throws Exception { OzoneConfiguration conf = new OzoneConfiguration(); String omServiceId = "om-service-test1"; OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class); @@ -120,66 +126,48 @@ public void init(TestInfo testInfo) throws Exception { conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP); conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB); conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB); - if ("testSSTFilteringBackgroundService".equals(testInfo.getDisplayName())) { - conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, - TimeUnit.SECONDS); - } - if ("testCompactionLogBackgroundService" - .equals(testInfo.getDisplayName())) { - conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, - TimeUnit.MILLISECONDS); - } - if ("testBackupCompactionFilesPruningBackgroundService" - .equals(testInfo.getDisplayName())) { - conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, - TimeUnit.MILLISECONDS); - conf.setTimeDuration( - OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 1, - TimeUnit.SECONDS); - } - if ("testSnapshotAndKeyDeletionBackgroundServices" - .equals(testInfo.getDisplayName())) { - conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, - TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, - TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, - TimeUnit.MILLISECONDS); - conf.setTimeDuration( - OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3, - TimeUnit.SECONDS); - conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 3, - TimeUnit.SECONDS); - } + + // Used by: testSSTFilteringBackgroundService + conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); + + // Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService, + // testSnapshotAndKeyDeletionBackgroundServices + conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, TimeUnit.MILLISECONDS); + + // Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService + conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3, TimeUnit.SECONDS); + + // Used by: testSnapshotAndKeyDeletionBackgroundServices + conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); + // Used by: testSnapshotAndKeyDeletionBackgroundServices + conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS); + conf.setLong( OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, SNAPSHOT_THRESHOLD); int numOfOMs = 3; cluster = MiniOzoneCluster.newHABuilder(conf) - .setOMServiceId("om-service-test1") + .setOMServiceId(omServiceId) .setNumOfOzoneManagers(numOfOMs) - .setNumOfActiveOMs(2) + .setNumOfActiveOMs(numOfOMs) .build(); - if ("testBackupCompactionFilesPruningBackgroundService" - .equals(testInfo.getDisplayName())) { - cluster. - getOzoneManagersList() - .forEach( - TestSnapshotBackgroundServices - ::suspendBackupCompactionFilesPruning); - } + cluster.waitForClusterToBeReady(); client = OzoneClientFactory.getRpcClient(omServiceId, conf); objectStore = client.getObjectStore(); + } + + @BeforeEach + public void setupTest() throws IOException, InterruptedException, TimeoutException { + recoverCluster(); + stopFollowerOM(cluster.getOMLeader()); volumeName = "volume" + counter.incrementAndGet(); bucketName = "bucket" + counter.incrementAndGet(); - VolumeArgs createVolumeArgs = VolumeArgs.newBuilder() .setOwner("user" + counter.incrementAndGet()) .setAdmin("admin" + counter.incrementAndGet()) .build(); - objectStore.createVolume(volumeName, createVolumeArgs); OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName); @@ -188,7 +176,32 @@ public void init(TestInfo testInfo) throws Exception { ozoneBucket = retVolumeinfo.getBucket(bucketName); } - @AfterEach + private void recoverCluster() throws InterruptedException, TimeoutException, IOException { + for (OzoneManager ozoneManager : cluster.getOzoneManagersList()) { + if (!ozoneManager.isRunning()) { + cluster.restartOzoneManager(ozoneManager, false); + } + + if (!ozoneManager.getMetadataManager().getStore().getRocksDBCheckpointDiffer().shouldRun()) { + resumeBackupCompactionFilesPruning(ozoneManager); + } + } + cluster.waitForClusterToBeReady(); + cluster.waitForLeaderOM(); + } + + private void stopFollowerOM(OzoneManager leaderOM) throws TimeoutException, InterruptedException { + for (OzoneManager om : cluster.getOzoneManagersList()) { + if (om != leaderOM && om.isRunning()) { + String omNodeId = om.getOMNodeId(); + cluster.stopOzoneManager(omNodeId); + GenericTestUtils.waitFor(() -> !om.isRunning() && !om.isOmRpcServerRunning(), 100, 15000); + break; + } + } + } + + @AfterAll public void shutdown() { IOUtils.closeQuietly(client); if (cluster != null) { @@ -202,7 +215,7 @@ public void shutdown() { public void testSnapshotAndKeyDeletionBackgroundServices() throws Exception { OzoneManager leaderOM = getLeaderOM(); - OzoneManager followerOM = getInactiveFollowerOM(leaderOM); + OzoneManager followerOM = getInactiveFollowerOM(); createSnapshotsEachWithNewKeys(leaderOM); @@ -332,7 +345,7 @@ private void startInactiveFollower(OzoneManager leaderOM, long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex(); // Start the inactive OM. Checkpoint installation will happen spontaneously. - cluster.startInactiveOM(followerOM.getOMNodeId()); + cluster.restartOzoneManager(followerOM, true); actionAfterStarting.run(); // The recently started OM should be lagging behind the leader OM. @@ -357,26 +370,41 @@ private void createSnapshotsEachWithNewKeys(OzoneManager ozoneManager) } } - private OzoneManager getInactiveFollowerOM(OzoneManager leaderOM) { - String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId(); - if (cluster.isOMActive(followerNodeId)) { - followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId(); - } - return cluster.getOzoneManager(followerNodeId); + private OzoneManager getInactiveFollowerOM() + throws TimeoutException, InterruptedException { + // Wait for an inactive OM to be available + AtomicReference inactiveOM = new AtomicReference<>(); + GenericTestUtils.waitFor(() -> { + Iterator iterator = cluster.getInactiveOM(); + if (iterator.hasNext()) { + inactiveOM.set(iterator.next()); + return true; + } + return false; + }, 100, 10000); + OzoneManager result = inactiveOM.get(); + assertNotNull(result, "No inactive OM available"); + return result; } private OzoneManager getLeaderOM() { - final String leaderOMNodeId = OmTestUtil.getCurrentOmProxyNodeId(objectStore); - return cluster.getOzoneManager(leaderOMNodeId); + return cluster.getOMLeader(); } @Test @DisplayName("testCompactionLogBackgroundService") @Flaky("HDDS-11672") + @Order(Integer.MAX_VALUE) public void testCompactionLogBackgroundService() throws IOException, InterruptedException, TimeoutException { + + // reset to the default value to avoid side effects + cluster.restartOzoneManagersWithConfigCustomizer(config -> { + config.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 10, TimeUnit.MINUTES); + }); + OzoneManager leaderOM = getLeaderOM(); - OzoneManager followerOM = getInactiveFollowerOM(leaderOM); + OzoneManager followerOM = getInactiveFollowerOM(); createSnapshotsEachWithNewKeys(leaderOM); @@ -407,6 +435,7 @@ public void testCompactionLogBackgroundService() newLeaderOM.getMetadataManager().getStore() .getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes() .stream().map(CompactionNode::getFileName).collect(toSet())); + assertEquals(leaderOM.getMetadataManager().getStore() .getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges() .stream().map(edge -> @@ -440,8 +469,14 @@ private List getCompactionLogEntries(OzoneManager om) @DisplayName("testBackupCompactionFilesPruningBackgroundService") public void testBackupCompactionFilesPruningBackgroundService() throws IOException, InterruptedException, TimeoutException { + + cluster.getOzoneManagersList().stream() + .filter(OzoneManager::isRunning) + .forEach(TestSnapshotBackgroundServices::suspendBackupCompactionFilesPruning); + OzoneManager leaderOM = getLeaderOM(); - OzoneManager followerOM = getInactiveFollowerOM(leaderOM); + OzoneManager followerOM = getInactiveFollowerOM(); + startInactiveFollower(leaderOM, followerOM, () -> suspendBackupCompactionFilesPruning(followerOM)); @@ -463,6 +498,7 @@ public void testBackupCompactionFilesPruningBackgroundService() assertNotNull(files); int numberOfSstFiles = files.length; + assertEquals(cluster.getOMLeader(), newLeaderOM); resumeBackupCompactionFilesPruning(newLeaderOM); checkIfCompactionBackupFilesWerePruned(sstBackupDir, @@ -494,7 +530,7 @@ private static void suspendBackupCompactionFilesPruning( public void testSSTFilteringBackgroundService() throws IOException, InterruptedException, TimeoutException { OzoneManager leaderOM = getLeaderOM(); - OzoneManager followerOM = getInactiveFollowerOM(leaderOM); + OzoneManager followerOM = getInactiveFollowerOM(); createSnapshotsEachWithNewKeys(leaderOM); diff --git a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java index d3369534617..06a9c9f4ee2 100644 --- a/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java +++ b/hadoop-ozone/mini-cluster/src/main/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; import java.util.function.Function; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hdds.ExitManager; @@ -125,6 +126,10 @@ public Iterator getInactiveSCM() { return scmhaService.inactiveServices(); } + public Iterator getInactiveOM() { + return omhaService.inactiveServices(); + } + public StorageContainerManager getSCM(String scmNodeId) { return this.scmhaService.getServiceById(scmNodeId); } @@ -141,6 +146,30 @@ public List getOzoneManagersList() { return omhaService.getServices(); } + public void restartOzoneManagersWithConfigCustomizer(Consumer configCustomizer) + throws IOException, TimeoutException, InterruptedException { + List toRestart = new ArrayList<>(); + for (OzoneManager om : getOzoneManagersList()) { + OzoneConfiguration configuration = new OzoneConfiguration(om.getConfiguration()); + if (configCustomizer != null) { + configCustomizer.accept(configuration); + } + om.setConfiguration(configuration); + if (om.isRunning()) { + toRestart.add(om); + } + } + for (OzoneManager om : toRestart) { + if (!om.stop()) { + continue; + } + om.join(); + om.restart(); + GenericTestUtils.waitFor(om::isRunning, 1000, 30000); + } + waitForLeaderOM(); + } + public List getStorageContainerManagersList() { return scmhaService.getServices(); } @@ -244,6 +273,12 @@ public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM) GenericTestUtils.waitFor(ozoneManager::isRunning, 1000, waitForClusterToBeReadyTimeout); } + + omhaService.inactiveServices().forEachRemaining(om -> { + if (om.equals(ozoneManager)) { + this.omhaService.activate(om); + } + }); } public void shutdownStorageContainerManager(StorageContainerManager scm) {