Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/SeqCli/Forwarder/Filesystem/StoreDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ protected virtual (string, StoreFile) CreateTemporary()
var tmpName = $"rc{Guid.NewGuid():N}.tmp";
return (tmpName, Create(tmpName));
}

/// <summary>
/// Create a new file with the given contents.
/// </summary>
public virtual StoreFile Create(string name, Span<byte> contents)
{
Create(name);
return ReplaceContents(name, contents);
}

/// <summary>
/// Delete a file with the given name, returning whether the file was deleted.
Expand All @@ -43,7 +52,7 @@ protected virtual (string, StoreFile) CreateTemporary()
/// <summary>
/// Atomically replace the contents of one file with another, creating it if it doesn't exist and deleting the other.
/// </summary>
public abstract StoreFile Replace(string toReplace, string replaceWith);
public abstract StoreFile Replace(string destinationPath, string sourcePath);

/// <summary>
/// Atomically replace the contents of a file.
Expand Down Expand Up @@ -73,10 +82,18 @@ public virtual StoreFile ReplaceContents(string name, Span<byte> contents, bool
}

/// <summary>
/// List all files in unspecified order.
/// List all files matching the given predicate in unspecified order.
/// </summary>
public abstract IEnumerable<(string Name, StoreFile File)> List(Func<string, bool> predicate);

/// <summary>
/// List all files in unspecified order.
/// </summary>
public virtual IEnumerable<(string Name, StoreFile File)> ListAll()
{
return List(_ => true);
}

/// <summary>
/// Try get a file by name.
/// </summary>
Expand Down
17 changes: 17 additions & 0 deletions src/SeqCli/Forwarder/Filesystem/StoreFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ public virtual long CopyContentsTo(Span<byte> buffer)
return reader.CopyTo(buffer);
}

/// <summary>
/// Append the contents of the supplied buffer to the end of the file.
/// </summary>
public virtual void Append(Span<byte> buffer)
{
if (!TryOpenAppend(out var opened))
{
throw new Exception("Failed to open the file for appending");
}

using var appender = opened;

appender.Append(buffer);
appender.Commit();
appender.Sync();
}

/// <summary>
/// Try open a reader to the file.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ public override bool TryDelete(string name)
}
}

public override SystemStoreFile Replace(string toReplace, string replaceWith)
public override SystemStoreFile Replace(string destinationPath, string sourcePath)
{
var filePath = Path.Combine(_directoryPath, toReplace);
var filePath = Path.Combine(_directoryPath, destinationPath);

File.Replace(Path.Combine(_directoryPath, replaceWith), filePath, null);
File.Replace(Path.Combine(_directoryPath, sourcePath), filePath, null);

return new SystemStoreFile(filePath);
}
Expand Down
168 changes: 108 additions & 60 deletions src/SeqCli/Forwarder/Storage/BufferReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,80 +71,107 @@ Scan through the offending chunk until a newline delimiter is found.
2. After discarding, attempt to fill a buffer with as much data as possible
from the underlying chunks.
*/

var chunkIndex = 0;

if (_discardingHead != null)
{
// We're discarding an oversize payload
var discardingRentedArray = ArrayPool<byte>.Shared.Rent(maxSize);

// NOTE: We don't use `maxSize` here, because we're discarding these bytes
// so it doesn't matter what size the target array is
var discardingBatchBuffer = discardingRentedArray.AsSpan();

while (_discardingHead != null)
while (_discardingHead != null && chunkIndex < _sortedChunks.Count)
{
var chunk = _sortedChunks[0];
var chunk = _sortedChunks[chunkIndex];

// If the chunk has changed (it may have been deleted externally)
// If the first chunk has changed (it may have been deleted externally)
// then stop discarding
if (chunk.Name.Id != _discardingHead.Value.ChunkId)
{
_discardingHead = null;

ArrayPool<byte>.Shared.Return(discardingRentedArray);
break;
}

var chunkHead = Extents(chunk);
// Try read to the end of the chunk
//
// If reading the chunk length fails then advance over it
if (!chunk.Chunk.TryGetLength(out var length))
{
chunkIndex += 1;

continue;
}

var chunkHead = new BufferReaderChunkExtents(Math.Min(length.Value, _discardingHead.Value.Offset), length.Value);

// Attempt to fill the buffer with data from the underlying chunk
//
// If reading from the chunk fails then advance over it
if (!TryFillChunk(chunk,
chunkHead with { CommitHead = _discardingHead.Value.Offset },
chunkHead,
discardingBatchBuffer,
out var fill))
out var filled))
{
// If attempting to read from the chunk fails then remove it and carry on
// This is also done below in the regular read-loop if reading fails
_sortedChunks.RemoveAt(0);
_discardingHead = null;
chunkIndex += 1;

ArrayPool<byte>.Shared.Return(discardingRentedArray);
break;
continue;
}

