Skip to content

Commit d763e6a

Browse files
committed
Implement command-level tracing
1 parent a075ce1 commit d763e6a

File tree

8 files changed

+167
-42
lines changed

8 files changed

+167
-42
lines changed

src/MongoDB.Driver/Core/Connections/BinaryConnection.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ public BinaryConnection(
6868
IConnectionInitializer connectionInitializer,
6969
IEventSubscriber eventSubscriber,
7070
ILoggerFactory loggerFactory,
71+
TracingOptions tracingOptions,
7172
TimeSpan socketReadTimeout,
7273
TimeSpan socketWriteTimeout)
7374
{
@@ -83,7 +84,7 @@ public BinaryConnection(
8384

8485
_compressorSource = new CompressorSource(settings.Compressors);
8586
_eventLogger = loggerFactory.CreateEventLogger<LogCategories.Connection>(eventSubscriber);
86-
_commandEventHelper = new CommandEventHelper(loggerFactory.CreateEventLogger<LogCategories.Command>(eventSubscriber));
87+
_commandEventHelper = new CommandEventHelper(loggerFactory.CreateEventLogger<LogCategories.Command>(eventSubscriber), tracingOptions);
8788
_socketReadTimeout = socketReadTimeout;
8889
_socketWriteTimeout = socketWriteTimeout;
8990
}

src/MongoDB.Driver/Core/Connections/BinaryConnectionFactory.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ internal sealed class BinaryConnectionFactory : IConnectionFactory
3232
private readonly ILoggerFactory _loggerFactory;
3333
private readonly ConnectionSettings _settings;
3434
private readonly IStreamFactory _streamFactory;
35+
private readonly TracingOptions _tracingOptions;
3536
// TODO: CSOT: temporary here, remove on the next major release, together with socketTimeout
3637
private readonly TimeSpan _socketReadTimeout;
3738
private readonly TimeSpan _socketWriteTimeout;
@@ -43,6 +44,7 @@ public BinaryConnectionFactory(
4344
IEventSubscriber eventSubscriber,
4445
ServerApi serverApi,
4546
ILoggerFactory loggerFactory,
47+
TracingOptions tracingOptions,
4648
TimeSpan? socketReadTimeout,
4749
TimeSpan? socketWriteTimeout)
4850
{
@@ -51,6 +53,7 @@ public BinaryConnectionFactory(
5153
_eventSubscriber = Ensure.IsNotNull(eventSubscriber, nameof(eventSubscriber));
5254
_connectionInitializer = new ConnectionInitializer(settings.ApplicationName, settings.Compressors, serverApi, settings.LibraryInfo);
5355
_loggerFactory = loggerFactory;
56+
_tracingOptions = tracingOptions;
5457
_socketReadTimeout = socketReadTimeout.HasValue && socketReadTimeout > TimeSpan.Zero ? socketReadTimeout.Value : Timeout.InfiniteTimeSpan;
5558
_socketWriteTimeout = socketWriteTimeout.HasValue && socketWriteTimeout > TimeSpan.Zero ? socketWriteTimeout.Value : Timeout.InfiniteTimeSpan;
5659
}
@@ -70,6 +73,7 @@ public IConnection CreateConnection(ServerId serverId, EndPoint endPoint)
7073
_connectionInitializer,
7174
_eventSubscriber,
7275
_loggerFactory,
76+
_tracingOptions,
7377
_socketReadTimeout,
7478
_socketWriteTimeout);
7579
}

src/MongoDB.Driver/Core/Connections/ClientDocumentHelper.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ private static bool TryGetType(string typeName, out Type type)
449449
}
450450
}
451451

