Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
Expand All @@ -62,7 +63,6 @@
import org.apache.hadoop.ozone.conf.OMClientConfig;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmTestUtil;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SstFilteringService;
import org.apache.hadoop.ozone.om.exceptions.OMLeaderNotReadyException;
Expand All @@ -80,15 +80,21 @@
import org.apache.ozone.test.tag.Flaky;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestMethodOrder;

/**
* Tests snapshot background services.
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class TestSnapshotBackgroundServices {
private MiniOzoneHAClusterImpl cluster;
private ObjectStore objectStore;
Expand All @@ -105,8 +111,8 @@ public class TestSnapshotBackgroundServices {
private OzoneClient client;
private final AtomicInteger counter = new AtomicInteger();

@BeforeEach
public void init(TestInfo testInfo) throws Exception {
@BeforeAll
public void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
String omServiceId = "om-service-test1";
OzoneManagerRatisServerConfig omRatisConf = conf.getObject(OzoneManagerRatisServerConfig.class);
Expand All @@ -120,66 +126,48 @@ public void init(TestInfo testInfo) throws Exception {
conf.setInt(OMConfigKeys.OZONE_OM_RATIS_LOG_PURGE_GAP, LOG_PURGE_GAP);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_SIZE_KEY, 16, StorageUnit.KB);
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, 16, StorageUnit.KB);
if ("testSSTFilteringBackgroundService".equals(testInfo.getDisplayName())) {
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1,
TimeUnit.SECONDS);
}
if ("testCompactionLogBackgroundService"
.equals(testInfo.getDisplayName())) {
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1,
TimeUnit.MILLISECONDS);
}
if ("testBackupCompactionFilesPruningBackgroundService"
.equals(testInfo.getDisplayName())) {
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(
OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 1,
TimeUnit.SECONDS);
}
if ("testSnapshotAndKeyDeletionBackgroundServices"
.equals(testInfo.getDisplayName())) {
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1,
TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1,
TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(
OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3,
TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 3,
TimeUnit.SECONDS);
}

// Used by: testSSTFilteringBackgroundService
conf.setTimeDuration(OZONE_SNAPSHOT_SST_FILTERING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);

// Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService,
// testSnapshotAndKeyDeletionBackgroundServices
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED, 1, TimeUnit.MILLISECONDS);

// Used by: testCompactionLogBackgroundService, testBackupCompactionFilesPruningBackgroundService
conf.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 3, TimeUnit.SECONDS);

// Used by: testSnapshotAndKeyDeletionBackgroundServices
conf.setTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);
// Used by: testSnapshotAndKeyDeletionBackgroundServices
conf.setTimeDuration(OZONE_SNAPSHOT_DELETING_SERVICE_INTERVAL, 1, TimeUnit.SECONDS);

conf.setLong(
OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
SNAPSHOT_THRESHOLD);
int numOfOMs = 3;
cluster = MiniOzoneCluster.newHABuilder(conf)
.setOMServiceId("om-service-test1")
.setOMServiceId(omServiceId)
.setNumOfOzoneManagers(numOfOMs)
.setNumOfActiveOMs(2)
.setNumOfActiveOMs(numOfOMs)
.build();
if ("testBackupCompactionFilesPruningBackgroundService"
.equals(testInfo.getDisplayName())) {
cluster.
getOzoneManagersList()
.forEach(
TestSnapshotBackgroundServices
::suspendBackupCompactionFilesPruning);
}

cluster.waitForClusterToBeReady();
client = OzoneClientFactory.getRpcClient(omServiceId, conf);
objectStore = client.getObjectStore();
}

@BeforeEach
public void setupTest() throws IOException, InterruptedException, TimeoutException {
recoverCluster();
stopFollowerOM(cluster.getOMLeader());
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setupTest passes cluster.getOMLeader() into stopFollowerOM(...), but getOMLeader() can legally return null transiently (eg during leader transitions) and would cause the helper to stop an arbitrary OM. Prefer using the result of cluster.waitForLeaderOM() (or assert non-null) to avoid test flakiness.

Suggested change
stopFollowerOM(cluster.getOMLeader());
OzoneManager leader = cluster.waitForLeaderOM();
stopFollowerOM(leader);

Copilot uses AI. Check for mistakes.

volumeName = "volume" + counter.incrementAndGet();
bucketName = "bucket" + counter.incrementAndGet();

VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
.setOwner("user" + counter.incrementAndGet())
.setAdmin("admin" + counter.incrementAndGet())
.build();

objectStore.createVolume(volumeName, createVolumeArgs);
OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);

Expand All @@ -188,7 +176,32 @@ public void init(TestInfo testInfo) throws Exception {
ozoneBucket = retVolumeinfo.getBucket(bucketName);
}

@AfterEach
private void recoverCluster() throws InterruptedException, TimeoutException, IOException {
for (OzoneManager ozoneManager : cluster.getOzoneManagersList()) {
if (!ozoneManager.isRunning()) {
cluster.restartOzoneManager(ozoneManager, false);
}

if (!ozoneManager.getMetadataManager().getStore().getRocksDBCheckpointDiffer().shouldRun()) {
resumeBackupCompactionFilesPruning(ozoneManager);
}
}
cluster.waitForClusterToBeReady();
cluster.waitForLeaderOM();
}

private void stopFollowerOM(OzoneManager leaderOM) throws TimeoutException, InterruptedException {
for (OzoneManager om : cluster.getOzoneManagersList()) {
if (om != leaderOM && om.isRunning()) {
String omNodeId = om.getOMNodeId();
cluster.stopOzoneManager(omNodeId);
GenericTestUtils.waitFor(() -> !om.isRunning() && !om.isOmRpcServerRunning(), 100, 15000);
break;
}
}
}

@AfterAll
public void shutdown() {
IOUtils.closeQuietly(client);
if (cluster != null) {
Expand All @@ -202,7 +215,7 @@ public void shutdown() {
public void testSnapshotAndKeyDeletionBackgroundServices()
throws Exception {
OzoneManager leaderOM = getLeaderOM();
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
OzoneManager followerOM = getInactiveFollowerOM();

createSnapshotsEachWithNewKeys(leaderOM);

Expand Down Expand Up @@ -332,7 +345,7 @@ private void startInactiveFollower(OzoneManager leaderOM,
long leaderOMSnapshotIndex = leaderOMTermIndex.getIndex();

// Start the inactive OM. Checkpoint installation will happen spontaneously.
cluster.startInactiveOM(followerOM.getOMNodeId());
cluster.restartOzoneManager(followerOM, true);
actionAfterStarting.run();

// The recently started OM should be lagging behind the leader OM.
Expand All @@ -357,26 +370,41 @@ private void createSnapshotsEachWithNewKeys(OzoneManager ozoneManager)
}
}

private OzoneManager getInactiveFollowerOM(OzoneManager leaderOM) {
String followerNodeId = leaderOM.getPeerNodes().get(0).getNodeId();
if (cluster.isOMActive(followerNodeId)) {
followerNodeId = leaderOM.getPeerNodes().get(1).getNodeId();
}
return cluster.getOzoneManager(followerNodeId);
private OzoneManager getInactiveFollowerOM()
throws TimeoutException, InterruptedException {
// Wait for an inactive OM to be available
AtomicReference<OzoneManager> inactiveOM = new AtomicReference<>();
GenericTestUtils.waitFor(() -> {
Iterator<OzoneManager> iterator = cluster.getInactiveOM();
if (iterator.hasNext()) {
inactiveOM.set(iterator.next());
return true;
}
return false;
}, 100, 10000);
OzoneManager result = inactiveOM.get();
assertNotNull(result, "No inactive OM available");
return result;
}

private OzoneManager getLeaderOM() {
final String leaderOMNodeId = OmTestUtil.getCurrentOmProxyNodeId(objectStore);
return cluster.getOzoneManager(leaderOMNodeId);
return cluster.getOMLeader();
}

@Test
@DisplayName("testCompactionLogBackgroundService")
@Flaky("HDDS-11672")
@Order(Integer.MAX_VALUE)
public void testCompactionLogBackgroundService()
throws IOException, InterruptedException, TimeoutException {

// reset to the default value to avoid side effects
cluster.restartOzoneManagersWithConfigCustomizer(config -> {
config.setTimeDuration(OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL, 10, TimeUnit.MINUTES);
});

OzoneManager leaderOM = getLeaderOM();
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
OzoneManager followerOM = getInactiveFollowerOM();

createSnapshotsEachWithNewKeys(leaderOM);

Expand Down Expand Up @@ -407,6 +435,7 @@ public void testCompactionLogBackgroundService()
newLeaderOM.getMetadataManager().getStore()
.getRocksDBCheckpointDiffer().getForwardCompactionDAG().nodes()
.stream().map(CompactionNode::getFileName).collect(toSet()));

assertEquals(leaderOM.getMetadataManager().getStore()
.getRocksDBCheckpointDiffer().getForwardCompactionDAG().edges()
.stream().map(edge ->
Expand Down Expand Up @@ -440,8 +469,14 @@ private List<CompactionLogEntry> getCompactionLogEntries(OzoneManager om)
@DisplayName("testBackupCompactionFilesPruningBackgroundService")
public void testBackupCompactionFilesPruningBackgroundService()
throws IOException, InterruptedException, TimeoutException {

cluster.getOzoneManagersList().stream()
.filter(OzoneManager::isRunning)
.forEach(TestSnapshotBackgroundServices::suspendBackupCompactionFilesPruning);

OzoneManager leaderOM = getLeaderOM();
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
OzoneManager followerOM = getInactiveFollowerOM();


startInactiveFollower(leaderOM, followerOM,
() -> suspendBackupCompactionFilesPruning(followerOM));
Expand All @@ -463,6 +498,7 @@ public void testBackupCompactionFilesPruningBackgroundService()
assertNotNull(files);
int numberOfSstFiles = files.length;

assertEquals(cluster.getOMLeader(), newLeaderOM);
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertEquals(cluster.getOMLeader(), newLeaderOM) can be flaky because getOMLeader() explicitly returns null when no leader is ready or when multiple leaders appear ready. If the intent is to verify leadership, use cluster.waitForLeaderOM() (and compare), or assert newLeaderOM.isLeaderReady().

Suggested change
assertEquals(cluster.getOMLeader(), newLeaderOM);
assertEquals(cluster.waitForLeaderOM(), newLeaderOM);

Copilot uses AI. Check for mistakes.
resumeBackupCompactionFilesPruning(newLeaderOM);

checkIfCompactionBackupFilesWerePruned(sstBackupDir,
Expand Down Expand Up @@ -494,7 +530,7 @@ private static void suspendBackupCompactionFilesPruning(
public void testSSTFilteringBackgroundService()
throws IOException, InterruptedException, TimeoutException {
OzoneManager leaderOM = getLeaderOM();
OzoneManager followerOM = getInactiveFollowerOM(leaderOM);
OzoneManager followerOM = getInactiveFollowerOM();

createSnapshotsEachWithNewKeys(leaderOM);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.ExitManager;
Expand Down Expand Up @@ -125,6 +126,10 @@ public Iterator<StorageContainerManager> getInactiveSCM() {
return scmhaService.inactiveServices();
}

public Iterator<OzoneManager> getInactiveOM() {
return omhaService.inactiveServices();
}

public StorageContainerManager getSCM(String scmNodeId) {
return this.scmhaService.getServiceById(scmNodeId);
}
Expand All @@ -141,6 +146,30 @@ public List<OzoneManager> getOzoneManagersList() {
return omhaService.getServices();
}

public void restartOzoneManagersWithConfigCustomizer(Consumer<OzoneConfiguration> configCustomizer)
throws IOException, TimeoutException, InterruptedException {
List<OzoneManager> toRestart = new ArrayList<>();
for (OzoneManager om : getOzoneManagersList()) {
OzoneConfiguration configuration = new OzoneConfiguration(om.getConfiguration());
if (configCustomizer != null) {
configCustomizer.accept(configuration);
}
om.setConfiguration(configuration);
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
if (om.isRunning()) {
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Copilot uses AI. Check for mistakes.
toRestart.add(om);
}
}
for (OzoneManager om : toRestart) {
if (!om.stop()) {
continue;
}
om.join();
om.restart();
GenericTestUtils.waitFor(om::isRunning, 1000, 30000);
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Access of element annotated with VisibleForTesting found in production code.

Suggested change
GenericTestUtils.waitFor(om::isRunning, 1000, 30000);
long startTime = System.currentTimeMillis();
while (!om.isRunning()) {
if (System.currentTimeMillis() - startTime > 30000) {
throw new TimeoutException("Timed out waiting for OzoneManager to start.");
}
Thread.sleep(1000);
}

Copilot uses AI. Check for mistakes.
}
Comment on lines +162 to +169
Copy link

Copilot AI Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restartOzoneManagersWithConfigCustomizer silently skips restarting an OM when om.stop() returns false (even though the OM was in the toRestart list). This can mask a failed stop and leave the cluster partially restarted with a mix of old/new config. Consider failing fast (throw) or at least logging and still attempting a restart / waiting for a consistent state.

Copilot uses AI. Check for mistakes.
waitForLeaderOM();
}

public List<StorageContainerManager> getStorageContainerManagersList() {
return scmhaService.getServices();
}
Expand Down Expand Up @@ -244,6 +273,12 @@ public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM)
GenericTestUtils.waitFor(ozoneManager::isRunning,
1000, waitForClusterToBeReadyTimeout);
}

omhaService.inactiveServices().forEachRemaining(om -> {
if (om.equals(ozoneManager)) {
this.omhaService.activate(om);
}
});
}

public void shutdownStorageContainerManager(StorageContainerManager scm) {
Expand Down