Skip to content

Commit a075ce1

Browse files
committed
Implement session and transaction tracing
1 parent 039aa24 commit a075ce1

File tree

6 files changed

+209
-8
lines changed

6 files changed

+209
-8
lines changed

src/MongoDB.Driver/ClientSessionHandle.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,12 @@ public IClientSessionHandle Fork()
158158
public void StartTransaction(TransactionOptions transactionOptions = null)
159159
{
160160
var effectiveTransactionOptions = GetEffectiveTransactionOptions(transactionOptions);
161-
_coreSession.StartTransaction(effectiveTransactionOptions);
161+
162+
// Check if tracing is enabled for this client
163+
var tracingOptions = _client.Settings.TracingOptions;
164+
var isTracingEnabled = tracingOptions == null || !tracingOptions.Disabled;
165+
166+
((ICoreSessionInternal)_coreSession).StartTransaction(effectiveTransactionOptions, isTracingEnabled);
162167
}
163168

164169
/// <inheritdoc />

src/MongoDB.Driver/Core/Bindings/CoreSession.cs

Lines changed: 150 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414
*/
1515

1616
using System;
17+
using System.Diagnostics;
1718
using System.Linq;
1819
using System.Threading;
1920
using System.Threading.Tasks;
2021
using MongoDB.Bson;
22+
using MongoDB.Driver;
2123
using MongoDB.Driver.Core.Clusters;
2224
using MongoDB.Driver.Core.Misc;
2325
using MongoDB.Driver.Core.Operations;
@@ -190,6 +192,20 @@ void ICoreSessionInternal.AbortTransaction(AbortTransactionOptions options, Canc
190192
_currentTransaction.SetState(CoreTransactionState.Aborted);
191193
// The transaction is aborted.The session MUST be unpinned regardless
192194
// of whether the abortTransaction command succeeds or fails
195+
if (_currentTransaction.TransactionActivity != null)
196+
{
197+
var transactionActivity = _currentTransaction.TransactionActivity;
198+
_currentTransaction.TransactionActivity = null;
199+
200+
// Set status to Ok for successfully aborted transaction
201+
transactionActivity.SetStatus(ActivityStatusCode.Ok);
202+
203+
// Dispose the transaction activity. Note: Activity.Current was already restored to the
204+
// parent in StartTransaction() to prevent AsyncLocal flow issues, so the transaction
205+
// activity was never persisted in Activity.Current. Set it explicitly to be defensive.
206+
Activity.Current = _currentTransaction.ParentActivity;
207+
transactionActivity.Dispose();
208+
}
193209
_currentTransaction.UnpinAll();
194210
}
195211
}
@@ -244,6 +260,20 @@ async Task ICoreSessionInternal.AbortTransactionAsync(AbortTransactionOptions op
244260
_currentTransaction.SetState(CoreTransactionState.Aborted);
245261
// The transaction is aborted.The session MUST be unpinned regardless
246262
// of whether the abortTransaction command succeeds or fails
263+
if (_currentTransaction.TransactionActivity != null)
264+
{
265+
var transactionActivity = _currentTransaction.TransactionActivity;
266+
_currentTransaction.TransactionActivity = null;
267+
268+
// Set status to Ok for successfully aborted transaction
269+
transactionActivity.SetStatus(ActivityStatusCode.Ok);
270+
271+
// Dispose the transaction activity. Note: Activity.Current was already restored to the
272+
// parent in StartTransaction() to prevent AsyncLocal flow issues, so the transaction
273+
// activity was never persisted in Activity.Current. Set it explicitly to be defensive.
274+
Activity.Current = _currentTransaction.ParentActivity;
275+
transactionActivity.Dispose();
276+
}
247277
_currentTransaction.UnpinAll();
248278
}
249279
}
@@ -334,6 +364,21 @@ void ICoreSessionInternal.CommitTransaction(CommitTransactionOptions options, Ca
334364
{
335365
_isCommitTransactionInProgress = false;
336366
_currentTransaction.SetState(CoreTransactionState.Committed);
367+
// Stop the transaction span immediately so it's captured for testing
368+
if (_currentTransaction.TransactionActivity != null)
369+
{
370+
var transactionActivity = _currentTransaction.TransactionActivity;
371+
_currentTransaction.TransactionActivity = null;
372+
373+
// Set status to Ok for successfully committed transaction
374+
transactionActivity.SetStatus(ActivityStatusCode.Ok);
375+
376+
// Dispose the transaction activity. Note: Activity.Current was already restored to the
377+
// parent in StartTransaction() to prevent AsyncLocal flow issues, so the transaction
378+
// activity was never persisted in Activity.Current. Set it explicitly to be defensive.
379+
Activity.Current = _currentTransaction.ParentActivity;
380+
transactionActivity.Dispose();
381+
}
337382
}
338383
}
339384