// Scan forwards for the next newline
var firstNewlineIndex = discardingBatchBuffer[..fill.Value].IndexOf((byte)'\n');

// If a newline was found then advance the reader to it and stop discarding
if (firstNewlineIndex >= 0) fill = firstNewlineIndex + 1;
var firstNewlineIndex = discardingBatchBuffer[..filled.Value].IndexOf((byte)'\n');
if (firstNewlineIndex >= 0) filled = firstNewlineIndex + 1;

_discardingHead = _discardingHead.Value with
{
Offset = _discardingHead.Value.Offset + fill.Value
Offset = _discardingHead.Value.Offset + filled.Value
};
_readHead = _discardingHead.Value;

var isChunkFinished = _discardingHead.Value.Offset == chunkHead.WriteHead;

// If the chunk is finished or a newline is found then stop discarding
if (firstNewlineIndex >= 0 || (isChunkFinished && _sortedChunks.Count > 1))
// If a newline was found then advance the reader to it and stop discarding
if (firstNewlineIndex >= 0)
{
_discardingHead = null;

ArrayPool<byte>.Shared.Return(discardingRentedArray);
break;
}

var isChunkFinished = chunkHead.CommitHead + filled == chunkHead.WriteHead;

// If we've discarded to the end of the chunk then update our state from the disk and return
//
// The next time we attempt to fill a chunk we'll resume from this point.
if (isChunkFinished)
{
// If there's no way new data can arrive to complete this event then advance over it.
// If the chunk is the last one then it's considered actively writable, and so we
// presume we're seeing a torn write here.
//
// A future sync from the files on disk will delete it.
if (_sortedChunks.Count > 1)
{
_discardingHead = null;

// If there's more data in the chunk to read then loop back through
if (!isChunkFinished) continue;

// If the chunk is finished but a newline wasn't found then refresh
// our set of chunks and loop back through
ReadChunks();
break;
}

// There's only a single chunk, update our state from the disk in case the writer
// has moved on to another chunk and return. We may end up coming back later and
// reading more to discard.
ReadChunks();

ArrayPool<byte>.Shared.Return(discardingRentedArray);
batch = null;
return false;
ArrayPool<byte>.Shared.Return(discardingRentedArray);
batch = null;
return false;
}
}

ReadChunks();

ArrayPool<byte>.Shared.Return(discardingRentedArray);
}

// Fill a buffer with newline-delimited values
Expand All @@ -154,45 +181,69 @@ from the underlying chunks.
var batchLength = 0;

BufferPosition? batchHead = null;
var chunkIndex = 0;

