diff --git a/bson/src/main/org/bson/ByteBuf.java b/bson/src/main/org/bson/ByteBuf.java
index 089bb67885c..bc37102f0b0 100644
--- a/bson/src/main/org/bson/ByteBuf.java
+++ b/bson/src/main/org/bson/ByteBuf.java
@@ -184,6 +184,26 @@ public interface ByteBuf  {
      */
     byte[] array();
 
+    /**
+     * 
States whether this buffer is backed by an accessible byte array.
+     *
+     * If this method returns {@code true} then the {@link #array()} and {@link #arrayOffset()} methods may safely be invoked.
+     *
+     * @return {@code true} if, and only if, this buffer is backed by an array and is not read-only
+     * @since 5.5
+     */
+    boolean hasArray();
+
+    /**
+     * Returns the offset of the first byte within the backing byte array of
+     * this buffer.
+     *
+     * @throws java.nio.ReadOnlyBufferException If this buffer is backed by an array but is read-only
+     * @throws UnsupportedOperationException if this buffer is not backed by an accessible array
+     * @since 5.5
+     */
+    int arrayOffset();
+
     /**
      * Returns this buffer's limit.
      *
diff --git a/bson/src/main/org/bson/ByteBufNIO.java b/bson/src/main/org/bson/ByteBufNIO.java
index ffb6584ac64..ba71625d764 100644
--- a/bson/src/main/org/bson/ByteBufNIO.java
+++ b/bson/src/main/org/bson/ByteBufNIO.java
@@ -132,6 +132,16 @@ public byte[] array() {
         return buf.array();
     }
 
+    @Override
+    public boolean hasArray() {
+        return buf.hasArray();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return buf.arrayOffset();
+    }
+
     @Override
     public int limit() {
         return buf.limit();
diff --git a/bson/src/main/org/bson/io/OutputBuffer.java b/bson/src/main/org/bson/io/OutputBuffer.java
index 7c1a64b2f85..e0a1a314017 100644
--- a/bson/src/main/org/bson/io/OutputBuffer.java
+++ b/bson/src/main/org/bson/io/OutputBuffer.java
@@ -197,7 +197,7 @@ public void writeLong(final long value) {
         writeInt64(value);
     }
 
-    private int writeCharacters(final String str, final boolean checkForNullCharacters) {
+    protected int writeCharacters(final String str, final boolean checkForNullCharacters) {
         int len = str.length();
         int total = 0;
 
diff --git a/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java b/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java
index d53ffe7c683..a4d4bd1be57 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/ByteBufferBsonOutput.java
@@ -16,6 +16,7 @@
 
 package com.mongodb.internal.connection;
 
+import org.bson.BsonSerializationException;
 import org.bson.ByteBuf;
 import org.bson.io.OutputBuffer;
 
@@ -25,8 +26,10 @@
 import java.util.ArrayList;
 import java.util.List;
 
+import static com.mongodb.assertions.Assertions.assertFalse;
 import static com.mongodb.assertions.Assertions.assertTrue;
 import static com.mongodb.assertions.Assertions.notNull;
+import static java.lang.String.format;
 
 /**
  * This class is not part of the public API and may be removed or changed at any time
@@ -178,11 +181,17 @@ private ByteBuf getCurrentByteBuffer() {
         return getByteBufferAtIndex(curBufferIndex);
     }
 
+    private ByteBuf getNextByteBuffer() {
+        assertFalse(bufferList.get(curBufferIndex).hasRemaining());
+        return getByteBufferAtIndex(++curBufferIndex);
+    }
+
     private ByteBuf getByteBufferAtIndex(final int index) {
         if (bufferList.size() < index + 1) {
-            bufferList.add(bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT)
-                                                            ? MAX_BUFFER_SIZE
-                                                            : Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE)));
+            ByteBuf buffer = bufferProvider.getBuffer(index >= (MAX_SHIFT - INITIAL_SHIFT)
+                    ? MAX_BUFFER_SIZE
+                    : Math.min(INITIAL_BUFFER_SIZE << index, MAX_BUFFER_SIZE));
+            bufferList.add(buffer);
         }
         return bufferList.get(index);
     }
@@ -225,6 +234,16 @@ public List getByteBuffers() {
         return buffers;
     }
 
+    public List getDuplicateByteBuffers() {
+        ensureOpen();
+
+        List buffers = new ArrayList<>(bufferList.size());
+        for (final ByteBuf cur : bufferList) {
+            buffers.add(cur.duplicate().order(ByteOrder.LITTLE_ENDIAN));
+        }
+        return buffers;
+    }
+
 
     @Override
     public int pipe(final OutputStream out) throws IOException {
@@ -233,14 +252,18 @@ public int pipe(final OutputStream out) throws IOException {
         byte[] tmp = new byte[INITIAL_BUFFER_SIZE];
 
         int total = 0;
-        for (final ByteBuf cur : getByteBuffers()) {
-            ByteBuf dup = cur.duplicate();
-            while (dup.hasRemaining()) {
-                int numBytesToCopy = Math.min(dup.remaining(), tmp.length);
-                dup.get(tmp, 0, numBytesToCopy);
-                out.write(tmp, 0, numBytesToCopy);
+        List byteBuffers = getByteBuffers();
+        try {
+            for (final ByteBuf cur : byteBuffers) {
+                while (cur.hasRemaining()) {
+                    int numBytesToCopy = Math.min(cur.remaining(), tmp.length);
+                    cur.get(tmp, 0, numBytesToCopy);
+                    out.write(tmp, 0, numBytesToCopy);
+                }
+                total += cur.limit();
             }
-            total += dup.limit();
+        } finally {
+            byteBuffers.forEach(ByteBuf::release);
         }
         return total;
     }
@@ -360,4 +383,165 @@ private static final class BufferPositionPair {
             this.position = position;
         }
     }
+
+    protected int writeCharacters(final String str, final boolean checkNullTermination) {
+        int stringLength = str.length();
+        int sp = 0;
+        int prevPos = position;
+
+        ByteBuf curBuffer = getCurrentByteBuffer();
+        int curBufferPos = curBuffer.position();
+        int curBufferLimit = curBuffer.limit();
+        int remaining = curBufferLimit - curBufferPos;
+
+        if (curBuffer.hasArray()) {
+            byte[] dst = curBuffer.array();
+            int arrayOffset = curBuffer.arrayOffset();
+            if (remaining >= str.length() + 1) {
+                // Write ASCII characters directly to the array until we hit a non-ASCII character.
+                sp = writeOnArrayAscii(str, dst, arrayOffset + curBufferPos, checkNullTermination);
+                curBufferPos += sp;
+                // If the whole string was written as ASCII, append the null terminator.
+                if (sp == stringLength) {
+                    dst[arrayOffset + curBufferPos++] = 0;
+                    position += sp + 1;
+                    curBuffer.position(curBufferPos);
+                    return sp + 1;
+                }
+                // Otherwise, update the position to reflect the partial write.
+                position += sp;
+                curBuffer.position(curBufferPos);
+            }
+        }
+
+        // We get here, when the buffer is not backed by an array, or when the string contains at least one non-ASCII characters.
+        return writeOnBuffers(str,
+                checkNullTermination,
+                sp,
+                stringLength,
+                curBufferLimit,
+                curBufferPos,
+                curBuffer,
+                prevPos);
+    }
+
+    private int writeOnBuffers(final String str,
+                               final boolean checkNullTermination,
+                               final int stringPointer,
+                               final int stringLength,
+                               final int bufferLimit,
+                               final int bufferPos,
+                               final ByteBuf buffer,
+                               final int prevPos) {
+        int remaining;
+        int sp = stringPointer;
+        int curBufferPos = bufferPos;
+        int curBufferLimit = bufferLimit;
+        ByteBuf curBuffer = buffer;
+        while (sp < stringLength) {
+            remaining = curBufferLimit - curBufferPos;
+            int c = str.charAt(sp);
+
+            if (checkNullTermination && c == 0x0) {
+                throw new BsonSerializationException(
+                        format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp));
+            }
+
+            if (c < 0x80) {
+                if (remaining == 0) {
+                    curBuffer = getNextByteBuffer();
+                    curBufferPos = 0;
+                    curBufferLimit = curBuffer.limit();
+                }
+                curBuffer.put((byte) c);
+                curBufferPos++;
+                position++;
+            } else if (c < 0x800) {
+                if (remaining < 2) {
+                    // Not enough space: use write() to handle buffer boundary
+                    write((byte) (0xc0 + (c >> 6)));
+                    write((byte) (0x80 + (c & 0x3f)));
+
+                    curBuffer = getCurrentByteBuffer();
+                    curBufferPos = curBuffer.position();
+                    curBufferLimit = curBuffer.limit();
+                } else {
+                    curBuffer.put((byte) (0xc0 + (c >> 6)));
+                    curBuffer.put((byte) (0x80 + (c & 0x3f)));
+                    curBufferPos += 2;
+                    position += 2;
+                }
+            } else {
+                // Handle multibyte characters (may involve surrogate pairs).
+                c = Character.codePointAt(str, sp);
+                /*
+                 Malformed surrogate pairs are encoded as-is (3 byte code unit) without substituting any code point.
+                 This known deviation from the spec and current functionality remains for backward compatibility.
+                 Ticket: JAVA-5575
+                */
+                if (c < 0x10000) {
+                    if (remaining < 3) {
+                        write((byte) (0xe0 + (c >> 12)));
+                        write((byte) (0x80 + ((c >> 6) & 0x3f)));
+                        write((byte) (0x80 + (c & 0x3f)));
+
+                        curBuffer = getCurrentByteBuffer();
+                        curBufferPos = curBuffer.position();
+                        curBufferLimit = curBuffer.limit();
+                    } else {
+                        curBuffer.put((byte) (0xe0 + (c >> 12)));
+                        curBuffer.put((byte) (0x80 + ((c >> 6) & 0x3f)));
+                        curBuffer.put((byte) (0x80 + (c & 0x3f)));
+                        curBufferPos += 3;
+                        position += 3;
+                    }
+                } else {
+                    if (remaining < 4) {
+                        write((byte) (0xf0 + (c >> 18)));
+                        write((byte) (0x80 + ((c >> 12) & 0x3f)));
+                        write((byte) (0x80 + ((c >> 6) & 0x3f)));
+                        write((byte) (0x80 + (c & 0x3f)));
+
+                        curBuffer = getCurrentByteBuffer();
+                        curBufferPos = curBuffer.position();
+                        curBufferLimit = curBuffer.limit();
+                    } else {
+                        curBuffer.put((byte) (0xf0 + (c >> 18)));
+                        curBuffer.put((byte) (0x80 + ((c >> 12) & 0x3f)));
+                        curBuffer.put((byte) (0x80 + ((c >> 6) & 0x3f)));
+                        curBuffer.put((byte) (0x80 + (c & 0x3f)));
+                        curBufferPos += 4;
+                        position += 4;
+                    }
+                }
+            }
+            sp += Character.charCount(c);
+        }
+
+        getCurrentByteBuffer().put((byte) 0);
+        position++;
+        return position - prevPos;
+    }
+
+    private static int writeOnArrayAscii(final String str,
+                                         final byte[] dst,
+                                         final int arrayPosition,
+                                         final boolean checkNullTermination) {
+        int pos = arrayPosition;
+        int sp = 0;
+        // Fast common path: This tight loop is JIT-friendly (simple, no calls, few branches),
+        // It might be unrolled for performance.
+        for (; sp < str.length(); sp++, pos++) {
+            char c = str.charAt(sp);
+            if (checkNullTermination && c == 0) {
+                throw new BsonSerializationException(
+                        format("BSON cstring '%s' is not valid because it contains a null character " + "at index %d", str, sp));
+            }
+            if (c >= 0x80) {
+                break;
+            }
+            dst[pos] = (byte) c;
+        }
+        return sp;
+    }
 }
