Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,48 +148,43 @@ public final class CommandMessage extends RequestMessage {
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` sections.
*/
BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
List<ByteBuf> byteBuffers = bsonOutput.getByteBuffers();
CompositeByteBuf byteBuf = new CompositeByteBuf(bsonOutput.getByteBuffers());
try {
CompositeByteBuf byteBuf = new CompositeByteBuf(byteBuffers);
try {
byteBuf.position(firstDocumentPosition);
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);

// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
if (byteBuf.hasRemaining()) {
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();

// Each loop iteration processes one Document Sequence
// When there are no more bytes remaining, there are no more Document Sequences
while (byteBuf.hasRemaining()) {
// skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
byteBuf.position(byteBuf.position() + 1);
int sequenceStart = byteBuf.position();
int sequenceSizeInBytes = byteBuf.getInt();
int sectionEnd = sequenceStart + sequenceSizeInBytes;

String fieldName = getSequenceIdentifier(byteBuf);
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
// so, this method will need to change in order to append the value to the correct nested document.
assertFalse(fieldName.contains("."));

ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
try {
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
} finally {
documentsByteBufSlice.release();
}
byteBuf.position(sectionEnd);
byteBuf.position(firstDocumentPosition);
ByteBufBsonDocument byteBufBsonDocument = createOne(byteBuf);

// If true, it means there is at least one `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE` section in the OP_MSG
if (byteBuf.hasRemaining()) {
BsonDocument commandBsonDocument = byteBufBsonDocument.toBaseBsonDocument();

// Each loop iteration processes one Document Sequence
// When there are no more bytes remaining, there are no more Document Sequences
while (byteBuf.hasRemaining()) {
// skip reading the payload type, we know it is `PAYLOAD_TYPE_1`
byteBuf.position(byteBuf.position() + 1);
int sequenceStart = byteBuf.position();
int sequenceSizeInBytes = byteBuf.getInt();
int sectionEnd = sequenceStart + sequenceSizeInBytes;

String fieldName = getSequenceIdentifier(byteBuf);
// If this assertion fires, it means that the driver has started using document sequences for nested fields. If
// so, this method will need to change in order to append the value to the correct nested document.
assertFalse(fieldName.contains("."));

ByteBuf documentsByteBufSlice = byteBuf.duplicate().limit(sectionEnd);
try {
commandBsonDocument.append(fieldName, new BsonArray(createList(documentsByteBufSlice)));
} finally {
documentsByteBufSlice.release();
}
return commandBsonDocument;
} else {
return byteBufBsonDocument;
byteBuf.position(sectionEnd);
}
} finally {
byteBuf.release();
return commandBsonDocument;
} else {
return byteBufBsonDocument;
}
} finally {
byteBuffers.forEach(ByteBuf::release);
byteBuf.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.mongodb.internal.connection;

import com.mongodb.assertions.Assertions;
import org.bson.ByteBuf;

import java.nio.Buffer;
Expand Down Expand Up @@ -50,7 +51,11 @@ class CompositeByteBuf implements ByteBuf {
}

CompositeByteBuf(final CompositeByteBuf from) {
components = from.components;
notNull("from", from);
components = new ArrayList<>(from.components.size());
from.components.forEach(component ->
components.add(new Component(component.buffer.duplicate(), component.offset))
);
position = from.position();
limit = from.limit();
}
Expand Down Expand Up @@ -306,6 +311,7 @@ public ByteBuf retain() {
referenceCount.decrementAndGet();
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
}
components.forEach(c -> c.buffer.retain());
return this;
}

Expand All @@ -315,6 +321,11 @@ public void release() {
referenceCount.incrementAndGet();
throw new IllegalStateException("Attempted to decrement the reference count below 0");
}
components.forEach(c -> c.buffer.release());
if (referenceCount.get() == 0) {
Assertions.assertTrue(components.stream().noneMatch(c -> c.buffer.getReferenceCount() > 0),
"Some buffers still had a reference to them even though the CompositeByteBuf was fully released");
}
}

private Component findComponent(final int index) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.atomic.AtomicInteger;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
public final class NettyByteBuf implements ByteBuf {

private final AtomicInteger referenceCount = new AtomicInteger(1);
private io.netty.buffer.ByteBuf proxied;
private boolean isWriting = true;

Expand Down Expand Up @@ -271,17 +272,25 @@ public ByteBuffer asNIO() {

@Override
public int getReferenceCount() {
return proxied.refCnt();
return referenceCount.get();
}

@Override
public ByteBuf retain() {
if (referenceCount.incrementAndGet() == 1) {
referenceCount.decrementAndGet();
throw new IllegalStateException("Attempted to increment the reference count when it is already 0");
}
proxied.retain();
return this;
}

@Override
public void release() {
if (referenceCount.decrementAndGet() < 0) {
referenceCount.incrementAndGet();
throw new IllegalStateException("Attempted to decrement the reference count below 0");
}
proxied.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class CommandHelperSpecification extends Specification {
InternalConnection connection

def setup() {
InternalStreamConnection.setRecordEverything(true) // Ensures implementation can log as expected
connection = new InternalStreamConnectionFactory(ClusterConnectionMode.SINGLE,
new NettyStreamFactory(SocketSettings.builder().build(), getSslSettings()),
getCredentialWithCache(), CLIENT_METADATA, [], LoggerSettings.builder().build(), null, getServerApi())
Expand Down
Loading