11/*
2- * Copyright 2021 Sonu Kumar
2+ * Copyright 2023 Sonu Kumar
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
2828import com .github .sonus21 .rqueue .core .RqueueMessageTemplate ;
2929import com .github .sonus21 .rqueue .core .RqueueRedisListenerContainerFactory ;
3030import com .github .sonus21 .rqueue .core .ScheduledQueueMessageScheduler ;
31+ import com .github .sonus21 .rqueue .core .eventbus .EventBusErrorHandler ;
32+ import com .github .sonus21 .rqueue .core .eventbus .RqueueEventBus ;
3133import com .github .sonus21 .rqueue .core .impl .RqueueMessageTemplateImpl ;
3234import com .github .sonus21 .rqueue .dao .RqueueStringDao ;
3335import com .github .sonus21 .rqueue .dao .impl .RqueueStringDaoImpl ;
3739import com .github .sonus21 .rqueue .utils .condition .ReactiveEnabled ;
3840import com .github .sonus21 .rqueue .utils .pebble .ResourceLoader ;
3941import com .github .sonus21 .rqueue .utils .pebble .RqueuePebbleExtension ;
42+ import com .google .common .eventbus .AsyncEventBus ;
43+ import com .google .common .eventbus .EventBus ;
4044import com .mitchellbosecke .pebble .PebbleEngine ;
4145import com .mitchellbosecke .pebble .spring .extension .SpringExtension ;
4246import com .mitchellbosecke .pebble .spring .reactive .PebbleReactiveViewResolver ;
4549import org .springframework .beans .factory .annotation .Qualifier ;
4650import org .springframework .beans .factory .annotation .Value ;
4751import org .springframework .beans .factory .config .ConfigurableBeanFactory ;
52+ import org .springframework .context .ApplicationEventPublisher ;
4853import org .springframework .context .annotation .Bean ;
4954import org .springframework .context .annotation .Conditional ;
5055import org .springframework .data .redis .connection .ReactiveRedisConnectionFactory ;
5156import org .springframework .data .redis .connection .RedisConnectionFactory ;
5257import org .springframework .data .redis .core .RedisTemplate ;
58+ import org .springframework .scheduling .concurrent .ThreadPoolTaskExecutor ;
5359
5460/**
5561 * This is a base configuration class for Rqueue, that is used in Spring and Spring boot Rqueue libs
@@ -105,8 +111,8 @@ protected MessageConverterProvider getMessageConverterProvider() {
105111 * Database for different ops.
106112 *
107113 * @param beanFactory configurable bean factory
108- * @param versionKey Rqueue db version key
109- * @param dbVersion database version
114+ * @param versionKey Rqueue db version key
115+ * @param dbVersion database version
110116 * @return {@link RedisConnectionFactory} object.
111117 */
112118 @ Bean
@@ -150,6 +156,11 @@ public RqueueWebConfig rqueueWebConfig() {
150156 return new RqueueWebConfig ();
151157 }
152158
159+ @ Bean
160+ public RqueueEventBusConfig rqueueEventBusConfig () {
161+ return new RqueueEventBusConfig ();
162+ }
163+
153164 @ Bean
154165 public RqueueSchedulerConfig rqueueSchedulerConfig () {
155166 return new RqueueSchedulerConfig ();
@@ -190,8 +201,14 @@ public RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory()
190201 * @return {@link ScheduledQueueMessageScheduler} object
191202 */
192203 @ Bean
193- public ScheduledQueueMessageScheduler scheduledMessageScheduler () {
194- return new ScheduledQueueMessageScheduler ();
204+ public ScheduledQueueMessageScheduler scheduledMessageScheduler (
205+ RqueueSchedulerConfig rqueueSchedulerConfig ,
206+ RqueueConfig rqueueConfig ,
207+ RqueueEventBus eventBus ,
208+ RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory ,
209+ @ Qualifier ("rqueueRedisLongTemplate" )
210+ RedisTemplate <String , Long > redisTemplate ) {
211+ return new ScheduledQueueMessageScheduler (rqueueSchedulerConfig , rqueueConfig , eventBus , rqueueRedisListenerContainerFactory , redisTemplate );
195212 }
196213
197214 /**
@@ -201,8 +218,14 @@ public ScheduledQueueMessageScheduler scheduledMessageScheduler() {
201218 * @return {@link ProcessingQueueMessageScheduler} object
202219 */
203220 @ Bean
204- public ProcessingQueueMessageScheduler processingMessageScheduler () {
205- return new ProcessingQueueMessageScheduler ();
221+ public ProcessingQueueMessageScheduler processingMessageScheduler (
222+ RqueueSchedulerConfig rqueueSchedulerConfig ,
223+ RqueueConfig rqueueConfig ,
224+ RqueueEventBus eventBus ,
225+ RqueueRedisListenerContainerFactory rqueueRedisListenerContainerFactory ,
226+ @ Qualifier ("rqueueRedisLongTemplate" )
227+ RedisTemplate <String , Long > redisTemplate ) {
228+ return new ProcessingQueueMessageScheduler (rqueueSchedulerConfig , rqueueConfig , eventBus , rqueueRedisListenerContainerFactory , redisTemplate );
206229 }
207230
208231 @ Bean
@@ -265,12 +288,27 @@ public RqueueInternalPubSubChannel rqueueInternalPubSubChannel(
265288 RqueueConfig rqueueConfig ,
266289 RqueueBeanProvider rqueueBeanProvider ,
267290 @ Qualifier ("stringRqueueRedisTemplate" )
268- RqueueRedisTemplate <String > stringRqueueRedisTemplate ) {
291+ RqueueRedisTemplate <String > stringRqueueRedisTemplate ) {
269292 return new RqueueInternalPubSubChannel (
270293 rqueueRedisListenerContainerFactory ,
271294 rqueueMessageListenerContainer ,
272295 rqueueConfig ,
273296 stringRqueueRedisTemplate ,
274297 rqueueBeanProvider );
275298 }
299+
300+ @ Bean
301+ public RqueueEventBus rqueueEventBus (ApplicationEventPublisher applicationEventPublisher ,
302+ RqueueEventBusConfig rqueueEventBusConfig ) {
303+ ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor ();
304+ threadPoolTaskExecutor .setCorePoolSize (rqueueEventBusConfig .getCorePoolSize ());
305+ threadPoolTaskExecutor .setMaxPoolSize (rqueueEventBusConfig .getMaxPoolSize ());
306+ threadPoolTaskExecutor .setKeepAliveSeconds (rqueueEventBusConfig .getKeepAliveTime ());
307+ threadPoolTaskExecutor .setQueueCapacity (rqueueEventBusConfig .getQueueCapacity ());
308+ threadPoolTaskExecutor .setThreadNamePrefix ("RqueueEventBusAsyncExecutor-" );
309+ threadPoolTaskExecutor .initialize ();
310+ EventBus eventBus = new AsyncEventBus (threadPoolTaskExecutor );
311+ eventBus .register (new EventBusErrorHandler ());
312+ return new RqueueEventBus (eventBus , applicationEventPublisher );
313+ }
276314}
0 commit comments