Skip to content

Commit 1509012

Browse files
[HOCS-6784] Switch /correspondents to a streaming response (#996)
* [HOCS-6784] Switch `/correspondents` to a streaming response - Refactor streaming json from a database transaction into a helper class * [HOCS-6784] Fix user data -> format string exceptions by moving formatting to caller * [HOCS-6784] Fix user data -> format string exceptions by moving formatting to caller * [HOCS-6784] Build uuid -> fullname json objects in db and stream as strings * [HOCS-6784] Remove redundant Map.of() * [HOCS-6784] Tidy up JSON streamer - extract transaction surrounds to common method * [HOCS-6784] Update resource test to mock string version of JSON streamer * [HOCS-6784] Revert unintentional edit to COMP2.json
1 parent c0016a8 commit 1509012

18 files changed

+620
-196
lines changed

src/main/java/uk/gov/digital/ho/hocs/casework/api/CaseDataService.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -775,10 +775,13 @@ public void mapCaseDataValues(UUID caseUUID, Map<String, String> keyMappings) {
775775
Map<String, String> updatedCaseDataMap = new HashMap<>(caseData.getDataMap());
776776

777777
if (!updatedCaseDataMap.keySet().containsAll(keyMappings.keySet())) {
778-
String msg = "Requested keys to map do not exist in case data for caseUUID %s, requested mapping: %s";
778+
String msg = String.format(
779+
"Requested keys to map do not exist in case data for caseUUID %s, requested mapping: %s",
780+
caseUUID,
781+
keyMappings.keySet()
782+
);
779783
log.error(String.format(msg, caseUUID, keyMappings));
780-
throw new ApplicationExceptions.DataMappingException(msg, null, DATA_MAPPING_EXCEPTION, caseUUID,
781-
keyMappings.keySet());
784+
throw new ApplicationExceptions.DataMappingException(msg, null, DATA_MAPPING_EXCEPTION);
782785
}
783786

784787
keyMappings.forEach((String fromKey, String toKey) -> {

src/main/java/uk/gov/digital/ho/hocs/casework/api/CaseTagService.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,10 @@ public CaseDataTag addTagToCase(UUID caseUuid, String tag) {
3232
}
3333
return caseDataTag;
3434
} catch (DataIntegrityViolationException ex) {
35-
throw new ApplicationExceptions.DatabaseConflictException("Failed to add data tag for case",
36-
CASE_TAG_CONFLICT, ex);
35+
throw new ApplicationExceptions.DatabaseConflictException(
36+
String.format("Failed to add data tag for case - %s", ex.getMessage()),
37+
CASE_TAG_CONFLICT
38+
);
3739
}
3840
}
3941

src/main/java/uk/gov/digital/ho/hocs/casework/api/CorrespondentResource.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111
import org.springframework.web.bind.annotation.RequestBody;
1212
import org.springframework.web.bind.annotation.RequestParam;
1313
import org.springframework.web.bind.annotation.RestController;
14+
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
1415
import uk.gov.digital.ho.hocs.casework.api.dto.CorrespondentTypeDto;
1516
import uk.gov.digital.ho.hocs.casework.api.dto.CreateCorrespondentRequest;
16-
import uk.gov.digital.ho.hocs.casework.api.dto.GetCorrespondentOutlinesResponse;
1717
import uk.gov.digital.ho.hocs.casework.api.dto.GetCorrespondentResponse;
1818
import uk.gov.digital.ho.hocs.casework.api.dto.GetCorrespondentTypeResponse;
1919
import uk.gov.digital.ho.hocs.casework.api.dto.GetCorrespondentsResponse;
2020
import uk.gov.digital.ho.hocs.casework.api.dto.UpdateCorrespondentRequest;
21+
import uk.gov.digital.ho.hocs.casework.api.utils.JsonResponseStreamer;
2122
import uk.gov.digital.ho.hocs.casework.domain.model.Address;
2223
import uk.gov.digital.ho.hocs.casework.domain.model.Correspondent;
2324
import uk.gov.digital.ho.hocs.casework.domain.model.CorrespondentWithPrimaryFlag;
@@ -27,7 +28,6 @@
2728
import uk.gov.digital.ho.hocs.casework.security.Authorised;
2829

2930
import javax.validation.Valid;
30-
import java.util.Optional;
3131
import java.util.Set;
3232
import java.util.UUID;
3333

@@ -37,21 +37,27 @@ public class CorrespondentResource {
3737

3838
private final CorrespondentService correspondentService;
3939

40+
private final JsonResponseStreamer jsonResponseStreamer;
41+
4042
@Autowired
41-
public CorrespondentResource(CorrespondentService correspondentService) {
43+
public CorrespondentResource(CorrespondentService correspondentService, JsonResponseStreamer jsonResponseStreamer) {
4244
this.correspondentService = correspondentService;
45+
this.jsonResponseStreamer = jsonResponseStreamer;
4346
}
4447

4548
@GetMapping(value = "/correspondents")
46-
ResponseEntity<GetCorrespondentOutlinesResponse> getAllActiveCorrespondents(
47-
@RequestParam(value = "includeDeleted", required = false) Optional<Boolean> includeDeleted) {
48-
Set<Correspondent> correspondents = correspondentService.getAllCorrespondents(includeDeleted.orElse(false));
49-
return ResponseEntity.ok(GetCorrespondentOutlinesResponse.from(correspondents));
49+
ResponseEntity<StreamingResponseBody> getAllActiveCorrespondents(
50+
@RequestParam(value = "includeDeleted", defaultValue = "false") Boolean includeDeleted) {
51+
52+
return jsonResponseStreamer.jsonStringsWrappedTransactionalStreamingResponseBody(
53+
"correspondents",
54+
() -> correspondentService.streamCorrespondentOutlineJson(includeDeleted)
55+
);
5056
}
5157

5258
@Allocated(allocatedTo = AllocationLevel.USER_OR_TEAM)
5359
@PostMapping(value = "/case/{caseUUID}/stage/{stageUUID}/correspondent")
54-
ResponseEntity addCorrespondentToCase(@PathVariable UUID caseUUID,
60+
ResponseEntity<Void> addCorrespondentToCase(@PathVariable UUID caseUUID,
5561
@PathVariable UUID stageUUID,
5662
@Valid @RequestBody CreateCorrespondentRequest request) {
5763
Address address = new Address(request.getPostcode(), request.getAddress1(), request.getAddress2(),
@@ -102,10 +108,10 @@ ResponseEntity<GetCorrespondentResponse> deleteCorrespondent(@PathVariable UUID
102108

103109
@Allocated(allocatedTo = AllocationLevel.USER_OR_TEAM)
104110
@PutMapping(value = "/case/{caseUUID}/stage/{stageUUID}/correspondent/{correspondentUUID}")
105-
ResponseEntity updateCorrespondent(@PathVariable UUID caseUUID,
106-
@PathVariable UUID stageUUID,
107-
@PathVariable UUID correspondentUUID,
108-
@Valid @RequestBody UpdateCorrespondentRequest request) {
111+
ResponseEntity<Void> updateCorrespondent(@PathVariable UUID caseUUID,
112+
@SuppressWarnings("unused") @PathVariable UUID stageUUID,
113+
@PathVariable UUID correspondentUUID,
114+
@Valid @RequestBody UpdateCorrespondentRequest request) {
109115
correspondentService.updateCorrespondent(caseUUID, correspondentUUID, request);
110116
return ResponseEntity.ok().build();
111117
}

src/main/java/uk/gov/digital/ho/hocs/casework/api/CorrespondentService.java

+28-7
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Set;
2424
import java.util.UUID;
2525
import java.util.stream.Collectors;
26+
import java.util.stream.Stream;
2627

2728
import static net.logstash.logback.argument.StructuredArguments.value;
2829
import static uk.gov.digital.ho.hocs.casework.application.LogEvent.CORRESPONDENTS_RETRIEVED;
@@ -77,6 +78,14 @@ Set<Correspondent> getAllCorrespondents(boolean includeDeleted) {
7778
return correspondents;
7879
}
7980

81+
Stream<String> streamCorrespondentOutlineJson(boolean includeDeleted) {
82+
log.debug("Getting all active Correspondents");
83+
84+
return includeDeleted
85+
? correspondentRepository.findAllUuidToNameMappingJson()
86+
: correspondentRepository.findActiveUuidToNameMappingJson();
87+
}
88+
8089
Set<CorrespondentWithPrimaryFlag> getCorrespondents(UUID caseUUID) {
8190
log.debug("Getting all Correspondents for Case: {}", caseUUID);
8291

@@ -163,8 +172,14 @@ void createCorrespondent(UUID caseUUID,
163172
}
164173
} catch (DataIntegrityViolationException e) {
165174
throw new ApplicationExceptions.EntityCreationException(
166-
String.format("Failed to create correspondent %s for Case: %s", correspondent.getUuid(), caseUUID),
167-
CORRESPONDENT_CREATE_FAILURE, e);
175+
String.format(
176+
"Failed to create correspondent %s for Case: %s - %s",
177+
correspondent.getUuid(),
178+
caseUUID,
179+
e.getMessage()
180+
),
181+
CORRESPONDENT_CREATE_FAILURE
182+
);
168183
}
169184
log.info("Created Correspondent: {} for Case: {}", correspondent.getUuid(), caseUUID,
170185
value(EVENT, CORRESPONDENT_CREATED));
@@ -201,8 +216,14 @@ void updateCorrespondent(UUID caseUUID,
201216
auditClient.updateCorrespondentAudit(correspondent);
202217
} catch (DataIntegrityViolationException e) {
203218
throw new ApplicationExceptions.EntityCreationException(
204-
String.format("Failed to update correspondent %s for Case: %s", correspondent.getUuid(), caseUUID),
205-
CORRESPONDENT_UPDATE_FAILURE, e);
219+
String.format(
220+
"Failed to update correspondent %s for Case: %s - %s",
221+
correspondent.getUuid(),
222+
caseUUID,
223+
e.getMessage()
224+
),
225+
CORRESPONDENT_UPDATE_FAILURE
226+
);
206227
}
207228
log.info("Updated Correspondent: {} for Case: {}", correspondent.getUuid(), caseUUID,
208229
value(EVENT, CORRESPONDENT_UPDATED));
@@ -231,9 +252,9 @@ public void copyCorrespondents(UUID fromCase, UUID toCase) {
231252
// save the primary first and the existing logic will assign it within the case
232253
Optional<CorrespondentWithPrimaryFlag> primaryCorrespondent = correspondents.stream().filter(
233254
CorrespondentWithPrimaryFlag::getIsPrimary).findFirst();
234-
if (primaryCorrespondent.isPresent()) {
235-
createCorrespondent(toCase, primaryCorrespondent.get());
236-
}
255+
primaryCorrespondent.ifPresent(
256+
correspondentWithPrimaryFlag -> createCorrespondent(toCase, correspondentWithPrimaryFlag)
257+
);
237258

238259
// save the rest
239260
correspondents.stream().filter(correspondent -> !correspondent.getIsPrimary()).forEach(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
package uk.gov.digital.ho.hocs.casework.api.utils;
2+
3+
import com.fasterxml.jackson.core.JsonEncoding;
4+
import com.fasterxml.jackson.core.JsonFactory;
5+
import com.fasterxml.jackson.core.JsonGenerator;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.apache.http.entity.ContentType;
9+
import org.springframework.http.HttpHeaders;
10+
import org.springframework.http.HttpStatus;
11+
import org.springframework.http.ResponseEntity;
12+
import org.springframework.stereotype.Service;
13+
import org.springframework.transaction.TransactionStatus;
14+
import org.springframework.transaction.support.TransactionCallback;
15+
import org.springframework.transaction.support.TransactionTemplate;
16+
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
17+
import uk.gov.digital.ho.hocs.casework.application.LogEvent;
18+
import uk.gov.digital.ho.hocs.casework.domain.exception.ApplicationExceptions;
19+
20+
import java.io.IOException;
21+
import java.io.OutputStream;
22+
import java.util.Map;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.function.BiFunction;
25+
import java.util.function.Supplier;
26+
import java.util.stream.Stream;
27+
28+
@Service
29+
@Slf4j
30+
public class JsonResponseStreamer {
31+
32+
private final ObjectMapper objectMapper;
33+
34+
private final TransactionTemplate transactionTemplate;
35+
36+
public JsonResponseStreamer(ObjectMapper objectMapper, TransactionTemplate transactionTemplate) {
37+
this.objectMapper = objectMapper;
38+
this.transactionTemplate = transactionTemplate;
39+
}
40+
41+
/**
42+
* Convert a stream of objects that can be serialised to JSON by the default ObjectMapper into a json object
43+
* containing an array field with the serialised contents of the stream as the array's items. The stream will be
44+
* consumed within a transaction to prevent issues with the database query closing before the stream is closed.
45+
*
46+
* @param fieldName The field name attached to the array of streamed items
47+
* @param streamSupplier A callback that will supply the stream of items to serialise. This will be run within a
48+
* transaction
49+
* @return StreamingResponseBody producing a root json object with the stream items in an array field
50+
*/
51+
public ResponseEntity<StreamingResponseBody> jsonWrappedTransactionalStreamingResponseBody(
52+
String fieldName,
53+
Supplier<Stream<?>> streamSupplier)
54+
{
55+
return jsonWrappedTransactionalStreamingResponseBody(fieldName, streamSupplier, Map.of());
56+
}
57+
58+
/**
59+
* Convert a stream of objects that can be serialised to JSON by the default ObjectMapper into a json object
60+
* containing an array field with the serialised contents of the stream as the array's items. The stream will be
61+
* consumed within a transaction to prevent issues with the database query closing before the stream is closed.
62+
*
63+
* @param fieldName The field name attached to the array of streamed items
64+
* @param streamSupplier A callback that will supply the stream of items to serialise. This will be run within a
65+
* transaction
66+
* @param additionalFields These will be added as fields to the root JSON object. The keys will be used as the field
67+
* names and the values will be serialised to JSON
68+
* @return StreamingResponseBody producing a root json object with the stream items in an array field
69+
*/
70+
public ResponseEntity<StreamingResponseBody> jsonWrappedTransactionalStreamingResponseBody(
71+
String fieldName,
72+
Supplier<Stream<?>> streamSupplier,
73+
Map<String, Object> additionalFields)
74+
{
75+
return wrapStream(
76+
fieldName,
77+
(generator, outputStream) -> status -> {
78+
streamSupplier.get().forEach(streamItem -> {
79+
try {
80+
generator.writeObject(streamItem);
81+
} catch (IOException e) {
82+
throw new ApplicationExceptions.ReportBodyStreamingException(
83+
String.format(
84+
"Failed to write streaming response body for item: %s - %s",
85+
streamItem,
86+
e
87+
),
88+
LogEvent.STREAMING_JSON_SERIALISATION_EXCEPTION
89+
);
90+
}
91+
});
92+
93+
return null;
94+
},
95+
additionalFields
96+
);
97+
}
98+
99+
/**
100+
* Convert a stream of json strings into a json object containing an array field with the raw strings from the
101+
* stream as the array's items. The stream will be consumed within a transaction to prevent issues with the database
102+
* query closing before the stream is closed.
103+
*
104+
* @param fieldName The field name attached to the array of streamed items
105+
* @param streamSupplier A callback that will supply a stream of valid JSON Strings. This will be run within a
106+
* transaction.
107+
* @return StreamingResponseBody producing a root json object with the stream items in an array field
108+
*/
109+
public ResponseEntity<StreamingResponseBody> jsonStringsWrappedTransactionalStreamingResponseBody(
110+
String fieldName,
111+
Supplier<Stream<String>> streamSupplier)
112+
{
113+
return jsonStringsWrappedTransactionalStreamingResponseBody(fieldName, streamSupplier, Map.of());
114+
}
115+
116+
/**
117+
* Convert a stream of json strings into a json object containing an array field with the raw strings from the
118+
* stream as the array's items. The stream will be consumed within a transaction to prevent issues with the database
119+
* query closing before the stream is closed.
120+
*
121+
* @param fieldName The field name attached to the array of streamed items
122+
* @param streamSupplier A callback that will supply a stream of valid JSON Strings. This will be run within a
123+
* transaction.
124+
* @param additionalFields These will be added as fields to the root JSON object. The keys will be used as the field
125+
* names and the values will be serialised to JSON
126+
* @return StreamingResponseBody producing a root json object with the stream items in an array field
127+
*/
128+
public ResponseEntity<StreamingResponseBody> jsonStringsWrappedTransactionalStreamingResponseBody(
129+
String fieldName,
130+
Supplier<Stream<String>> streamSupplier,
131+
Map<String, Object> additionalFields
132+
) {
133+
return wrapStream(
134+
fieldName,
135+
(generator, outputStream) -> status -> {
136+
// Needs to be final to be used in stream.
137+
AtomicBoolean prefixComma = new AtomicBoolean(false);
138+
139+
streamSupplier.get().forEach(
140+
(streamItem) -> {
141+
try {
142+
if (prefixComma.get()) {
143+
outputStream.write(',');
144+
} else {
145+
prefixComma.set(true);
146+
}
147+
148+
outputStream.write(streamItem.getBytes());
149+
} catch (IOException e) {
150+
log.error("Failed to write {} to json response - {}", streamItem, e);
151+
}
152+
}
153+
);
154+
155+
return null;
156+
},
157+
additionalFields
158+
);
159+
}
160+
161+
private ResponseEntity<StreamingResponseBody> wrapStream(
162+
String fieldName,
163+
BiFunction<JsonGenerator, OutputStream, TransactionCallback<TransactionStatus>> transactionCallbackSupplier,
164+
Map<String, Object> additionalFields
165+
) {
166+
StreamingResponseBody body = outputStream -> {
167+
try {
168+
JsonFactory factory = new JsonFactory();
169+
170+
JsonGenerator generator = factory.createGenerator(outputStream, JsonEncoding.UTF8);
171+
generator.setCodec(objectMapper);
172+
173+
generator.writeStartObject();
174+
175+
additionalFields.forEach((nestedFieldName, object) -> {
176+
try {
177+
generator.writeObjectField(nestedFieldName, object);
178+
} catch (IOException e) {
179+
log.error("Failed to write {} to json response - {}", nestedFieldName, e);
180+
}
181+
});
182+
183+
generator.writeArrayFieldStart(fieldName);
184+
// jsonStringsWrappedTransactionalStreamingResponseBody writes JSON strings directly to the output
185+
// stream, so flush current json first
186+
generator.flush();
187+
188+
transactionTemplate.execute(transactionCallbackSupplier.apply(generator, outputStream));
189+
190+
generator.writeEndArray();
191+
generator.writeEndObject();
192+
generator.close();
193+
} catch (Exception e) {
194+
log.error("Failed to write streaming response body - {}", e);
195+
}
196+
};
197+
198+
HttpHeaders responseHeaders = new HttpHeaders();
199+
responseHeaders.add(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.getMimeType());
200+
201+
return new ResponseEntity<>(
202+
body,
203+
responseHeaders,
204+
HttpStatus.OK
205+
);
206+
}
207+
}

src/main/java/uk/gov/digital/ho/hocs/casework/application/LogEvent.java

+2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public enum LogEvent {
159159
STAGE_NOT_FOUND,
160160
STAGE_RECREATED,
161161
STAGE_TRANSITION_NOTE_UPDATED,
162+
STREAMING_JSON_SERIALISATION_EXCEPTION,
162163
TEAMS_STAGE_LIST_EMPTY,
163164
TEAMS_STAGE_LIST_RETRIEVED,
164165
TEAM_EMAIL_SENT,
@@ -172,5 +173,6 @@ public enum LogEvent {
172173

173174
public static final String EXCEPTION = "exception";
174175

176+
@SuppressWarnings("unused")
175177
public static final String STACKTRACE = "stacktrace";
176178
}

0 commit comments

Comments
 (0)