@@ -374,6 +419,21 @@ async Task ICoreSessionInternal.CommitTransactionAsync(CommitTransactionOptions
374419
{
375420
_isCommitTransactionInProgress = false;
376421
_currentTransaction.SetState(CoreTransactionState.Committed);
422+
// Stop the transaction span immediately so it's captured for testing
423+
if (_currentTransaction.TransactionActivity != null)
424+
{
425+
var transactionActivity = _currentTransaction.TransactionActivity;
426+
_currentTransaction.TransactionActivity = null;
427+
428+
// Set status to Ok for successfully committed transaction
429+
transactionActivity.SetStatus(ActivityStatusCode.Ok);
430+
431+
// Dispose the transaction activity. Note: Activity.Current was already restored to the
432+
// parent in StartTransaction() to prevent AsyncLocal flow issues, so the transaction
433+
// activity was never persisted in Activity.Current. Set it explicitly to be defensive.
434+
Activity.Current = _currentTransaction.ParentActivity;
435+
transactionActivity.Dispose();
436+
}
377437
}
378438
}
379439

@@ -414,6 +474,9 @@ public void MarkDirty()
414474

415475
/// <inheritdoc />
416476
public void StartTransaction(TransactionOptions transactionOptions = null)
477+
=> ((ICoreSessionInternal)this).StartTransaction(transactionOptions, isTracingEnabled: false);
478+
479+
void ICoreSessionInternal.StartTransaction(TransactionOptions transactionOptions, bool isTracingEnabled)
417480
{
418481
EnsureStartTransactionCanBeCalled();
419482

@@ -425,7 +488,19 @@ public void StartTransaction(TransactionOptions transactionOptions = null)
425488
}
426489

427490
_currentTransaction?.UnpinAll(); // unpin data if any when a new transaction is started
428-
_currentTransaction = new CoreTransaction(transactionNumber, effectiveTransactionOptions);
491+
_currentTransaction = new CoreTransaction(transactionNumber, effectiveTransactionOptions, isTracingEnabled);
492+
493+
// Start transaction span for OpenTelemetry tracing (if enabled)
494+
if (isTracingEnabled)
495+
{
496+
// Store the parent activity to restore later
497+
_currentTransaction.ParentActivity = Activity.Current;
498+
_currentTransaction.TransactionActivity = MongoTelemetry.StartTransactionActivity();
499+
500+
// Immediately restore Activity.Current to the parent to prevent AsyncLocal flow issues.
501+
// The transaction activity will be explicitly set as parent for operations within the transaction.
502+
Activity.Current = _currentTransaction.ParentActivity;
503+
}
429504
}
430505

431506
/// <inheritdoc />
@@ -559,19 +634,87 @@ private void EnsureTransactionsAreSupported()
559634

