Skip to content

Directly compressing StreamOutput#140502

Merged
DaveCTurner merged 2 commits intoelastic:mainfrom
DaveCTurner:2026/01/12/threadLocalStreamOutput
Jan 12, 2026
Merged

Directly compressing StreamOutput#140502
DaveCTurner merged 2 commits intoelastic:mainfrom
DaveCTurner:2026/01/12/threadLocalStreamOutput

Conversation

@DaveCTurner
Copy link
Contributor

@DaveCTurner DaveCTurner commented Jan 12, 2026

Today Compressor#threadLocalOutputStream returns a bare (buffered)
OutputStream which many callers then adapt into a StreamOutput using
the inefficient OutputStreamStreamOutput. This commit adjusts the
compressor to return a BufferedStreamOutput, avoiding the need for an
extra wrapper while also supporting more efficient structured write
operations such as writeVInt and writeString.

Today `Compressor#threadLocalOutputStream` returns a bare (buffered)
`OutputStream` which many callers then adapt into a `StreamOutput` using
the inefficient `OutputStreamStreamOutput`. This commit adjusts the
compressor to return a `BufferedStreamOutput`, avoiding the need for an
extra wrapper while also supporting supports more efficient structured
write operations such as `writeVInt` and `writeString`.
@DaveCTurner DaveCTurner requested a review from a team as a code owner January 12, 2026 09:54
@DaveCTurner DaveCTurner added >non-issue :Distributed/Network Http and internode communication implementations v9.4.0 labels Jan 12, 2026
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination (obsolete) Meta label for Distributed Coordination team. Obsolete. Please do not use. label Jan 12, 2026
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

Copy link
Contributor

@joshua-adams-1 joshua-adams-1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

assert clusterState.nodes().isLocalNodeElectedMaster();

try (var bytesStream = transportService.newNetworkBytesStream()) {
try (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm surprised no tests needed updating? I would have assumed there was a test for the branch where this try block failed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. This is just removing the now-unnecessary OutputStreamStreamOutput wrapper, there's no change in failure handling or anything here is there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code was:

try (var bytesStream = transportService.newNetworkBytesStream()) {
    try (
        var stream = new OutputStreamStreamOutput(
            CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
        )
    ) {
        stream.setTransportVersion(version);
        clusterState.writeTo(stream);
    } catch (IOException e) {
        throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode);
    }
    ...

I would have expected there to be a test for the ElasticsearchException that used mocklog or equivalent to throw an exception when the OutputStreamStreamOutput was created. Since we've changed this now to:

var stream = CompressorFactory.COMPRESSOR.threadLocalStreamOutput(Streams.flushOnCloseStream(bytesStream)))

I suspected that any test similar to the above would need to be updated. Checking JoinValidationServiceTests here yields no test for this code branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating an OutputStreamStreamOutput throws no exceptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake - I didn't check the constructor for OutputStreamStreamOutput. I didn't realise this try/catch block was only for auto-closable, I thought it might be protecting against an IOException too.

As a note, I still don't think the ElasticsearchException is tested, since I cannot find any reference to it in the JoinValidationServiceTests, nor the message failed to serialize cluster state for publishing to node anywhere else in the Elasticsearch codebase, but that isn't relevant to this change

public void close() throws IOException {
flush();
delegate.close();
if (isClosed == false) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a change of behaviour. Can it be documented in a Javadoc please? (Either at the method or class level)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually the expected behaviour of close() - see java.io.Closeable#close. Turns out we hardly use or test that fact.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, interesting! Is this worth adding to a github issue or other relevant documentation to track this testing gap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'd ever get around to acting on such an issue tbh, it's too broad. I think it's enough to have assertions etc in the places where it actually matters.

@DaveCTurner DaveCTurner merged commit 993e57f into elastic:main Jan 12, 2026
35 checks passed
@DaveCTurner DaveCTurner deleted the 2026/01/12/threadLocalStreamOutput branch January 12, 2026 14:29
szybia added a commit to szybia/elasticsearch that referenced this pull request Jan 12, 2026
…i-project-tests

* upstream/main: (23 commits)
  Fix `testAckListenerReceivesNacksIfPublicationTimesOut` (elastic#140514)
  Reduce priority of clear-cache tasks (elastic#139685)
  Add docs and tests about `StreamOutput` to memory (elastic#140365)
  ES|QL - dense_vector support for COUNT, PRESENT, ABSENT aggregator functions (elastic#139914)
  Add release notes for v9.2.4 release (elastic#140487)
  Add release notes for v9.1.10 release (elastic#140488)
  Add conncectors release notes for 9.1.10, 9.2.4 (elastic#140499)
  Add parameter support in PromQL query durations (elastic#139873)
  Improve testing of STS credentials reloading (elastic#140114)
  Fix zstd native binary publishing script to support newer versions (elastic#140485)
  Add FlattenedFieldBinaryVsSortedSetDocValuesSyntheticSourceIT (elastic#140489)
  Store fallback match only text fields in binary doc values (elastic#140189)
  [DiskBBQ] Use the new merge executor for intra-merge parallelism (elastic#139942)
  ESQL: introduce support for mapping-unavailable fields (elastic#140463)
  Add ESNextOSQVectorsScorerTests (elastic#140436)
  Disable high cardinality tests on release builds (elastic#140503)
  ESQL: TRange timezone support (elastic#139911)
  Directly compressing `StreamOutput` (elastic#140502)
  ES|QL - fix dense vector enrich bug (elastic#139774)
  Use CrossProjectModeDecider in RemoteClusterService (elastic#140481)
  ...
spinscale pushed a commit to spinscale/elasticsearch that referenced this pull request Jan 21, 2026
Today `Compressor#threadLocalOutputStream` returns a bare (buffered)
`OutputStream` which many callers then adapt into a `StreamOutput` using
the inefficient `OutputStreamStreamOutput`. This commit adjusts the
compressor to return a `BufferedStreamOutput`, avoiding the need for an
extra wrapper while also supporting supports more efficient structured
write operations such as `writeVInt` and `writeString`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Network Http and internode communication implementations >non-issue Team:Distributed Coordination (obsolete) Meta label for Distributed Coordination team. Obsolete. Please do not use. v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants