From 2c699e72ebe3a6c20941744562da0af0d205c7e8 Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 29 Sep 2025 14:50:27 -0700 Subject: [PATCH 1/2] initial commit --- src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 371b00818..08cdfc818 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -311,6 +311,10 @@ public async Task RenewMessageAsync(MessageData message, SessionBase session) TaskMessage taskMessage = message.TaskMessage; OrchestrationInstance instance = taskMessage.OrchestrationInstance; + // Use zero visibility timeout to prevent race conditions with DeleteMessageAsync. + // This allows the message to remain visible and be deleted immediately after renewal if needed. + TimeSpan zeroVisibilityTimeout = TimeSpan.Zero; + this.settings.Logger.RenewingMessage( this.storageAccountName, this.settings.TaskHubName, @@ -321,13 +325,13 @@ public async Task RenewMessageAsync(MessageData message, SessionBase session) Utils.GetTaskEventId(message.TaskMessage.Event), queueMessage.MessageId, queueMessage.PopReceipt, - (int)this.MessageVisibilityTimeout.TotalSeconds); + (int)zeroVisibilityTimeout.TotalSeconds); try { await this.storageQueue.UpdateMessageAsync( message, - this.MessageVisibilityTimeout, + zeroVisibilityTimeout, session?.TraceActivityId); } catch (Exception e) From 6871e5d10322dcfda3570ebc46ecbc87ad62d9af Mon Sep 17 00:00:00 2001 From: "naiyuantian@microsoft.com" Date: Mon, 29 Sep 2025 18:21:37 -0700 Subject: [PATCH 2/2] udpate --- .../Messaging/TaskHubQueue.cs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs index 08cdfc818..221789566 100644 --- a/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs +++ b/src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs @@ -311,10 +311,6 @@ public async Task RenewMessageAsync(MessageData message, SessionBase session) TaskMessage taskMessage = message.TaskMessage; OrchestrationInstance instance = taskMessage.OrchestrationInstance; - // Use zero visibility timeout to prevent race conditions with DeleteMessageAsync. - // This allows the message to remain visible and be deleted immediately after renewal if needed. - TimeSpan zeroVisibilityTimeout = TimeSpan.Zero; - this.settings.Logger.RenewingMessage( this.storageAccountName, this.settings.TaskHubName, @@ -325,13 +321,13 @@ public async Task RenewMessageAsync(MessageData message, SessionBase session) Utils.GetTaskEventId(message.TaskMessage.Event), queueMessage.MessageId, queueMessage.PopReceipt, - (int)zeroVisibilityTimeout.TotalSeconds); + (int)this.MessageVisibilityTimeout.TotalSeconds); try { await this.storageQueue.UpdateMessageAsync( message, - zeroVisibilityTimeout, + this.MessageVisibilityTimeout, session?.TraceActivityId); } catch (Exception e) @@ -349,7 +345,6 @@ await this.storageQueue.UpdateMessageAsync( public virtual async Task DeleteMessageAsync(MessageData message, SessionBase? session = null) { - QueueMessage queueMessage = message.OriginalQueueMessage; TaskMessage taskMessage = message.TaskMessage; bool haveRetried = false; @@ -360,16 +355,16 @@ public virtual async Task DeleteMessageAsync(MessageData message, SessionBase? s this.settings.TaskHubName, taskMessage.Event.EventType.ToString(), Utils.GetTaskEventId(taskMessage.Event), - queueMessage.MessageId, + message.OriginalQueueMessage.MessageId, taskMessage.OrchestrationInstance.InstanceId, taskMessage.OrchestrationInstance.ExecutionId, this.storageQueue.Name, message.SequenceNumber, - queueMessage.PopReceipt); + message.OriginalQueueMessage.PopReceipt); try { - await this.storageQueue.DeleteMessageAsync(queueMessage, session?.TraceActivityId); + await this.storageQueue.DeleteMessageAsync(message.OriginalQueueMessage, session?.TraceActivityId); } catch (Exception e) {