diff --git a/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java b/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java
index 47545753367..e7e0186e128 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/CompositeByteBuf.java
@@ -213,6 +213,16 @@ public byte[] array() {
         throw new UnsupportedOperationException("Not implemented yet!");
     }
 
+    @Override
+    public boolean hasArray() {
+        return false;
+    }
+
+    @Override
+    public int arrayOffset() {
+        throw new UnsupportedOperationException("Not implemented yet!");
+    }
+
     @Override
     public ByteBuf limit(final int newLimit) {
         if (newLimit < 0 || newLimit > capacity()) {
diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java
index cb6ba587419..cbe50aaada0 100644
--- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java
+++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyByteBuf.java
@@ -124,6 +124,16 @@ public byte[] array() {
         return proxied.array();
     }
 
+    @Override
+    public boolean hasArray() {
+        return proxied.hasArray();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return proxied.arrayOffset();
+    }
+
     @Override
     public int limit() {
         if (isWriting) {
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufSpecification.groovy
index 0e0755f65bd..d052d6b23f1 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufSpecification.groovy
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufSpecification.groovy
@@ -249,11 +249,7 @@ class ByteBufSpecification extends Specification {
         @Override
         ByteBuf getBuffer(final int size) {
             io.netty.buffer.ByteBuf buffer = allocator.directBuffer(size, size)
-            try {
-                new NettyByteBuf(buffer.retain())
-            } finally {
-                buffer.release();
-            }
+            new NettyByteBuf(buffer)
         }
     }
 }
diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java
index 560e3177360..bd055461115 100644
--- a/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java
+++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ByteBufferBsonOutputTest.java
@@ -16,16 +16,20 @@
 
 package com.mongodb.internal.connection;
 
+import com.google.common.primitives.Ints;
+import com.mongodb.internal.connection.netty.NettyByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
 import org.bson.BsonSerializationException;
 import org.bson.ByteBuf;
 import org.bson.ByteBufNIO;
+import org.bson.io.OutputBuffer;
 import org.bson.types.ObjectId;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
 
@@ -34,22 +38,83 @@
 import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
+import java.util.stream.Stream;
 
-import static com.mongodb.assertions.Assertions.fail;
 import static com.mongodb.internal.connection.ByteBufferBsonOutput.INITIAL_BUFFER_SIZE;
 import static com.mongodb.internal.connection.ByteBufferBsonOutput.MAX_BUFFER_SIZE;
+import static java.lang.Character.MAX_CODE_POINT;
+import static java.lang.Character.MAX_HIGH_SURROGATE;
+import static java.lang.Character.MAX_LOW_SURROGATE;
+import static java.lang.Character.MIN_HIGH_SURROGATE;
+import static java.lang.Character.MIN_LOW_SURROGATE;
+import static java.lang.Integer.reverseBytes;
+import static java.lang.String.format;
 import static java.util.Arrays.asList;
 import static java.util.Arrays.copyOfRange;
+import static java.util.Collections.emptyList;
 import static java.util.stream.Collectors.toList;
+import static java.util.stream.IntStream.range;
+import static java.util.stream.IntStream.rangeClosed;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 final class ByteBufferBsonOutputTest {
+
+    private static final List ALL_CODE_POINTS_EXCLUDING_SURROGATES = Stream.concat(
+                    range(1, MIN_HIGH_SURROGATE).boxed(),
+                    rangeClosed(MAX_LOW_SURROGATE + 1, MAX_CODE_POINT).boxed())
+            .collect(toList());
+    private static final List ALL_SURROGATE_CODE_POINTS = Stream.concat(
+            range(MIN_LOW_SURROGATE, MAX_LOW_SURROGATE).boxed(),
+            range(MIN_HIGH_SURROGATE, MAX_HIGH_SURROGATE).boxed()).collect(toList());
+    public static final List ALL_UTF_16_CODE_POINTS_FORMED_BY_SURROGATE_PAIRS = rangeClosed(0x10000, MAX_CODE_POINT)
+            .boxed()
+            .collect(toList());
+
+    static Stream bufferProviders() {
+        return Stream.of(
+                size -> new NettyByteBuf(PooledByteBufAllocator.DEFAULT.directBuffer(size)),
+                size -> new NettyByteBuf(PooledByteBufAllocator.DEFAULT.heapBuffer(size)),
+                new PowerOfTwoBufferPool(),
+                size -> new ByteBufNIO(ByteBuffer.wrap(new byte[size + 5], 2, size).slice()),  //different array offsets
+                size -> new ByteBufNIO(ByteBuffer.wrap(new byte[size + 4], 3, size).slice()),  //different array offsets
+                size -> new ByteBufNIO(ByteBuffer.allocate(size)) {
+                    @Override
+                    public boolean hasArray() {
+                        return false;
+                    }
+
+                    @Override
+                    public byte[] array() {
+                        return Assertions.fail("array() is called, when hasArray() returns false");
+                    }
+
+                    @Override
+                    public int arrayOffset() {
+                        return Assertions.fail("arrayOffset() is called, when hasArray() returns false");
+                    }
+                }
+        );
+    }
+
+    public static Stream bufferProvidersWithBranches() {
+        List arguments = new ArrayList<>();
+        List collect = bufferProviders().collect(toList());
+        for (BufferProvider bufferProvider : collect) {
+            arguments.add(Arguments.of(true, bufferProvider));
+            arguments.add(Arguments.of(false, bufferProvider));
+        }
+        return arguments.stream();
+    }
+
+
     @DisplayName("constructor should throw if buffer provider is null")
     @Test
     @SuppressWarnings("try")
@@ -87,7 +152,7 @@ void positionAndSizeShouldBe0AfterConstructor(final String branchState) {
                     break;
                 }
                 default: {
-                    throw fail(branchState);
+                    throw com.mongodb.assertions.Assertions.fail(branchState);
                 }
             }
             assertEquals(0, out.getPosition());
@@ -97,9 +162,9 @@ void positionAndSizeShouldBe0AfterConstructor(final String branchState) {
 
     @DisplayName("should write a byte")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteByte(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteByte(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte v = 11;
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -161,9 +226,9 @@ void shouldThrowExceptionWhenWriteByteAtInvalidPosition(final boolean useBranch)
 
     @DisplayName("should write a bytes")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteBytes(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteBytes(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = {1, 2, 3, 4};
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -180,9 +245,9 @@ void shouldWriteBytes(final boolean useBranch) {
 
     @DisplayName("should write bytes from offset until length")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteBytesFromOffsetUntilLength(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteBytesFromOffsetUntilLength(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = {0, 1, 2, 3, 4, 5};
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -199,9 +264,9 @@ void shouldWriteBytesFromOffsetUntilLength(final boolean useBranch) {
 
     @DisplayName("should write a little endian Int32")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteLittleEndianInt32(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteLittleEndianInt32(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             int v = 0x1020304;
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -218,9 +283,9 @@ void shouldWriteLittleEndianInt32(final boolean useBranch) {
 
     @DisplayName("should write a little endian Int64")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteLittleEndianInt64(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteLittleEndianInt64(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             long v = 0x102030405060708L;
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -237,9 +302,9 @@ void shouldWriteLittleEndianInt64(final boolean useBranch) {
 
     @DisplayName("should write a double")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteDouble(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteDouble(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             double v = Double.longBitsToDouble(0x102030405060708L);
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -256,9 +321,9 @@ void shouldWriteDouble(final boolean useBranch) {
 
     @DisplayName("should write an ObjectId")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteObjectId(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteObjectId(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] objectIdAsByteArray = {12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1};
             ObjectId v = new ObjectId(objectIdAsByteArray);
             if (useBranch) {
@@ -276,9 +341,9 @@ void shouldWriteObjectId(final boolean useBranch) {
 
     @DisplayName("should write an empty string")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteEmptyString(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteEmptyString(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -295,9 +360,9 @@ void shouldWriteEmptyString(final boolean useBranch) {
 
     @DisplayName("should write an ASCII string")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteAsciiString(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteAsciiString(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "Java";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -314,9 +379,9 @@ void shouldWriteAsciiString(final boolean useBranch) {
 
     @DisplayName("should write a UTF-8 string")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteUtf8String(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteUtf8String(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "\u0900";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -333,9 +398,9 @@ void shouldWriteUtf8String(final boolean useBranch) {
 
     @DisplayName("should write an empty CString")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteEmptyCString(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteEmptyCString(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -352,9 +417,9 @@ void shouldWriteEmptyCString(final boolean useBranch) {
 
     @DisplayName("should write an ASCII CString")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteAsciiCString(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteAsciiCString(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "Java";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -371,9 +436,9 @@ void shouldWriteAsciiCString(final boolean useBranch) {
 
     @DisplayName("should write a UTF-8 CString")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteUtf8CString(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteUtf8CString(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "\u0900";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -390,9 +455,9 @@ void shouldWriteUtf8CString(final boolean useBranch) {
 
     @DisplayName("should get byte buffers as little endian")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldGetByteBuffersAsLittleEndian(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldGetByteBuffersAsLittleEndian(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = {1, 0, 0, 0};
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -407,9 +472,9 @@ void shouldGetByteBuffersAsLittleEndian(final boolean useBranch) {
 
     @DisplayName("null character in CString should throw SerializationException")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void nullCharacterInCStringShouldThrowSerializationException(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void nullCharacterInCStringShouldThrowSerializationException(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "hell\u0000world";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -423,9 +488,9 @@ void nullCharacterInCStringShouldThrowSerializationException(final boolean useBr
 
     @DisplayName("null character in String should not throw SerializationException")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void nullCharacterInStringShouldNotThrowSerializationException(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void nullCharacterInStringShouldNotThrowSerializationException(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             String v = "h\u0000i";
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -438,11 +503,25 @@ void nullCharacterInStringShouldNotThrowSerializationException(final boolean use
         }
     }
 
+
+    public static Stream writeInt32AtPositionShouldThrowWithInvalidPosition() {
+        return bufferProvidersWithBranches().flatMap(arguments -> {
+            Object[] args = arguments.get();
+            boolean useBranch = (boolean) args[0];
+            BufferProvider bufferProvider = (BufferProvider) args[1];
+            return Stream.of(
+                    Arguments.of(useBranch, -1, bufferProvider),
+                    Arguments.of(useBranch, 1, bufferProvider)
+            );
+        });
+    }
+
     @DisplayName("write Int32 at position should throw with invalid position")
     @ParameterizedTest
-    @CsvSource({"false, -1", "false, 1", "true, -1", "true, 1"})
-    void writeInt32AtPositionShouldThrowWithInvalidPosition(final boolean useBranch, final int position) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource
+    void writeInt32AtPositionShouldThrowWithInvalidPosition(final boolean useBranch, final int position,
+                                                            final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = {1, 2, 3, 4};
             int v2 = 0x1020304;
             if (useBranch) {
@@ -459,9 +538,9 @@ void writeInt32AtPositionShouldThrowWithInvalidPosition(final boolean useBranch,
 
     @DisplayName("should write Int32 at position")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldWriteInt32AtPosition(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldWriteInt32AtPosition(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             Consumer lastAssertions = effectiveOut -> {
                 assertArrayEquals(new byte[] {4, 3, 2, 1}, copyOfRange(effectiveOut.toByteArray(), 1023, 1027), "the position is not in the first buffer");
                 assertEquals(1032, effectiveOut.getPosition());
@@ -492,9 +571,22 @@ void shouldWriteInt32AtPosition(final boolean useBranch) {
         }
     }
 
+    public static Stream truncateShouldThrowWithInvalidPosition() {
+        return bufferProvidersWithBranches().flatMap(arguments -> {
+                    Object[] args = arguments.get();
+                    boolean useBranch = (boolean) args[0];
+                    BufferProvider bufferProvider = (BufferProvider) args[1];
+                    return Stream.of(
+                            Arguments.of(useBranch, -1, bufferProvider),
+                            Arguments.of(useBranch, 5, bufferProvider)
+                    );
+                }
+        );
+    }
+
     @DisplayName("truncate should throw with invalid position")
     @ParameterizedTest
-    @CsvSource({"false, -1", "false, 5", "true, -1", "true, 5"})
+    @MethodSource
     void truncateShouldThrowWithInvalidPosition(final boolean useBranch, final int position) {
         try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
             byte[] v = {1, 2, 3, 4};
@@ -512,9 +604,9 @@ void truncateShouldThrowWithInvalidPosition(final boolean useBranch, final int p
 
     @DisplayName("should truncate to position")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldTruncateToPosition(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldTruncateToPosition(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = {1, 2, 3, 4};
             byte[] v2 = new byte[1024];
             if (useBranch) {
@@ -536,15 +628,23 @@ void shouldTruncateToPosition(final boolean useBranch) {
 
     @DisplayName("should grow to maximum allowed size of byte buffer")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldGrowToMaximumAllowedSizeOfByteBuffer(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldGrowToMaximumAllowedSizeOfByteBuffer(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = new byte[0x2000000];
             ThreadLocalRandom.current().nextBytes(v);
-            Consumer assertByteBuffers = effectiveOut -> assertEquals(
-                    asList(1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15, 1 << 16, 1 << 17, 1 << 18, 1 << 19, 1 << 20,
-                            1 << 21, 1 << 22, 1 << 23, 1 << 24, 1 << 24),
-                    effectiveOut.getByteBuffers().stream().map(ByteBuf::capacity).collect(toList()));
+            Consumer assertByteBuffers = effectiveOut -> {
+                List byteBuffers = new ArrayList<>();
+                try {
+                    byteBuffers = effectiveOut.getByteBuffers();
+                    assertEquals(
+                            asList(1 << 10, 1 << 11, 1 << 12, 1 << 13, 1 << 14, 1 << 15, 1 << 16, 1 << 17, 1 << 18, 1 << 19, 1 << 20,
+                                    1 << 21, 1 << 22, 1 << 23, 1 << 24, 1 << 24),
+                            byteBuffers.stream().map(ByteBuf::capacity).collect(toList()));
+                } finally {
+                    byteBuffers.forEach(ByteBuf::release);
+                }
+            };
             Consumer assertions = effectiveOut -> {
                 effectiveOut.writeBytes(v);
                 assertEquals(v.length, effectiveOut.size());
@@ -570,9 +670,9 @@ void shouldGrowToMaximumAllowedSizeOfByteBuffer(final boolean useBranch) {
 
     @DisplayName("should pipe")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
-    void shouldPipe(final boolean useBranch) throws IOException {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProvidersWithBranches")
+    void shouldPipe(final boolean useBranch, final BufferProvider bufferProvider) throws IOException {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = new byte[1027];
             BiConsumer assertions = (effectiveOut, baos) -> {
                 assertArrayEquals(v, baos.toByteArray());
@@ -606,10 +706,10 @@ void shouldPipe(final boolean useBranch) throws IOException {
 
     @DisplayName("should close")
     @ParameterizedTest
-    @ValueSource(booleans = {false, true})
+    @MethodSource("bufferProvidersWithBranches")
     @SuppressWarnings("try")
-    void shouldClose(final boolean useBranch) {
-        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    void shouldClose(final boolean useBranch, final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) {
             byte[] v = new byte[1027];
             if (useBranch) {
                 try (ByteBufferBsonOutput.Branch branch = out.branch()) {
@@ -673,10 +773,11 @@ void shouldHandleMixedBranchingAndTruncating(final int reps) throws CharacterCod
         }
     }
 
-    @Test
+    @ParameterizedTest
     @DisplayName("should throw exception when calling writeInt32 at absolute position where integer would not fit")
-    void shouldThrowExceptionWhenIntegerDoesNotFitWriteInt32() {
-        try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProviders")
+    void shouldThrowExceptionWhenIntegerDoesNotFitWriteInt32(final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(bufferProvider)) {
             // Write 10 bytes (position becomes 10)
             for (int i = 0; i < 10; i++) {
                 output.writeByte(0);
@@ -689,18 +790,19 @@ void shouldThrowExceptionWhenIntegerDoesNotFitWriteInt32() {
         }
     }
 
-    @Test
+    @ParameterizedTest
     @DisplayName("should throw exception when calling writeInt32 with negative absolute position")
-    void shouldThrowExceptionWhenAbsolutePositionIsNegative() {
-        try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(new SimpleBufferProvider())) {
+    @MethodSource("bufferProviders")
+    void shouldThrowExceptionWhenAbsolutePositionIsNegative(final BufferProvider bufferProvider) {
+        try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(bufferProvider)) {
             Assertions.assertThrows(IllegalArgumentException.class, () ->
                     output.writeInt32(-1, 5678)
             );
         }
     }
 
-    static java.util.stream.Stream shouldWriteInt32AbsoluteValueWithinSpanningBuffers() {
-        return java.util.stream.Stream.of(
+    static Stream shouldWriteInt32AbsoluteValueWithinSpanningBuffers() {
+        return bufferProviders().flatMap(bufferProvider -> Stream.of(
                 Arguments.of(
                         0, // absolute position
                         0x09080706, // int value
@@ -711,22 +813,28 @@ static java.util.stream.Stream shouldWriteInt32AbsoluteValueWithinSpa
                         asList(
                                 // expected BsonByteBufferOutput data
                                 new byte[]{0x06, 0x07, 0x08, 0x09},
-                                new byte[]{4, 5, 6, 7})),
+                                new byte[]{4, 5, 6, 7}),
+                        bufferProvider // buffer to write data to
+                ),
                 Arguments.of(1, 0x09080706,
                         asList(new byte[]{0, 1, 2, 3}, new byte[]{4, 5, 6, 7}),
-                        asList(new byte[]{0, 0x06, 0x07, 0x08}, new byte[]{0x09, 5, 6, 7})),
+                        asList(new byte[]{0, 0x06, 0x07, 0x08}, new byte[]{0x09, 5, 6, 7}),
+                        bufferProvider),
                 Arguments.of(2, 0x09080706,
                         asList(new byte[]{0, 1, 2, 3}, new byte[]{4, 5, 6, 7}),
-                        asList(new byte[]{0, 1, 0x06, 0x07}, new byte[]{0x08, 0x09, 6, 7})
+                        asList(new byte[]{0, 1, 0x06, 0x07}, new byte[]{0x08, 0x09, 6, 7}),
+                        bufferProvider
                 ),
                 Arguments.of(3, 0x09080706,
                         asList(new byte[]{0, 1, 2, 3}, new byte[]{4, 5, 6, 7}),
-                        asList(new byte[]{0, 1, 2, 0x06}, new byte[]{0x07, 0x08, 0x09, 7})
+                        asList(new byte[]{0, 1, 2, 0x06}, new byte[]{0x07, 0x08, 0x09, 7}),
+                        bufferProvider
                 ),
                 Arguments.of(4, 0x09080706,
                         asList(new byte[]{0, 1, 2, 3}, new byte[]{4, 5, 6, 7}),
-                        asList(new byte[]{0, 1, 2, 3}, new byte[]{0x06, 0x07, 0x08, 0x09})
-                ));
+                        asList(new byte[]{0, 1, 2, 3}, new byte[]{0x06, 0x07, 0x08, 0x09}),
+                        bufferProvider
+                )));
     }
 
     @ParameterizedTest
@@ -735,10 +843,11 @@ void shouldWriteInt32AbsoluteValueWithinSpanningBuffers(
             final int absolutePosition,
             final int intValue,
             final List initialData,
-            final List expectedBuffers) {
+            final List expectedBuffers,
+            final BufferProvider bufferProvider) {
 
-        try (ByteBufferBsonOutput output =
-                     new ByteBufferBsonOutput(size -> new ByteBufNIO(ByteBuffer.allocate(4)))) {
+        List buffers = new ArrayList<>();
+        try (ByteBufferBsonOutput output = new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) {
 
             //given
             initialData.forEach(output::writeBytes);
@@ -747,17 +856,16 @@ void shouldWriteInt32AbsoluteValueWithinSpanningBuffers(
             output.writeInt32(absolutePosition, intValue);
 
             //then
-            List buffers = output.getByteBuffers();
+            buffers = output.getByteBuffers();
             assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
-            for (int i = 0; i < expectedBuffers.size(); i++) {
-                assertArrayEquals(expectedBuffers.get(i), buffers.get(i).array(),
-                        "Buffer " + i + " contents mismatch");
-            }
+            assertBufferContents(expectedBuffers, buffers);
+        } finally {
+            buffers.forEach(ByteBuf::release);
         }
     }
 
-    static java.util.stream.Stream int32SpanningBuffersData() {
-        return java.util.stream.Stream.of(
+    static Stream int32SpanningBuffersData() {
+        return bufferProviders().flatMap(bufferProvider -> Stream.of(
                 // Test case 1: No initial data; entire int written into one buffer.
                 Arguments.of(0x09080706,
                         asList(
@@ -767,28 +875,33 @@ static java.util.stream.Stream int32SpanningBuffersData() {
                                 // expected BsonByteBufferOutput data
                                 new byte[]{0x06, 0x07, 0x08, 0x09}),
                         4, // expected overall position after write (0 + 4)
-                        4  // expected last buffer position (buffer fully written)
+                        4,  // expected last buffer position (buffer fully written)
+                        bufferProvider //buffer to write data to
                 ),
                 Arguments.of(0x09080706,
                         asList(new byte[]{0}),
-                        asList(new byte[]{0, 0x06, 0x07, 0x08}, new byte[]{0x09, 0, 0, 0}), 5, 1
+                        asList(new byte[]{0, 0x06, 0x07, 0x08}, new byte[]{0x09, 0, 0, 0}), 5, 1,
+                        bufferProvider
                 ),
                 Arguments.of(0x09080706,
                         asList(new byte[]{0, 1}),
-                        asList(new byte[]{0, 1, 0x06, 0x07}, new byte[]{0x08, 0x09, 0, 0}), 6, 2
+                        asList(new byte[]{0, 1, 0x06, 0x07}, new byte[]{0x08, 0x09, 0, 0}), 6, 2,
+                        bufferProvider
                 ),
                 Arguments.of(0x09080706,
                         asList(new byte[]{0, 1, 2}),
-                        asList(new byte[]{0, 1, 2, 0x06}, new byte[]{0x07, 0x08, 0x09, 0}), 7, 3
+                        asList(new byte[]{0, 1, 2, 0x06}, new byte[]{0x07, 0x08, 0x09, 0}), 7, 3,
+                        bufferProvider
                 ),
                 Arguments.of(0x09080706,
                         asList(new byte[]{0, 1, 2, 3}),
-                        asList(new byte[]{0, 1, 2, 3}, new byte[]{0x06, 0x07, 0x08, 0x09}), 8, 4
-                ));
+                        asList(new byte[]{0, 1, 2, 3}, new byte[]{0x06, 0x07, 0x08, 0x09}), 8, 4,
+                        bufferProvider
+                )));
     }
 
-    static java.util.stream.Stream int64SpanningBuffersData() {
-        return java.util.stream.Stream.of(
+    static Stream int64SpanningBuffersData() {
+        return bufferProviders().flatMap(bufferProvider -> Stream.of(
                 // Test case 1: No initial data; entire long written into one buffer.
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(
@@ -799,48 +912,56 @@ static java.util.stream.Stream int64SpanningBuffersData() {
                                 new byte[]{0x11, 0x10, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A}
                         ),
                         8, // expected overall position after write (0 + 8)
-                        8  // expected last buffer position (buffer fully written)
+                        8,  // expected last buffer position (buffer fully written)
+                        bufferProvider //buffer to write data to
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0}),
                         asList(new byte[]{0, 0x11, 0x10, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B}, new byte[]{0x0A, 0, 0, 0, 0, 0, 0, 0}),
-                        9, 1
+                        9, 1,
+                        bufferProvider
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1}),
                         asList(new byte[]{0, 1, 0x11, 0x10, 0x0F, 0x0E, 0x0D, 0x0C}, new byte[]{0x0B, 0x0A, 0, 0, 0, 0, 0, 0}),
-                        10, 2
+                        10, 2,
+                        bufferProvider
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1, 2}),
                         asList(new byte[]{0, 1, 2, 0x11, 0x10, 0x0F, 0x0E, 0x0D}, new byte[]{0x0C, 0x0B, 0x0A, 0, 0, 0, 0, 0}),
-                        11, 3
+                        11, 3,
+                        bufferProvider
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1, 2, 3}),
                         asList(new byte[]{0, 1, 2, 3, 0x11, 0x10, 0x0F, 0x0E}, new byte[]{0x0D, 0x0C, 0x0B, 0x0A, 0, 0, 0, 0}),
-                        12, 4
+                        12, 4,
+                        bufferProvider
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1, 2, 3, 4}),
                         asList(new byte[]{0, 1, 2, 3, 4, 0x11, 0x10, 0x0F}, new byte[]{0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0, 0, 0}),
-                        13, 5
+                        13, 5,
+                        bufferProvider
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1, 2, 3, 4, 5}),
                         asList(new byte[]{0, 1, 2, 3, 4, 5, 0x11, 0x10}, new byte[]{0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0, 0}),
-                        14, 6
+                        14, 6,
+                        bufferProvider
                 ), Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1, 2, 3, 4, 5, 6}),
                         asList(new byte[]{0, 1, 2, 3, 4, 5, 6, 0x11}, new byte[]{0x10, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A, 0}),
-                        15, 7
+                        15, 7,
+                        bufferProvider
                 ),
                 Arguments.of(0x0A0B0C0D0E0F1011L,
                         asList(new byte[]{0, 1, 2, 3, 4, 5, 6, 7}),
                         asList(new byte[]{0, 1, 2, 3, 4, 5, 6, 7}, new byte[]{0x11, 0x10, 0x0F, 0x0E, 0x0D, 0x0C, 0x0B, 0x0A}),
-                        16, 8
-                )
-        );
+                        16, 8,
+                        bufferProvider
+                )));
     }
 
     @ParameterizedTest
@@ -850,10 +971,11 @@ void shouldWriteInt32WithinSpanningBuffers(
             final List initialData,
             final List expectedBuffers,
             final int expectedOutputPosition,
-            final int expectedLastBufferPosition) {
+            final int expectedLastBufferPosition,
+            final BufferProvider bufferProvider) {
 
         try (ByteBufferBsonOutput output =
-                     new ByteBufferBsonOutput(size -> new ByteBufNIO(ByteBuffer.allocate(4)))) {
+                     new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Integer.BYTES))) {
 
             //given
             initialData.forEach(output::writeBytes);
@@ -865,10 +987,7 @@ void shouldWriteInt32WithinSpanningBuffers(
             //getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
             List buffers = output.getByteBuffers();
             assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
-            for (int i = 0; i < expectedBuffers.size(); i++) {
-                assertArrayEquals(expectedBuffers.get(i), buffers.get(i).array(),
-                        "Buffer " + i + " contents mismatch");
-            }
+            assertBufferContents(expectedBuffers, buffers);
 
             assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
             assertEquals(expectedOutputPosition, output.getPosition());
@@ -882,10 +1001,11 @@ void shouldWriteInt64WithinSpanningBuffers(
             final List initialData,
             final List expectedBuffers,
             final int expectedOutputPosition,
-            final int expectedLastBufferPosition) {
+            final int expectedLastBufferPosition,
+            final BufferProvider bufferProvider) {
 
         try (ByteBufferBsonOutput output =
-                     new ByteBufferBsonOutput(size -> new ByteBufNIO(ByteBuffer.allocate(8)))) {
+                     new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {
 
             //given
             initialData.forEach(output::writeBytes);
@@ -897,10 +1017,7 @@ void shouldWriteInt64WithinSpanningBuffers(
             //getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
             List buffers = output.getByteBuffers();
             assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
-            for (int i = 0; i < expectedBuffers.size(); i++) {
-                assertArrayEquals(expectedBuffers.get(i), buffers.get(i).array(),
-                        "Buffer " + i + " contents mismatch");
-            }
+            assertBufferContents(expectedBuffers, buffers);
 
             assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
             assertEquals(expectedOutputPosition, output.getPosition());
@@ -914,10 +1031,11 @@ void shouldWriteDoubleWithinSpanningBuffers(
             final List initialData,
             final List expectedBuffers,
             final int expectedOutputPosition,
-            final int expectedLastBufferPosition) {
+            final int expectedLastBufferPosition,
+            final BufferProvider bufferProvider) {
 
         try (ByteBufferBsonOutput output =
-                     new ByteBufferBsonOutput(size -> new ByteBufNIO(ByteBuffer.allocate(8)))) {
+                     new ByteBufferBsonOutput(size -> bufferProvider.getBuffer(Long.BYTES))) {
 
             //given
             initialData.forEach(output::writeBytes);
@@ -929,13 +1047,553 @@ void shouldWriteDoubleWithinSpanningBuffers(
             //getByteBuffers returns ByteBuffers with limit() set to position, position set to 0.
             List buffers = output.getByteBuffers();
             assertEquals(expectedBuffers.size(), buffers.size(), "Number of buffers mismatch");
-            for (int i = 0; i < expectedBuffers.size(); i++) {
-                assertArrayEquals(expectedBuffers.get(i), buffers.get(i).array(),
-                        "Buffer " + i + " contents mismatch");
-            }
+            assertBufferContents(expectedBuffers, buffers);
 
             assertEquals(expectedLastBufferPosition, buffers.get(buffers.size() - 1).limit());
             assertEquals(expectedOutputPosition, output.getPosition());
         }
     }
+
+    private static void assertBufferContents(final List expectedBuffersContent,
+                                             final List actualByteBuffers) {
+        for (int i = 0; i < expectedBuffersContent.size(); i++) {
+            ByteBuf byteBuf = actualByteBuffers.get(i);
+            byte[] expectedBufferBytes = expectedBuffersContent.get(i);
+            byte[] actualBufferBytes =
+                    new byte[byteBuf.capacity()]; //capacity is used because we want to compare internal ByteBuffer arrays.
+            byteBuf.get(actualBufferBytes, 0, byteBuf.limit());
+
+            assertEquals(expectedBufferBytes.length, byteBuf.capacity());
+            assertArrayEquals(expectedBufferBytes, actualBufferBytes,
+                    "Buffer " + i + " contents mismatch");
+        }
+    }
+
+    /*
+   Tests that all Unicode code points are correctly encoded in UTF-8 when:
+   - The buffer has just enough capacity for the UTF-8 string plus a null terminator.
+   - The encoded string may span multiple buffers.
+
+   To test edge conditions, the test writes a UTF-8 CString/String at various starting offsets. This simulates scenarios where data
+   doesn't start at index 0, forcing the string to span multiple buffers.
+
+   For example, assume the encoded string requires N bytes and null terminator:
+   1. startingOffset == 0:
+      [ S S S ... S NULL ]
+
+   2. startingOffset == 2:
+      ("X" represents dummy bytes written before the string.)
+      Buffer 1: [ X X | S S S ... ] (Buffer 1 runs out of space, the remaining bytes (including the NULL) are written in Buffer 2.)
+      Buffer 2: [ S NULL ...]
+
+   3. startingOffset == bufferAllocationSize:
+      Buffer 1: [ X X X ... X ]
+      Buffer 2: [ S S S ... S NULL ]
+  */
+    @Nested
+    @DisplayName("UTF-8 String and CString Buffer Boundary Tests")
+    class Utf8StringTests {
+
+        @DisplayName("should write UTF-8 CString across buffers")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteCStringAcrossBuffersUTF8(final BufferProvider bufferProvider) throws IOException {
+            for (Integer codePoint : ALL_CODE_POINTS_EXCLUDING_SURROGATES) {
+                String stringToEncode = new String(Character.toChars(codePoint)) + "a";
+                byte[] expectedStringEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                int bufferAllocationSize = expectedStringEncoding.length + "\u0000".length();
+                testWriteCStringAcrossBuffers(bufferProvider, codePoint, bufferAllocationSize, stringToEncode, expectedStringEncoding);
+            }
+        }
+
+        @DisplayName("should write UTF-8 CString across buffers with a branch")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteCStringAcrossBuffersUTF8WithBranch(final BufferProvider bufferProvider) throws IOException {
+            for (Integer codePoint : ALL_CODE_POINTS_EXCLUDING_SURROGATES) {
+                String stringToEncode = new String(Character.toChars(codePoint)) + "a";
+                int bufferAllocationSize = stringToEncode.getBytes(StandardCharsets.UTF_8).length + "\u0000".length();
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+
+                testWriteCStringAcrossBufferWithBranch(bufferProvider, codePoint, bufferAllocationSize, stringToEncode, expectedEncoding);
+            }
+        }
+
+        @DisplayName("should write UTF-8 String across buffers")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteStringAcrossBuffersUTF8(final BufferProvider bufferProvider) throws IOException {
+            for (Integer codePoint : ALL_CODE_POINTS_EXCLUDING_SURROGATES) {
+                // given
+                String stringToEncode = new String(Character.toChars(codePoint)) + "a";
+                //4 bytes for the length prefix, bytes for encoded String, and 1 byte for the null terminator
+                int bufferAllocationSize = Integer.BYTES + stringToEncode.getBytes(StandardCharsets.UTF_8).length + "\u0000".length();
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                testWriteStringAcrossBuffers(bufferProvider,
+                        codePoint,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        expectedEncoding);
+            }
+        }
+
+        @DisplayName("should write UTF-8 String across buffers with branch")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteStringAcrossBuffersUTF8WithBranch(final BufferProvider bufferProvider) throws IOException {
+            for (Integer codePoint : ALL_CODE_POINTS_EXCLUDING_SURROGATES) {
+                String stringToEncode = new String(Character.toChars(codePoint)) + "a";
+                //4 bytes for the length prefix, bytes for encoded String, and 1 byte for the null terminator
+                int bufferAllocationSize = Integer.BYTES + stringToEncode.getBytes(StandardCharsets.UTF_8).length + "\u0000".length();
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                testWriteStringAcrossBuffersWithBranch(
+                        bufferProvider,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        codePoint,
+                        expectedEncoding);
+            }
+        }
+
+        /*
+           Tests that malformed surrogate pairs are encoded as-is without substituting any code point.
+           This known bug and corresponding test remain for backward compatibility.
+           Ticket: JAVA-5575
+         */
+        @DisplayName("should write malformed surrogate CString across buffers")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteCStringWithMalformedSurrogates(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_SURROGATE_CODE_POINTS) {
+                byte[] expectedEncoding = new byte[]{
+                        (byte) (0xE0 | ((surrogateCodePoint >> 12) & 0x0F)),
+                        (byte) (0x80 | ((surrogateCodePoint >> 6) & 0x3F)),
+                        (byte) (0x80 | (surrogateCodePoint & 0x3F))
+                };
+                String str = new String(Character.toChars(surrogateCodePoint));
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteCStringAcrossBuffers(
+                        bufferProvider,
+                        surrogateCodePoint,
+                        bufferAllocationSize,
+                        str,
+                        expectedEncoding);
+            }
+        }
+
+        /*
+           Tests that malformed surrogate pairs are encoded as-is without substituting any code point.
+           This known bug and corresponding test remain for backward compatibility.
+           Ticket: JAVA-5575
+         */
+        @DisplayName("should write malformed surrogate CString across buffers with branch")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteCStringWithMalformedSurrogatesWithBranch(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_SURROGATE_CODE_POINTS) {
+                byte[] expectedEncoding = new byte[]{
+                        (byte) (0xE0 | ((surrogateCodePoint >> 12) & 0x0F)),
+                        (byte) (0x80 | ((surrogateCodePoint >> 6) & 0x3F)),
+                        (byte) (0x80 | (surrogateCodePoint & 0x3F))
+                };
+                String str = new String(Character.toChars(surrogateCodePoint));
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteCStringAcrossBufferWithBranch(
+                        bufferProvider,
+                        surrogateCodePoint,
+                        bufferAllocationSize,
+                        str,
+                        expectedEncoding);
+            }
+        }
+
+        @DisplayName("should write surrogate CString across buffers")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteCStringWithSurrogatePairs(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_UTF_16_CODE_POINTS_FORMED_BY_SURROGATE_PAIRS) {
+                String stringToEncode = new String(toSurrogatePair(surrogateCodePoint));
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteCStringAcrossBuffers(
+                        bufferProvider,
+                        surrogateCodePoint,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        expectedEncoding);
+            }
+        }
+
+        @DisplayName("should write surrogate CString across buffers with branch")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteCStringWithSurrogatePairsWithBranch(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_UTF_16_CODE_POINTS_FORMED_BY_SURROGATE_PAIRS) {
+                String stringToEncode = new String(toSurrogatePair(surrogateCodePoint));
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteCStringAcrossBufferWithBranch(
+                        bufferProvider,
+                        surrogateCodePoint,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        expectedEncoding);
+            }
+        }
+
+        @DisplayName("should write surrogate String across buffers")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteStringWithSurrogatePairs(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_UTF_16_CODE_POINTS_FORMED_BY_SURROGATE_PAIRS) {
+                String stringToEncode = new String(toSurrogatePair(surrogateCodePoint));
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteStringAcrossBuffers(
+                        bufferProvider,
+                        surrogateCodePoint,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        expectedEncoding);
+            }
+        }
+
+        @DisplayName("should write surrogate String across buffers with branch")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteStringWithSurrogatePairsWithBranch(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_UTF_16_CODE_POINTS_FORMED_BY_SURROGATE_PAIRS) {
+                String stringToEncode = new String(toSurrogatePair(surrogateCodePoint));
+                byte[] expectedEncoding = stringToEncode.getBytes(StandardCharsets.UTF_8);
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteStringAcrossBuffersWithBranch(
+                        bufferProvider,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        surrogateCodePoint,
+                        expectedEncoding);
+            }
+        }
+
+        /*
+           Tests that malformed surrogate pairs are encoded as-is without substituting any code point.
+           This known bug and corresponding test remain for backward compatibility.
+           Ticket: JAVA-5575
+         */
+        @DisplayName("should write malformed surrogate String across buffers")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteStringWithMalformedSurrogates(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_SURROGATE_CODE_POINTS) {
+                byte[] expectedEncoding = new byte[]{
+                        (byte) (0xE0 | ((surrogateCodePoint >> 12) & 0x0F)),
+                        (byte) (0x80 | ((surrogateCodePoint >> 6) & 0x3F)),
+                        (byte) (0x80 | (surrogateCodePoint & 0x3F))
+                };
+                String stringToEncode = new String(Character.toChars(surrogateCodePoint));
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteStringAcrossBuffers(
+                        bufferProvider,
+                        surrogateCodePoint,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        expectedEncoding);
+            }
+        }
+
+        /*
+          Tests that malformed surrogate pairs are encoded as-is without substituting any code point.
+          This known bug and corresponding test remain for backward compatibility.
+          Ticket: JAVA-5575
+        */
+        @DisplayName("should write malformed surrogate String across buffers with branch")
+        @ParameterizedTest
+        @MethodSource("com.mongodb.internal.connection.ByteBufferBsonOutputTest#bufferProviders")
+        void shouldWriteStringWithMalformedSurrogatesWithBranch(final BufferProvider bufferProvider) throws IOException {
+            for (Integer surrogateCodePoint : ALL_SURROGATE_CODE_POINTS) {
+                byte[] expectedEncoding = new byte[]{
+                        (byte) (0xE0 | ((surrogateCodePoint >> 12) & 0x0F)),
+                        (byte) (0x80 | ((surrogateCodePoint >> 6) & 0x3F)),
+                        (byte) (0x80 | (surrogateCodePoint & 0x3F))
+                };
+                String stringToEncode = new String(Character.toChars(surrogateCodePoint));
+                int bufferAllocationSize = expectedEncoding.length + "\u0000".length();
+
+                testWriteStringAcrossBuffersWithBranch(
+                        bufferProvider,
+                        bufferAllocationSize,
+                        stringToEncode,
+                        surrogateCodePoint,
+                        expectedEncoding);
+            }
+        }
+
+        private void testWriteCStringAcrossBuffers(final BufferProvider bufferProvider,
+                                                   final Integer surrogateCodePoint,
+                                                   final int bufferAllocationSize,
+                                                   final String str,
+                                                   final byte[] expectedEncoding) throws IOException {
+            for (int startingOffset = 0; startingOffset <= bufferAllocationSize; startingOffset++) {
+                //given
+                List actualByteBuffers = emptyList();
+
+                try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(
+                        size -> bufferProvider.getBuffer(bufferAllocationSize))) {
+                    // Write an initial startingOffset of empty bytes to shift the start position
+                    bsonOutput.write(new byte[startingOffset]);
+
+                    // when
+                    bsonOutput.writeCString(str);
+
+                    // then
+                    actualByteBuffers = bsonOutput.getDuplicateByteBuffers();
+                    byte[] actualFlattenedByteBuffersBytes = getBytes(bsonOutput);
+                    assertEncodedResult(surrogateCodePoint,
+                            startingOffset,
+                            expectedEncoding,
+                            bufferAllocationSize,
+                            actualByteBuffers,
+                            actualFlattenedByteBuffersBytes);
+                } finally {
+                    actualByteBuffers.forEach(ByteBuf::release);
+                }
+            }
+        }
+
+        private void testWriteStringAcrossBuffers(final BufferProvider bufferProvider,
+                                                  final Integer codePoint,
+                                                  final int bufferAllocationSize,
+                                                  final String stringToEncode,
+                                                  final byte[] expectedEncoding) throws IOException {
+            for (int startingOffset = 0; startingOffset <= bufferAllocationSize; startingOffset++) {
+                //given
+                List actualByteBuffers = emptyList();
+
+                try (ByteBufferBsonOutput actualBsonOutput = new ByteBufferBsonOutput(
+                        size -> bufferProvider.getBuffer(bufferAllocationSize))) {
+                    // Write an initial startingOffset of empty bytes to shift the start position
+                    actualBsonOutput.write(new byte[startingOffset]);
+
+                    // when
+                    actualBsonOutput.writeString(stringToEncode);
+
+                    // then
+                    actualByteBuffers = actualBsonOutput.getDuplicateByteBuffers();
+                    byte[] actualFlattenedByteBuffersBytes = getBytes(actualBsonOutput);
+
+                    assertEncodedStringSize(codePoint,
+                            expectedEncoding,
+                            actualFlattenedByteBuffersBytes,
+                            startingOffset);
+                    assertEncodedResult(codePoint,
+                            startingOffset + Integer.BYTES, // +4 bytes for the length prefix
+                            expectedEncoding,
+                            bufferAllocationSize,
+                            actualByteBuffers,
+                            actualFlattenedByteBuffersBytes);
+                } finally {
+                    actualByteBuffers.forEach(ByteBuf::release);
+                }
+            }
+        }
+
+        private void testWriteStringAcrossBuffersWithBranch(final BufferProvider bufferProvider,
+                                                            final int bufferAllocationSize,
+                                                            final String stringToEncode,
+                                                            final Integer codePoint,
+                                                            final byte[] expectedEncoding) throws IOException {
+            for (int startingOffset = 0; startingOffset <= bufferAllocationSize; startingOffset++) {
+                //given
+                List actualByteBuffers = emptyList();
+                List actualBranchByteBuffers = emptyList();
+
+                try (ByteBufferBsonOutput actualBsonOutput = new ByteBufferBsonOutput(
+                        size -> bufferProvider.getBuffer(bufferAllocationSize))) {
+
+                    try (ByteBufferBsonOutput.Branch branchOutput = actualBsonOutput.branch()) {
+                        // Write an initial startingOffset of empty bytes to shift the start position
+                        branchOutput.write(new byte[startingOffset]);
+
+                        // when
+                        branchOutput.writeString(stringToEncode);
+
+                        // then
+                        actualBranchByteBuffers = branchOutput.getDuplicateByteBuffers();
+                        byte[] actualFlattenedByteBuffersBytes = getBytes(branchOutput);
+                        assertEncodedStringSize(
+                                codePoint,
+                                expectedEncoding,
+                                actualFlattenedByteBuffersBytes,
+                                startingOffset);
+                        assertEncodedResult(codePoint,
+                                startingOffset + Integer.BYTES, // +4 bytes for the length prefix
+                                expectedEncoding,
+                                bufferAllocationSize,
+                                actualBranchByteBuffers,
+                                actualFlattenedByteBuffersBytes);
+                    }
+
+                    // then
+                    actualByteBuffers = actualBsonOutput.getDuplicateByteBuffers();
+                    byte[] actualFlattenedByteBuffersBytes = getBytes(actualBsonOutput);
+                    assertEncodedStringSize(
+                            codePoint,
+                            expectedEncoding,
+                            actualFlattenedByteBuffersBytes,
+                            startingOffset);
+                    assertEncodedResult(codePoint,
+                            startingOffset + Integer.BYTES, // +4 bytes for the length prefix
+                            expectedEncoding,
+                            bufferAllocationSize,
+                            actualByteBuffers,
+                            actualFlattenedByteBuffersBytes);
+
+                } finally {
+                    actualByteBuffers.forEach(ByteBuf::release);
+                    actualBranchByteBuffers.forEach(ByteBuf::release);
+                }
+            }
+        }
+
+        // Verify that the resulting byte array (excluding the starting offset and null terminator)
+        // matches the expected UTF-8 encoded length of the test string.
+        private void assertEncodedStringSize(final Integer codePoint,
+                                             final byte[] expectedStringEncoding,
+                                             final byte[] actualFlattenedByteBuffersBytes,
+                                             final int startingOffset) {
+            int littleEndianLength = reverseBytes(expectedStringEncoding.length + "\u0000".length());
+            byte[] expectedEncodedStringSize = Ints.toByteArray(littleEndianLength);
+            byte[] actualEncodedStringSize = copyOfRange(
+                    actualFlattenedByteBuffersBytes,
+                    startingOffset,
+                    startingOffset + Integer.BYTES);
+
+            assertArrayEquals(
+                    expectedEncodedStringSize,
+                    actualEncodedStringSize,
+                    () -> format("Encoded String size before the test String does not match expected size. "
+                                    + "Failed with code point: %s, startingOffset: %s",
+                            codePoint,
+                            startingOffset));
+        }
+
+        private void testWriteCStringAcrossBufferWithBranch(final BufferProvider bufferProvider,
+                                                            final Integer codePoint,
+                                                            final int bufferAllocationSize,
+                                                            final String str, final byte[] expectedEncoding) throws IOException {
+            for (int startingOffset = 0; startingOffset <= bufferAllocationSize; startingOffset++) {
+                List actualBranchByteBuffers = emptyList();
+                List actualByteBuffers = emptyList();
+
+                try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(
+                        size -> bufferProvider.getBuffer(bufferAllocationSize))) {
+
+                    try (ByteBufferBsonOutput.Branch branchOutput = bsonOutput.branch()) {
+                        // Write an initial startingOffset of empty bytes to shift the start position
+                        branchOutput.write(new byte[startingOffset]);
+
+                        // when
+                        branchOutput.writeCString(str);
+
+                        // then
+                        actualBranchByteBuffers = branchOutput.getDuplicateByteBuffers();
+                        byte[] actualFlattenedByteBuffersBytes = getBytes(branchOutput);
+                        assertEncodedResult(codePoint,
+                                startingOffset,
+                                expectedEncoding,
+                                bufferAllocationSize,
+                                actualBranchByteBuffers,
+                                actualFlattenedByteBuffersBytes);
+                    }
+
+                    // then
+                    actualByteBuffers = bsonOutput.getDuplicateByteBuffers();
+                    byte[] actualFlattenedByteBuffersBytes = getBytes(bsonOutput);
+                    assertEncodedResult(codePoint,
+                            startingOffset,
+                            expectedEncoding,
+                            bufferAllocationSize,
+                            actualByteBuffers,
+                            actualFlattenedByteBuffersBytes);
+                } finally {
+                    actualByteBuffers.forEach(ByteBuf::release);
+                    actualBranchByteBuffers.forEach(ByteBuf::release);
+                }
+            }
+        }
+
+        private void assertEncodedResult(final int codePoint,
+                                         final int startingOffset,
+                                         final byte[] expectedEncoding,
+                                         final int expectedBufferAllocationSize,
+                                         final List actualByteBuffers,
+                                         final byte[] actualFlattenedByteBuffersBytes) {
+            int expectedCodeUnitCount = expectedEncoding.length;
+            int byteCount = startingOffset + expectedCodeUnitCount + 1;
+            int expectedBufferCount = (byteCount + expectedBufferAllocationSize - 1) / expectedBufferAllocationSize;
+            int expectedLastBufferPosition = (byteCount % expectedBufferAllocationSize) == 0 ? expectedBufferAllocationSize
+                    : byteCount % expectedBufferAllocationSize;
+
+            assertEquals(
+                    expectedBufferCount,
+                    actualByteBuffers.size(),
+                    () -> format("expectedBufferCount failed with code point: %s, offset: %s",
+                            codePoint,
+                            startingOffset));
+            assertEquals(
+                    expectedLastBufferPosition,
+                    actualByteBuffers.get(actualByteBuffers.size() - 1).position(),
+                    () -> format("expectedLastBufferPosition failed  with code point: %s, offset: %s",
+                            codePoint,
+                            startingOffset));
+
+            for (ByteBuf byteBuf : actualByteBuffers.subList(0, actualByteBuffers.size() - 1)) {
+                assertEquals(
+                        byteBuf.position(),
+                        byteBuf.limit(),
+                        () -> format("All non-final buffers are not full. Code point: %s, offset: %s",
+                                codePoint,
+                                startingOffset));
+            }
+
+            // Verify that the final byte array (excluding the initial offset and null terminator)
+            // matches the expected UTF-8 encoding of the test string
+            assertArrayEquals(
+                    expectedEncoding,
+                    Arrays.copyOfRange(actualFlattenedByteBuffersBytes, startingOffset, actualFlattenedByteBuffersBytes.length - 1),
+                    () -> format("Expected UTF-8 encoding of the test string does not match actual encoding. Code point: %s, offset: %s",
+                            codePoint,
+                            startingOffset));
+            assertEquals(
+                    0,
+                    actualFlattenedByteBuffersBytes[actualFlattenedByteBuffersBytes.length - 1],
+                    () -> format("String does not end with null terminator. Code point: %s, offset: %s",
+                            codePoint,
+                            startingOffset));
+        }
+
+        public char[] toSurrogatePair(final int codePoint) {
+            if (!Character.isValidCodePoint(codePoint) || codePoint < 0x10000) {
+                throw new IllegalArgumentException("Invalid code point: " + codePoint);
+            }
+            char[] result = new char[2];
+            result[0] = Character.highSurrogate(codePoint);
+            result[1] = Character.lowSurrogate(codePoint);
+            return result;
+        }
+
+    }
+
+    private static byte[] getBytes(final OutputBuffer basicOutputBuffer) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(basicOutputBuffer.getSize());
+        basicOutputBuffer.pipe(baos);
+        return baos.toByteArray();
+    }
 }