560635
private TResult ExecuteEndTransactionOnPrimary<TResult>(OperationContext operationContext, IReadOperation<TResult> operation)
561636
{
562-
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
563-
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
637+
// Determine operation name and create operation-level span if tracing is enabled
638+
string operationName = operation switch
639+
{
640+
CommitTransactionOperation => "commitTransaction",
641+
AbortTransactionOperation => "abortTransaction",
642+
_ => null
643+
};
644+
645+
// Temporarily set Activity.Current to transaction activity so the operation nests under it
646+
var transactionActivity = _currentTransaction?.TransactionActivity;
647+
var previousActivity = Activity.Current;
648+
if (transactionActivity != null)
564649
{
565-
return operation.Execute(operationContext, binding);
650+
Activity.Current = transactionActivity;
651+
}
652+
653+
using var activity = _currentTransaction?.IsTracingEnabled == true && operationName != null
654+
? MongoTelemetry.StartOperationActivity(operationName, "admin", collectionName: null)
655+
: null;
656+
657+
// Don't restore Activity.Current yet - let it stay as the operation activity
658+
// so command activities nest under it. We'll restore after the operation completes.
659+
660+
try
661+
{
662+
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
663+
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
664+
{
665+
return operation.Execute(operationContext, binding);
666+
}
667+
}
668+
finally
669+
{
670+
// Restore Activity.Current after operation completes
671+
if (transactionActivity != null)
672+
{
673+
Activity.Current = previousActivity;
674+
}
566675
}
567676
}
568677

569678
private async Task<TResult> ExecuteEndTransactionOnPrimaryAsync<TResult>(OperationContext operationContext, IReadOperation<TResult> operation)
570679
{
571-
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
572-
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
680+
// Determine operation name and create operation-level span if tracing is enabled
681+
string operationName = operation switch
682+
{
683+
CommitTransactionOperation => "commitTransaction",
684+
AbortTransactionOperation => "abortTransaction",
685+
_ => null
686+
};
687+
688+
// Temporarily set Activity.Current to transaction activity so the operation nests under it
689+
var transactionActivity = _currentTransaction?.TransactionActivity;
690+
var previousActivity = Activity.Current;
691+
if (transactionActivity != null)
692+
{
693+
Activity.Current = transactionActivity;
694+
}
695+
696+
using var activity = _currentTransaction?.IsTracingEnabled == true && operationName != null
697+
? MongoTelemetry.StartOperationActivity(operationName, "admin", collectionName: null)
698+
: null;
699+
700+
// Don't restore Activity.Current yet - let it stay as the operation activity
701+
// so command activities nest under it. We'll restore after the operation completes.
702+
703+
try
704+
{
705+
using (var sessionHandle = new NonDisposingCoreSessionHandle(this))
706+
using (var binding = ChannelPinningHelper.CreateReadWriteBinding(_cluster, sessionHandle))
707+
{
708+
return await operation.ExecuteAsync(operationContext, binding).ConfigureAwait(false);
709+
}
710+
}
711+
finally
573712
{
574-
return await operation.ExecuteAsync(operationContext, binding).ConfigureAwait(false);
713+
// Restore Activity.Current after operation completes
714+
if (transactionActivity != null)
715+
{
716+
Activity.Current = previousActivity;
717+
}
575718
}
576719
}
577720

src/MongoDB.Driver/Core/Bindings/CoreTransaction.cs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* limitations under the License.
1414
*/
1515

16+
using System.Diagnostics;
1617
using MongoDB.Bson;
1718
using MongoDB.Driver.Core.Servers;
1819

@@ -32,6 +33,9 @@ public class CoreTransaction
3233
private readonly long _transactionNumber;
3334
private readonly TransactionOptions _transactionOptions;
3435
private readonly object _lock = new object();
36+
private Activity _transactionActivity;
37+
private Activity _parentActivity;
38+
private readonly bool _isTracingEnabled;
3539

