Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 26 additions & 14 deletions bson/src/main/org/bson/BsonBinaryWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,17 @@ public void pipe(final BsonReader reader) {
pipeDocument(reader, null);
}

@Override
public void pipe(final byte[] bytes, final int offset, final int length) {
if (getState() == State.VALUE) {
bsonOutput.writeByte(BsonType.DOCUMENT.getValue());
writeCurrentName();
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeBytes(bytes, offset, length);
completePipeDocument(pipedDocumentStartPosition);
}

@Override
public void pipe(final BsonReader reader, final List<BsonElement> extraElements) {
notNull("reader", reader);
Expand All @@ -355,9 +366,7 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
}
int pipedDocumentStartPosition = bsonOutput.getPosition();
bsonOutput.writeInt32(size);
byte[] bytes = new byte[size - 4];
bsonInput.readBytes(bytes);
bsonOutput.writeBytes(bytes);
bsonInput.pipe(bsonOutput, size - 4);

binaryReader.setState(AbstractBsonReader.State.TYPE);

Expand All @@ -371,24 +380,27 @@ private void pipeDocument(final BsonReader reader, final List<BsonElement> extra
setContext(getContext().getParentContext());
}

if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}

validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
completePipeDocument(pipedDocumentStartPosition);
} else if (extraElements != null) {
super.pipe(reader, extraElements);
} else {
super.pipe(reader);
}
}

private void completePipeDocument(final int pipedDocumentStartPosition) {
if (getContext() == null) {
setState(State.DONE);
} else {
if (getContext().getContextType() == BsonContextType.JAVASCRIPT_WITH_SCOPE) {
backpatchSize(); // size of the JavaScript with scope value
setContext(getContext().getParentContext());
}
setState(getNextState());
}
validateSize(bsonOutput.getPosition() - pipedDocumentStartPosition);
}

/**
* Sets a maximum size for documents from this point.
*
Expand Down
25 changes: 25 additions & 0 deletions bson/src/main/org/bson/BsonWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@

package org.bson;

import org.bson.io.ByteBufferBsonInput;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;

/**
* An interface for writing a logical BSON document using a push-oriented API.
*
Expand Down Expand Up @@ -357,4 +361,25 @@ public interface BsonWriter {
*/
void pipe(BsonReader reader);

/**
* Pipes a raw BSON document from the given byte array to this writer.
*
* <p>The default implementation wraps the bytes in a {@linkplain BsonBinaryReader}
* and calls {@link #pipe(BsonReader)}. Implementations may override this
* to write the raw bytes directly without intermediate object allocation.</p>
*
* @param bytes the byte array containing the BSON document
* @param offset the offset into the byte array
* @param length the length of the BSON document
* @since 5.7
*/
default void pipe(byte[] bytes, int offset, int length) {
try (BsonBinaryReader reader = new BsonBinaryReader(
new ByteBufferBsonInput(
new ByteBufNIO(ByteBuffer.wrap(bytes, offset, length)
.order(ByteOrder.LITTLE_ENDIAN))))) {
pipe(reader);
}
}

}
30 changes: 30 additions & 0 deletions bson/src/main/org/bson/RawBsonDocument.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ public ByteBuf getByteBuffer() {
return new ByteBufNIO(buffer);
}

/**
* Returns the byte array backing this document. Changes to the returned array will be reflected in this document.
*
* @return the backing byte array
* @since 5.7
*/
public byte[] getByteBacking() {
return bytes;
}

/**
* Returns the offset into the {@linkplain #getByteBacking() backing byte array} where this document starts.
*
* @return the offset
* @since 5.7
*/
public int getByteOffset() {
return offset;
}

/**
* Returns the length of this document within the {@linkplain #getByteBacking() backing byte array}.
*
* @return the length
* @since 5.7
*/
public int getByteLength() {
return length;
}

/**
* Decode this into a document.
*
Expand Down
6 changes: 1 addition & 5 deletions bson/src/main/org/bson/codecs/RawBsonDocumentCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,11 @@

package org.bson.codecs;

import org.bson.BsonBinaryReader;
import org.bson.BsonBinaryWriter;
import org.bson.BsonReader;
import org.bson.BsonWriter;
import org.bson.RawBsonDocument;
import org.bson.io.BasicOutputBuffer;
import org.bson.io.ByteBufferBsonInput;

/**
* A simple BSONDocumentBuffer codec. It does not attempt to validate the contents of the underlying ByteBuffer. It assumes that it
Expand All @@ -40,9 +38,7 @@ public RawBsonDocumentCodec() {

@Override
public void encode(final BsonWriter writer, final RawBsonDocument value, final EncoderContext encoderContext) {
try (BsonBinaryReader reader = new BsonBinaryReader(new ByteBufferBsonInput(value.getByteBuffer()))) {
writer.pipe(reader);
}
writer.pipe(value.getByteBacking(), value.getByteOffset(), value.getByteLength());
}

@Override
Expand Down
9 changes: 9 additions & 0 deletions bson/src/main/org/bson/io/BsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,15 @@ public interface BsonInput extends Closeable {
*/
boolean hasRemaining();

/**
* Pipes the specified number of bytes from {@linkplain BsonInput this} input to the given {@linkplain BsonOutput output}.
*
* @param output the output to pipe to
* @param numBytes the number of bytes to pipe
* @since 5.7
*/
void pipe(BsonOutput output, int numBytes);

@Override
void close();
}
18 changes: 18 additions & 0 deletions bson/src/main/org/bson/io/ByteBufferBsonInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,24 @@ public boolean hasRemaining() {
return buffer.hasRemaining();
}

@Override
public void pipe(final BsonOutput output, final int numBytes) {
ensureOpen();
ensureAvailable(numBytes);

if (buffer.isBackedByArray()) {
int position = buffer.position();
int arrayOffset = buffer.arrayOffset();
output.writeBytes(buffer.array(), arrayOffset + position, numBytes);
buffer.position(position + numBytes);
} else {
// Fallback: use temporary buffer for non-array-backed buffers
byte[] temp = new byte[numBytes];
buffer.get(temp);
output.writeBytes(temp);
}
}

@Override
public void close() {
buffer.release();
Expand Down