452-
private static string GetAssemblyVersion(Assembly assembly)
452+
internal static string GetAssemblyVersion(Assembly assembly)
453453
{
454454
var versionAttribute = assembly.GetCustomAttribute<AssemblyInformationalVersionAttribute>();
455455
var hashIndex = versionAttribute.InformationalVersion.IndexOf('+');

src/MongoDB.Driver/Core/Connections/CommandEventHelper.cs

Lines changed: 151 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
using MongoDB.Bson;
2222
using MongoDB.Bson.IO;
2323
using MongoDB.Bson.Serialization.Serializers;
24+
using MongoDB.Driver.Core.Configuration;
2425
using MongoDB.Driver.Core.Events;
2526
using MongoDB.Driver.Core.Logging;
2627
using MongoDB.Driver.Core.Misc;
@@ -34,24 +35,32 @@ internal class CommandEventHelper
3435
{
3536
private readonly EventLogger<LogCategories.Command> _eventLogger;
3637
private readonly ConcurrentDictionary<int, CommandState> _state;
38+
private readonly TracingOptions _tracingOptions;
3739

3840
private readonly bool _shouldProcessRequestMessages;
3941
private readonly bool _shouldTrackState;
4042
private readonly bool _shouldTrackFailed;
4143
private readonly bool _shouldTrackSucceeded;
44+
private readonly bool _shouldTrace;
4245

43-
public CommandEventHelper(EventLogger<LogCategories.Command> eventLogger)
46+
public CommandEventHelper(EventLogger<LogCategories.Command> eventLogger, TracingOptions tracingOptions = null)
4447
{
4548
_eventLogger = eventLogger;
49+
_tracingOptions = tracingOptions;
50+
4651
_shouldTrackSucceeded = _eventLogger.IsEventTracked<CommandSucceededEvent>();
4752
_shouldTrackFailed = _eventLogger.IsEventTracked<CommandFailedEvent>();
48-
_shouldTrackState = _shouldTrackSucceeded || _shouldTrackFailed;
53+
54+
// Check if tracing is disabled for this client
55+
_shouldTrace = _tracingOptions?.Disabled != true;
56+
57+
_shouldTrackState = _shouldTrackSucceeded || _shouldTrackFailed || _shouldTrace;
4958
_shouldProcessRequestMessages = _eventLogger.IsEventTracked<CommandStartedEvent>() || _shouldTrackState;
5059

5160
if (_shouldTrackState)
5261
{
5362
// we only need to track state if we have to raise
54-
// a succeeded or failed event
63+
// a succeeded or failed event or for tracing
5564
_state = new ConcurrentDictionary<int, CommandState>();
5665
}
5766
}
@@ -104,6 +113,12 @@ public void AfterSending(RequestMessage message, ConnectionId connectionId, Obje
104113
{
105114
state.Stopwatch.Stop();
106115

116+
if (state.CommandActivity != null)
117+
{
118+
state.CommandActivity.SetStatus(ActivityStatusCode.Ok);
119+
state.CommandActivity.Stop();
120+
}
121+
107122
if (_shouldTrackSucceeded)
108123
{
109124
_eventLogger.LogAndPublish(new CommandSucceededEvent(
@@ -128,6 +143,13 @@ public void ErrorSending(RequestMessage message, ConnectionId connectionId, Obje
128143
if (_state.TryRemove(message.RequestId, out state))
129144
{
130145
state.Stopwatch.Stop();
146+
147+
if (state.CommandActivity != null)
148+
{
149+
MongoTelemetry.RecordException(state.CommandActivity, exception);
150+
state.CommandActivity.Stop();
151+
}
152+
131153
_eventLogger.LogAndPublish(new CommandFailedEvent(
132154
state.CommandName,
133155
state.QueryNamespace.DatabaseNamespace,
@@ -171,6 +193,12 @@ public void ErrorReceiving(int responseTo, ConnectionId connectionId, ObjectId?
171193

172194
state.Stopwatch.Stop();
173195

196+
if (state.CommandActivity != null)
197+
{
198+
MongoTelemetry.RecordException(state.CommandActivity, exception);
199+
state.CommandActivity.Stop();
200+
}
201+
174202
_eventLogger.LogAndPublish(new CommandFailedEvent(
175203
state.CommandName,
176204
state.QueryNamespace.DatabaseNamespace,
@@ -268,15 +296,27 @@ private void ProcessCommandRequestMessage(CommandRequestMessage message, Connect
268296

269297
if (_shouldTrackState)
270298
{
271-
_state.TryAdd(requestId, new CommandState
299+
var commandState = new CommandState
272300
{
273301
CommandName = commandName,
274302
OperationId = operationId,
275303
Stopwatch = stopwatch,
276304
QueryNamespace = new CollectionNamespace(databaseNamespace, "$cmd"),
277305
ExpectedResponseType = decodedMessage.MoreToCome ? ExpectedResponseType.None : ExpectedResponseType.Command,
278306
ShouldRedactReply = shouldRedactCommand
279-
});
307+
};
308+
309+
if (_shouldTrace && !ShouldRedactCommand(command))
310+
{
311+
commandState.CommandActivity = MongoTelemetry.StartCommandActivity(
312+
commandName,
313+
command,
314+
databaseNamespace,
315+
connectionId,
316+
_tracingOptions?.QueryTextMaxLength ?? 0);
317+
}
318+
319+
_state.TryAdd(requestId, commandState);
280320
}
281321
}
282322
}
@@ -302,6 +342,8 @@ private void ProcessCommandResponseMessage(CommandState state, CommandResponseMe
302342

303343
if (ok.ToBoolean())
304344
{
345+
CompleteCommandActivityWithSuccess(state.CommandActivity, reply);
346+
305347
_eventLogger.LogAndPublish(new CommandSucceededEvent(
306348
state.CommandName,
307349
reply,
@@ -315,23 +357,7 @@ private void ProcessCommandResponseMessage(CommandState state, CommandResponseMe
315357
}
316358
else
317359
{
318-
if (_shouldTrackFailed)
319-
{
320-
_eventLogger.LogAndPublish(new CommandFailedEvent(
321-
state.CommandName,
322-
state.QueryNamespace.DatabaseNamespace,
323-
new MongoCommandException(
324-
connectionId,
325-
string.Format("{0} command failed", state.CommandName),
326-
null,
327-
reply),
328-
state.OperationId,
329-
message.ResponseTo,
330-
connectionId,
331-
serviceId,
332-
state.Stopwatch.Elapsed),
333-
skipLogging);
334-
}
360+
HandleCommandFailure(state, reply, connectionId, serviceId, message.ResponseTo, skipLogging);
335361
}
336362
}
337363

@@ -380,15 +406,27 @@ private void ProcessQueryMessage(QueryMessage originalMessage, ConnectionId conn
380406

381407
if (_shouldTrackState)
382408
{
383-
_state.TryAdd(requestId, new CommandState
409+
var commandState = new CommandState
384410
{
385411
CommandName = commandName,
386412
OperationId = operationId,
387413
Stopwatch = stopwatch,
388414
QueryNamespace = decodedMessage.CollectionNamespace,
389415
ExpectedResponseType = isCommand ? ExpectedResponseType.Command : ExpectedResponseType.Query,
390416
ShouldRedactReply = shouldRedactCommand
391-
});
417+
};
418+
419+
if (_shouldTrace && !ShouldRedactCommand(command))
420+
{
421+
commandState.CommandActivity = MongoTelemetry.StartCommandActivity(
422+
commandName,
423+
command,
424+
decodedMessage.CollectionNamespace.DatabaseNamespace,
425+
connectionId,
426+
_tracingOptions?.QueryTextMaxLength ?? 0);
427+
}
428+
429+
_state.TryAdd(requestId, commandState);
392430
}
393431
}
394432
finally
@@ -493,25 +531,12 @@ private void ProcessCommandReplyMessage(CommandState state, ReplyMessage<RawBson
493531

494532
if (!ok.ToBoolean())
495533
{
496-
if (_shouldTrackFailed)
497-
{
498-
_eventLogger.LogAndPublish(new CommandFailedEvent(
499-
state.CommandName,
500-
state.QueryNamespace.DatabaseNamespace,
501-
new MongoCommandException(
502-
connectionId,
503-
string.Format("{0} command failed", state.CommandName),
504-
null,
505-
reply),
506-
state.OperationId,
507-
replyMessage.ResponseTo,
508-
connectionId,
509-
state.Stopwatch.Elapsed),
510-
skipLogging);
511-
}
534+
HandleCommandFailure(state, reply, connectionId, serviceId: null, replyMessage.ResponseTo, skipLogging);
512535
}
513536
else
514537
{
538+
CompleteCommandActivityWithSuccess(state.CommandActivity, reply);
539+
515540
_eventLogger.LogAndPublish(new CommandSucceededEvent(
516541
state.CommandName,
517542
reply,
@@ -549,6 +574,8 @@ private void ProcessQueryReplyMessage(CommandState state, ReplyMessage<RawBsonDo
549574
};
550575
}
551576

577+
CompleteCommandActivityWithSuccess(state.CommandActivity, reply);
578+
552579
_eventLogger.LogAndPublish(new CommandSucceededEvent(
553580
state.CommandName,
554581
reply,
@@ -655,6 +682,89 @@ private static bool ShouldRedactCommand(BsonDocument command)
655682
}
656683
}
657684

685+
private void CompleteCommandActivityWithSuccess(Activity activity, BsonDocument reply)
686+
{
687+
if (activity == null)
688+
{
689+
return;
690+
}
691+
692+
if (TryGetCursorId(reply, out var cursorId))
693+
{
694+
activity.SetTag("db.mongodb.cursor_id", cursorId);
695+
}
696+
activity.SetStatus(ActivityStatusCode.Ok);
697+
activity.Stop();
698+
}
699+
700+
private void HandleCommandFailure(
701+
CommandState state,
702+
BsonDocument reply,
703+
ConnectionId connectionId,
704+
ObjectId? serviceId,
705+
int responseTo,
706+
bool skipLogging)
707+
{
708+
MongoCommandException exception = null;
709+
710+
// Only create exception if needed for span or event
711+
if (state.CommandActivity != null || _shouldTrackFailed)
712+
{
713+
exception = new MongoCommandException(
714+
connectionId,
715+
$"{state.CommandName} command failed",
716+
null,
717+
reply);
718+
}
719+
720+
if (state.CommandActivity != null)
721+
{
722+
MongoTelemetry.RecordException(state.CommandActivity, exception);
723+
724+
// Add error code if present in reply (spec requires string type)
725+
if (reply.Contains("code"))
726+
{
727+
state.CommandActivity.SetTag("db.response.status_code", reply["code"].ToString());
728+
}
729+
730+
state.CommandActivity.SetTag("exception.stacktrace", "");
731+
732+
state.CommandActivity.Stop();
733+
}
734+
735+
if (_shouldTrackFailed)
736+
{
737+
_eventLogger.LogAndPublish(new CommandFailedEvent(
738+
state.CommandName,
739+
state.QueryNamespace.DatabaseNamespace,
740+
exception,
741+
state.OperationId,
742+
responseTo,
743+
connectionId,
744+
serviceId,
745+
state.Stopwatch.Elapsed),
746+
skipLogging);
747+
}
748+
}
749+
750+
private bool TryGetCursorId(BsonDocument reply, out long cursorId)
751+
{
752+
cursorId = 0;
753+
if (reply == null) return false;
754+
755+
if (reply.TryGetValue("cursor", out var cursorValue) && cursorValue.IsBsonDocument)
756+
{
757+
var cursorDoc = cursorValue.AsBsonDocument;
758+
if (cursorDoc.TryGetValue("id", out var idValue) && idValue.IsInt64)
759+
{
760+
cursorId = idValue.AsInt64;
761+
return cursorId != 0;
762+
}
763+
}
764+
765+
return false;
766+
}
767+
658768
private enum ExpectedResponseType
659769
{
660770
None,
@@ -670,6 +780,7 @@ private class CommandState
670780
public CollectionNamespace QueryNamespace;
671781
public ExpectedResponseType ExpectedResponseType;
672782
public bool ShouldRedactReply;
783+
public Activity CommandActivity;
673784
}
674785
}
675786
}

0 commit comments

Comments
 (0)