3640
// public constructors
3741
/// <summary>
@@ -40,9 +44,21 @@ public class CoreTransaction
4044
/// <param name="transactionNumber">The transaction number.</param>
4145
/// <param name="transactionOptions">The transaction options.</param>
4246
public CoreTransaction(long transactionNumber, TransactionOptions transactionOptions)
47+
: this(transactionNumber, transactionOptions, isTracingEnabled: false)
48+
{
49+
}
50+
51+
/// <summary>
52+
/// Initializes a new instance of the <see cref="CoreTransaction" /> class.
53+
/// </summary>
54+
/// <param name="transactionNumber">The transaction number.</param>
55+
/// <param name="transactionOptions">The transaction options.</param>
56+
/// <param name="isTracingEnabled">Whether OpenTelemetry tracing is enabled for this transaction.</param>
57+
internal CoreTransaction(long transactionNumber, TransactionOptions transactionOptions, bool isTracingEnabled)
4358
{
4459
_transactionNumber = transactionNumber;
4560
_transactionOptions = transactionOptions;
61+
_isTracingEnabled = isTracingEnabled;
4662
_state = CoreTransactionState.Starting;
4763
_isEmpty = true;
4864
}
@@ -58,6 +74,29 @@ public CoreTransaction(long transactionNumber, TransactionOptions transactionOpt
5874

5975
internal OperationContext OperationContext { get; set; }
6076

77+
/// <summary>
78+
/// Gets or sets the transaction activity (for OpenTelemetry tracing).
79+
/// </summary>
80+
internal Activity TransactionActivity
81+
{
82+
get => _transactionActivity;
83+
set => _transactionActivity = value;
84+
}
85+
86+
/// <summary>
87+
/// Gets or sets the parent activity to restore after the transaction completes.
88+
/// </summary>
89+
internal Activity ParentActivity
90+
{
91+
get => _parentActivity;
92+
set => _parentActivity = value;
93+
}
94+
95+
/// <summary>
96+
/// Gets whether OpenTelemetry tracing is enabled for this transaction.
97+
/// </summary>
98+
internal bool IsTracingEnabled => _isTracingEnabled;
99+
61100
/// <summary>
62101
/// Gets the transaction state.
63102
/// </summary>
@@ -138,6 +177,8 @@ internal void UnpinAll()
138177
_pinnedChannel?.Dispose();
139178
_pinnedChannel = null;
140179
_pinnedServer = null;
180+
_transactionActivity?.Dispose();
181+
_transactionActivity = null;
141182
}
142183
}
143184
}

src/MongoDB.Driver/Core/Bindings/ICoreSessionInternal.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ internal interface ICoreSessionInternal
2525
Task AbortTransactionAsync(AbortTransactionOptions options, CancellationToken cancellationToken = default);
2626
void CommitTransaction(CommitTransactionOptions options, CancellationToken cancellationToken = default);
2727
Task CommitTransactionAsync(CommitTransactionOptions options, CancellationToken cancellationToken = default);
28+
void StartTransaction(TransactionOptions transactionOptions, bool isTracingEnabled);
2829
}

src/MongoDB.Driver/Core/Bindings/NoCoreSession.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ public void StartTransaction(TransactionOptions transactionOptions = null)
173173
throw new NotSupportedException("NoCoreSession does not support StartTransaction.");
174174
}
175175

176+
void ICoreSessionInternal.StartTransaction(TransactionOptions transactionOptions, bool isTracingEnabled)
177+
{
178+
throw new NotSupportedException("NoCoreSession does not support StartTransaction.");
179+
}
180+
176181
/// <inheritdoc />
177182
public void SetSnapshotTimeIfNeeded(BsonTimestamp snapshotTime)
178183
{

src/MongoDB.Driver/Core/Bindings/WrappingCoreSession.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,12 @@ public virtual void StartTransaction(TransactionOptions transactionOptions = nul
286286
_wrapped.StartTransaction(transactionOptions);
287287
}
288288

289+
void ICoreSessionInternal.StartTransaction(TransactionOptions transactionOptions, bool isTracingEnabled)
290+
{
291+
ThrowIfDisposed();
292+
((ICoreSessionInternal)_wrapped).StartTransaction(transactionOptions, isTracingEnabled);
293+
}
294+
289295
/// <inheritdoc />
290296
public void SetSnapshotTimeIfNeeded(BsonTimestamp snapshotTime)
291297
{

0 commit comments

Comments
 (0)