@@ -162,7 +162,6 @@ const adminUrl = 'http://localhost:8080';
162162 describe ( 'Message Routing' , ( ) => {
163163 test ( 'Custom Message Router' , async ( ) => {
164164 // 1. Define a partitioned topic and a custom router
165- const targetPartition = 1 ;
166165 const partitionedTopicName = `test-custom-router-${ Date . now ( ) } ` ;
167166 const partitionedTopic = `persistent://public/default/${ partitionedTopicName } ` ;
168167 const numPartitions = 10 ;
@@ -182,10 +181,36 @@ const adminUrl = 'http://localhost:8080';
182181 // 204 No Content is success for PUT create
183182 expect ( createPartitionedTopicRes . statusCode ) . toBe ( 204 ) ;
184183
184+ const routingKey = 'user-id-12345' ;
185+ const simpleHash = ( str ) => {
186+ let hash = 0 ;
187+ /* eslint-disable no-bitwise */
188+ for ( let i = 0 ; i < str . length ; i += 1 ) {
189+ const char = str . charCodeAt ( i ) ;
190+ hash = ( ( hash << 5 ) - hash ) + char ;
191+ hash &= hash ;
192+ }
193+ /* eslint-disable no-bitwise */
194+ return Math . abs ( hash ) ;
195+ } ;
196+ const expectedPartition = simpleHash ( routingKey ) % numPartitions ;
197+ console . log ( `Routing key '${ routingKey } ' will be sent to partition: ${ expectedPartition } ` ) ;
198+
185199 // 2. Create a producer with the custom message router
186200 const producer = await client . createProducer ( {
187- topic : partitionedTopic , // Note: For producer, use the base topic name
188- messageRouter : ( message , topicMetadata ) => targetPartition ,
201+ topic : partitionedTopic ,
202+ messageRouter : ( message , topicMetadata ) => {
203+ // Get the routingKey from the message properties
204+ const key = message . properties . routingKey ;
205+ if ( key ) {
206+ // Use the metadata to get the number of partitions
207+ const numPartitionsAvailable = topicMetadata . numPartitions ;
208+ // Calculate the target partition
209+ return simpleHash ( key ) % numPartitionsAvailable ;
210+ }
211+ // Fallback to a default partition if no key is provided
212+ return 0 ;
213+ } ,
189214 messageRoutingMode : 'CustomPartition' ,
190215 } ) ;
191216
@@ -203,19 +228,22 @@ const adminUrl = 'http://localhost:8080';
203228 const msg = `message-${ i } ` ;
204229 producer . send ( {
205230 data : Buffer . from ( msg ) ,
231+ properties : {
232+ routingKey,
233+ } ,
206234 } ) ;
207235 }
208236 await producer . flush ( ) ;
209237 console . log ( `Sent ${ numMessages } messages.` ) ;
210238
211239 // 5. Receive messages and assert they all come from the target partition
212240 const receivedMessages = new Set ( ) ;
213- const expectedPartitionName = `${ partitionedTopic } -partition-${ targetPartition } ` ;
241+ const expectedPartitionName = `${ partitionedTopic } -partition-${ expectedPartition } ` ;
214242
215243 for ( let i = 0 ; i < numMessages ; i += 1 ) {
216244 const msg = await consumer . receive ( 10000 ) ;
217245 // eslint-disable-next-line no-underscore-dangle
218- expect ( msg . getProperties ( ) . __partition__ ) . toBe ( String ( targetPartition ) ) ;
246+ expect ( msg . getProperties ( ) . __partition__ ) . toBe ( String ( expectedPartition ) ) ;
219247 expect ( msg . getTopicName ( ) ) . toBe ( expectedPartitionName ) ;
220248 receivedMessages . add ( msg . getData ( ) . toString ( ) ) ;
221249 await consumer . acknowledge ( msg ) ;
@@ -235,7 +263,7 @@ const adminUrl = 'http://localhost:8080';
235263 // 2. Create a producer with the custom message router
236264 const producer = await client . createProducer ( {
237265 topic : partitionedTopic , // Note: For producer, use the base topic name
238- messageRouter : ( message , topicMetadata ) => {
266+ messageRouter : ( ) => {
239267 throw new Error ( 'Custom router error' ) ;
240268 } ,
241269 messageRoutingMode : 'CustomPartition' ,
0 commit comments