diff --git a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 71e4a3f14c4..d02a6ce8b78 100644 --- a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -113,6 +113,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -1350,6 +1351,12 @@ private class Node { RaftGroupOptionsConfigurer msRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager, metastorageWorkDir.metaPath()); + var msRaftServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( + clusterService, + raftGroupEventsClientListener, + failureManager + ); + metaStorageManager = new MetaStorageManagerImpl( clusterService.staticLocalNode(), cmgManager, @@ -1357,7 +1364,7 @@ private class Node { raftManager, keyValueStorage, hybridClock, - topologyAwareRaftGroupServiceFactory, + msRaftServiceFactory, metricManager, systemDistributedConfiguration, msRaftConfigurer, diff --git a/modules/metastorage/build.gradle b/modules/metastorage/build.gradle index bf0c0a4fe85..90fcaf5f462 100644 --- a/modules/metastorage/build.gradle +++ b/modules/metastorage/build.gradle @@ -85,6 +85,7 @@ dependencies { testFixturesImplementation project(':ignite-core') testFixturesImplementation project(':ignite-configuration-system') testFixturesImplementation project(':ignite-raft-api') + testFixturesImplementation project(':ignite-raft') testFixturesImplementation project(':ignite-replicator') testFixturesImplementation project(':ignite-rocksdb-common') testFixturesImplementation project(':ignite-vault') diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java index 087af1fa54f..d6573930d1a 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.configuration.ComponentWorkingDir; import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper; @@ -105,7 +106,6 @@ import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.TestLozaFactory; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.service.LeaderWithTerm; import org.apache.ignite.internal.raft.service.RaftGroupService; @@ -228,11 +228,10 @@ private static class Node implements ManuallyCloseable { when(logicalTopologyService.validatedNodesOnLeader()).thenReturn(emptySetCompletedFuture()); - var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( clusterService, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener + raftGroupEventsClientListener, + new NoOpFailureManager() ); cmgManager = mock(ClusterManagementGroupManager.class); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java index e38ed662047..6699e5413eb 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java @@ -58,6 +58,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; import org.apache.ignite.internal.configuration.ComponentWorkingDir; import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper; @@ -91,8 +92,9 @@ import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.TestLozaFactory; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; @@ -182,11 +184,10 @@ void setUp( when(logicalTopologyService.validatedNodesOnLeader()).thenReturn(emptySetCompletedFuture()); - var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( clusterService, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener + raftGroupEventsClientListener, + new NoOpFailureManager() ); ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class); @@ -284,7 +285,7 @@ void testMetaStorageStopClosesRaftService() { assertThat(metaStorageManager.stopAsync(new ComponentContext()), willCompleteSuccessfully()); - CompletableFuture fut = svc.get(FOO_KEY); + CompletableFuture fut = svc.get(FOO_KEY, TimeAwareRaftGroupService.NO_TIMEOUT); assertThat(fut, willThrowFast(NodeStoppingException.class)); } @@ -308,7 +309,7 @@ void testMetaStorageStopBeforeRaftServiceStarted() { raftManager, storage, clock, - mock(TopologyAwareRaftGroupServiceFactory.class), + mock(TimeAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), mock(MetastorageRepairStorage.class), mock(MetastorageRepair.class), diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java index 730e0ee1722..9591f5d3430 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesAbstractTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -65,8 +66,8 @@ import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.TestLozaFactory; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; import org.apache.ignite.internal.storage.configurations.StorageConfiguration; @@ -203,11 +204,10 @@ class Node { var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); - var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( clusterService, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener + raftGroupEventsClientListener, + failureManager ); ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(basePath.resolve("metastorage")); @@ -287,7 +287,8 @@ CompletableFuture> getMetaStorageLearners() { return metaStorageManager .metaStorageService() .thenApply(MetaStorageServiceImpl::raftGroupService) - .thenCompose(service -> service.refreshMembers(false).thenApply(v -> service.learners())) + .thenCompose(service -> + service.refreshMembers(false, TimeAwareRaftGroupService.NO_TIMEOUT).thenApply(v -> service.learners())) .thenApply(learners -> learners.stream().map(Peer::consistentId).collect(toSet())); } } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java index ec4bfc0a85c..3853b6477a2 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesVsStorageTest.java @@ -55,7 +55,7 @@ import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.raft.Peer; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -394,9 +394,9 @@ void testIdleSafeTimePropagationLeaderTransferred() throws Exception { } private Node transferLeadership(Node firstNode, Node secondNode) { - RaftGroupService svc = getMetastorageService(firstNode); + TimeAwareRaftGroupService svc = getMetastorageService(firstNode); - CompletableFuture future = svc.refreshLeader() + CompletableFuture future = svc.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT) .thenCompose(v -> { Peer leader = svc.leader(); @@ -409,7 +409,7 @@ private Node transferLeadership(Node firstNode, Node secondNode) { Node newLeaderNode = newLeader.consistentId().equals(firstNode.name()) ? firstNode : secondNode; - return svc.transferLeadership(newLeader).thenApply(unused -> newLeaderNode); + return svc.transferLeadership(newLeader, TimeAwareRaftGroupService.NO_TIMEOUT).thenApply(unused -> newLeaderNode); }); assertThat(future, willCompleteSuccessfully()); @@ -417,8 +417,8 @@ private Node transferLeadership(Node firstNode, Node secondNode) { return future.join(); } - private RaftGroupService getMetastorageService(Node node) { - CompletableFuture future = node.metaStorageManager.metaStorageService() + private TimeAwareRaftGroupService getMetastorageService(Node node) { + CompletableFuture future = node.metaStorageManager.metaStorageService() .thenApply(MetaStorageServiceImpl::raftGroupService); assertThat(future, willCompleteSuccessfully()); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java index 6478ceb2eda..b5f3cfa6bf3 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServicePersistenceTest.java @@ -19,6 +19,10 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.nio.charset.StandardCharsets; import java.nio.file.Path; @@ -45,6 +49,7 @@ import org.apache.ignite.internal.raft.service.ItAbstractListenerSnapshotTest; import org.apache.ignite.internal.raft.service.RaftGroupListener; import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; import org.apache.ignite.internal.replicator.TestReplicationGroupId; import org.apache.ignite.internal.testframework.ExecutorServiceExtension; @@ -89,13 +94,13 @@ public void beforeFollowerStop(RaftGroupService service, RaftServer server) { metaStorage = new MetaStorageServiceImpl( followerNode, - service, + wrapAsTimeAware(service), new IgniteSpinBusyLock(), server.options().getClock() ); // Put some data in the metastorage - assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE), willCompleteSuccessfully()); + assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); // Check that data has been written successfully checkEntry(FIRST_KEY.bytes(), FIRST_VALUE, 1); @@ -112,13 +117,13 @@ public void afterFollowerStop(RaftGroupService service, RaftServer server, int s } // Remove the first key from the metastorage - assertThat(metaStorage.remove(FIRST_KEY), willCompleteSuccessfully()); + assertThat(metaStorage.remove(FIRST_KEY, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); // Check that data has been removed checkEntry(FIRST_KEY.bytes(), null, 2); // Put same data again - assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE), willCompleteSuccessfully()); + assertThat(metaStorage.put(FIRST_KEY, FIRST_VALUE, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); // Check that it has been written checkEntry(FIRST_KEY.bytes(), FIRST_VALUE, 3); @@ -126,7 +131,7 @@ public void afterFollowerStop(RaftGroupService service, RaftServer server, int s @Override public void afterSnapshot(RaftGroupService service) { - assertThat(metaStorage.put(SECOND_KEY, SECOND_VALUE), willCompleteSuccessfully()); + assertThat(metaStorage.put(SECOND_KEY, SECOND_VALUE, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); } @Override @@ -178,13 +183,21 @@ protected Marshaller commandsMarshaller(ClusterService clusterService) { } private void checkEntry(byte[] expKey, byte @Nullable [] expValue, long expRevision) { - CompletableFuture future = metaStorage.get(new ByteArray(expKey)); + CompletableFuture future = metaStorage.get(new ByteArray(expKey), TimeAwareRaftGroupService.NO_TIMEOUT); assertThat(future, willCompleteSuccessfully()); TestMetasStorageUtils.checkEntry(future.join(), expKey, expValue, expRevision); } + private static TimeAwareRaftGroupService wrapAsTimeAware(RaftGroupService service) { + TimeAwareRaftGroupService timeAwareService = mock(TimeAwareRaftGroupService.class); + when(timeAwareService.run(any(), anyLong())).thenAnswer( + invocation -> service.run(invocation.getArgument(0), invocation.getArgument(1))); + when(timeAwareService.clusterService()).thenReturn(service.clusterService()); + return timeAwareService; + } + private static InternalClusterNode getNode(RaftServer server) { return server.clusterService().staticLocalNode(); } diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java index dceba053fde..cb2949c1752 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageServiceTest.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.configuration.SystemLocalConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.failure.NoOpFailureManager; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.lang.ByteArray; @@ -113,8 +114,9 @@ import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.RaftNodeId; import org.apache.ignite.internal.raft.TestLozaFactory; +import org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; @@ -124,6 +126,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.network.NetworkAddress; +import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.hamcrest.Description; import org.hamcrest.TypeSafeMatcher; import org.junit.jupiter.api.AfterEach; @@ -196,7 +199,7 @@ private static class Node { private final ClusterTimeImpl clusterTime; - private RaftGroupService metaStorageRaftService; + private TimeAwareRaftGroupService metaStorageRaftService; private MetaStorageService metaStorageService; @@ -204,6 +207,8 @@ private static class Node { private final RaftGroupOptionsConfigurer partitionsRaftConfigurer; + private final RaftGroupEventsClientListener raftGroupEventsClientListener; + Node( ClusterService clusterService, RaftConfiguration raftConfiguration, @@ -223,11 +228,14 @@ private static class Node { partitionsRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(partitionsLogStorageManager, workingDir.metaPath()); + this.raftGroupEventsClientListener = new RaftGroupEventsClientListener(); + this.raftManager = TestLozaFactory.create( clusterService, raftConfiguration, systemLocalConfiguration, - clock + clock, + raftGroupEventsClientListener ); this.clusterTime = new ClusterTimeImpl(nodeName, new IgniteSpinBusyLock(), clock); @@ -259,7 +267,7 @@ String name() { return clusterService.staticLocalNode().name(); } - private RaftGroupService startRaftService(PeersAndLearners configuration) { + private TimeAwareRaftGroupService startRaftService(PeersAndLearners configuration) { String name = name(); boolean isLearner = configuration.peer(name) == null; @@ -273,12 +281,24 @@ private RaftGroupService startRaftService(PeersAndLearners configuration) { var raftNodeId = new RaftNodeId(MetastorageGroupId.INSTANCE, peer); try { - return raftManager.startSystemRaftGroupNodeAndWaitNodeReady( + return raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware( raftNodeId, configuration, listener, RaftGroupEventsListener.noopLsnr, - null, + (groupId, peersAndLearners, raftConfiguration, raftClientExecutor, commandsMarshaller, + stoppingExceptionFactory) -> + PhysicalTopologyAwareRaftGroupService.start( + groupId, + clusterService, + raftConfiguration, + peersAndLearners, + raftClientExecutor, + raftGroupEventsClientListener, + commandsMarshaller, + stoppingExceptionFactory, + new NoOpFailureManager() + ), partitionsRaftConfigurer ); } catch (NodeStoppingException e) { @@ -371,7 +391,10 @@ public void testGet() { startNodes(); - assertThat(node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key())), willBe(EXPECTED_RESULT_ENTRY)); + assertThat( + node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key()), TimeAwareRaftGroupService.NO_TIMEOUT), + willBe(EXPECTED_RESULT_ENTRY) + ); } /** @@ -387,7 +410,9 @@ public void testGetWithUpperBoundRevision() { startNodes(); assertThat( - node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key()), EXPECTED_RESULT_ENTRY.revision()), + node.metaStorageService.get( + new ByteArray(EXPECTED_RESULT_ENTRY.key()), EXPECTED_RESULT_ENTRY.revision(), TimeAwareRaftGroupService.NO_TIMEOUT + ), willBe(EXPECTED_RESULT_ENTRY) ); } @@ -404,7 +429,10 @@ public void testGetAll() { startNodes(); - assertThat(node.metaStorageService.getAll(EXPECTED_RESULT_MAP.keySet()), willBe(EXPECTED_RESULT_MAP)); + assertThat( + node.metaStorageService.getAll(EXPECTED_RESULT_MAP.keySet(), TimeAwareRaftGroupService.NO_TIMEOUT), + willBe(EXPECTED_RESULT_MAP) + ); } /** @@ -419,7 +447,10 @@ public void testGetAllWithUpperBoundRevision() { startNodes(); - assertThat(node.metaStorageService.getAll(EXPECTED_RESULT_MAP.keySet(), 10), willBe(EXPECTED_RESULT_MAP)); + assertThat( + node.metaStorageService.getAll(EXPECTED_RESULT_MAP.keySet(), 10, TimeAwareRaftGroupService.NO_TIMEOUT), + willBe(EXPECTED_RESULT_MAP) + ); } /** @@ -436,7 +467,7 @@ public void testPut() { startNodes(); - assertThat(node.metaStorageService.put(expKey, expVal), willCompleteSuccessfully()); + assertThat(node.metaStorageService.put(expKey, expVal, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); verify(node.mockStorage).put(eq(expKey.bytes()), eq(expVal), any()); } @@ -457,7 +488,7 @@ public void testPutAll() { e -> e.getValue().value() )); - assertThat(node.metaStorageService.putAll(values), willCompleteSuccessfully()); + assertThat(node.metaStorageService.putAll(values, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); ArgumentCaptor> keysCaptor = ArgumentCaptor.forClass(List.class); ArgumentCaptor> valuesCaptor = ArgumentCaptor.forClass(List.class); @@ -497,7 +528,7 @@ public void testRemove() { startNodes(); - assertThat(node.metaStorageService.remove(expKey), willCompleteSuccessfully()); + assertThat(node.metaStorageService.remove(expKey, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); verify(node.mockStorage).remove(eq(expKey.bytes()), any()); } @@ -512,7 +543,10 @@ public void testRemoveAll() { startNodes(); - assertThat(node.metaStorageService.removeAll(EXPECTED_RESULT_MAP.keySet()), willCompleteSuccessfully()); + assertThat( + node.metaStorageService.removeAll(EXPECTED_RESULT_MAP.keySet(), TimeAwareRaftGroupService.NO_TIMEOUT), + willCompleteSuccessfully() + ); List expKeys = EXPECTED_RESULT_MAP.keySet().stream() .map(ByteArray::bytes).collect(toList()); @@ -540,7 +574,7 @@ public void testRemoveByPrefix() { ByteArray prefix = new ByteArray(new byte[]{1}); - assertThat(node.metaStorageService.removeByPrefix(prefix), willCompleteSuccessfully()); + assertThat(node.metaStorageService.removeByPrefix(prefix, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); verify(node.mockStorage).removeByPrefix(eq(prefix.bytes()), any()); } @@ -562,7 +596,8 @@ public void testRangeWithKeyToAndUpperBound() { startNodes(); - node.metaStorageService.range(expKeyFrom, expKeyTo, expRevUpperBound).subscribe(singleElementSubscriber()); + node.metaStorageService.range(expKeyFrom, expKeyTo, expRevUpperBound, TimeAwareRaftGroupService.NO_TIMEOUT) + .subscribe(singleElementSubscriber()); verify(node.mockStorage, timeout(10_000)).range(expKeyFrom.bytes(), expKeyTo.bytes(), expRevUpperBound); } @@ -583,7 +618,8 @@ public void testRangeWithKeyTo() { startNodes(); - node.metaStorageService.range(expKeyFrom, expKeyTo, false).subscribe(singleElementSubscriber()); + node.metaStorageService.range(expKeyFrom, expKeyTo, false, TimeAwareRaftGroupService.NO_TIMEOUT) + .subscribe(singleElementSubscriber()); verify(node.mockStorage, timeout(10_000)).range(expKeyFrom.bytes(), expKeyTo.bytes()); } @@ -602,7 +638,8 @@ public void testRangeWithNullAsKeyTo() { startNodes(); - node.metaStorageService.range(expKeyFrom, null, false).subscribe(singleElementSubscriber()); + node.metaStorageService.range(expKeyFrom, null, false, TimeAwareRaftGroupService.NO_TIMEOUT) + .subscribe(singleElementSubscriber()); verify(node.mockStorage, timeout(10_000)).range(expKeyFrom.bytes(), null); } @@ -620,7 +657,8 @@ public void testRangeNext() { startNodes(); CompletableFuture expectedEntriesFuture = - subscribeToValue(node.metaStorageService.range(new ByteArray(EXPECTED_RESULT_ENTRY.key()), null)); + subscribeToValue(node.metaStorageService.range( + new ByteArray(EXPECTED_RESULT_ENTRY.key()), null, TimeAwareRaftGroupService.NO_TIMEOUT)); assertThat(expectedEntriesFuture, willBe(EXPECTED_RESULT_ENTRY)); } @@ -644,7 +682,8 @@ public void testRangeNextNoSuchElementException() { startNodes(); CompletableFuture> future = - subscribeToList(node.metaStorageService.range(new ByteArray(EXPECTED_RESULT_ENTRY.key()), null)); + subscribeToList(node.metaStorageService.range( + new ByteArray(EXPECTED_RESULT_ENTRY.key()), null, TimeAwareRaftGroupService.NO_TIMEOUT)); assertThat(future, willThrowFast(NoSuchElementException.class)); } @@ -706,7 +745,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { startNodes(); - assertThat(node.metaStorageService.invoke(iif).thenApply(StatementResult::getAsBoolean), willBe(true)); + assertThat(node.metaStorageService.invoke(iif, TimeAwareRaftGroupService.NO_TIMEOUT) + .thenApply(StatementResult::getAsBoolean), willBe(true)); verify(node.mockStorage).invoke(ifCaptor.capture(), any(), any()); @@ -761,7 +801,7 @@ public void testInvoke() { Operation failure = Operations.noop(); - assertThat(node.metaStorageService.invoke(condition, success, failure), willBe(true)); + assertThat(node.metaStorageService.invoke(condition, success, failure, TimeAwareRaftGroupService.NO_TIMEOUT), willBe(true)); var conditionCaptor = ArgumentCaptor.forClass(AbstractSimpleCondition.class); @@ -793,7 +833,10 @@ public void testGetThatThrowsCompactedException() { startNodes(); - assertThat(node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key())), willThrow(CompactedException.class)); + assertThat( + node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key()), TimeAwareRaftGroupService.NO_TIMEOUT), + willThrow(CompactedException.class) + ); } /** @@ -808,7 +851,10 @@ public void testGetThatThrowsOperationTimeoutException() { startNodes(); - assertThat(node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key())), willThrow(OperationTimeoutException.class)); + assertThat( + node.metaStorageService.get(new ByteArray(EXPECTED_RESULT_ENTRY.key()), TimeAwareRaftGroupService.NO_TIMEOUT), + willThrow(OperationTimeoutException.class) + ); } private static Subscriber singleElementSubscriber() { diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java index 4eaeb52591a..3b2b2cd435b 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -85,10 +86,8 @@ import org.apache.ignite.internal.network.ClusterService; import org.apache.ignite.internal.network.StaticNodeFinder; import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; -import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.TestLozaFactory; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; @@ -223,11 +222,10 @@ private class Node { var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); - var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( clusterService, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener + raftGroupEventsClientListener, + failureManager ); ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(basePath.resolve("storage")); diff --git a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java index 86fd908fbb7..6d28f99c3ef 100644 --- a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java +++ b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/server/raft/ItMetaStorageRaftGroupTest.java @@ -22,8 +22,8 @@ import static org.apache.ignite.internal.metastorage.TestMetasStorageUtils.ANY_TIMESTAMP; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.findLocalAddresses; import static org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.waitForTopology; -import static org.apache.ignite.internal.raft.TestThrottlingContextHolder.throttlingContextHolder; import static org.apache.ignite.internal.raft.server.RaftGroupOptions.defaults; +import static org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService.NO_TIMEOUT; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.util.IgniteUtils.startAsync; @@ -50,6 +50,7 @@ import org.apache.ignite.internal.configuration.SystemLocalConfiguration; import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; +import org.apache.ignite.internal.failure.NoOpFailureManager; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridClockImpl; import org.apache.ignite.internal.lang.ByteArray; @@ -67,13 +68,14 @@ import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils; import org.apache.ignite.internal.raft.PeersAndLearners; import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.client.RaftGroupServiceImpl; +import org.apache.ignite.internal.raft.StoppingExceptionFactories; +import org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.server.RaftGroupOptions; import org.apache.ignite.internal.raft.server.RaftServer; import org.apache.ignite.internal.raft.server.TestJraftServerFactory; import org.apache.ignite.internal.raft.service.LeaderWithTerm; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; import org.apache.ignite.internal.raft.util.ThreadLocalOptimizedMarshaller; @@ -83,7 +85,6 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.network.NetworkAddress; -import org.apache.ignite.raft.jraft.RaftMessagesFactory; import org.apache.ignite.raft.jraft.option.NodeOptions; import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener; import org.junit.jupiter.api.AfterEach; @@ -109,9 +110,6 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { /** Nodes. */ private static final int NODES = 3; - /** Factory. */ - private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory(); - /** Expected server result entry. */ private static final EntryImpl EXPECTED_RESULT_ENTRY1 = new EntryImpl( @@ -149,13 +147,13 @@ public class ItMetaStorageRaftGroupTest extends IgniteAbstractTest { private RaftServer metaStorageRaftSrv3; /** First meta storage raft group service. */ - private RaftGroupService metaStorageRaftGrpSvc1; + private TimeAwareRaftGroupService metaStorageRaftGrpSvc1; /** Second meta storage raft group service. */ - private RaftGroupService metaStorageRaftGrpSvc2; + private TimeAwareRaftGroupService metaStorageRaftGrpSvc2; /** Third meta storage raft group service. */ - private RaftGroupService metaStorageRaftGrpSvc3; + private TimeAwareRaftGroupService metaStorageRaftGrpSvc3; /** Mock Metastorage storage. */ @Mock @@ -242,11 +240,11 @@ public void testRangeNextWorksCorrectlyAfterLeaderChange() throws Exception { return Cursor.fromBareIterator(entries.iterator()); }); - List> raftServersRaftGroups = prepareJraftMetaStorages(); + List> raftServersRaftGroups = prepareJraftMetaStorages(); List raftServers = raftServersRaftGroups.stream().map(p -> p.key).collect(Collectors.toList()); - CompletableFuture oldLeaderFut = raftServersRaftGroups.get(0).value.refreshAndGetLeaderWithTerm(); + CompletableFuture oldLeaderFut = raftServersRaftGroups.get(0).value.refreshAndGetLeaderWithTerm(NO_TIMEOUT); assertThat(oldLeaderFut, willCompleteSuccessfully()); @@ -271,7 +269,7 @@ public void testRangeNextWorksCorrectlyAfterLeaderChange() throws Exception { log.info("Test: liveServer: " + liveServer.clusterService().staticLocalNode().name()); - RaftGroupService raftGroupServiceOfLiveServer = raftServersRaftGroups.stream() + TimeAwareRaftGroupService raftGroupServiceOfLiveServer = raftServersRaftGroups.stream() .filter(p -> p.key.equals(liveServer)) .findFirst() .orElseThrow() @@ -285,7 +283,7 @@ public void testRangeNextWorksCorrectlyAfterLeaderChange() throws Exception { var resultFuture = new CompletableFuture(); - metaStorageSvc.range(new ByteArray(EXPECTED_RESULT_ENTRY1.key()), new ByteArray(new byte[]{4})) + metaStorageSvc.range(new ByteArray(EXPECTED_RESULT_ENTRY1.key()), new ByteArray(new byte[]{4}), NO_TIMEOUT) .subscribe(new Subscriber<>() { private Subscription subscription; @@ -313,7 +311,7 @@ public void onNext(Entry item) { assertThat(stopAsync(componentContext, oldLeaderServer, oldLeaderClusterService), willCompleteSuccessfully()); CompletableFuture newLeaderWithTermFut = raftGroupServiceOfLiveServer - .refreshAndGetLeaderWithTerm(); + .refreshAndGetLeaderWithTerm(NO_TIMEOUT); assertThat(newLeaderWithTermFut, willCompleteSuccessfully()); LeaderWithTerm newLeaderWithTerm = newLeaderWithTermFut.join(); @@ -354,7 +352,7 @@ public void onComplete() { assertThat(resultFuture, willCompleteSuccessfully()); } - private List> prepareJraftMetaStorages() throws InterruptedException { + private List> prepareJraftMetaStorages() throws InterruptedException { PeersAndLearners membersConfiguration = cluster.stream() .map(ItMetaStorageRaftGroupTest::localMemberName) .collect(collectingAndThen(toSet(), PeersAndLearners::fromConsistentIds)); @@ -379,10 +377,12 @@ private List> prepareJraftMetaStorages() thro workingDir1.raftLogPath() ); + var eventsClientListener1 = new RaftGroupEventsClientListener(); + metaStorageRaftSrv1 = TestJraftServerFactory.create( cluster.get(0), opt1, - new RaftGroupEventsClientListener() + eventsClientListener1 ); ComponentWorkingDir workingDir2 = new ComponentWorkingDir(workDir.resolve("node2")); @@ -392,10 +392,12 @@ private List> prepareJraftMetaStorages() thro workingDir2.raftLogPath() ); + var eventsClientListener2 = new RaftGroupEventsClientListener(); + metaStorageRaftSrv2 = TestJraftServerFactory.create( cluster.get(1), opt2, - new RaftGroupEventsClientListener() + eventsClientListener2 ); ComponentWorkingDir workingDir3 = new ComponentWorkingDir(workDir.resolve("node3")); @@ -405,10 +407,12 @@ private List> prepareJraftMetaStorages() thro workingDir3.raftLogPath() ); + var eventsClientListener3 = new RaftGroupEventsClientListener(); + metaStorageRaftSrv3 = TestJraftServerFactory.create( cluster.get(2), opt3, - new RaftGroupEventsClientListener() + eventsClientListener3 ); assertThat( @@ -465,37 +469,40 @@ private List> prepareJraftMetaStorages() thro groupOptions3 ); - metaStorageRaftGrpSvc1 = RaftGroupServiceImpl.start( + metaStorageRaftGrpSvc1 = PhysicalTopologyAwareRaftGroupService.start( MetastorageGroupId.INSTANCE, cluster.get(0), - FACTORY, raftConfiguration, membersConfiguration, executor, + eventsClientListener1, commandsMarshaller, - throttlingContextHolder() + StoppingExceptionFactories.indicateComponentStop(), + new NoOpFailureManager() ); - metaStorageRaftGrpSvc2 = RaftGroupServiceImpl.start( + metaStorageRaftGrpSvc2 = PhysicalTopologyAwareRaftGroupService.start( MetastorageGroupId.INSTANCE, cluster.get(1), - FACTORY, raftConfiguration, membersConfiguration, executor, + eventsClientListener2, commandsMarshaller, - throttlingContextHolder() + StoppingExceptionFactories.indicateComponentStop(), + new NoOpFailureManager() ); - metaStorageRaftGrpSvc3 = RaftGroupServiceImpl.start( + metaStorageRaftGrpSvc3 = PhysicalTopologyAwareRaftGroupService.start( MetastorageGroupId.INSTANCE, cluster.get(2), - FACTORY, raftConfiguration, membersConfiguration, executor, + eventsClientListener3, commandsMarshaller, - throttlingContextHolder() + StoppingExceptionFactories.indicateComponentStop(), + new NoOpFailureManager() ); assertTrue(waitForCondition( @@ -503,7 +510,7 @@ private List> prepareJraftMetaStorages() thro "Leaders: " + metaStorageRaftGrpSvc1.leader() + " " + metaStorageRaftGrpSvc2.leader() + " " + metaStorageRaftGrpSvc3 .leader()); - List> raftServersRaftGroups = new ArrayList<>(); + List> raftServersRaftGroups = new ArrayList<>(); raftServersRaftGroups.add(new Pair<>(metaStorageRaftSrv1, metaStorageRaftGrpSvc1)); raftServersRaftGroups.add(new Pair<>(metaStorageRaftSrv2, metaStorageRaftGrpSvc2)); @@ -520,10 +527,11 @@ private List> prepareJraftMetaStorages() thro * @param group3 Raft group 3 * @return {@code true} if all raft groups have the same leader. */ - private boolean sameLeaders(RaftGroupService group1, RaftGroupService group2, RaftGroupService group3) { - group1.refreshLeader(); - group2.refreshLeader(); - group3.refreshLeader(); + private boolean sameLeaders(TimeAwareRaftGroupService group1, TimeAwareRaftGroupService group2, + TimeAwareRaftGroupService group3) { + group1.refreshLeader(NO_TIMEOUT); + group2.refreshLeader(NO_TIMEOUT); + group3.refreshLeader(NO_TIMEOUT); return Objects.equals(group1.leader(), group2.leader()) && Objects.equals(group2.leader(), group3.leader()); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorPublisher.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorPublisher.java index 7ac9fc280bc..69429ed71f5 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorPublisher.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorPublisher.java @@ -35,6 +35,8 @@ class CursorPublisher implements Publisher { private final Function nextBatchCommandSupplier; + private final long timeoutMillis; + private final AtomicBoolean subscriptionGuard = new AtomicBoolean(); /** @@ -43,10 +45,12 @@ class CursorPublisher implements Publisher { * @param context Context. * @param nextBatchCommandSupplier Factory that creates a command for retrieving the next batch of values provided with the last * processed key for pagination purposes. + * @param timeoutMillis Timeout in milliseconds for each batch request. */ - CursorPublisher(MetaStorageServiceContext context, Function nextBatchCommandSupplier) { + CursorPublisher(MetaStorageServiceContext context, Function nextBatchCommandSupplier, long timeoutMillis) { this.context = context; this.nextBatchCommandSupplier = nextBatchCommandSupplier; + this.timeoutMillis = timeoutMillis; } @Override @@ -62,7 +66,7 @@ public void subscribe(Subscriber subscriber) { } try { - var subscription = new CursorSubscription(context, nextBatchCommandSupplier, subscriber); + var subscription = new CursorSubscription(context, nextBatchCommandSupplier, subscriber, timeoutMillis); subscriber.onSubscribe(subscription); } finally { diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorSubscription.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorSubscription.java index 44a6530208e..0d67ac9e015 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorSubscription.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/CursorSubscription.java @@ -41,6 +41,8 @@ class CursorSubscription implements Subscription { private final Function nextBatchCommandSupplier; + private final long timeoutMillis; + /** Flag indicating that either the whole data range has been exhausted or the subscription has been cancelled. */ private boolean isDone = false; @@ -57,11 +59,13 @@ class CursorSubscription implements Subscription { CursorSubscription( MetaStorageServiceContext context, Function nextBatchCommandSupplier, - Subscriber subscriber + Subscriber subscriber, + long timeoutMillis ) { this.context = context; this.nextBatchCommandSupplier = nextBatchCommandSupplier; this.subscriber = subscriber; + this.timeoutMillis = timeoutMillis; } @Override @@ -150,7 +154,7 @@ private void processRequest() { private void requestNextBatch(byte @Nullable [] lastProcessedKey) { ReadCommand nextBatchCommand = nextBatchCommandSupplier.apply(lastProcessedKey); - context.raftService().run(nextBatchCommand) + context.raftService().run(nextBatchCommand, timeoutMillis) .whenCompleteAsync((resp, e) -> { if (e == null) { cachedResponse = resp; diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java index ef6b4c415e5..a8d978ae199 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLeaderElectionListener.java @@ -38,7 +38,7 @@ import org.apache.ignite.internal.metastorage.server.time.ClusterTimeImpl; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.raft.LeaderElectionListener; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.jetbrains.annotations.Nullable; @@ -205,11 +205,11 @@ private CompletableFuture syncTimeIfSecondaryDutiesAreNotPaused(HybridTime Long term = thisNodeTerm; if (term == null) { - // We seized to be a leader, do nothing. + // We ceased to be a leader, do nothing. return nullCompletedFuture(); } - return service.syncTime(safeTime, term); + return service.syncTime(safeTime, term, TimeAwareRaftGroupService.NO_TIMEOUT); } private class MetaStorageLogicalTopologyEventListener implements LogicalTopologyEventListener { @@ -237,7 +237,7 @@ public void onTopologyLeap(LogicalTopologySnapshot newTopology) { @FunctionalInterface private interface Action { - CompletableFuture apply(RaftGroupService raftService, long term); + CompletableFuture apply(TimeAwareRaftGroupService raftService, long term); } /** diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLearnerManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLearnerManager.java index 2c8b575d38f..44d91a4e432 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLearnerManager.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageLearnerManager.java @@ -34,7 +34,7 @@ import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.raft.Peer; import org.apache.ignite.internal.raft.PeersAndLearners; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.jetbrains.annotations.TestOnly; @@ -71,21 +71,21 @@ CompletableFuture updateLearners(long term) { return metaStorageSvcFut.thenCompose(service -> resetLearners(service.raftGroupService(), term, 0)); } - CompletableFuture addLearner(RaftGroupService raftService, InternalClusterNode learner) { + CompletableFuture addLearner(TimeAwareRaftGroupService raftService, InternalClusterNode learner) { if (!learnersAdditionEnabled) { return nullCompletedFuture(); } return updateConfigUnderLock(() -> isPeer(raftService, learner) ? nullCompletedFuture() // TODO: https://issues.apache.org/jira/browse/IGNITE-26854. - : raftService.addLearners(List.of(new Peer(learner.name())), 0)); + : raftService.addLearners(List.of(new Peer(learner.name())), 0, TimeAwareRaftGroupService.NO_TIMEOUT)); } - private static boolean isPeer(RaftGroupService raftService, InternalClusterNode node) { + private static boolean isPeer(TimeAwareRaftGroupService raftService, InternalClusterNode node) { return raftService.peers().stream().anyMatch(peer -> peer.consistentId().equals(node.name())); } - CompletableFuture removeLearner(RaftGroupService raftService, InternalClusterNode learner) { + CompletableFuture removeLearner(TimeAwareRaftGroupService raftService, InternalClusterNode learner) { return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader() .thenCompose(validatedNodes -> updateConfigUnderLock(() -> { if (isPeer(raftService, learner)) { @@ -99,11 +99,11 @@ CompletableFuture removeLearner(RaftGroupService raftService, InternalClus } // TODO: https://issues.apache.org/jira/browse/IGNITE-26854. - return raftService.removeLearners(List.of(new Peer(learner.name())), 0); + return raftService.removeLearners(List.of(new Peer(learner.name())), 0, TimeAwareRaftGroupService.NO_TIMEOUT); }))); } - CompletableFuture resetLearners(RaftGroupService raftService, long term, long sequenceToken) { + CompletableFuture resetLearners(TimeAwareRaftGroupService raftService, long term, long sequenceToken) { return updateConfigUnderLock(() -> logicalTopologyService.validatedNodesOnLeader() .thenCompose(validatedNodes -> updateConfigUnderLock(() -> { Set peers = raftService.peers().stream().map(Peer::consistentId).collect(toSet()); @@ -116,7 +116,8 @@ CompletableFuture resetLearners(RaftGroupService raftService, long term, l PeersAndLearners newPeerConfiguration = PeersAndLearners.fromConsistentIds(peers, learners); // We can't use 'resetLearners' call here because it does not support empty lists of learners. - return raftService.changePeersAndLearnersAsync(newPeerConfiguration, term, sequenceToken); + return raftService.changePeersAndLearnersAsync( + newPeerConfiguration, term, sequenceToken, TimeAwareRaftGroupService.NO_TIMEOUT); }))); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java index 3d4820925c5..c97b960b2e1 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java @@ -108,10 +108,11 @@ import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.RaftNodeId; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.StoppingExceptionFactories; +import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.server.RaftGroupOptions; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.util.Cursor; @@ -172,7 +173,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager, MetastorageGr private final ClusterTimeImpl clusterTime; - private final TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory; + private final TimeAwareRaftGroupServiceFactory raftServiceFactory; private final MetricManager metricManager; @@ -199,7 +200,7 @@ public class MetaStorageManagerImpl implements MetaStorageManager, MetastorageGr private final CompletableFuture raftNodeStarted = new CompletableFuture<>(); /** Gets completed when a Raft service (that is, the Raft client for talking with the group) is started for Metastorage. */ - private final OrderingFuture raftServiceFuture = new OrderingFuture<>(); + private final OrderingFuture raftServiceFuture = new OrderingFuture<>(); /** * State of changing Raft group peers (aka voting set members). Currently only used for forceful members reset during repair. @@ -263,7 +264,7 @@ public MetaStorageManagerImpl( RaftManager raftMgr, KeyValueStorage storage, HybridClock clock, - TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, + TimeAwareRaftGroupServiceFactory raftServiceFactory, MetricManager metricManager, MetastorageRepairStorage metastorageRepairStorage, MetastorageRepair metastorageRepair, @@ -280,7 +281,7 @@ public MetaStorageManagerImpl( this.clock = clock; this.clusterTime = new ClusterTimeImpl(localNode.name(), busyLock, clock, failureProcessor); this.metaStorageMetricSource = new MetaStorageMetricSource(clusterTime, this::computeAvailablePeers, () -> msAvailable ? 1 : 0); - this.topologyAwareRaftGroupServiceFactory = topologyAwareRaftGroupServiceFactory; + this.raftServiceFactory = raftServiceFactory; this.metricManager = metricManager; this.metastorageRepairStorage = metastorageRepairStorage; this.metastorageRepair = metastorageRepair; @@ -306,7 +307,7 @@ public MetaStorageManagerImpl( RaftManager raftMgr, KeyValueStorage storage, HybridClock clock, - TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, + TimeAwareRaftGroupServiceFactory raftServiceFactory, MetricManager metricManager, SystemDistributedConfiguration systemConfiguration, RaftGroupOptionsConfigurer raftGroupOptionsConfigurer @@ -318,7 +319,7 @@ public MetaStorageManagerImpl( raftMgr, storage, clock, - topologyAwareRaftGroupServiceFactory, + raftServiceFactory, metricManager, systemConfiguration, raftGroupOptionsConfigurer, @@ -337,7 +338,7 @@ public MetaStorageManagerImpl( RaftManager raftMgr, KeyValueStorage storage, HybridClock clock, - TopologyAwareRaftGroupServiceFactory topologyAwareRaftGroupServiceFactory, + TimeAwareRaftGroupServiceFactory raftServiceFactory, MetricManager metricManager, SystemDistributedConfiguration systemConfiguration, RaftGroupOptionsConfigurer raftGroupOptionsConfigurer, @@ -350,7 +351,7 @@ public MetaStorageManagerImpl( raftMgr, storage, clock, - topologyAwareRaftGroupServiceFactory, + raftServiceFactory, metricManager, new NoOpMetastorageRepairStorage(), (nodes, mgReplicationFactor) -> nullCompletedFuture(), @@ -375,7 +376,7 @@ public void registerNotificationEnqueuedListener(NotificationEnqueuedListener li private CompletableFuture recover(MetaStorageService service) { return inBusyLockAsync(busyLock, () -> { - service.currentRevisions() + service.currentRevisions(TimeAwareRaftGroupService.NO_TIMEOUT) .thenAccept(targetRevisions -> { assert targetRevisions != null; @@ -459,7 +460,7 @@ private CompletableFuture validateMetastorageForDivergence(Set met long localChecksum = storage.checksum(localRevision); return doWithOneOffRaftGroupService(PeersAndLearners.fromConsistentIds(metastorageNodes), raftClient -> { - return createMetaStorageService(raftClient).checksum(localRevision) + return createMetaStorageService(raftClient).checksum(localRevision, TimeAwareRaftGroupService.NO_TIMEOUT) .thenAccept(leaderChecksumInfo -> { LOG.info( "Validating Metastorage for divergence [localRevision={}, localChecksum={}, leaderChecksumInfo={}", @@ -498,7 +499,7 @@ private void saveWitnessedMetastorageRepairClusterIdLocally(UUID currentClusterI } private CompletableFuture initializeMetastorage(MetaStorageInfo metaStorageInfo) { - CompletableFuture localRaftServiceFuture; + CompletableFuture localRaftServiceFuture; try { localRaftServiceFuture = metaStorageInfo.metaStorageNodes().contains(localNode.name()) ? startVotingNode(metaStorageInfo) @@ -515,11 +516,11 @@ private CompletableFuture initializeMetastorage(MetaStor }); } - private MetaStorageServiceImpl createMetaStorageService(RaftGroupService raftService) { + private MetaStorageServiceImpl createMetaStorageService(TimeAwareRaftGroupService raftService) { return new MetaStorageServiceImpl(localNode, raftService, busyLock, clock); } - private CompletableFuture startVotingNode( + private CompletableFuture startVotingNode( MetaStorageInfo metaStorageInfo ) throws NodeStoppingException { PeersAndLearners configuration = PeersAndLearners.fromConsistentIds(metaStorageInfo.metaStorageNodes()); @@ -529,7 +530,7 @@ private CompletableFuture startVotingNode( return startRaftNode(configuration, localPeer, metaStorageInfo); } - private CompletableFuture startLearnerNode( + private CompletableFuture startLearnerNode( MetaStorageInfo metaStorageInfo ) throws NodeStoppingException { String thisNodeName = localNode.name(); @@ -540,7 +541,7 @@ private CompletableFuture startLearnerNode( return startRaftNode(configuration, localPeer, metaStorageInfo); } - private CompletableFuture startRaftNode( + private CompletableFuture startRaftNode( PeersAndLearners configuration, Peer localPeer, MetaStorageInfo metaStorageInfo @@ -548,8 +549,8 @@ private CompletableFuture startRaftNode( SystemDistributedConfiguration currentSystemConfiguration = systemConfiguration; assert currentSystemConfiguration != null : "System configuration has not been set"; - CompletableFuture serviceFuture = CompletableFuture.supplyAsync(() -> { - TopologyAwareRaftGroupService service = startRaftNodeItself(configuration, localPeer, metaStorageInfo); + CompletableFuture serviceFuture = CompletableFuture.supplyAsync(() -> { + PhysicalTopologyAwareRaftGroupService service = startRaftNodeItself(configuration, localPeer, metaStorageInfo); raftNodeStarted.complete(null); @@ -562,7 +563,7 @@ private CompletableFuture startRaftNode( }); } - private TopologyAwareRaftGroupService startRaftNodeItself( + private PhysicalTopologyAwareRaftGroupService startRaftNodeItself( PeersAndLearners configuration, Peer localPeer, MetaStorageInfo metaStorageInfo @@ -576,12 +577,14 @@ private TopologyAwareRaftGroupService startRaftNodeItself( ); try { - return raftMgr.startSystemRaftGroupNodeAndWaitNodeReady( + // Safe cast: raftServiceFactory is always PhysicalTopologyAwareRaftGroupServiceFactory, which produces + // PhysicalTopologyAwareRaftGroupService. We need the concrete type for subscribeLeader(). + return (PhysicalTopologyAwareRaftGroupService) raftMgr.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware( raftNodeId(localPeer), configuration, raftListener, RaftGroupEventsListener.noopLsnr, - topologyAwareRaftGroupServiceFactory, + raftServiceFactory, options -> { raftGroupOptionsConfigurer.configure(options); @@ -655,7 +658,7 @@ private void onConfigurationCommitted(RaftGroupConfiguration configuration) { }); } - private void updateRaftClientConfigIfEventIsNotStale(RaftGroupConfiguration configuration, RaftGroupService raftService) { + private void updateRaftClientConfigIfEventIsNotStale(RaftGroupConfiguration configuration, TimeAwareRaftGroupService raftService) { IndexWithTerm newIndexWithTerm = new IndexWithTerm(configuration.index(), configuration.term()); lastHandledIndexWithTerm.updateAndGet(existingIndexWithTerm -> { @@ -676,7 +679,7 @@ private void updateRaftClientConfigIfEventIsNotStale(RaftGroupConfiguration conf }); } - private void handlePeersChange(RaftGroupConfiguration configuration, RaftGroupService raftService) { + private void handlePeersChange(RaftGroupConfiguration configuration, TimeAwareRaftGroupService raftService) { synchronized (peersChangeMutex) { if (peersChangeState == null || configuration.term() <= peersChangeState.termBeforeChange) { return; @@ -689,7 +692,7 @@ private void handlePeersChange(RaftGroupConfiguration configuration, RaftGroupSe PeersAndLearners newConfig = PeersAndLearners.fromConsistentIds(currentState.targetPeers); // TODO: https://issues.apache.org/jira/browse/IGNITE-26854. - raftService.changePeersAndLearners(newConfig, configuration.term(), 0) + raftService.changePeersAndLearners(newConfig, configuration.term(), 0, TimeAwareRaftGroupService.NO_TIMEOUT) .whenComplete((res, ex) -> { if (ex != null) { Throwable unwrapped = ExceptionUtils.unwrapCause(ex); @@ -857,7 +860,7 @@ public long appliedRevision() { @Override public CompletableFuture currentRevision() { - return metaStorageSvcFut.thenCompose(MetaStorageService::currentRevisions) + return metaStorageSvcFut.thenCompose(svc -> svc.currentRevisions(TimeAwareRaftGroupService.NO_TIMEOUT)) .thenApply(RevisionsInfo::revision); } @@ -920,7 +923,7 @@ public CompletableFuture get(ByteArray key) { busyLock, () -> withTrackReadOperationFromLeaderFuture( storage.revision(), - () -> metaStorageSvcFut.thenCompose(svc -> svc.get(key)) + () -> metaStorageSvcFut.thenCompose(svc -> svc.get(key, TimeAwareRaftGroupService.NO_TIMEOUT)) ) ); } @@ -931,7 +934,7 @@ public CompletableFuture get(ByteArray key, long revUpperBound) { busyLock, () -> withTrackReadOperationFromLeaderFuture( revUpperBound, - () -> metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound)) + () -> metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound, TimeAwareRaftGroupService.NO_TIMEOUT)) ) ); } @@ -983,7 +986,7 @@ public CompletableFuture> getAll(Set keys) { busyLock, () -> withTrackReadOperationFromLeaderFuture( storage.revision(), - () -> metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys)) + () -> metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, TimeAwareRaftGroupService.NO_TIMEOUT)) ) ); } @@ -995,7 +998,7 @@ public CompletableFuture put(ByteArray key, byte[] val) { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val)); + return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1008,7 +1011,7 @@ public CompletableFuture putAll(Map vals) { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals)); + return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1021,7 +1024,7 @@ public CompletableFuture remove(ByteArray key) { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.remove(key)); + return metaStorageSvcFut.thenCompose(svc -> svc.remove(key, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1034,7 +1037,7 @@ public CompletableFuture removeAll(Set keys) { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys)); + return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1047,7 +1050,7 @@ public CompletableFuture removeByPrefix(ByteArray prefix) { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.removeByPrefix(prefix)); + return metaStorageSvcFut.thenCompose(svc -> svc.removeByPrefix(prefix, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1060,7 +1063,7 @@ public CompletableFuture invoke(Condition cond, Operation success, Oper } try { - return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure)); + return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1073,7 +1076,7 @@ public CompletableFuture invoke(Condition cond, List success } try { - return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure)); + return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1086,7 +1089,7 @@ public CompletableFuture invoke(Iif iif) { } try { - return metaStorageSvcFut.thenCompose(svc -> svc.invoke(iif)); + return metaStorageSvcFut.thenCompose(svc -> svc.invoke(iif, TimeAwareRaftGroupService.NO_TIMEOUT)); } finally { busyLock.leaveBusy(); } @@ -1101,7 +1104,8 @@ public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo) { try { return withTrackReadOperationFromLeaderPublisher( storage.revision(), - () -> new CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, false))) + () -> new CompletableFuturePublisher<>(metaStorageSvcFut.thenApply( + svc -> svc.range(keyFrom, keyTo, false, TimeAwareRaftGroupService.NO_TIMEOUT))) ); } finally { busyLock.leaveBusy(); @@ -1122,7 +1126,8 @@ public Publisher prefix(ByteArray keyPrefix, long revUpperBound) { try { return withTrackReadOperationFromLeaderPublisher( revUpperBound, - () -> new CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc -> svc.prefix(keyPrefix, revUpperBound))) + () -> new CompletableFuturePublisher<>(metaStorageSvcFut.thenApply( + svc -> svc.prefix(keyPrefix, revUpperBound, TimeAwareRaftGroupService.NO_TIMEOUT))) ); } finally { busyLock.leaveBusy(); @@ -1161,7 +1166,9 @@ private void checkMgAvailability() { try { metaStorageSvcFut - .thenCompose(MetaStorageServiceImpl::currentRevisions) + .thenCompose(metaStorageService -> + metaStorageService.currentRevisions(TimeAwareRaftGroupService.NO_TIMEOUT) + ) .orTimeout(AVAILABILITY_CHECK_TIMEOUT_MS, TimeUnit.MILLISECONDS) .whenComplete((rev, ex) -> msAvailable = ex == null); } finally { @@ -1246,10 +1253,16 @@ public void initiateForcefulVotersChange(long termBeforeChange, Set targ private CompletableFuture doWithOneOffRaftGroupService( PeersAndLearners raftClientConfiguration, - Function> action + Function> action ) { try { - RaftGroupService raftGroupService = raftMgr.startRaftGroupService(MetastorageGroupId.INSTANCE, raftClientConfiguration, true); + TimeAwareRaftGroupService raftGroupService = raftMgr.startTimeAwareRaftGroupService( + MetastorageGroupId.INSTANCE, + raftClientConfiguration, + raftServiceFactory, + StoppingExceptionFactories.indicateNodeStop(), + true + ); return action.apply(raftGroupService) // This callback should be executed asynchronously due to @@ -1336,7 +1349,9 @@ public CompletableFuture evictIdempotentCommandsCache(HybridTimestamp evic } try { - return metaStorageSvcFut.thenCompose(svc -> svc.evictIdempotentCommandsCache(evictionTimestamp)); + return metaStorageSvcFut.thenCompose(svc -> + svc.evictIdempotentCommandsCache(evictionTimestamp, TimeAwareRaftGroupService.NO_TIMEOUT) + ); } finally { busyLock.leaveBusy(); } @@ -1467,7 +1482,8 @@ private MetaStorageInfoAndClusterState(MetaStorageInfo metaStorageInfo, ClusterS * @return Pending operation future. */ CompletableFuture sendCompactionCommand(long compactionRevision) { - return inBusyLockAsync(busyLock, () -> metaStorageSvcFut.thenCompose(svc -> svc.sendCompactionCommand(compactionRevision))); + return inBusyLockAsync(busyLock, () -> metaStorageSvcFut.thenCompose( + svc -> svc.sendCompactionCommand(compactionRevision, TimeAwareRaftGroupService.NO_TIMEOUT))); } // TODO: https://issues.apache.org/jira/browse/IGNITE-26085 Remove, tmp hack diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java index 50c5509820c..54876d33055 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageService.java @@ -37,24 +37,34 @@ /** * Defines interface for access to a meta storage service. + * + *

All methods accept a {@code timeoutMillis} parameter with the following semantics: + *

    + *
  • {@code 0} - single attempt without retries.
  • + *
  • {@code Long.MAX_VALUE} - infinite wait.
  • + *
  • negative values - treated as infinite for compatibility.
  • + *
  • positive values - bounded wait up to the specified timeout.
  • + *
*/ public interface MetaStorageService extends ManuallyCloseable { /** * Retrieves an entry for the given key. * * @param key Key. Couldn't be {@code null}. + * @param timeoutMillis Timeout in milliseconds. * @return An entry for the given key. Couldn't be {@code null}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray * @see Entry */ - CompletableFuture get(ByteArray key); + CompletableFuture get(ByteArray key, long timeoutMillis); /** * Retrieves an entry for the given key and the revision upper bound. * * @param key The key. Couldn't be {@code null}. * @param revUpperBound The upper bound for entry revisions. Must be positive. + * @param timeoutMillis Timeout in milliseconds. * @return An entry for the given key and maximum revision limited by {@code revUpperBound}. Couldn't be {@code null}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. Will be thrown on getting @@ -62,24 +72,26 @@ public interface MetaStorageService extends ManuallyCloseable { * @see ByteArray * @see Entry */ - CompletableFuture get(ByteArray key, long revUpperBound); + CompletableFuture get(ByteArray key, long revUpperBound, long timeoutMillis); /** * Retrieves entries for given keys. * * @param keys The set of keys. Couldn't be {@code null} or empty. Set elements couldn't be {@code null}. + * @param timeoutMillis Timeout in milliseconds. * @return A map of entries for given keys. Couldn't be {@code null}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray * @see Entry */ - CompletableFuture> getAll(Set keys); + CompletableFuture> getAll(Set keys, long timeoutMillis); /** * Retrieves entries for given keys and the revision upper bound. * * @param keys The set of keys. Couldn't be {@code null} or empty. Set elements couldn't be {@code null}. * @param revUpperBound The upper bound for entry revisions. Must be positive. + * @param timeoutMillis Timeout in milliseconds. * @return A map of entries for given keys and maximum revision limited by {@code revUpperBound}. Couldn't be {@code null}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. Will be thrown on getting @@ -87,62 +99,67 @@ public interface MetaStorageService extends ManuallyCloseable { * @see ByteArray * @see Entry */ - CompletableFuture> getAll(Set keys, long revUpperBound); + CompletableFuture> getAll(Set keys, long revUpperBound, long timeoutMillis); /** * Inserts or updates an entry with the given key and the given value. * * @param key The key. Couldn't be {@code null}. * @param value The value. Couldn't be {@code null}. + * @param timeoutMillis Timeout in milliseconds. * @return Completed future. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray * @see Entry */ - CompletableFuture put(ByteArray key, byte[] value); + CompletableFuture put(ByteArray key, byte[] value, long timeoutMillis); /** * Inserts or updates entries with given keys and given values. * * @param vals The map of keys and corresponding values. Couldn't be {@code null} or empty. + * @param timeoutMillis Timeout in milliseconds. * @return Completed future. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray * @see Entry */ - CompletableFuture putAll(Map vals); + CompletableFuture putAll(Map vals, long timeoutMillis); /** * Removes an entry for the given key. * * @param key The key. Couldn't be {@code null}. + * @param timeoutMillis Timeout in milliseconds. * @return Completed future. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray * @see Entry */ - CompletableFuture remove(ByteArray key); + CompletableFuture remove(ByteArray key, long timeoutMillis); /** * Removes entries for given keys. * * @param keys The keys set. Couldn't be {@code null} or empty. + * @param timeoutMillis Timeout in milliseconds. * @return Completed future. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray * @see Entry */ - CompletableFuture removeAll(Set keys); + CompletableFuture removeAll(Set keys, long timeoutMillis); /** * Removes entries by given prefix. * * @param prefix Prefix to remove keys by. Couldn't be {@code null}. + * @param timeoutMillis Timeout in milliseconds. * @return Future that completes successfully when keys with given prefix are deleted or with {@link OperationTimeoutException}. * @see ByteArray * @see Entry */ - CompletableFuture removeByPrefix(ByteArray prefix); + CompletableFuture removeByPrefix(ByteArray prefix, long timeoutMillis); /** * Updates an entry for the given key conditionally. @@ -152,6 +169,7 @@ public interface MetaStorageService extends ManuallyCloseable { * @param condition The condition. * @param success The update which will be applied in case of condition evaluation yields {@code true}. * @param failure The update which will be applied in case of condition evaluation yields {@code false}. + * @param timeoutMillis Timeout in milliseconds. * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray @@ -159,7 +177,7 @@ public interface MetaStorageService extends ManuallyCloseable { * @see Condition * @see Operation */ - CompletableFuture invoke(Condition condition, Operation success, Operation failure); + CompletableFuture invoke(Condition condition, Operation success, Operation failure, long timeoutMillis); /** * Updates an entry for the given key conditionally. @@ -169,6 +187,7 @@ public interface MetaStorageService extends ManuallyCloseable { * @param condition The condition. * @param success The updates which will be applied in case of condition evaluation yields {@code true}. * @param failure The updates which will be applied in case of condition evaluation yields {@code false}. + * @param timeoutMillis Timeout in milliseconds. * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see ByteArray @@ -176,18 +195,19 @@ public interface MetaStorageService extends ManuallyCloseable { * @see Condition * @see Operation */ - CompletableFuture invoke(Condition condition, List success, List failure); + CompletableFuture invoke(Condition condition, List success, List failure, long timeoutMillis); /** * Invoke, which supports nested conditional statements. For detailed docs about construction of new if statement, look at {@link Iif} * javadocs. * * @param iif {@link Iif} statement to invoke + * @param timeoutMillis Timeout in milliseconds. * @return execution result * @see Iif * @see StatementResult */ - CompletableFuture invoke(Iif iif); + CompletableFuture invoke(Iif iif, long timeoutMillis); /** * Retrieves entries for the given key range in lexicographic order. Entries will be filtered out by upper bound of given revision @@ -196,13 +216,14 @@ public interface MetaStorageService extends ManuallyCloseable { * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}. * @param keyTo End key of range (exclusive). {@code null} represents an unbound range. * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision. + * @param timeoutMillis Timeout in milliseconds. * @return Publisher that will provide entries corresponding to the given range and revision. * @throws OperationTimeoutException If the operation is timed out. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. * @see ByteArray * @see Entry */ - Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound); + Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound, long timeoutMillis); /** * Retrieves entries for the given key range in lexicographic order. Entries will be filtered out by upper bound of given revision @@ -212,61 +233,71 @@ public interface MetaStorageService extends ManuallyCloseable { * @param keyTo End key of range (exclusive). {@code null} represents an unbound range. * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision. * @param includeTombstones Whether to include tombstone entries. + * @param timeoutMillis Timeout in milliseconds. * @return Publisher that will provide entries corresponding to the given range and revision. * @throws OperationTimeoutException If the operation is timed out. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. * @see ByteArray * @see Entry */ - Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound, boolean includeTombstones); + Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound, boolean includeTombstones, + long timeoutMillis); /** - * Retrieves entries for the given key range in lexicographic order. Short cut for {@link #range(ByteArray, ByteArray, long)} where - * {@code revUpperBound == -1}. + * Retrieves entries for the given key range in lexicographic order. Short cut for + * {@link #range(ByteArray, ByteArray, long, long)} where {@code revUpperBound == -1}. * * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}. * @param keyTo End key of range (exclusive). {@code null} represents an unbound range. + * @param timeoutMillis Timeout in milliseconds. * @return Publisher that will provide entries corresponding to the given range. * @throws OperationTimeoutException If the operation is timed out. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. * @see ByteArray * @see Entry */ - Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo); + Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long timeoutMillis); /** - * Retrieves entries for the given key range in lexicographic order. Short cut for {@link #range(ByteArray, ByteArray, long, boolean)} - * where {@code revUpperBound == -1}. + * Retrieves entries for the given key range in lexicographic order. Short cut for + * {@link #range(ByteArray, ByteArray, long, boolean, long)} where {@code revUpperBound == -1}. * * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}. * @param keyTo End key of range (exclusive). {@code null} represents an unbound range. * @param includeTombstones Whether to include tombstone entries. + * @param timeoutMillis Timeout in milliseconds. * @return Publisher that will provide entries corresponding to the given range. * @throws OperationTimeoutException If the operation is timed out. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. * @see ByteArray * @see Entry */ - Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, boolean includeTombstones); + Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, boolean includeTombstones, long timeoutMillis); /** * Retrieves entries for keys starting with the given prefix in lexicographic order. * * @param prefix Key prefix. * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision. + * @param timeoutMillis Timeout in milliseconds. * @return Publisher that will provide entries corresponding to the given prefix and revision. * @throws OperationTimeoutException If the operation is timed out. * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. */ - Publisher prefix(ByteArray prefix, long revUpperBound); + Publisher prefix(ByteArray prefix, long revUpperBound, long timeoutMillis); - /** Returns a future which will hold {@link RevisionsInfo current revisions} of the metastorage leader. */ - CompletableFuture currentRevisions(); + /** + * Returns a future which will hold {@link RevisionsInfo current revisions} of the metastorage leader. + * + * @param timeoutMillis Timeout in milliseconds. + */ + CompletableFuture currentRevisions(long timeoutMillis); /** * Returns information about a revision checksum on the leader. * * @param revision Revision of interest. + * @param timeoutMillis Timeout in milliseconds. */ - CompletableFuture checksum(long revision); + CompletableFuture checksum(long revision, long timeoutMillis); } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceContext.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceContext.java index feeb968006e..ce866b40097 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceContext.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceContext.java @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -30,7 +30,7 @@ */ class MetaStorageServiceContext implements ManuallyCloseable { /** Meta storage raft group service. */ - private final RaftGroupService raftService; + private final TimeAwareRaftGroupService raftService; /** Commands factory. */ private final MetaStorageCommandsFactory commandsFactory; @@ -41,7 +41,7 @@ class MetaStorageServiceContext implements ManuallyCloseable { private final IgniteSpinBusyLock busyLock; MetaStorageServiceContext( - RaftGroupService raftService, + TimeAwareRaftGroupService raftService, MetaStorageCommandsFactory commandsFactory, ExecutorService executorService, IgniteSpinBusyLock busyLock @@ -52,7 +52,7 @@ class MetaStorageServiceContext implements ManuallyCloseable { this.busyLock = busyLock; } - RaftGroupService raftService() { + TimeAwareRaftGroupService raftService() { return raftService; } diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java index 8d4196bffcf..97a2dcd7af5 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageServiceImpl.java @@ -58,8 +58,7 @@ import org.apache.ignite.internal.metastorage.dsl.StatementResult; import org.apache.ignite.internal.network.InternalClusterNode; import org.apache.ignite.internal.raft.ReadCommand; -import org.apache.ignite.internal.raft.service.RaftCommandRunner; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteSpinBusyLock; import org.apache.ignite.internal.util.IgniteUtils; @@ -74,10 +73,6 @@ public class MetaStorageServiceImpl implements MetaStorageService { /** Default batch size that is requested from the remote server. */ public static final int BATCH_SIZE = 1000; - // TODO: https://issues.apache.org/jira/browse/IGNITE-26085 Use proper timeout or reactive approach. - /** Timeout for meta storage raft commands processing. */ - private static final int TIMEOUT_MILLIS = 30_000; - private final MetaStorageServiceContext context; private final HybridClock clock; @@ -91,7 +86,7 @@ public class MetaStorageServiceImpl implements MetaStorageService { */ public MetaStorageServiceImpl( InternalClusterNode localNode, - RaftGroupService metaStorageRaftGrpSvc, + TimeAwareRaftGroupService metaStorageRaftGrpSvc, IgniteSpinBusyLock busyLock, HybridClock clock ) { @@ -107,88 +102,89 @@ public MetaStorageServiceImpl( this.commandIdGenerator = new CommandIdGenerator(localNode.id()); } - public RaftGroupService raftGroupService() { + public TimeAwareRaftGroupService raftGroupService() { return context.raftService(); } @Override - public CompletableFuture get(ByteArray key) { - return get(key, MetaStorageManager.LATEST_REVISION); + public CompletableFuture get(ByteArray key, long timeoutMillis) { + return get(key, MetaStorageManager.LATEST_REVISION, timeoutMillis); } @Override - public CompletableFuture get(ByteArray key, long revUpperBound) { + public CompletableFuture get(ByteArray key, long revUpperBound, long timeoutMillis) { GetCommand getCommand = context.commandsFactory().getCommand().key(ByteBuffer.wrap(key.bytes())).revision(revUpperBound).build(); - return context.raftService().run(getCommand, TIMEOUT_MILLIS); + return context.raftService().run(getCommand, timeoutMillis); } @Override - public CompletableFuture> getAll(Set keys) { - return getAll(keys, MetaStorageManager.LATEST_REVISION); + public CompletableFuture> getAll(Set keys, long timeoutMillis) { + return getAll(keys, MetaStorageManager.LATEST_REVISION, timeoutMillis); } @Override - public CompletableFuture> getAll(Set keys, long revUpperBound) { + public CompletableFuture> getAll(Set keys, long revUpperBound, long timeoutMillis) { GetAllCommand getAllCommand = getAllCommand(context.commandsFactory(), keys, revUpperBound); - return context.raftService().>run(getAllCommand) + return context.raftService().>run(getAllCommand, timeoutMillis) .thenApply(MetaStorageServiceImpl::multipleEntryResult); } @Override - public CompletableFuture put(ByteArray key, byte[] value) { + public CompletableFuture put(ByteArray key, byte[] value, long timeoutMillis) { PutCommand putCommand = context.commandsFactory().putCommand() .key(ByteBuffer.wrap(key.bytes())) .value(ByteBuffer.wrap(value)) .initiatorTime(clock.now()) .build(); - return context.raftService().run(putCommand, TIMEOUT_MILLIS); + return context.raftService().run(putCommand, timeoutMillis); } @Override - public CompletableFuture putAll(Map vals) { + public CompletableFuture putAll(Map vals, long timeoutMillis) { PutAllCommand putAllCommand = putAllCommand(context.commandsFactory(), vals, clock.now()); - return context.raftService().run(putAllCommand, TIMEOUT_MILLIS); + return context.raftService().run(putAllCommand, timeoutMillis); } @Override - public CompletableFuture remove(ByteArray key) { + public CompletableFuture remove(ByteArray key, long timeoutMillis) { RemoveCommand removeCommand = context.commandsFactory().removeCommand().key(ByteBuffer.wrap(key.bytes())) .initiatorTime(clock.now()).build(); - return context.raftService().run(removeCommand, TIMEOUT_MILLIS); + return context.raftService().run(removeCommand, timeoutMillis); } @Override - public CompletableFuture removeAll(Set keys) { + public CompletableFuture removeAll(Set keys, long timeoutMillis) { RemoveAllCommand removeAllCommand = removeAllCommand(context.commandsFactory(), keys, clock.now()); - return context.raftService().run(removeAllCommand, TIMEOUT_MILLIS); + return context.raftService().run(removeAllCommand, timeoutMillis); } @Override - public CompletableFuture removeByPrefix(ByteArray prefix) { + public CompletableFuture removeByPrefix(ByteArray prefix, long timeoutMillis) { RemoveByPrefixCommand removeByPrefix = context.commandsFactory().removeByPrefixCommand() .prefix(ByteBuffer.wrap(prefix.bytes())) .initiatorTime(clock.now()) .build(); - return context.raftService().run(removeByPrefix, TIMEOUT_MILLIS); + return context.raftService().run(removeByPrefix, timeoutMillis); } @Override - public CompletableFuture invoke(Condition condition, Operation success, Operation failure) { - return invoke(condition, List.of(success), List.of(failure)); + public CompletableFuture invoke(Condition condition, Operation success, Operation failure, long timeoutMillis) { + return invoke(condition, List.of(success), List.of(failure), timeoutMillis); } @Override public CompletableFuture invoke( Condition condition, List success, - List failure + List failure, + long timeoutMillis ) { InvokeCommand invokeCommand = context.commandsFactory().invokeCommand() .condition(condition) @@ -198,33 +194,33 @@ public CompletableFuture invoke( .id(commandIdGenerator.newId()) .build(); - return context.raftService().run(invokeCommand, TIMEOUT_MILLIS); + return context.raftService().run(invokeCommand, timeoutMillis); } @Override - public CompletableFuture invoke(Iif iif) { + public CompletableFuture invoke(Iif iif, long timeoutMillis) { MultiInvokeCommand multiInvokeCommand = context.commandsFactory().multiInvokeCommand() .iif(iif) .initiatorTime(clock.now()) .id(commandIdGenerator.newId()) .build(); - return context.raftService().run(multiInvokeCommand, TIMEOUT_MILLIS); + return context.raftService().run(multiInvokeCommand, timeoutMillis); } @Override - public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) { - return range(keyFrom, keyTo, revUpperBound, false); + public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound, long timeoutMillis) { + return range(keyFrom, keyTo, revUpperBound, false, timeoutMillis); } @Override - public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo) { - return range(keyFrom, keyTo, false); + public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, long timeoutMillis) { + return range(keyFrom, keyTo, false, timeoutMillis); } @Override - public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, boolean includeTombstones) { - return range(keyFrom, keyTo, MetaStorageManager.LATEST_REVISION, includeTombstones); + public Publisher range(ByteArray keyFrom, @Nullable ByteArray keyTo, boolean includeTombstones, long timeoutMillis) { + return range(keyFrom, keyTo, MetaStorageManager.LATEST_REVISION, includeTombstones, timeoutMillis); } @Override @@ -232,7 +228,8 @@ public Publisher range( ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound, - boolean includeTombstones + boolean includeTombstones, + long timeoutMillis ) { Function getRangeCommand = prevKey -> context.commandsFactory().getRangeCommand() .keyFrom(ByteBuffer.wrap(keyFrom.bytes())) @@ -243,11 +240,11 @@ public Publisher range( .batchSize(BATCH_SIZE) .build(); - return new CursorPublisher(context, getRangeCommand); + return new CursorPublisher(context, getRangeCommand, timeoutMillis); } @Override - public Publisher prefix(ByteArray prefix, long revUpperBound) { + public Publisher prefix(ByteArray prefix, long revUpperBound, long timeoutMillis) { Function getPrefixCommand = prevKey -> context.commandsFactory().getPrefixCommand() .prefix(ByteBuffer.wrap(prefix.bytes())) .revUpperBound(revUpperBound) @@ -256,54 +253,56 @@ public Publisher prefix(ByteArray prefix, long revUpperBound) { .batchSize(BATCH_SIZE) .build(); - return new CursorPublisher(context, getPrefixCommand); + return new CursorPublisher(context, getPrefixCommand, timeoutMillis); } /** * Sends idle safe time sync message. Should be called only on the leader node. * * @param safeTime New safe time. + * @param timeoutMillis Timeout in milliseconds. * @return Future that will be completed when message is sent. */ - CompletableFuture syncTime(HybridTimestamp safeTime, long term) { + CompletableFuture syncTime(HybridTimestamp safeTime, long term, long timeoutMillis) { SyncTimeCommand syncTimeCommand = context.commandsFactory().syncTimeCommand() .initiatorTime(safeTime) .initiatorTerm(term) .build(); - return context.raftService().run(syncTimeCommand, TIMEOUT_MILLIS); + return context.raftService().run(syncTimeCommand, timeoutMillis); } @Override - public CompletableFuture currentRevisions() { + public CompletableFuture currentRevisions(long timeoutMillis) { GetCurrentRevisionsCommand cmd = context.commandsFactory().getCurrentRevisionsCommand().build(); - return context.raftService().run(cmd, RaftCommandRunner.NO_TIMEOUT); + return context.raftService().run(cmd, timeoutMillis); } @Override - public CompletableFuture checksum(long revision) { + public CompletableFuture checksum(long revision, long timeoutMillis) { GetChecksumCommand cmd = context.commandsFactory().getChecksumCommand() .revision(revision) .build(); - return context.raftService().run(cmd); + return context.raftService().run(cmd, timeoutMillis); } /** * Removes obsolete entries from both volatile and persistent idempotent command cache. * * @param evictionTimestamp Cached entries older than given timestamp will be evicted. + * @param timeoutMillis Timeout in milliseconds. * @return Pending operation future. */ - CompletableFuture evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp) { + CompletableFuture evictIdempotentCommandsCache(HybridTimestamp evictionTimestamp, long timeoutMillis) { EvictIdempotentCommandsCacheCommand evictIdempotentCommandsCacheCommand = evictIdempotentCommandsCacheCommand( context.commandsFactory(), evictionTimestamp, clock.now() ); - return context.raftService().run(evictIdempotentCommandsCacheCommand, TIMEOUT_MILLIS); + return context.raftService().run(evictIdempotentCommandsCacheCommand, timeoutMillis); } @Override @@ -390,15 +389,16 @@ private EvictIdempotentCommandsCacheCommand evictIdempotentCommandsCacheCommand( * Sends command {@link CompactionCommand} to the leader. * * @param compactionRevision New metastorage compaction revision. + * @param timeoutMillis Timeout in milliseconds. * @return Operation future. */ - CompletableFuture sendCompactionCommand(long compactionRevision) { + CompletableFuture sendCompactionCommand(long compactionRevision, long timeoutMillis) { CompactionCommand command = context.commandsFactory().compactionCommand() .compactionRevision(compactionRevision) .initiatorTime(clock.now()) .build(); - return context.raftService().run(command, TIMEOUT_MILLIS); + return context.raftService().run(command, timeoutMillis); } // TODO: https://issues.apache.org/jira/browse/IGNITE-26085 Remove, tmp hack diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/CursorPublisherTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/CursorPublisherTest.java index 0c62d5618a7..1f820715f30 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/CursorPublisherTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/CursorPublisherTest.java @@ -28,6 +28,7 @@ import static org.hamcrest.Matchers.hasSize; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; @@ -50,7 +51,7 @@ import org.apache.ignite.internal.metastorage.command.GetRangeCommand; import org.apache.ignite.internal.metastorage.command.MetaStorageCommandsFactory; import org.apache.ignite.internal.metastorage.command.response.BatchResponse; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.internal.testframework.flow.TestFlowUtils; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -66,7 +67,7 @@ @ExtendWith(MockitoExtension.class) public class CursorPublisherTest extends BaseIgniteAbstractTest { @Mock - private RaftGroupService raftService; + private TimeAwareRaftGroupService raftService; private MetaStorageServiceContext context; @@ -81,14 +82,14 @@ void setUp() { new IgniteSpinBusyLock() ); - publisher = new CursorPublisher(context, uuid -> mock(GetRangeCommand.class)); + publisher = new CursorPublisher(context, uuid -> mock(GetRangeCommand.class), TimeAwareRaftGroupService.NO_TIMEOUT); } @Test void testPagination() { Entry mockEntry = mock(Entry.class); - when(raftService.run(any(GetRangeCommand.class))) + when(raftService.run(any(GetRangeCommand.class), anyLong())) .thenReturn(completedFuture(new BatchResponse(nCopies(6, mockEntry), true))) .thenReturn(completedFuture(new BatchResponse(nCopies(5, mockEntry), false))); @@ -110,7 +111,7 @@ void testPagination() { assertThat(awaitFuture, willCompleteSuccessfully()); - verify(raftService, times(2)).run(any(GetRangeCommand.class)); + verify(raftService, times(2)).run(any(GetRangeCommand.class), anyLong()); verify(subscriber, times(11)).onNext(any()); verify(subscriber).onComplete(); } @@ -119,7 +120,7 @@ void testPagination() { void testRequestDuringOnNext() { Entry mockEntry = mock(Entry.class); - when(raftService.run(any(GetRangeCommand.class))) + when(raftService.run(any(GetRangeCommand.class), anyLong())) .thenReturn(completedFuture(new BatchResponse(nCopies(6, mockEntry), true))) .thenReturn(completedFuture(new BatchResponse(nCopies(5, mockEntry), false))); @@ -127,12 +128,12 @@ void testRequestDuringOnNext() { assertThat(future, will(hasSize(11))); - verify(raftService, times(2)).run(any(GetRangeCommand.class)); + verify(raftService, times(2)).run(any(GetRangeCommand.class), anyLong()); } @Test void testErrorOnPagination() { - when(raftService.run(any(GetRangeCommand.class))) + when(raftService.run(any(GetRangeCommand.class), anyLong())) .thenReturn(completedFuture(new BatchResponse(nCopies(5, mock(Entry.class)), true))) .thenReturn(failedFuture(new IllegalStateException())); @@ -160,7 +161,7 @@ void testErrorOnPagination() { assertThat(awaitFuture, willThrow(IllegalStateException.class, 10, TimeUnit.SECONDS)); - verify(raftService, times(2)).run(any(GetRangeCommand.class)); + verify(raftService, times(2)).run(any(GetRangeCommand.class), anyLong()); verify(subscriber, times(5)).onNext(any()); } @@ -195,7 +196,7 @@ void testComponentStopOnNext() throws Exception { Entry mockEntry = mock(Entry.class); - when(raftService.run(any(GetRangeCommand.class))) + when(raftService.run(any(GetRangeCommand.class), anyLong())) .thenReturn(completedFuture(new BatchResponse(List.of(mockEntry), true))) .thenAnswer(invocation -> supplyAsync(() -> { try { diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java index d97779d3f16..7bad78e3eb9 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageDeployWatchesCorrectnessTest.java @@ -48,8 +48,8 @@ import org.apache.ignite.internal.network.ClusterNodeImpl; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.RaftManager; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.extension.ExtendWith; @@ -76,7 +76,7 @@ private static Stream metaStorageProvider() throws Exception ClusterManagementGroupManager cmgManager = mock(ClusterManagementGroupManager.class); RaftManager raftManager = mock(RaftManager.class); - TopologyAwareRaftGroupService raftGroupService = mock(TopologyAwareRaftGroupService.class); + PhysicalTopologyAwareRaftGroupService raftGroupService = mock(PhysicalTopologyAwareRaftGroupService.class); when(cmgManager.metaStorageInfo()).thenReturn(completedFuture( new CmgMessagesFactory().metaStorageInfo().metaStorageNodes(Set.of(mcNodeName)).build() @@ -85,7 +85,7 @@ private static Stream metaStorageProvider() throws Exception var localNode = new ClusterNodeImpl(UUID.randomUUID(), mcNodeName, new NetworkAddress("foo", 123)); - when(raftManager.startSystemRaftGroupNodeAndWaitNodeReady(any(), any(), any(), any(), any(), any())) + when(raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(any(), any(), any(), any(), any(), any())) .thenReturn(raftGroupService); when(raftGroupService.run(any(GetCurrentRevisionsCommand.class), anyLong())) .thenAnswer(invocation -> completedFuture(new RevisionsInfo(0, -1))); @@ -100,7 +100,7 @@ private static Stream metaStorageProvider() throws Exception raftManager, new SimpleInMemoryKeyValueStorage(mcNodeName, readOperationForCompactionTracker), clock, - mock(TopologyAwareRaftGroupServiceFactory.class), + mock(TimeAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), systemConfiguration, RaftGroupOptionsConfigurer.EMPTY, diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java index 09b3f61d295..a23e7a65c77 100644 --- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java +++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerRecoveryTest.java @@ -52,9 +52,8 @@ import org.apache.ignite.internal.network.ClusterNodeImpl; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.RaftManager; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService; import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest; import org.apache.ignite.network.NetworkAddress; import org.junit.jupiter.api.Test; @@ -95,7 +94,7 @@ private void createMetaStorage(long remoteRevision) throws Exception { raftManager, kvs, clock, - mock(TopologyAwareRaftGroupServiceFactory.class), + mock(TimeAwareRaftGroupServiceFactory.class), new NoOpMetricManager(), systemConfiguration, RaftGroupOptionsConfigurer.EMPTY, @@ -106,12 +105,12 @@ private void createMetaStorage(long remoteRevision) throws Exception { private static RaftManager raftManager(long remoteRevision) throws Exception { RaftManager raft = mock(RaftManager.class); - RaftGroupService service = mock(TopologyAwareRaftGroupService.class); + PhysicalTopologyAwareRaftGroupService service = mock(PhysicalTopologyAwareRaftGroupService.class); when(service.run(any(GetCurrentRevisionsCommand.class), anyLong())) .thenAnswer(invocation -> completedFuture(new RevisionsInfo(remoteRevision, -1))); - when(raft.startSystemRaftGroupNodeAndWaitNodeReady(any(), any(), any(), any(), any(), any())) + when(raft.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware(any(), any(), any(), any(), any(), any())) .thenAnswer(invocation -> service); return raft; diff --git a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java index 1e4a47cb84f..6a7f6316d3c 100644 --- a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java +++ b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java @@ -20,9 +20,9 @@ import static java.util.Collections.singleton; import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; @@ -56,9 +56,9 @@ import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.RaftManager; import org.apache.ignite.internal.raft.ReadCommand; +import org.apache.ignite.internal.raft.TimeAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.WriteCommand; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; +import org.apache.ignite.internal.raft.client.PhysicalTopologyAwareRaftGroupService; import org.apache.ignite.internal.raft.service.BeforeApplyHandler; import org.apache.ignite.internal.raft.service.CommandClosure; import org.apache.ignite.internal.raft.service.RaftGroupListener; @@ -167,7 +167,7 @@ public static StandaloneMetaStorageManager create( logicalTopologyService, mockRaftManager(), keyValueStorage, - mock(TopologyAwareRaftGroupServiceFactory.class), + mock(TimeAwareRaftGroupServiceFactory.class), mockSystemConfiguration(), clock, RaftGroupOptionsConfigurer.EMPTY, @@ -180,7 +180,7 @@ private StandaloneMetaStorageManager( LogicalTopologyService logicalTopologyService, RaftManager raftMgr, KeyValueStorage storage, - TopologyAwareRaftGroupServiceFactory raftServiceFactory, + TimeAwareRaftGroupServiceFactory raftServiceFactory, SystemDistributedConfiguration systemConfiguration, HybridClock clock, RaftGroupOptionsConfigurer raftGroupOptionsConfigurer, @@ -270,10 +270,10 @@ public CompletableFuture invoke(Condition cond, List success private static RaftManager mockRaftManager() { ArgumentCaptor listenerCaptor = ArgumentCaptor.forClass(RaftGroupListener.class); RaftManager raftManager = mock(RaftManager.class, LENIENT_SETTINGS); - TopologyAwareRaftGroupService raftGroupService = mock(TopologyAwareRaftGroupService.class, LENIENT_SETTINGS); + PhysicalTopologyAwareRaftGroupService raftGroupService = mock(PhysicalTopologyAwareRaftGroupService.class, LENIENT_SETTINGS); try { - when(raftManager.startSystemRaftGroupNodeAndWaitNodeReady( + when(raftManager.startSystemRaftGroupNodeAndWaitNodeReadyTimeAware( any(), any(), listenerCaptor.capture(), @@ -301,16 +301,15 @@ private static RaftManager mockRaftManager() { return runCommand(command, listener); } }; - when(raftGroupService.run(any())).thenAnswer(answer); when(raftGroupService.run(any(), anyLong())).thenAnswer(answer); - when(raftGroupService.subscribeLeader(any())).thenAnswer(invocation -> { + doAnswer(invocation -> { LeaderElectionListener callback = invocation.getArgument(0); callback.onLeaderElected(TEST_NODE, 0); - return nullCompletedFuture(); - }); + return null; + }).when(raftGroupService).subscribeLeader(any()); return raftManager; } diff --git a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java index c1cc100885f..1f83da96741 100644 --- a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java +++ b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -473,6 +474,12 @@ public Node( RaftGroupOptionsConfigurer msRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager, metastorageWorkDir.metaPath()); + var msRaftServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( + clusterService, + raftGroupEventsClientListener, + failureManager + ); + metaStorageManager = new MetaStorageManagerImpl( clusterService.staticLocalNode(), cmgManager, @@ -480,7 +487,7 @@ public Node( raftManager, keyValueStorage, hybridClock, - topologyAwareRaftGroupServiceFactory, + msRaftServiceFactory, new NoOpMetricManager(), systemConfiguration, msRaftConfigurer, diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java index 6464aa89a78..92f3a24b8aa 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/MultiActorPlacementDriverTest.java @@ -43,6 +43,7 @@ import java.util.stream.IntStream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.configuration.ComponentWorkingDir; import org.apache.ignite.internal.configuration.RaftGroupOptionsConfigHelper; import org.apache.ignite.internal.configuration.SystemDistributedConfiguration; @@ -82,7 +83,7 @@ import org.apache.ignite.internal.raft.TestLozaFactory; import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; -import org.apache.ignite.internal.raft.service.RaftGroupService; +import org.apache.ignite.internal.raft.service.TimeAwareRaftGroupService; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; import org.apache.ignite.internal.replicator.PartitionGroupId; @@ -300,6 +301,12 @@ private List startPlacementDriver( RaftGroupOptionsConfigurer msRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager, metastorageWorkDir.metaPath()); + var msRaftServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( + clusterService, + eventsClientListener, + mock(FailureProcessor.class) + ); + var metaStorageManager = new MetaStorageManagerImpl( clusterService.staticLocalNode(), cmgManager, @@ -307,7 +314,7 @@ private List startPlacementDriver( raftManager, storage, nodeClock, - topologyAwareRaftGroupServiceFactory, + msRaftServiceFactory, new NoOpMetricManager(), systemDistributedConfiguration, msRaftConfigurer, @@ -394,14 +401,14 @@ public void prolongAfterActiveActorChanged() throws Exception { Lease lease = checkLeaseCreated(grpPart0, true); - CompletableFuture msRaftClientFuture = metaStorageManager.metaStorageService() + CompletableFuture msRaftClientFuture = metaStorageManager.metaStorageService() .thenApply(MetaStorageServiceImpl::raftGroupService); assertThat(msRaftClientFuture, willCompleteSuccessfully()); - RaftGroupService msRaftClient = msRaftClientFuture.join(); + TimeAwareRaftGroupService msRaftClient = msRaftClientFuture.join(); - assertThat(msRaftClient.refreshLeader(), willCompleteSuccessfully()); + assertThat(msRaftClient.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); Peer previousLeader = msRaftClient.leader(); @@ -409,10 +416,12 @@ public void prolongAfterActiveActorChanged() throws Exception { log.info("The placement driver group active actor is transferring [from={}, to={}]", previousLeader, newLeader); - assertThat(msRaftClient.transferLeadership(newLeader), willCompleteSuccessfully()); + assertThat(msRaftClient.transferLeadership(newLeader, TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); waitForProlong(grpPart0, lease); + assertThat(msRaftClient.refreshLeader(TimeAwareRaftGroupService.NO_TIMEOUT), willCompleteSuccessfully()); + assertEquals(newLeader, msRaftClient.leader()); } diff --git a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java index 00c75ff45f7..a9e067d7c14 100644 --- a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java +++ b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/PlacementDriverManagerTest.java @@ -58,6 +58,7 @@ import java.util.stream.Stream; import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.network.messages.CmgMessagesFactory; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener; import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService; @@ -243,6 +244,12 @@ private void startPlacementDriverManager() { RaftGroupOptionsConfigurer msRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager, metastorageWorkDir.metaPath()); + var msRaftServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( + clusterService, + eventsClientListener, + mock(FailureProcessor.class) + ); + metaStorageManager = new MetaStorageManagerImpl( clusterService.staticLocalNode(), cmgManager, @@ -250,7 +257,7 @@ private void startPlacementDriverManager() { raftManager, storage, nodeClock, - topologyAwareRaftGroupServiceFactory, + msRaftServiceFactory, new NoOpMetricManager(), systemDistributedConfiguration, msRaftConfigurer, diff --git a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftCommandRunner.java b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftCommandRunner.java index 4284a2c53a3..ae894d6aa92 100644 --- a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftCommandRunner.java +++ b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/service/RaftCommandRunner.java @@ -26,9 +26,6 @@ * @see RaftGroupService */ public interface RaftCommandRunner { - /** Constant meaning that an operation will never timeout. */ - long NO_TIMEOUT = -1; - /** * Runs a command on a replication group leader. * diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java index 6efa0bb7929..e472d672095 100644 --- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java +++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupService.java @@ -327,6 +327,14 @@ public void unsubscribeLeader(LeaderElectionListener callback) { @Override public CompletableFuture run(Command cmd, long timeoutMillis) { + if (commandExecutor.leader() == null && timeoutMillis != 0) { + // Discover the leader first, similar to RaftGroupServiceImpl.run() behavior. + // Without this, the command is sent to a random peer causing cascading EPERM errors + // and potential hangs due to race conditions with leader election notifications. + return refreshAndGetLeaderWithTerm(timeoutMillis) + .thenCompose(ignored -> commandExecutor.run(cmd, timeoutMillis)); + } + return commandExecutor.run(cmd, timeoutMillis); } diff --git a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java index d66a4fb44fa..e84ebe43110 100644 --- a/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java +++ b/modules/raft/src/test/java/org/apache/ignite/internal/raft/client/PhysicalTopologyAwareRaftGroupServiceRunTest.java @@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; @@ -168,6 +169,14 @@ void before() { // Return empty list for allMembers() - service uses this during initialization. lenient().when(topologyService.allMembers()).thenReturn(List.of()); + // Default mock for GetLeaderRequest used by refreshAndGetLeaderWithTerm() when leader is unknown. + // Returns EPERM (no leader) by default. Tests that simulate leader election should update this + // via mockGetLeaderResponse() to return proper leader info. + lenient().when(messagingService.invoke(any(InternalClusterNode.class), any(GetLeaderRequest.class), anyLong())) + .thenReturn(completedFuture(FACTORY.errorResponse() + .errorCode(RaftError.EPERM.getNumber()) + .build())); + // Mock RaftConfiguration. ConfigurationValue retryTimeoutValue = mock(ConfigurationValue.class); lenient().when(retryTimeoutValue.value()).thenReturn(TIMEOUT); @@ -231,6 +240,19 @@ private void simulateLeaderElection(Peer leaderPeer, long term) { eventsClientListener.onLeaderElected(TEST_GRP, leaderNode, term); } + /** + * Updates the GetLeaderRequest mock to return the specified leader. + * This is needed because PhysicalTopologyAwareRaftGroupService.run() calls + * refreshAndGetLeaderWithTerm() when leader is unknown, which sends GetLeaderRequest. + */ + private void mockGetLeaderResponse(Peer leaderPeer, long term) { + Mockito.doReturn(completedFuture(FACTORY.getLeaderResponse() + .leaderId(new PeerId(leaderPeer.consistentId(), 0, leaderPeer.idx()).toString()) + .currentTerm(term) + .build())) + .when(messagingService).invoke(any(InternalClusterNode.class), any(GetLeaderRequest.class), anyLong()); + } + /** * Simulates leader election and waits for the notification to be processed. */ @@ -265,12 +287,30 @@ private void mockLeaderRequest() { .build())); } - private boolean isTestWriteCommand(WriteActionRequest arg) { - if (arg == null) { + /** + * Mocks GetLeaderRequest: returns leader when {@code leaderAvailable} is set, otherwise returns the given error. + */ + private void mockGetLeaderWithFlag(AtomicBoolean leaderAvailable, RaftError errorWhenUnavailable) { + when(messagingService.invoke(any(InternalClusterNode.class), any(GetLeaderRequest.class), anyLong())) + .thenAnswer(invocation -> { + if (leaderAvailable.get()) { + return completedFuture(FACTORY.getLeaderResponse() + .leaderId(PeerId.fromPeer(NODES.get(0)).toString()) + .currentTerm(CURRENT_TERM) + .build()); + } + return completedFuture(FACTORY.errorResponse() + .errorCode(errorWhenUnavailable.getNumber()) + .build()); + }); + } + + private boolean isTestWriteCommand(Object arg) { + if (!(arg instanceof WriteActionRequest)) { return false; } Object command = new OptimizedMarshaller(cluster.serializationRegistry(), - OptimizedMarshaller.NO_POOL).unmarshall(arg.command()); + OptimizedMarshaller.NO_POOL).unmarshall(((WriteActionRequest) arg).command()); return command instanceof TestWriteCommand; } @@ -334,43 +374,35 @@ void testZeroTimeoutTriesAllPeersBeforeFailing() { } /** - * Tests that with Long.MAX_VALUE timeout, all peers are tried first, then waits for leader, and succeeds when leader appears. + * Tests that with Long.MAX_VALUE timeout, run() discovers the leader via GetLeaderRequest + * and then succeeds with the actual command. + * + *

With the new behavior, run() calls refreshAndGetLeaderWithTerm() first when leader is unknown. + * The GetLeaderRequest retries until leader is discovered, then the WriteActionRequest is sent. */ @Test - void testInfiniteTimeoutWaitsForLeaderAndSucceeds() throws Exception { - // First 3 WriteActionRequest calls return EPERM (no leader), then success after leader appears. - AtomicInteger callCount = new AtomicInteger(0); - CountDownLatch allPeersTried = new CountDownLatch(3); + void testInfiniteTimeoutWaitsForLeaderAndSucceeds() { + // WriteActionRequest always succeeds (sent after leader is discovered). + mockUserInputSuccess(); - when(messagingService.invoke( - any(InternalClusterNode.class), - argThat(this::isTestWriteCommand), - anyLong() - )).thenAnswer(invocation -> { - if (callCount.incrementAndGet() <= 3) { - allPeersTried.countDown(); - return completedFuture(FACTORY.errorResponse() - .errorCode(RaftError.EPERM.getNumber()) - .build()); - } - return completedFuture(FACTORY.actionResponse().result(new TestResponse()).build()); - }); + // EBUSY keeps retry loop on same peer; EPERM marks unavailable, exhausting all peers. + AtomicBoolean leaderAvailable = new AtomicBoolean(false); + mockGetLeaderWithFlag(leaderAvailable, RaftError.EBUSY); PhysicalTopologyAwareRaftGroupService svc = startService(); - // Start the command - it should try all peers first, then wait for leader. + // Start the command - refreshAndGetLeaderWithTerm() will keep retrying GetLeaderRequest. + // The result should not complete yet. CompletableFuture result = svc.run(testWriteCommand(), Long.MAX_VALUE); - // Wait for all peer attempts to complete. - assertTrue(allPeersTried.await(5, TimeUnit.SECONDS), "All peers should be tried"); - - verifyExact3PeersCalled(); - - // Initially not complete (waiting for leader). + // Initially not complete (GetLeaderRequest is retrying). assertThat(result.isDone(), is(false)); - // Simulate leader election. - executor.schedule(() -> simulateLeaderElection(NODES.get(0), CURRENT_TERM), 100, TimeUnit.MILLISECONDS); + // Signal leader availability and simulate leader election. + executor.schedule(() -> { + leaderAvailable.set(true); + simulateLeaderElection(NODES.get(0), CURRENT_TERM); + }, 100, TimeUnit.MILLISECONDS); // Should eventually complete successfully. assertThat(result, willCompleteSuccessfully()); @@ -517,34 +549,34 @@ void testZeroTimeoutShutdownDuringPeerTrying() throws Exception { } /** - * Tests that with infinite timeout, if the client is shutting down during leader waiting, + * Tests that with infinite timeout, if the client is shutting down during leader discovery, * the future completes with NodeStoppingException. + * + *

With the new behavior, run() calls refreshAndGetLeaderWithTerm() first when leader is unknown. + * The GetLeaderRequest retries (default mock returns EPERM). Shutdown cancels the leader discovery future. */ @Test void testInfiniteTimeoutShutdownDuringLeaderWaiting() throws Exception { - CountDownLatch allPeersTried = new CountDownLatch(3); - - // All peers return EPERM (no leader) to trigger leader waiting phase. - when(messagingService.invoke( - any(InternalClusterNode.class), - argThat(this::isTestWriteCommand), - anyLong() - )).thenAnswer(invocation -> { - allPeersTried.countDown(); - return completedFuture(FACTORY.errorResponse() - .errorCode(RaftError.EPERM.getNumber()) - .build()); - }); + // Override default GetLeaderRequest mock with one that signals when it's been called. + var getLeaderCalled = new CountDownLatch(1); + lenient().when(messagingService.invoke(any(InternalClusterNode.class), any(GetLeaderRequest.class), anyLong())) + .thenAnswer(invocation -> { + getLeaderCalled.countDown(); + return completedFuture(FACTORY.errorResponse() + .errorCode(RaftError.EPERM.getNumber()) + .build()); + }); PhysicalTopologyAwareRaftGroupService svc = startService(); // Start the command with infinite timeout. + // refreshAndGetLeaderWithTerm() will keep retrying GetLeaderRequest. CompletableFuture result = svc.run(testWriteCommand(), Long.MAX_VALUE); - // Wait for all peers to be tried (3 peers). - assertTrue(allPeersTried.await(5, TimeUnit.SECONDS), "All peers should be tried"); + // Wait for the GetLeaderRequest retry loop to actually start. + assertTrue(getLeaderCalled.await(5, TimeUnit.SECONDS), "GetLeaderRequest retry should start"); - // The result should not be complete yet (waiting for leader). + // The result should not be complete yet (retrying GetLeaderRequest). assertThat(result.isDone(), is(false)); // Shutdown the service. @@ -608,11 +640,17 @@ void testTransientErrorRetriesOnSamePeer() { * Tests that with bounded timeout, a non-leader-related retriable error (like EBUSY) causes * retries until timeout without waiting for leader. This differs from EPERM with no leader, * which tries all peers once then waits for leader. + * + *

Leader must be known before calling run() so that refreshAndGetLeaderWithTerm() is skipped + * and the WriteActionRequest retry loop is reached directly. */ @Test void testBoundedTimeoutRetriesNonLeaderErrorUntilTimeout() { AtomicInteger callCount = new AtomicInteger(0); + // GetLeaderRequest returns leader so run() proceeds to WriteActionRequest phase. + mockGetLeaderResponse(NODES.get(0), CURRENT_TERM); + // All peers return EBUSY (a retriable error that is NOT related to missing leader). when(messagingService.invoke( any(InternalClusterNode.class), @@ -627,6 +665,9 @@ void testBoundedTimeoutRetriesNonLeaderErrorUntilTimeout() { PhysicalTopologyAwareRaftGroupService svc = startService(); + // Simulate leader election so leader is known (skips refreshAndGetLeaderWithTerm in run()). + simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM); + // With 300ms timeout and EBUSY errors, should retry until timeout. // Unlike EPERM (no leader), EBUSY should cause continuous retries, not wait for leader. CompletableFuture result = svc.run(testWriteCommand(), 300); @@ -642,12 +683,18 @@ void testBoundedTimeoutRetriesNonLeaderErrorUntilTimeout() { /** * Tests that with infinite timeout, a non-leader-related retriable error (like EBUSY) causes * retries indefinitely until success. This differs from EPERM with no leader. + * + *

Leader must be known before calling run() so that refreshAndGetLeaderWithTerm() is skipped + * and the WriteActionRequest retry loop is reached directly. */ @Test void testInfiniteTimeoutRetriesNonLeaderErrorUntilSuccess() throws Exception { AtomicInteger callCount = new AtomicInteger(0); CountDownLatch multipleRetriesDone = new CountDownLatch(5); + // GetLeaderRequest returns leader so run() proceeds to WriteActionRequest phase. + mockGetLeaderResponse(NODES.get(0), CURRENT_TERM); + // First 5 calls return EBUSY, then success. when(messagingService.invoke( any(InternalClusterNode.class), @@ -666,6 +713,9 @@ void testInfiniteTimeoutRetriesNonLeaderErrorUntilSuccess() throws Exception { PhysicalTopologyAwareRaftGroupService svc = startService(); + // Simulate leader election so leader is known (skips refreshAndGetLeaderWithTerm in run()). + simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM); + // With infinite timeout and EBUSY errors, should retry until success. CompletableFuture result = svc.run(testWriteCommand(), Long.MAX_VALUE); @@ -682,13 +732,19 @@ void testInfiniteTimeoutRetriesNonLeaderErrorUntilSuccess() throws Exception { /** * Tests that with bounded timeout, if the client is shutting down during retry phase, * the future completes with NodeStoppingException immediately without new retry round. + * + *

With the new behavior, run() calls refreshAndGetLeaderWithTerm() first when leader is unknown. + * We set up GetLeaderRequest to return the leader immediately so the WriteActionRequest phase is reached, + * then test shutdown during WriteActionRequest retry. */ @Test void testBoundedTimeoutShutdownDuringRetryPhase() throws Exception { var pendingRetryInvoke = new CompletableFuture(); var retryPhaseStarted = new CountDownLatch(1); - var allPeersTried = new CountDownLatch(3); - var fifthCallAttempted = new CountDownLatch(1); + var secondCallAttempted = new CountDownLatch(1); + + // GetLeaderRequest returns leader immediately so run() proceeds to WriteActionRequest phase. + mockGetLeaderResponse(NODES.get(0), CURRENT_TERM); AtomicInteger callCount = new AtomicInteger(0); when(messagingService.invoke( @@ -697,37 +753,27 @@ void testBoundedTimeoutShutdownDuringRetryPhase() throws Exception { anyLong() )).thenAnswer(invocation -> { int count = callCount.incrementAndGet(); - if (count <= 3) { - // First 3 calls: return EPERM to trigger retry logic. - allPeersTried.countDown(); - return completedFuture(FACTORY.errorResponse() - .errorCode(RaftError.EPERM.getNumber()) - .build()); - } else if (count == 4) { - // Fourth call: we're in retry phase after leader was notified. - // Signal that retry phase started and return a pending future. + if (count == 1) { + // First call: return a pending future to simulate in-flight request. retryPhaseStarted.countDown(); return pendingRetryInvoke.thenApply(v -> FACTORY.errorResponse() .errorCode(RaftError.EBUSY.getNumber()) .build()); } - // Fifth call should not happen after shutdown. - fifthCallAttempted.countDown(); + // Second call should not happen after shutdown. + secondCallAttempted.countDown(); return completedFuture(FACTORY.actionResponse().result(new TestResponse()).build()); }); PhysicalTopologyAwareRaftGroupService svc = startService(); + // Simulate leader election so leader is known. + simulateLeaderElectionAndWait(NODES.get(0), CURRENT_TERM); + // Start the command with bounded timeout (long enough). CompletableFuture result = svc.run(testWriteCommand(), 30_000); - // Wait for all peers to be tried. - assertTrue(allPeersTried.await(5, TimeUnit.SECONDS), "All peers should be tried"); - - // Simulate leader election to trigger retry phase. - simulateLeaderElection(NODES.get(0), CURRENT_TERM); - - // Wait for retry phase to start. + // Wait for retry phase to start (first WriteActionRequest sent). assertTrue(retryPhaseStarted.await(5, TimeUnit.SECONDS), "Retry phase should start"); // Shutdown the service. @@ -740,10 +786,9 @@ void testBoundedTimeoutShutdownDuringRetryPhase() throws Exception { assertThat(result, willThrow(NodeStoppingException.class, 5, TimeUnit.SECONDS)); // Verify no additional retry attempts were made after shutdown. - // The fifth call latch should NOT be counted down (wait briefly and check). - assertThat("No 5th call should be attempted after shutdown", - fifthCallAttempted.await(100, TimeUnit.MILLISECONDS), is(false)); - assertThat(callCount.get(), is(4)); + assertThat("No 2nd call should be attempted after shutdown", + secondCallAttempted.await(100, TimeUnit.MILLISECONDS), is(false)); + assertThat(callCount.get(), is(1)); } /** @@ -839,9 +884,13 @@ private void verifyExact3PeersCalled() { */ @ParameterizedTest @EnumSource(names = {"UNKNOWN", "EINTERNAL", "ENOENT"}) - void testGetLeaderRequestTriesDifferentPeerOnTransientError(RaftError error) throws Exception { + void testGetLeaderRequestTriesDifferentPeerOnTransientError(RaftError error) { + // Track which peers received GetLeaderRequest calls. Set calledPeers = ConcurrentHashMap.newKeySet(); - AtomicInteger callCount = new AtomicInteger(0); + // The first peer to receive a request will always get a transient error. + // Any different peer will get a success response. This ensures the test verifies + // that the executor switches to a different peer after a transient error. + AtomicReference errorPeer = new AtomicReference<>(); when(messagingService.invoke( any(InternalClusterNode.class), @@ -849,17 +898,17 @@ void testGetLeaderRequestTriesDifferentPeerOnTransientError(RaftError error) thr anyLong()) ).thenAnswer(invocation -> { InternalClusterNode target = invocation.getArgument(0); - calledPeers.add(target.name()); - int count = callCount.incrementAndGet(); + String peerName = target.name(); + calledPeers.add(peerName); - if (count == 1) { - // First call returns transient error. + // The first peer to call gets "sticky" transient errors. + // Any other peer succeeds immediately. + if (errorPeer.compareAndSet(null, peerName) || errorPeer.get().equals(peerName)) { return completedFuture(FACTORY.errorResponse() .errorCode(error.getNumber()) .build()); } - // Second call succeeds. return completedFuture(FACTORY.getLeaderResponse() .leaderId(PeerId.fromPeer(NODES.get(0)).toString()) .currentTerm(CURRENT_TERM) @@ -1092,61 +1141,35 @@ void testSingleAttemptModeWithMixedErrors() { } /** - * Tests leader-wait mode: 3 nodes return "no leader", then those same nodes succeed after leader election. + * Tests leader-wait mode: run() discovers the leader via GetLeaderRequest after leader election, + * then sends the WriteActionRequest which succeeds. * - *

Key verification: "no leader" peers are NOT in unavailablePeers, so they CAN be retried - * after leader notification arrives. + *

With the new behavior, run() calls refreshAndGetLeaderWithTerm() first when leader is unknown. + * GetLeaderRequest retries until leader is discovered, then WriteActionRequest is sent and succeeds. */ @Test void testLeaderWaitModeRetriesNoLeaderPeersAfterLeaderElection() throws Exception { - int peerCount = NODES.size(); - - List calledPeers = new CopyOnWriteArrayList<>(); - AtomicInteger noLeaderResponseCount = new AtomicInteger(0); - CountDownLatch allPeersTriedOnce = new CountDownLatch(peerCount); - - when(messagingService.invoke( - any(InternalClusterNode.class), - argThat(this::isTestWriteCommand), - anyLong() - )).thenAnswer(invocation -> { - InternalClusterNode target = invocation.getArgument(0); - calledPeers.add(target.name()); - - // First round: all 3 peers return "no leader". - if (noLeaderResponseCount.get() < peerCount) { - noLeaderResponseCount.incrementAndGet(); - allPeersTriedOnce.countDown(); - return completedFuture(FACTORY.errorResponse() - .errorCode(RaftError.EPERM.getNumber()) - .leaderId(null) - .build()); - } + // WriteActionRequest always succeeds (sent after leader is discovered). + mockUserInputSuccess(); - // After leader election: succeed. - return completedFuture(FACTORY.actionResponse().result(new TestResponse()).build()); - }); + AtomicBoolean leaderAvailable = new AtomicBoolean(false); + mockGetLeaderWithFlag(leaderAvailable, RaftError.EPERM); PhysicalTopologyAwareRaftGroupService svc = startService(); // Start the command with infinite timeout. + // refreshAndGetLeaderWithTerm() will keep retrying GetLeaderRequest (returns EPERM). CompletableFuture result = svc.run(testWriteCommand(), Long.MAX_VALUE); - // Wait for all 3 peers to be tried once. - assertTrue(allPeersTriedOnce.await(5, TimeUnit.SECONDS), "All peers should be tried once"); - - // The result should NOT be complete yet - waiting for leader. + // The result should NOT be complete yet - GetLeaderRequest is retrying. assertThat(result.isDone(), is(false)); - // Simulate leader election. + // Signal leader availability and simulate leader election. + leaderAvailable.set(true); simulateLeaderElection(NODES.get(0), CURRENT_TERM); - // Should eventually succeed (by retrying one of the "no leader" peers). + // Should eventually succeed (GetLeaderRequest succeeds, then WriteActionRequest succeeds). assertThat(result, willCompleteSuccessfully()); - - // Verify that at least one peer was called twice (first with "no leader", then success). - long totalCalls = calledPeers.size(); - assertTrue(totalCalls > peerCount, "Should have more than 3 calls (some peers retried after leader election), got " + totalCalls); } /** diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java index 66503790e4a..3793d4f3ae1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/ItDistributedConfigurationPropertiesTest.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -72,7 +73,6 @@ import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.TestLozaFactory; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; @@ -226,11 +226,10 @@ private static class Node { var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); - var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( clusterService, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener + raftGroupEventsClientListener, + failureManager ); ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(workDir.resolve("metastorage")); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java index b31ca3b24eb..70c534ef163 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager; import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.TestClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -66,7 +67,6 @@ import org.apache.ignite.internal.raft.Loza; import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer; import org.apache.ignite.internal.raft.TestLozaFactory; -import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.raft.configuration.RaftConfiguration; import org.apache.ignite.internal.raft.storage.LogStorageManager; import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils; @@ -200,11 +200,10 @@ private static class Node { var logicalTopologyService = new LogicalTopologyServiceImpl(logicalTopology, cmgManager); - var topologyAwareRaftGroupServiceFactory = new TopologyAwareRaftGroupServiceFactory( + var topologyAwareRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( clusterService, - logicalTopologyService, - Loza.FACTORY, - raftGroupEventsClientListener + raftGroupEventsClientListener, + failureManager ); ComponentWorkingDir metastorageWorkDir = new ComponentWorkingDir(workDir.resolve("metastorage")); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index eb8d0697136..55e1035c88d 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -104,6 +104,7 @@ import org.apache.ignite.internal.cluster.management.NodeAttributesCollector; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesConfiguration; import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesExtensionConfiguration; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyServiceImpl; @@ -515,6 +516,12 @@ private PartialNode startPartialNode( RaftGroupOptionsConfigurer msRaftConfigurer = RaftGroupOptionsConfigHelper.configureProperties(msLogStorageManager, metastorageWorkDir.metaPath()); + var metaStorageRaftServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( + clusterSvc, + raftGroupEventsClientListener, + failureProcessor + ); + var metaStorageMgr = new MetaStorageManagerImpl( clusterSvc.staticLocalNode(), cmgManager, @@ -522,7 +529,7 @@ private PartialNode startPartialNode( raftMgr, metaStorage, hybridClock, - topologyAwareRaftGroupServiceFactory, + metaStorageRaftServiceFactory, metricManager, systemDistributedConfiguration, msRaftConfigurer, diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 47e1cdd5c80..84e446324fb 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -85,6 +85,7 @@ import org.apache.ignite.internal.cluster.management.configuration.NodeAttributesExtensionConfiguration; import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorage; import org.apache.ignite.internal.cluster.management.raft.ClusterStateStorageManager; +import org.apache.ignite.internal.cluster.management.raft.PhysicalTopologyAwareRaftGroupServiceFactory; import org.apache.ignite.internal.cluster.management.raft.RocksDbClusterStateStorage; import org.apache.ignite.internal.cluster.management.raft.ValidationManager; import org.apache.ignite.internal.cluster.management.topology.LogicalTopologyImpl; @@ -789,6 +790,12 @@ public class IgniteImpl implements Ignite { raftGroupEventsClientListener ); + var msRaftGroupServiceFactory = new PhysicalTopologyAwareRaftGroupServiceFactory( + clusterSvc, + raftGroupEventsClientListener, + failureManager + ); + var readOperationForCompactionTracker = new ReadOperationForCompactionTracker(); var storage = new RocksDbKeyValueStorage( @@ -806,7 +813,7 @@ public class IgniteImpl implements Ignite { raftMgr, storage, clock, - topologyAwareRaftGroupServiceFactory, + msRaftGroupServiceFactory, metricManager, systemDisasterRecoveryStorage, new MetastorageRepairImpl(clusterSvc.messagingService(), logicalTopology, cmgMgr),