// Try fill the buffer with as much data as possible
// by walking over all chunks
while (chunkIndex < _sortedChunks.Count)
{
var chunk = _sortedChunks[chunkIndex];
var chunkHead = Extents(chunk);

BufferReaderChunkExtents chunkHead;
if (chunk.Name.Id == _readHead.ChunkId)
{
// The chunk is the one we're currently reading; resume from where we left off
// If the file was truncated externally then we'll treat it as complete
chunkHead = chunk.Chunk.TryGetLength(out var length)
? new BufferReaderChunkExtents(Math.Min(_readHead.Offset, length.Value), length.Value)
: new BufferReaderChunkExtents(_readHead.Offset, _readHead.Offset);
}
else
{
// The chunk is not the one we've been reading; start from the beginning
chunk.Chunk.TryGetLength(out var length);
chunkHead = new BufferReaderChunkExtents(0, length ?? 0);
}

if (!TryFillChunk(chunk, chunkHead, batchBuffer[batchLength..], out var fill))
if (!TryFillChunk(chunk, chunkHead, batchBuffer[batchLength..], out var filled))
{
// If we can't read from this chunk anymore then remove it and continue
_sortedChunks.RemoveAt(chunkIndex);
// If we can't read from this chunk anymore then step over it
chunkIndex += 1;
continue;
}

var isBufferFull = batchLength + fill == maxSize;
var isChunkFinished = fill == chunkHead.WriteHead;
var isBufferFull = batchLength + filled == maxSize;
var isChunkFinished = chunkHead.CommitHead + filled == chunkHead.WriteHead;

// If either the buffer has been filled or we've reached the end of a chunk
// then scan to the last newline
// If either the buffer has been filled or we've reached the end of the chunk
// then scan backwards to the last newline delimiter
if (isBufferFull || isChunkFinished)
{
// If the chunk is finished then we expect this to immediately find a trailing newline
// If the chunk is valid and finished then we expect this to immediately find a trailing newline
// NOTE: `Span.LastIndexOf` and similar methods are vectorized
var lastNewlineIndex = batchBuffer[batchLength..(batchLength + fill.Value)].LastIndexOf((byte)'\n');
var lastNewlineIndex = batchBuffer[batchLength..(batchLength + filled.Value)].LastIndexOf((byte)'\n');
if (lastNewlineIndex == -1)
{
// If this isn't the last chunk then discard the trailing data and move on
// The data we wrote didn't contain any newline delimiters

// If there's no way new data can arrive to complete this event then advance over it.
// If the chunk is the last one then it's considered actively writable, and so we
// presume we're seeing a torn write here.
//
// A subsequent attempt to fill will overwrite the incomplete data in it, and
// a future sync from the files on disk will delete it
if (isChunkFinished && chunkIndex < _sortedChunks.Count)
{
chunkIndex += 1;
continue;
}

// If this is the first chunk then we've hit an oversize payload
// If we're looking at the first chunk then start discarding
//
// We'll hit this point if we happen to start from an oversized payload, or if our last attempt
// to fill a batch advanced up to an oversize event
if (chunkIndex == 0)
{
_discardingHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
_discardingHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + filled.Value);

// Ensures we don't attempt to yield the data we've read
batchHead = null;
Expand All @@ -202,11 +253,12 @@ from the underlying chunks.
break;
}

fill = lastNewlineIndex + 1;
// Only consider the read data up to the last newline
filled = lastNewlineIndex + 1;
}

batchLength += fill.Value;
batchHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + fill.Value);
batchLength += filled.Value;
batchHead = new BufferPosition(chunk.Name.Id, chunkHead.CommitHead + filled.Value);

chunkIndex += 1;
}
Expand Down Expand Up @@ -245,6 +297,7 @@ public void AdvanceTo(BufferPosition newReaderHead)
// The remainder of the chunk is being skipped
if (chunk.Name.Id < newReaderHead.ChunkId)
{
chunk.Dispose();
_storeDirectory.TryDelete(chunk.Name.ToString());
}
else
Expand All @@ -261,17 +314,12 @@ public void AdvanceTo(BufferPosition newReaderHead)
_sortedChunks.RemoveRange(0, removeLength);
}

BufferReaderChunkExtents Extents(BufferReaderChunk chunk)
{
if (chunk.Name.Id == _readHead.ChunkId)
return chunk.Chunk.TryGetLength(out var writeHead)
? new BufferReaderChunkExtents(Math.Min(_readHead.Offset, writeHead.Value), writeHead.Value)
: new BufferReaderChunkExtents(_readHead.Offset, _readHead.Offset);

chunk.Chunk.TryGetLength(out var length);
return new BufferReaderChunkExtents(0, length ?? 0);
}

/// <summary>
/// Read the current state of the store from files on disk.
/// </summary>
/// <remarks>
/// This method will delete any files it finds before the current read head.
/// </remarks>
void ReadChunks()
{
List<BufferReaderChunk> chunks = [];
Expand Down
20 changes: 18 additions & 2 deletions src/SeqCli/Forwarder/Storage/BufferReaderChunkExtents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;

namespace SeqCli.Forwarder.Storage;

/// <summary>
/// The current read and write positions in a <see cref="BufferReaderChunk" />.
/// </summary>
readonly record struct BufferReaderChunkExtents(long CommitHead, long WriteHead)
readonly record struct BufferReaderChunkExtents
{
public long Unadvanced => WriteHead - CommitHead;
public BufferReaderChunkExtents(long commitHead, long writeHead)
{
if (commitHead > writeHead)
{
throw new ArgumentOutOfRangeException(nameof(CommitHead), "The commit head cannot be greater than the write head");
}

CommitHead = commitHead;
WriteHead = writeHead;
}

public long CommitHead { get; }
public long WriteHead { get; }

public long Unadvanced => Math.Max(0, WriteHead - CommitHead);
}
Loading
Loading