Upgrading
core 25.x.x -> 26.0.0
sqs xx.x.x -> xx.0.0
sns xx.x.x -> xx.0.0
amqp xx.x.x -> xx.0.0
gcp-pubsub 2.x.x -> 3.0.0
-
messageTypeFieldoption removed: The deprecatedmessageTypeFieldoption has been removed from all queue services. UsemessageTypeResolverinstead. -
HandlerSpy.addProcessedMessagesignature changed: TheaddProcessedMessagemethod now requires amessageTypeparameter. This is an internal API change - if you're usingHandlerSpydirectly (outside of the built-in queue services), you'll need to update your calls. The library's queue services handle this automatically.
Replace messageTypeField: 'fieldName' with messageTypeResolver: { messageTypePath: 'fieldName' }:
// Before
super(dependencies, {
messageTypeField: 'type',
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler)
.build(),
})
// After
super(dependencies, {
messageTypeResolver: { messageTypePath: 'type' },
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler)
.build(),
})-
NO_MESSAGE_TYPE_FIELDconstant removed: TheNO_MESSAGE_TYPE_FIELDconstant has been removed from@message-queue-toolkit/core. UsemessageTypeResolverwith literal mode instead. -
New
messageTypeResolverconfiguration: A flexible configuration for message type resolution. Supports three modes:{ messageTypePath: 'type' }- extract type from a field in the message (supports dot notation for nested paths like'metadata.type'){ literal: 'my.message.type' }- use a constant type for all messages{ resolver: ({ messageData, messageAttributes }) => 'resolved.type' }- custom resolver function
-
MessageSchemaContainerAPI changed: The constructor now acceptsSchemaEntryandDefinitionEntryobjects instead of bare schemas. Each entry can include an optionalmessageTypefor explicit type mapping (required when using custom resolvers). -
Explicit
messageTypein handler configuration: When using a custom resolver function, you must provide an explicitmessageTypein handler options since the type cannot be extracted from schemas at registration time. -
Custom resolver validation: Custom resolver functions (
{ resolver: fn }) cannot be used with multiple schemas inMessageSchemaContainer. This is because the resolver works at runtime, but at registration time the container needs to map schemas to types. UsemessageTypePathfor multiple schemas, or register only a single schema with a custom resolver. -
Improved error handling in
MessageSchemaContainer.resolveSchema: The method now properly catches resolver errors and returns them asEither<Error, Schema>instead of throwing. This ensures consistent error handling regardless of resolver configuration.
Replace messageTypeField: NO_MESSAGE_TYPE_FIELD with messageTypeResolver: { literal: 'your.message.type' }:
// Before
import { NO_MESSAGE_TYPE_FIELD } from '@message-queue-toolkit/core'
super(dependencies, {
messageTypeField: NO_MESSAGE_TYPE_FIELD,
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler)
.build(),
})
// After
super(dependencies, {
messageTypeResolver: { literal: 'your.message.type' },
handlers: new MessageHandlerConfigBuilder()
.addConfig(schema, handler, { messageType: 'your.message.type' })
.build(),
})When using messageTypeResolver: { resolver: fn }, provide explicit messageType in handler options:
super(dependencies, {
messageTypeResolver: {
resolver: ({ messageAttributes }) => {
// Map external event types to internal types
const eventType = messageAttributes?.eventType as string
if (eventType === 'OBJECT_FINALIZE') return 'storage.object.created'
throw new Error(`Unknown event type: ${eventType}`)
},
},
handlers: new MessageHandlerConfigBuilder()
.addConfig(ObjectSchema, handler, { messageType: 'storage.object.created' })
.build(),
})The AbstractPubSubDlqConsumer now uses DLQ_MESSAGE_TYPE constant internally. If you import this constant, update your import:
import { DLQ_MESSAGE_TYPE } from '@message-queue-toolkit/gcp-pubsub'
// DLQ_MESSAGE_TYPE = 'dlq.message'Upgrading
core 19.0.0 -> 20.0.0
sqs 19.0.0 -> 20.0.0
sns 20.0.0 -> 21.0.0
amqp 18.0.0 -> 19.0.0
metrics 2.0.0 -> 3.0.0
- The
processingResultfrom the core has been updated to an object format. It is utilized within spies and metrics and now contains the following properties:status: This property can have one of the following values:retryLater,consumed,published, orerror.skippedAsDuplicate: For messages with astatusofconsumed, this property indicates whether the message was skipped due to being a duplicate.errorReason: Whenstatusiserror, this property provides specific reasons and can be one of the following:invalidMessage,handlerError, orretryLaterExceeded.
- Additionally, the
metricslibrary has undergone class renaming to support future tool integrations and simplify the process of adding new Prometheus custom metrics by extendingPrometheusMessageMetric.
- If your project utilizes
processingResultwithin spies or metrics, you must update it to align with the new structure. - Follow these renaming changes in the
metricslibrary:MessageProcessingMultiMetricsis nowMessageMultiMetricManagerMessageLifetimeMetricis nowPrometheusMessageLifetimeMetricMessageProcessingTimeMetricis nowPrometheusMessageTimeMetric
-
In
AbstractQueueService:handleMessageProcessedmethod signature has changed. Parameters are now passed as an object, and 2 additional properties were added:messageProcessingStartTimestamp- timestamp in milliseconds used to calculate message processing timequeueName- name of the queue or topic on which message is consumed or published
resolveProcessedMessageMetadatawas made private
-
ProcessedMessageMetadatatype used inMessageMetricsManagerhas changed:messageProcessingMillisecondsproperty was removed- the following properties were added:
messageTimestampmessageProcessingStartTimestampmessageProcessingEndTimestamp
-
Because of the change above,
@message-queue-toolkit/metrics@1.0.0is not compatible with the new version.
-
If you are using custom implementation of
MessageMetricsManager, you may need to adjust it to the new properties inProcessedMessageMetadata -
If you are extending
AbstractQueueServiceand callinghandleMessageProcessedmanually, you need to adjust it to the new signature, e.g.:- from:
this.handleMessageProcessed(message, 'consumed', messageProcessingStartTimestamp, queueName)
- to:
this.handleMessageProcessed({ message, processingResult: 'consumed', messageProcessingStartTimestamp, queueName })
- from:
-
If you are using features from
@message-queue-toolkit/metrics@1.0.0, upgrade to@message-queue-toolkit/metrics@2.0.0and follow the migration guide below.
MessageProcessingTimePrometheusMetricclass was replaced by 2 classes:MessageProcessingTimeMetric- registers elapsed time from start to the end of processingMessageLifetimeMetric- registers elapsed time from message creation to the end of processing
- If you are using
MessageProcessingTimePrometheusMetric, replace it with one of the metrics mentioned above, depending on what do you want to measure. Note: if you want to keep current measurements, useMessageLifetimeMetric. If you want to use both metrics, useMessageProcessingMultiMetrics(see: README.md)
loggerinjectable dependency interface was replaced byCommonLoggerfrom@lokalise/node-corepackage. It is compatible withpinologger (https://github.com/pinojs/pino)
- If you are using
pinologger, or any other logger compatible withCommonLoggerinterface, you don't need to do anything - Otherwise, there are serveral properties that need to be provided compared to the old interface:
level- string defining configured minimal logging levelisLevelEnabled- utility method for determining if a given log level will write to the destination.child- method for generating child logger instance with predefined propertiessilent- method used for logging whensilentlevel is selected
- The property
prehandlershas been updated topreHandlerfor consumers. - In the SQS and SNS consumer,
consumerOverridesno longer permits the alteration of critical properties such assqs,queueUrl,handler, andhandleMessageBatchto prevent confusion and maintain proper functionality of the consumer. - The
consumerOverridesin the SQS and SNS consumer no longer includes the option to define thevisibilityTimeoutproperty, as it is now automatically determined by the consumer from thecreationConfigor queue configuration in the case oflocatorConfig.
If you currently utilize the prehandlers property in your consumer, it will be necessary to update it to preHandler.
If you are implementing the consumerOverrides property in your consumer, it is essential to eliminate the properties
sqs, queueUrl, handler, handleMessageBatch, and visibilityTimeout.
sqsshould be included in the constructor dependenciesqueueUrlis managed automatically by the library and does not require manual specificationhandleMessageBatchis not supported by the libraryvisibilityTimeoutis automatically managed by the library and does not need explicit declaration- For the
handler, use thehandlerproperty in the constructor as demonstrated below
export class MyConsumer extends AbstractAmqpConsumer<MyType, undefined> {
public static QUEUE_NAME = 'my-queue-name'
constructor(dependencies: AMQPConsumerDependencies) {
super(
dependencies,
{
creationConfig: {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueOptions: { durable: true, autoDelete: false },
},
messageTypeField: 'messageType',
handlers: new MessageHandlerConfigBuilder<SupportedEvents, ExecutionContext>()
.addConfig(
MY_MESSAGE_SCHEMA,
async (message) => {
// Your handling code
return {
result: 'success',
}
},
)
.build(),
},
undefined
)
}
}Multi consumers and publishers can accomplish the same tasks as mono ones, but they add extra layer of complexity by requiring features to be implemented in both. As a result, we have decided to remove the mono ones to enhance maintainability.
If you are using the multi consumer or consumer, you will only need to rename the class you are extending, and it should work as before.
AbstractSqsMultiConsumer->AbstractSqsConsumerAbstractSqsMultiPublisher->AbstractSqsPublisher
If you are using the mono consumer or publisher, they no longer exist, so you will need to adjust your code to use the old named multi consumer or publisher (now called just consumer or publisher). Please check the guide below.
- Rename the class you are extending from
AbstractSqsPublisherMonoSchematoAbstractSqsPublisherSchema. - replace the
messageSchemaproperty withmessageSchemas, it is an array ofzodschemas.
// Old code
export class MyPublisher extends AbstractSqsPublisherMonoSchema<MyType> {
public static QUEUE_NAME = 'my-queue-name'
constructor(dependencies: SQSDependencies) {
super(dependencies, {
creationConfig: {
queue: {
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
},
},
handlerSpy: true,
deletionConfig: {
deleteIfExists: false,
},
logMessages: true,
messageSchema: MY_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
})
}
}
// Updated code
export class MyPublisher extends AbstractSqsPublisher<MyType> {
public static QUEUE_NAME = 'my-queue-name'
constructor(dependencies: SQSDependencies) {
super(dependencies, {
creationConfig: {
queue: {
QueueName: SqsPermissionPublisherMonoSchema.QUEUE_NAME,
},
},
handlerSpy: true,
deletionConfig: {
deleteIfExists: false,
},
logMessages: true,
messageSchemas: [MY_MESSAGE_SCHEMA],
messageTypeField: 'messageType',
})
}
}- Rename the class you are extending from
AbstractSqsConsumerMonoSchematoAbstractSqsConsumer. - Remove the
messageSchemaproperty. - Define a handler (
handlersproperty) for your message, specifying thezodschema (oldmessageSchema) and the method to handle the message (oldprocessMessagemethod)
// Old code
export class MyConsumer extends AbstractAmqpConsumerMonoSchema<MyType> {
public static QUEUE_NAME = 'my-queue-name'
constructor(dependencies: AMQPConsumerDependencies) {
super(dependencies, {
creationConfig: {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
deletionConfig: {
deleteIfExists: true,
},
messageSchema: MY_MESSAGE_SCHEMA,
messageTypeField: 'messageType',
})
}
override async processMessage(
message: MyType,
): Promise<Either<'retryLater', 'success'>> {
// Your handling code
return { result: 'success' }
}
}
// Updated code
export class MyConsumer extends AbstractAmqpConsumer<MyType, undefined> {
public static QUEUE_NAME = 'my-queue-name'
constructor(dependencies: AMQPConsumerDependencies) {
super(
dependencies,
{
creationConfig: {
queueName: AmqpPermissionConsumer.QUEUE_NAME,
queueOptions: {
durable: true,
autoDelete: false,
},
},
deletionConfig: {
deleteIfExists: true,
},
messageTypeField: 'messageType',
handlers: new MessageHandlerConfigBuilder<SupportedEvents, ExecutionContext>()
.addConfig(
MY_MESSAGE_SCHEMA,
async (message) => {
// Your handling code
return {
result: 'success',
}
},
)
.build(),
},
undefined
)
}
}NOTE: on this example code we are omitting the barrier pattern (
preHandlerBarrier) and pre handlers (preHandlers) to simplify the example. If you are using them, please check SqsPermissionConsumer.ts to see how to update your code.