Skip to content

Commit

Permalink
Minor refactoring in ServerSentEvent
Browse files Browse the repository at this point in the history
Extract re-usable method to serialize SSE fields.

See gh-33975
  • Loading branch information
rstoyanchev committed Dec 11, 2024
1 parent 66f33a8 commit 640e570
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.springframework.lang.Nullable;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/**
* Representation for a Server-Sent Event for use with Spring's reactive Web support.
Expand Down Expand Up @@ -102,6 +103,34 @@ public T data() {
return this.data;
}

/**
* Return a StringBuilder with the id, event, retry, and comment fields fully
* serialized, and also appending "data:" if there is data.
* @since 6.2.1
*/
public String format() {
StringBuilder sb = new StringBuilder();
if (this.id != null) {
appendAttribute("id", this.id, sb);
}
if (this.event != null) {
appendAttribute("event", this.event, sb);
}
if (this.retry != null) {
appendAttribute("retry", this.retry.toMillis(), sb);
}
if (this.comment != null) {
sb.append(':').append(StringUtils.replace(this.comment, "\n", "\n:")).append('\n');
}
if (this.data != null) {
sb.append("data:");
}
return sb.toString();
}

private void appendAttribute(String fieldName, Object fieldValue, StringBuilder sb) {
sb.append(fieldName).append(':').append(fieldValue).append('\n');
}

@Override
public boolean equals(@Nullable Object other) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.http.codec;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -124,65 +123,42 @@ private Flux<Publisher<DataBuffer>> encode(Publisher<?> input, ResolvableType el
ServerSentEvent<?> sse = (element instanceof ServerSentEvent<?> serverSentEvent ?
serverSentEvent : ServerSentEvent.builder().data(element).build());

StringBuilder sb = new StringBuilder();
String id = sse.id();
String event = sse.event();
Duration retry = sse.retry();
String comment = sse.comment();
String sseText = sse.format();
Object data = sse.data();
if (id != null) {
writeField("id", id, sb);
}
if (event != null) {
writeField("event", event, sb);
}
if (retry != null) {
writeField("retry", retry.toMillis(), sb);
}
if (comment != null) {
sb.append(':').append(StringUtils.replace(comment, "\n", "\n:")).append('\n');
}
if (data != null) {
sb.append("data:");
}

Flux<DataBuffer> result;
if (data == null) {
result = Flux.just(encodeText(sb + "\n", mediaType, factory));
result = Flux.just(encodeText(sseText + "\n", mediaType, factory));
}
else if (data instanceof String text) {
text = StringUtils.replace(text, "\n", "\ndata:");
result = Flux.just(encodeText(sb + text + "\n\n", mediaType, factory));
result = Flux.just(encodeText(sseText + text + "\n\n", mediaType, factory));
}
else {
result = encodeEvent(sb, data, dataType, mediaType, factory, hints);
result = encodeEvent(sseText, data, dataType, mediaType, factory, hints);
}

return result.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
});
}

@SuppressWarnings("unchecked")
private <T> Flux<DataBuffer> encodeEvent(StringBuilder eventContent, T data, ResolvableType dataType,
private <T> Flux<DataBuffer> encodeEvent(CharSequence sseText, T data, ResolvableType dataType,
MediaType mediaType, DataBufferFactory factory, Map<String, Object> hints) {

if (this.encoder == null) {
throw new CodecException("No SSE encoder configured and the data is not String.");
}

return Flux.defer(() -> {
DataBuffer startBuffer = encodeText(eventContent, mediaType, factory);
DataBuffer startBuffer = encodeText(sseText, mediaType, factory);
DataBuffer endBuffer = encodeText("\n\n", mediaType, factory);
DataBuffer dataBuffer = ((Encoder<T>) this.encoder).encodeValue(data, factory, dataType, mediaType, hints);
Hints.touchDataBuffer(dataBuffer, hints, logger);
return Flux.just(startBuffer, dataBuffer, endBuffer);
});
}

private void writeField(String fieldName, Object fieldValue, StringBuilder sb) {
sb.append(fieldName).append(':').append(fieldValue).append('\n');
}

private DataBuffer encodeText(CharSequence text, MediaType mediaType, DataBufferFactory bufferFactory) {
Assert.notNull(mediaType.getCharset(), "Expected MediaType with charset");
byte[] bytes = text.toString().getBytes(mediaType.getCharset());
Expand Down

0 comments on commit 640e570

Please sign in to comment.