Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 63 additions & 37 deletions src/BenchmarkDotNet/Loggers/Broker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
using System.IO;
using System.IO.Pipes;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using BenchmarkDotNet.Diagnosers;
using BenchmarkDotNet.Engines;
using BenchmarkDotNet.Running;
Expand All @@ -14,13 +12,19 @@

namespace BenchmarkDotNet.Loggers
{
internal class Broker
internal class Broker : IDisposable
{
private readonly ILogger logger;
private readonly Process process;
private readonly CompositeInProcessDiagnoser compositeInProcessDiagnoser;
private readonly AnonymousPipeServerStream inputFromBenchmark, acknowledgments;
private readonly ManualResetEvent finished;

private enum Result
{
Success,
EndOfStream,
InvalidData,
}

public Broker(ILogger logger, Process process, IDiagnoser? diagnoser, CompositeInProcessDiagnoser compositeInProcessDiagnoser,
BenchmarkCase benchmarkCase, BenchmarkId benchmarkId, AnonymousPipeServerStream inputFromBenchmark, AnonymousPipeServerStream acknowledgments)
Expand All @@ -32,7 +36,6 @@ public Broker(ILogger logger, Process process, IDiagnoser? diagnoser, CompositeI
this.inputFromBenchmark = inputFromBenchmark;
this.acknowledgments = acknowledgments;
DiagnoserActionParameters = new DiagnoserActionParameters(process, benchmarkCase, benchmarkId);
finished = new ManualResetEvent(false);

process.EnableRaisingEvents = true;
process.Exited += OnProcessExited;
Expand All @@ -46,54 +49,69 @@ public Broker(ILogger logger, Process process, IDiagnoser? diagnoser, CompositeI

internal List<string> PrefixedOutput { get; } = [];

internal void ProcessData()
public void Dispose()
{
// When the process fails to start, there is no pipe to read from.
// If we try to read from such pipe, the read blocks and BDN hangs.
// We can't use async methods with cancellation tokens because Anonymous Pipes don't support async IO.

// Usually, this property is not set yet.
if (process.HasExited)
{
return;
}

Task.Run(ProcessDataBlocking);
process.Exited -= OnProcessExited;

finished.WaitOne();
// Dispose all the pipes to let reading from pipe finish with EOF and avoid a resource leak.
DisposeLocalCopyOfClientHandles();
inputFromBenchmark.Dispose();
acknowledgments.Dispose();
}

private void OnProcessExited(object? sender, EventArgs e)
{
process.Exited -= OnProcessExited;
DisposeLocalCopyOfClientHandles();
}

// Dispose all the pipes to let reading from pipe finish with EOF and avoid a reasource leak.
private void DisposeLocalCopyOfClientHandles()
{
inputFromBenchmark.DisposeLocalCopyOfClientHandle();
inputFromBenchmark.Dispose();
acknowledgments.DisposeLocalCopyOfClientHandle();
acknowledgments.Dispose();
}

internal void ProcessData()
{
// When the process fails to start, there is no pipe to read from.
// If we try to read from such pipe, the read blocks and BDN hangs.
// We can't use async methods with cancellation tokens because Anonymous Pipes don't support async IO.

// Usually, this property is not set yet.
if (process.HasExited)
return;

finished.Set();
var result = ProcessDataBlocking();
if (result != Result.Success)
logger.WriteLineError($"ProcessData operation is interrupted by {result}.");
}

private void ProcessDataBlocking()
private Result ProcessDataBlocking()
{
using StreamReader reader = new(inputFromBenchmark, AnonymousPipesHost.UTF8NoBOM, detectEncodingFromByteOrderMarks: false);
using StreamWriter writer = new(acknowledgments, AnonymousPipesHost.UTF8NoBOM, bufferSize: 1);
// Flush the data to the Stream after each write, otherwise the client will wait for input endlessly!
writer.AutoFlush = true;

while (reader.ReadLine() is { } line)
while (true)
{
var line = reader.ReadLine();
if (line == null)
return Result.EndOfStream;

// TODO: implement Silent mode here
logger.WriteLine(LogKind.Default, line);

// Handle normal log.
if (!line.StartsWith("//"))
{
Results.Add(line);
continue;
}

// Keep in sync with WasmExecutor and InProcessHost.
else if (line.StartsWith(CompositeInProcessDiagnoser.HeaderKey))

// Handle line prefixed with "// InProcessDiagnoser "
if (line.StartsWith(CompositeInProcessDiagnoser.HeaderKey))
{
// Something like "// InProcessDiagnoser 0 1"
string[] lineItems = line.Split(' ');
Expand All @@ -102,17 +120,27 @@ private void ProcessDataBlocking()
var resultsStringBuilder = new StringBuilder();
for (int i = 0; i < resultsLinesCount;)
{
line = reader.ReadLine();
if (line == null)
return Result.EndOfStream;

if (!line.StartsWith($"{CompositeInProcessDiagnoser.ResultsKey} "))
return Result.InvalidData;

// Strip the prepended "// InProcessDiagnoserResults ".
line = reader.ReadLine()!.Substring(CompositeInProcessDiagnoser.ResultsKey.Length + 1);
line = line.Substring(CompositeInProcessDiagnoser.ResultsKey.Length + 1);
resultsStringBuilder.Append(line);
if (++i < resultsLinesCount)
{
resultsStringBuilder.AppendLine();
}
}
compositeInProcessDiagnoser.DeserializeResults(diagnoserIndex, DiagnoserActionParameters.BenchmarkCase, resultsStringBuilder.ToString());
continue;
}
else if (Engine.Signals.TryGetSignal(line, out var signal))

// Handle HostSignal data
if (Engine.Signals.TryGetSignal(line, out var signal))
{
Diagnoser?.Handle(signal, DiagnoserActionParameters);

Expand All @@ -123,22 +151,20 @@ private void ProcessDataBlocking()
// The client has connected, we no longer need to keep the local copy of client handle alive.
// This allows server to detect that child process is done and hence avoid resource leak.
// Full explanation: https://stackoverflow.com/a/39700027
inputFromBenchmark.DisposeLocalCopyOfClientHandle();
acknowledgments.DisposeLocalCopyOfClientHandle();
DisposeLocalCopyOfClientHandles();
}
else if (signal == HostSignal.AfterAll)
{
// we have received the last signal so we can stop reading from the pipe
// if the process won't exit after this, its hung and needs to be killed
process.Exited -= OnProcessExited;
finished.Set();
return;
return Result.Success;
}

continue;
}
else if (!string.IsNullOrEmpty(line))
{
PrefixedOutput.Add(line);
}

// Other line that have "//" prefix.
PrefixedOutput.Add(line);
}
}
}
Expand Down
50 changes: 28 additions & 22 deletions src/BenchmarkDotNet/Toolchains/DotNetCli/DotNetCliExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.IO.Pipes;
Expand Down Expand Up @@ -79,12 +80,14 @@ private ExecuteResult Execute(BenchmarkCase benchmarkCase,

startInfo.SetEnvironmentVariables(benchmarkCase, resolver);

using (Process process = new() { StartInfo = startInfo })
using (ConsoleExitHandler consoleExitHandler = new(process, logger))
using (AsyncProcessOutputReader processOutputReader = new(process, logOutput: true, logger, readStandardError: false))
{
Broker broker = new(logger, process, diagnoser, compositeInProcessDiagnoser, benchmarkCase, benchmarkId, inputFromBenchmark, acknowledgments);
using Process process = new() { StartInfo = startInfo };
using ConsoleExitHandler consoleExitHandler = new(process, logger);
using AsyncProcessOutputReader processOutputReader = new(process, logOutput: true, logger, readStandardError: false);

List<string> results;
List<string> prefixedOutput;
using (Broker broker = new(logger, process, diagnoser, compositeInProcessDiagnoser, benchmarkCase, benchmarkId, inputFromBenchmark, acknowledgments))
{
logger.WriteLineInfo($"// Execute: {process.StartInfo.FileName} {process.StartInfo.Arguments} in {process.StartInfo.WorkingDirectory}");

diagnoser?.Handle(HostSignal.BeforeProcessStart, broker.DiagnoserActionParameters);
Expand All @@ -103,26 +106,29 @@ private ExecuteResult Execute(BenchmarkCase benchmarkCase,

broker.ProcessData();

if (!process.WaitForExit(milliseconds: (int)ExecuteParameters.ProcessExitTimeout.TotalMilliseconds))
{
logger.WriteLineInfo($"// The benchmarking process did not quit within {ExecuteParameters.ProcessExitTimeout.TotalSeconds} seconds, it's going to get force killed now.");
results = broker.Results;
prefixedOutput = broker.PrefixedOutput;
}

processOutputReader.CancelRead();
consoleExitHandler.KillProcessTree();
}
else
{
processOutputReader.StopRead();
}
if (!process.WaitForExit(milliseconds: (int) ExecuteParameters.ProcessExitTimeout.TotalMilliseconds))
{
logger.WriteLineInfo($"// The benchmarking process did not quit within {ExecuteParameters.ProcessExitTimeout.TotalSeconds} seconds, it's going to get force killed now.");

return new ExecuteResult(true,
process.HasExited ? process.ExitCode : null,
process.Id,
broker.Results,
broker.PrefixedOutput,
processOutputReader.GetOutputLines(),
launchIndex);
processOutputReader.CancelRead();
consoleExitHandler.KillProcessTree();
}
else
{
processOutputReader.StopRead();
}

return new ExecuteResult(true,
process.HasExited ? process.ExitCode : null,
process.Id,
results,
prefixedOutput,
processOutputReader.GetOutputLines(),
launchIndex);
}
}
}
81 changes: 44 additions & 37 deletions src/BenchmarkDotNet/Toolchains/Executor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Diagnostics;
using System.IO;
Expand Down Expand Up @@ -45,55 +46,61 @@ private static ExecuteResult Execute(BenchmarkCase benchmarkCase, BenchmarkId be
{
try
{
using AnonymousPipeServerStream inputFromBenchmark = new(PipeDirection.In, HandleInheritability.Inheritable);
using AnonymousPipeServerStream acknowledgments = new(PipeDirection.Out, HandleInheritability.Inheritable);

string args = benchmarkId.ToArguments(inputFromBenchmark.GetClientHandleAsString(), acknowledgments.GetClientHandleAsString(), diagnoserRunMode);

using (Process process = new() { StartInfo = CreateStartInfo(benchmarkCase, artifactsPaths, args, resolver) })
using (ConsoleExitHandler consoleExitHandler = new(process, logger))
using (AsyncProcessOutputReader processOutputReader = new(process, logOutput: true, logger, readStandardError: false))
{
Broker broker = new(logger, process, diagnoser, compositeInProcessDiagnoser, benchmarkCase, benchmarkId, inputFromBenchmark, acknowledgments);

diagnoser?.Handle(HostSignal.BeforeProcessStart, new DiagnoserActionParameters(process, benchmarkCase, benchmarkId));

return Execute(process, benchmarkCase, broker, logger, consoleExitHandler, launchIndex, processOutputReader);
}
return ExecuteCore(benchmarkCase, benchmarkId, logger, artifactsPaths, diagnoser, compositeInProcessDiagnoser, resolver, launchIndex, diagnoserRunMode);
}
finally
{
diagnoser?.Handle(HostSignal.AfterProcessExit, new DiagnoserActionParameters(null, benchmarkCase, benchmarkId));
}
}

private static ExecuteResult Execute(Process process, BenchmarkCase benchmarkCase, Broker broker,
ILogger logger, ConsoleExitHandler consoleExitHandler, int launchIndex, AsyncProcessOutputReader processOutputReader)
private static ExecuteResult ExecuteCore(BenchmarkCase benchmarkCase, BenchmarkId benchmarkId, ILogger logger, ArtifactsPaths artifactsPaths,
IDiagnoser? diagnoser, CompositeInProcessDiagnoser compositeInProcessDiagnoser, IResolver resolver, int launchIndex,
Diagnosers.RunMode diagnoserRunMode)
{
logger.WriteLineInfo($"// Execute: {process.StartInfo.FileName} {process.StartInfo.Arguments} in {process.StartInfo.WorkingDirectory}");
using AnonymousPipeServerStream inputFromBenchmark = new(PipeDirection.In, HandleInheritability.Inheritable);
using AnonymousPipeServerStream acknowledgments = new(PipeDirection.Out, HandleInheritability.Inheritable);

try
{
process.Start();
}
catch (Win32Exception ex)
string args = benchmarkId.ToArguments(inputFromBenchmark.GetClientHandleAsString(), acknowledgments.GetClientHandleAsString(), diagnoserRunMode);

using Process process = new() { StartInfo = CreateStartInfo(benchmarkCase, artifactsPaths, args, resolver) };
using ConsoleExitHandler consoleExitHandler = new(process, logger);
using AsyncProcessOutputReader processOutputReader = new(process, logOutput: true, logger, readStandardError: false);

List<string> results;
List<string> prefixedOutput;
using (Broker broker = new(logger, process, diagnoser, compositeInProcessDiagnoser, benchmarkCase, benchmarkId, inputFromBenchmark, acknowledgments))
{
logger.WriteLineError($"// Failed to start the benchmark process: {ex}");
diagnoser?.Handle(HostSignal.BeforeProcessStart, new DiagnoserActionParameters(process, benchmarkCase, benchmarkId));

return new ExecuteResult(true, null, null, Array.Empty<string>(), Array.Empty<string>(), Array.Empty<string>(), launchIndex);
}
logger.WriteLineInfo($"// Execute: {process.StartInfo.FileName} {process.StartInfo.Arguments} in {process.StartInfo.WorkingDirectory}");

broker.Diagnoser?.Handle(HostSignal.AfterProcessStart, broker.DiagnoserActionParameters);
try
{
process.Start();
}
catch (Win32Exception ex)
{
logger.WriteLineError($"// Failed to start the benchmark process: {ex}");

processOutputReader.BeginRead();
return new ExecuteResult(true, null, null, [], [], [], launchIndex);
}

process.EnsureHighPriority(logger);
if (benchmarkCase.Job.Environment.HasValue(EnvironmentMode.AffinityCharacteristic))
{
process.TrySetAffinity(benchmarkCase.Job.Environment.Affinity, logger);
}
broker.Diagnoser?.Handle(HostSignal.AfterProcessStart, broker.DiagnoserActionParameters);

broker.ProcessData();
processOutputReader.BeginRead();

process.EnsureHighPriority(logger);
if (benchmarkCase.Job.Environment.HasValue(EnvironmentMode.AffinityCharacteristic))
{
process.TrySetAffinity(benchmarkCase.Job.Environment.Affinity, logger);
}

broker.ProcessData();

results = broker.Results;
prefixedOutput = broker.PrefixedOutput;
}

if (!process.WaitForExit(milliseconds: (int)ExecuteParameters.ProcessExitTimeout.TotalMilliseconds))
{
Expand All @@ -107,14 +114,14 @@ private static ExecuteResult Execute(Process process, BenchmarkCase benchmarkCas
processOutputReader.StopRead();
}

if (broker.Results.Any(line => line.Contains("BadImageFormatException")))
if (results.Any(line => line.Contains("BadImageFormatException")))
logger.WriteLineError("You are probably missing <PlatformTarget>AnyCPU</PlatformTarget> in your .csproj file.");

return new ExecuteResult(true,
process.HasExited ? process.ExitCode : null,
process.Id,
broker.Results,
broker.PrefixedOutput,
results,
prefixedOutput,
processOutputReader.GetOutputLines(),
launchIndex);
}
Expand Down