Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4947b0e
refactor of parallel on disk graph index writes
MarkWolters Jan 13, 2026
bfa658d
refactor of parallel graph index writes
MarkWolters Jan 14, 2026
d2dd213
added unit test of 1 v 2 phase writes
MarkWolters Jan 16, 2026
d6b9c8a
update for review comments
MarkWolters Jan 16, 2026
60dde25
fix for bug that could result in numTasks=0
MarkWolters Jan 16, 2026
031716a
initial impl
MarkWolters Jan 23, 2026
0d4c4a1
updates to javadoc comments and imports
MarkWolters Jan 23, 2026
dc54ea9
deprecated writeInline and moved error block
MarkWolters Jan 26, 2026
8395737
cleanup
MarkWolters Jan 27, 2026
c89177f
Merge branch 'parallel_writer_v2' into refactor_parallel_writer
MarkWolters Jan 27, 2026
175453e
use asyncFileChannel for inline writes
MarkWolters Jan 27, 2026
f18f30c
preallocate disk space
MarkWolters Jan 27, 2026
b23e9f6
Revert "preallocate disk space"
MarkWolters Jan 27, 2026
94f3584
force() writes to disk instead of just OS buffer
MarkWolters Feb 2, 2026
5b980c5
debugging code for failing test
MarkWolters Feb 3, 2026
9624de8
removing debugging code for failing test
MarkWolters Feb 3, 2026
281d030
remove double write
MarkWolters Feb 4, 2026
9dcf774
check channel for write completion
MarkWolters Feb 4, 2026
99710f3
force inline writes before neighbors
MarkWolters Feb 6, 2026
d28846e
remove async channel for inline
MarkWolters Feb 6, 2026
c0cde9e
removing faulty test
MarkWolters Feb 9, 2026
968e492
removing faulty test
MarkWolters Feb 9, 2026
91ec6d9
removing faulty test
MarkWolters Feb 9, 2026
3cef608
removing faulty test
MarkWolters Feb 10, 2026
7cba6cb
unit test parallel writes
MarkWolters Feb 10, 2026
a721c68
add invalidation for buffer cache
MarkWolters Feb 10, 2026
97c3a46
Merge branch 'main' into refactor_parallel_writer
MarkWolters Feb 12, 2026
3757b4a
Jvector 606 (#624)
MarkWolters Feb 12, 2026
e442f5d
continue to use standard writer in tests for now
MarkWolters Feb 12, 2026
7736065
Merge branch 'main' into refactor_parallel_writer
MarkWolters Feb 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues;
import io.github.jbellis.jvector.graph.NodesIterator;
import io.github.jbellis.jvector.graph.RandomAccessVectorValues;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter;
import io.github.jbellis.jvector.graph.disk.OrdinalMapper;
import io.github.jbellis.jvector.graph.disk.*;
import io.github.jbellis.jvector.graph.disk.feature.Feature;
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
import io.github.jbellis.jvector.graph.disk.feature.FusedPQ;
Expand Down Expand Up @@ -189,12 +187,17 @@ public void writeSequentialThenParallelAndVerify(Blackhole blackhole) throws IOE
private void writeGraph(ImmutableGraphIndex graph,
Path path,
boolean parallel) throws IOException {
try (var writer = new OnDiskGraphIndexWriter.Builder(graph, path)
.withParallelWrites(parallel)
.with(nvqFeature)
.with(fusedPQFeature)
.withMapper(identityMapper)
.build()) {
try (RandomAccessOnDiskGraphIndexWriter writer = parallel ?
new OnDiskParallelGraphIndexWriter.Builder(graph, path)
.with(nvqFeature)
.with(fusedPQFeature)
.withMapper(identityMapper)
.build() :
new OnDiskGraphIndexWriter.Builder(graph, path)
.with(nvqFeature)
.with(fusedPQFeature)
.withMapper(identityMapper)
.build()) {
var view = graph.getView();
Map<FeatureId, IntFunction<Feature.State>> writeSuppliers = new EnumMap<>(FeatureId.class);
writeSuppliers.put(FeatureId.NVQ_VECTORS, inlineSuppliers.get(FeatureId.NVQ_VECTORS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,22 @@ public void flush() throws IOException {
stream.flush();
}

/**
* Invalidates any buffered read cache by seeking to the current position.
* This is necessary when external writes (e.g., via AsynchronousFileChannel)
* have modified the file, bypassing this writer's buffer.
* <p>
* This method flushes any pending writes and then seeks to the current position,
* which forces the underlying RandomAccessFile to re-read from disk on the next read.
*
* @throws IOException if an I/O error occurs
*/
public void invalidateCache() throws IOException {
flush();
long currentPos = raf.getFilePointer();
raf.seek(currentPos);
}

/**
* Returns the CRC32 checksum for the range [startOffset .. endOffset)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
* <p>
* Implementations support different strategies for writing graph data,
* including random access, sequential, and parallel writing modes.
* Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)}
* or {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)}
* Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)}
* factory methods to obtain appropriate builder instances.
*
* @see GraphIndexWriterTypes
Expand All @@ -56,58 +55,31 @@ public interface GraphIndexWriter extends Closeable {
void write(Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers) throws IOException;

/**
* Factory method to obtain a builder for the specified writer type with an IndexWriter.
* <p>
* This overload accepts any IndexWriter but certain types have specific requirements:
* <ul>
* <li>ON_DISK requires a RandomAccessWriter (will throw IllegalArgumentException otherwise)</li>
* <li>ON_DISK_SEQUENTIAL accepts any IndexWriter</li>
* <li>ON_DISK_PARALLEL is not supported via this method (use the Path overload instead)</li>
* </ul>
* Factory method to obtain a builder for the specified writer type.
*
* @param type the type of writer to create
* @param graphIndex the graph index to write
* @param out the output writer
* @param out the Path to the output file
* @return a builder for the specified writer type
* @throws IllegalArgumentException if the type requires a specific writer type that wasn't provided
*/
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends IndexWriter>
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) {
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends RandomAccessWriter>
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException {
switch (type) {
case ON_DISK_PARALLEL:
if (!(out instanceof RandomAccessWriter)) {
throw new IllegalArgumentException("ON_DISK_PARALLEL requires a RandomAccessWriter");
}
return new OnDiskGraphIndexWriter.Builder(graphIndex, (RandomAccessWriter) out);
case ON_DISK_SEQUENTIAL:
return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out);
case RANDOM_ACCESS:
return new OnDiskGraphIndexWriter.Builder(graphIndex, out);
case RANDOM_ACCESS_PARALLEL:
return new OnDiskParallelGraphIndexWriter.Builder(graphIndex, out);
default:
throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type);
throw new IllegalArgumentException("Unknown RandomAccess GraphIndexWriterType: " + type);
}
}

/**
* Factory method to obtain a builder for the specified writer type with a file Path.
* <p>
* This overload accepts a Path and is required for:
* <ul>
* <li>ON_DISK_PARALLEL - enables async I/O for improved throughput</li>
* </ul>
* Other writer types should use the {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)}
* overload instead.
*
* @param type the type of writer to create (currently only ON_DISK_PARALLEL is supported)
* @param graphIndex the graph index to write
* @param out the output file path
* @return a builder for the specified writer type
* @throws FileNotFoundException if the file cannot be created or opened
* @throws IllegalArgumentException if the type is not supported via this method
*/
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends IndexWriter>
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException {
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) {
switch (type) {
case ON_DISK_PARALLEL:
return new OnDiskGraphIndexWriter.Builder(graphIndex, out);
case ON_DISK_SEQUENTIAL:
return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out);
default:
throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ public enum GraphIndexWriterTypes {
*/
ON_DISK_SEQUENTIAL,

/**
* Sequential on-disk writer that uses asynchronous I/O for improved throughput.
* Writes all data sequentially and is the current default implementation.
* Writes header as footer. Does not support incremental updates.
* Accepts any RandomAccessWriter.
*/
RANDOM_ACCESS,

/**
* Parallel on-disk writer that uses asynchronous I/O for improved throughput.
* Builds records in parallel across multiple threads and writes them
* asynchronously using AsynchronousFileChannel.
* Requires a Path to be provided for async file channel access.
*/
ON_DISK_PARALLEL
RANDOM_ACCESS_PARALLEL
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@
import io.github.jbellis.jvector.graph.disk.feature.Feature;
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.nio.channels.AsynchronousFileChannel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.IntFunction;

/**
* A task that builds L0 records for a range of nodes in memory.
* A task that writes L0 records for a range of nodes directly to disk using synchronous position-based writes.
* <p>
* This task is designed to be executed in a thread pool, with each worker thread
* owning its own ImmutableGraphIndex.View for thread-safe neighbor iteration.
* Each task processes a contiguous range of ordinals to reduce task creation overhead.
* <p>
* This writes directly to the AsynchronousFileChannel using position-based writes with writeFully
* to ensure all bytes are written before returning. This eliminates race conditions where the OS
* buffer cache hasn't flushed data before subsequent reads occur.
*/
class NodeRecordTask implements Callable<List<NodeRecordTask.Result>> {
class NodeRecordTask implements Callable<Void> {
private final int startOrdinal; // Inclusive
private final int endOrdinal; // Exclusive
private final OrdinalMapper ordinalMapper;
Expand All @@ -45,22 +51,8 @@ class NodeRecordTask implements Callable<List<NodeRecordTask.Result>> {
private final Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers;
private final int recordSize;
private final long baseOffset; // Base file offset for L0 (offsets calculated per-ordinal)
private final ByteBuffer buffer;

/**
* Result of building a node record.
*/
static class Result {
final int newOrdinal;
final long fileOffset;
final ByteBuffer data;

Result(int newOrdinal, long fileOffset, ByteBuffer data) {
this.newOrdinal = newOrdinal;
this.fileOffset = fileOffset;
this.data = data;
}
}
private final AsynchronousFileChannel channel;
private final ByteBuffer buffer; // Thread-local buffer for building record components

NodeRecordTask(int startOrdinal,
int endOrdinal,
Expand All @@ -71,6 +63,7 @@ static class Result {
Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers,
int recordSize,
long baseOffset,
AsynchronousFileChannel channel,
ByteBuffer buffer) {
this.startOrdinal = startOrdinal;
this.endOrdinal = endOrdinal;
Expand All @@ -81,41 +74,75 @@ static class Result {
this.featureStateSuppliers = featureStateSuppliers;
this.recordSize = recordSize;
this.baseOffset = baseOffset;
this.channel = channel;
this.buffer = buffer;
}

@Override
public List<Result> call() throws Exception {
List<Result> results = new ArrayList<>(endOrdinal - startOrdinal);
/**
* Writes a buffer fully to the channel at the specified position.
* Ensures all bytes are written by looping until the buffer is empty.
* This is critical for correctness as AsynchronousFileChannel.write() may not write all bytes in one call.
*
* @param channel the channel to write to
* @param buffer the buffer to write (will be fully consumed)
* @param position the file position to write at
* @throws IOException if an I/O error occurs
* @throws ExecutionException if the write operation fails
* @throws InterruptedException if interrupted while waiting for write completion
*/
private static void writeFully(AsynchronousFileChannel channel, ByteBuffer buffer, long position)
throws IOException, ExecutionException, InterruptedException {
long currentPosition = position;
while (buffer.hasRemaining()) {
int written = channel.write(buffer, currentPosition).get();
if (written < 0) {
throw new IOException("Channel closed while writing");
}
currentPosition += written;
}
}

@Override
public Void call() throws Exception {
// Reuse writer and buffer across all ordinals in this range
var writer = new ByteBufferIndexWriter(buffer);

for (int newOrdinal = startOrdinal; newOrdinal < endOrdinal; newOrdinal++) {
// Calculate file offset for this ordinal
long fileOffset = baseOffset + (long) newOrdinal * recordSize;
var originalOrdinal = ordinalMapper.newToOld(newOrdinal);
long recordOffset = baseOffset + (long) newOrdinal * recordSize;
long currentPosition = recordOffset;

// Reset buffer for this ordinal
writer.reset();

var originalOrdinal = ordinalMapper.newToOld(newOrdinal);

// Write node ordinal
writer.writeInt(newOrdinal);
ByteBuffer ordinalData = writer.cloneBuffer();
writeFully(channel, ordinalData, currentPosition);
currentPosition += Integer.BYTES;

// Handle OMITTED nodes (holes in ordinal space)
if (originalOrdinal == OrdinalMapper.OMITTED) {
// Write placeholder: skip inline features and write empty neighbor list
// Write placeholder: zeros for features and empty neighbor list
writer.reset();
for (var feature : inlineFeatures) {
// Write zeros for missing features
for (int i = 0; i < feature.featureSize(); i++) {
writer.writeByte(0);
}
}
ByteBuffer featureData = writer.cloneBuffer();
writeFully(channel, featureData, currentPosition);
currentPosition += featureData.remaining();

// Write empty neighbor list
writer.reset();
writer.writeInt(0); // neighbor count
for (int n = 0; n < graph.getDegree(0); n++) {
writer.writeInt(-1); // padding
}
ByteBuffer neighborData = writer.cloneBuffer();
writeFully(channel, neighborData, currentPosition);
} else {
// Validate node exists
if (!graph.containsNode(originalOrdinal)) {
Expand All @@ -124,20 +151,22 @@ public List<Result> call() throws Exception {
newOrdinal, originalOrdinal));
}

// Write inline features
// Write inline features (skip if supplier is null - feature was pre-written)
for (var feature : inlineFeatures) {
var supplier = featureStateSuppliers.get(feature.id());
if (supplier == null) {
// Write zeros for missing supplier
for (int i = 0; i < feature.featureSize(); i++) {
writer.writeByte(0);
}
} else {
if (supplier != null) {
// Feature not pre-written, write it now
writer.reset();
feature.writeInline(writer, supplier.apply(originalOrdinal));
ByteBuffer featureData = writer.cloneBuffer();
writeFully(channel, featureData, currentPosition);
}
// Skip to next feature position (whether we wrote it or not)
currentPosition += feature.featureSize();
}

// Write neighbors
writer.reset();
var neighbors = view.getNeighborsIterator(0, originalOrdinal);
if (neighbors.size() > graph.getDegree(0)) {
throw new IllegalStateException(
Expand All @@ -161,21 +190,13 @@ public List<Result> call() throws Exception {
for (; n < graph.getDegree(0); n++) {
writer.writeInt(-1);
}
}

// Verify we wrote exactly the expected amount
if (writer.bytesWritten() != recordSize) {
throw new IllegalStateException(
String.format("Record size mismatch for ordinal %d: expected %d bytes, wrote %d bytes",
newOrdinal, recordSize, writer.bytesWritten()));
ByteBuffer neighborData = writer.cloneBuffer();
writeFully(channel, neighborData, currentPosition);
}

// Writer handles flip, copy, and reset internally
// The copy ensures thread-local buffer can be safely reused for the next ordinal
ByteBuffer dataCopy = writer.cloneBuffer();
results.add(new Result(newOrdinal, fileOffset, dataCopy));
}

return results;
return null;
}
}

Loading