Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisInvalidSubscriptionException;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
Expand Down Expand Up @@ -716,8 +717,24 @@ private void addListener(MessageListener listener, Collection<? extends Topic> t

getRequiredSubscriber().addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(
patterns, channels, () -> future.complete(null)));
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));

try {
if (!channels.isEmpty()) {
getRequiredSubscriber().subscribeChannel(channels.toArray(new byte[channels.size()][]));
}
if (!patterns.isEmpty()) {
getRequiredSubscriber().subscribePattern(patterns.toArray(new byte[patterns.size()][]));
}
} catch (RedisInvalidSubscriptionException ex) {
// Race with removeMessageListener: concurrent unsubscribe made the connection-level
// subscription go dead (closeIfUnsubscribed -> alive=false) while we were about to
// subscribe new channels. stopListening() disposes the dead connection; lazyListen()
// opens a fresh connection and re-subscribes all topics in channelMapping/patternMapping,
// which already include the channels/patterns added above.
stopListening();
lazyListen();
return;
}

try {
future.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import static org.mockito.Mockito.*;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand All @@ -35,11 +37,14 @@
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisInvalidSubscriptionException;
import org.springframework.data.redis.connection.Subscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.listener.adapter.RedisListenerExecutionFailedException;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.backoff.FixedBackOff;

/**
Expand Down Expand Up @@ -273,4 +278,85 @@ void shouldApplyConfiguredAutoStartup() {
assertThat(container.isAutoStartup()).isEqualTo(false);
}

@Test // GH-3080
void shouldRecoverWhenSubscriptionDiesOnConcurrentRemove() {

// The race only manifests with async drivers (Lettuce) because BlockingSubscriber
// sets connection=null after doSubscribe, making incremental subscribeChannel a no-op.
// Use a LettuceConnectionFactory mock so the async Subscriber code path is exercised.
LettuceConnectionFactory asyncFactory = mock(LettuceConnectionFactory.class);

RedisMessageListenerContainer asyncContainer = new RedisMessageListenerContainer();
asyncContainer.setConnectionFactory(asyncFactory);
asyncContainer.setBeanName("async-container");
asyncContainer.setTaskExecutor(new SyncTaskExecutor());
asyncContainer.setMaxSubscriptionRegistrationWaitingTime(1000);
asyncContainer.afterPropertiesSet();

when(asyncFactory.getConnection()).thenReturn(connectionMock);

// Initial subscription: listener1 on "a". Fires onChannelSubscribed synchronously.
// Use any(byte[][].class) for the vararg parameter so the stub matches regardless of
// how many channels are passed (1 for initial, 2 for recovery full re-subscribe).
doAnswer(it -> {
SubscriptionListener listener = it.getArgument(0);
when(connectionMock.isSubscribed()).thenReturn(true);
listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);
return null;
}).when(connectionMock).subscribe(any(), any(byte[][].class));

asyncContainer.addMessageListener(adapter, new ChannelTopic("a"));
asyncContainer.start();
assertThat(asyncContainer.isListening()).isTrue();

// connection.getSubscription() now returns subscriptionMock for incremental subscribes.
when(connectionMock.getSubscription()).thenReturn(subscriptionMock);

// Simulate the race: removeMessageListener concurrently unsubscribed the last channel,
// which triggered AbstractSubscription.closeIfUnsubscribed() → alive=false.
// Any incremental subscribe("b") now throws RedisInvalidSubscriptionException.
doThrow(new RedisInvalidSubscriptionException("Subscription has been unsubscribed"))
.when(subscriptionMock).subscribe(any());

// Recovery subscription answer: fires "a" and "b" (full re-subscribe from channelMapping).
// Override the stub so the recovery connection.subscribe uses this answer.
doAnswer(it -> {
SubscriptionListener listener = it.getArgument(0);
when(connectionMock.isSubscribed()).thenReturn(true);
listener.onChannelSubscribed("a".getBytes(StandardCharsets.UTF_8), 1);
listener.onChannelSubscribed("b".getBytes(StandardCharsets.UTF_8), 1);
return null;
}).when(connectionMock).subscribe(any(), any(byte[][].class));

// Reset isSubscribed so Subscriber.initialize() does not bail out for the recovery call.
when(connectionMock.isSubscribed()).thenReturn(false);

MessageListener listener2 = (message, pattern) -> {};

// Should NOT throw: fix catches RedisInvalidSubscriptionException, calls stopListening()
// to dispose the dead connection, then lazyListen() to re-subscribe all channelMapping
// entries (which already contain "b" added above).
assertThatNoException()
.isThrownBy(() -> asyncContainer.addMessageListener(listener2, new ChannelTopic("b")));

// subscriptionMock.subscribe was called exactly once (the failed incremental attempt for "b").
verify(subscriptionMock, times(1)).subscribe(any(byte[][].class));

// connection.subscribe was called twice: initial ("a") + recovery ("a","b").
verify(connectionMock, times(2)).subscribe(any(), any(byte[][].class));

// Container recovered and is listening again.
assertThat(asyncContainer.isListening()).isTrue();

// listener2 is registered in listenerTopics – no memory leak.
@SuppressWarnings("unchecked")
Map<MessageListener, Set<Topic>> listenerTopics =
(Map<MessageListener, Set<Topic>>) ReflectionTestUtils.getField(asyncContainer, "listenerTopics");
assertThat(listenerTopics).containsKey(listener2);

try {
asyncContainer.destroy();
} catch (Exception ignored) {}
}

}