diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java index 535b0ef212..a79f0df222 100644 --- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java +++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/CountingNettyResourceLeakDetector.java @@ -24,7 +24,7 @@ public class CountingNettyResourceLeakDetector extends ResourceLeakDetector void wrapWithLeakChecks(ExtensionContext extensionContext, + Callable repeatCall, Callable finalCall) throws Throwable { if (allLeakChecksAreDisabled || getAnnotation(extensionContext).map(a -> a.disableLeakChecks()).orElse(false)) { CountingNettyResourceLeakDetector.deactivate(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java index c5e882ea5f..3625617997 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/SourceTargetCaptureTuple.java @@ -67,10 +67,10 @@ private JSONObject jsonFromHttpDataUnsafe(List data) throws IOException Scanner scanner = new Scanner(collatedStream, StandardCharsets.UTF_8); scanner.useDelimiter("\r\n\r\n"); // The headers are seperated from the body with two newlines. String head = scanner.next(); - int header_length = head.getBytes(StandardCharsets.UTF_8).length + 4; // The extra 4 bytes accounts for the two newlines. + int headerLength = head.getBytes(StandardCharsets.UTF_8).length + 4; // The extra 4 bytes accounts for the two newlines. // SequenceInputStreams cannot be reset, so it's recreated from the original data. SequenceInputStream bodyStream = ReplayUtils.byteArraysToInputStream(data); - bodyStream.skip(header_length); + for (int leftToSkip = headerLength; leftToSkip > 0; leftToSkip -= bodyStream.skip(leftToSkip)) {} // There are several limitations introduced by using the HTTP.toJSONObject call. // 1. We need to replace "\r\n" with "\n" which could mask differences in the responses. @@ -78,7 +78,7 @@ private JSONObject jsonFromHttpDataUnsafe(List data) throws IOException // We deal with this in the code that reads these JSONs, but it's a more brittle and error-prone format // than it would be otherwise. // TODO: Refactor how messages are converted to JSON and consider using a more sophisticated HTTP parsing strategy. - JSONObject message = HTTP.toJSONObject(head.replaceAll("\r\n", "\n")); + JSONObject message = HTTP.toJSONObject(head.replace("\r\n", "\n")); String base64body = Base64.getEncoder().encodeToString(bodyStream.readAllBytes()); message.put("body", base64body); return message; @@ -100,7 +100,7 @@ private JSONObject jsonFromHttpData(@NonNull List data, Duration latency return message; } - private JSONObject toJSONObject(SourceTargetCaptureTuple triple) throws IOException { + private JSONObject toJSONObject(SourceTargetCaptureTuple triple) { // TODO: Use Netty to parse the packets as HTTP rather than json.org (we can also remove it as a dependency) JSONObject meta = new JSONObject(); meta.put("sourceRequest", jsonFromHttpData(triple.sourcePair.requestData.packetBytes)); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 3fce03c896..cd9cb6565a 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -90,6 +90,19 @@ public class TrafficReplayer { private AtomicReference shutdownReasonRef; private AtomicReference> shutdownFutureRef; private AtomicReference>> nextChunkFutureRef; + private ConcurrentHashMap liveRequests = new ConcurrentHashMap<>(); + + public class TerminationException extends Exception { + public final Throwable shutdownCause; + public final Throwable immediateCause; + public TerminationException(Throwable shutdownCause, Throwable immediateCause) { + // use one of these two so that anybody handling this as any other exception can get + // at least one of the root errors + super(Optional.ofNullable(shutdownCause).orElse(immediateCause)); + this.shutdownCause = shutdownCause; + this.immediateCause = immediateCause; + } + } public static IJsonTransformer buildDefaultJsonTransformer(String newHostName) { var joltJsonTransformerBuilder = JsonJoltTransformer.newBuilder() @@ -297,7 +310,9 @@ public static Parameters parseArgs(String[] args) { } - public static void main(String[] args) throws IOException, InterruptedException, ExecutionException { + public static void main(String[] args) + throws IOException, InterruptedException, ExecutionException, TerminationException + { var params = parseArgs(args); URI uri; System.err.println("Starting Traffic Replayer"); @@ -316,14 +331,15 @@ public static void main(String[] args) throws IOException, InterruptedException, var bufferedOutputStream = new BufferedOutputStream(outputStream); var blockingTrafficStream = TrafficCaptureSourceFactory.createTrafficCaptureSource(params, Duration.ofSeconds(params.lookaheadTimeSeconds)); - var authTransformer = buildAuthTransformerFactory(params)) { + var authTransformer = buildAuthTransformerFactory(params)) + { var tr = new TrafficReplayer(uri, authTransformer, params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests); setupShutdownHookForReplayer(tr); var tupleWriter = new SourceTargetCaptureTuple.TupleToFileWriter(bufferedOutputStream); var timeShifter = new TimeShifter(params.speedupFactor); - tr.runReplayWithIOStreams(Duration.ofSeconds(params.observedPacketConnectionTimeout), - blockingTrafficStream, bufferedOutputStream, timeShifter, tupleWriter); + tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(params.observedPacketConnectionTimeout), + blockingTrafficStream, timeShifter, tupleWriter); log.info("Done processing TrafficStreams"); } } @@ -402,9 +418,9 @@ private static IAuthTransformerFactory buildAuthTransformerFactory(Parameters pa } String serviceName = serviceAndRegion[0]; String region = serviceAndRegion[1]; - DefaultCredentialsProvider defaultCredentialsProvider = DefaultCredentialsProvider.create(); return new IAuthTransformerFactory() { + DefaultCredentialsProvider defaultCredentialsProvider = DefaultCredentialsProvider.create(); @Override public IAuthTransformer getAuthTransformer(IHttpMessage httpMessage) { return new SigV4Signer(defaultCredentialsProvider, serviceName, region, "https", null); @@ -421,11 +437,10 @@ public void close() { } } - void runReplayWithIOStreams(Duration observedPacketConnectionTimeout, - BlockingTrafficSource trafficChunkStream, - BufferedOutputStream bufferedOutputStream, - TimeShifter timeShifter, - Consumer resultTupleConsumer) + void setupRunAndWaitForReplay(Duration observedPacketConnectionTimeout, + BlockingTrafficSource trafficChunkStream, + TimeShifter timeShifter, + Consumer resultTupleConsumer) throws InterruptedException, ExecutionException { var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool); @@ -444,63 +459,75 @@ void runReplayWithIOStreams(Duration observedPacketConnectionTimeout, throw e; } finally { trafficToHttpTransactionAccumulator.close(); - var PRIMARY_LOG_LEVEL = Level.INFO; - var SECONDARY_LOG_LEVEL = Level.WARN; - var logLevel = PRIMARY_LOG_LEVEL; - for (var timeout = Duration.ofSeconds(60); ; timeout = timeout.multipliedBy(2)) { - if (shutdownFutureRef.get() != null) { - log.warn("Not waiting for work because the TrafficReplayer is shutting down."); - break; - } - try { - waitForRemainingWork(logLevel, timeout, replayEngine); - break; - } catch (TimeoutException e) { - log.atLevel(logLevel).log("Timed out while waiting for the remaining " + - "requests to be finalized..."); - logLevel = SECONDARY_LOG_LEVEL; - } finally { - shutdown(null).get(); - log.error("Done waiting for TrafficReplayer (" + this + ") to shut down"); - } - } - if (requestToFinalWorkFuturesMap.size() > 0 || - exceptionRequestCount.get() > 0) { - log.atWarn().setMessage("{} in-flight requests being dropped due to pending shutdown; " + - "{} requests to the target threw an exception; " + - "{} requests were successfully processed.") - .addArgument(requestToFinalWorkFuturesMap.size()) - .addArgument(exceptionRequestCount.get()) - .addArgument(successfulRequestCount.get()) - .log(); - } else { - log.info(successfulRequestCount.get() + " requests were successfully processed."); - } - log.info("# of connections created: {}; # of requests on reused keep-alive connections: {}; " + - "# of expired connections: {}; # of connections closed: {}; " + - "# of connections terminated upon accumulator termination: {}", - trafficToHttpTransactionAccumulator.numberOfConnectionsCreated(), - trafficToHttpTransactionAccumulator.numberOfRequestsOnReusedConnections(), - trafficToHttpTransactionAccumulator.numberOfConnectionsExpired(), - trafficToHttpTransactionAccumulator.numberOfConnectionsClosed(), - trafficToHttpTransactionAccumulator.numberOfRequestsTerminatedUponAccumulatorClose() - ); + wrapUpWorkAndEmitSummary(replayEngine, trafficToHttpTransactionAccumulator); if (shutdownFutureRef.get() == null) { assert requestToFinalWorkFuturesMap.size() == 0 : "expected to wait for all the in flight requests to fully flush and self destruct themselves"; - } else { - var e = shutdownReasonRef.get(); - if (e != null) { - throw e; // shutdown due to an error, preserve what the error was for any observers - } else { - // this was a shutdown due to a signal, so there's nothing to report or handle. - } } } } - ConcurrentHashMap liveRequests = new ConcurrentHashMap<>(); + private void wrapUpWorkAndEmitSummary(ReplayEngine replayEngine, CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator) throws ExecutionException, InterruptedException { + final var primaryLogLevel = Level.INFO; + final var secondaryLogLevel = Level.WARN; + var logLevel = primaryLogLevel; + for (var timeout = Duration.ofSeconds(60); ; timeout = timeout.multipliedBy(2)) { + if (shutdownFutureRef.get() != null) { + log.warn("Not waiting for work because the TrafficReplayer is shutting down."); + break; + } + try { + waitForRemainingWork(logLevel, timeout, replayEngine); + break; + } catch (TimeoutException e) { + log.atLevel(logLevel).log("Timed out while waiting for the remaining " + + "requests to be finalized..."); + logLevel = secondaryLogLevel; + } + } + if (requestToFinalWorkFuturesMap.size() > 0 || + exceptionRequestCount.get() > 0) { + log.atWarn().setMessage("{} in-flight requests being dropped due to pending shutdown; " + + "{} requests to the target threw an exception; " + + "{} requests were successfully processed.") + .addArgument(requestToFinalWorkFuturesMap.size()) + .addArgument(exceptionRequestCount.get()) + .addArgument(successfulRequestCount.get()) + .log(); + } else { + log.info(successfulRequestCount.get() + " requests were successfully processed."); + } + log.info("# of connections created: {}; # of requests on reused keep-alive connections: {}; " + + "# of expired connections: {}; # of connections closed: {}; " + + "# of connections terminated upon accumulator termination: {}", + trafficToHttpTransactionAccumulator.numberOfConnectionsCreated(), + trafficToHttpTransactionAccumulator.numberOfRequestsOnReusedConnections(), + trafficToHttpTransactionAccumulator.numberOfConnectionsExpired(), + trafficToHttpTransactionAccumulator.numberOfConnectionsClosed(), + trafficToHttpTransactionAccumulator.numberOfRequestsTerminatedUponAccumulatorClose() + ); + } + void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketConnectionTimeout, + BlockingTrafficSource trafficChunkStream, + TimeShifter timeShifter, + Consumer resultTupleConsumer) + throws TerminationException, ExecutionException, InterruptedException { + try { + setupRunAndWaitForReplay(observedPacketConnectionTimeout, trafficChunkStream, + timeShifter, resultTupleConsumer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new TerminationException(shutdownReasonRef.get(), e); + } catch (Throwable t) { + throw new TerminationException(shutdownReasonRef.get(), t); + } + if (shutdownReasonRef.get() != null) { + throw new TerminationException(shutdownReasonRef.get(), null); + } + // if nobody has run shutdown yet, do so now so that we can tear down the netty resources + shutdown(null).get(); // if somebody already HAD run shutdown, it will return the future already created + } @AllArgsConstructor class TrafficReplayerAccumulationCallbacks implements AccumulationCallbacks { @@ -623,18 +650,7 @@ private void waitForRemainingWork(Level logLevel, throws ExecutionException, InterruptedException, TimeoutException { Map.Entry>[] allRemainingWorkArray = requestToFinalWorkFuturesMap.entrySet().toArray(Map.Entry[]::new); - log.atLevel(logLevel).log("All remaining work to wait on " + allRemainingWorkArray.length); - if (log.isInfoEnabled()) { - LoggingEventBuilder loggingEventBuilderToUse = log.isTraceEnabled() ? log.atTrace() : log.atInfo(); - long itemLimit = log.isTraceEnabled() ? Long.MAX_VALUE : MAX_ITEMS_TO_SHOW_FOR_LEFTOVER_WORK_AT_INFO_LEVEL; - loggingEventBuilderToUse.setMessage(() -> " items: " + - Arrays.stream(allRemainingWorkArray) - .map(kvp -> kvp.getKey() + " --> " + - kvp.getValue().formatAsString(TrafficReplayer::formatWorkItem)) - .limit(itemLimit) - .collect(Collectors.joining("\n"))) - .log(); - } + writeStatusLogsForRemainingWork(logLevel, allRemainingWorkArray); // remember, this block is ONLY for the leftover items. Lots of other items have been processed // and were removed from the live map (hopefully) @@ -647,27 +663,8 @@ private void waitForRemainingWork(Level logLevel, if (allRemainingWorkFutureOrShutdownSignalRef.compareAndSet(null, allWorkFuture)) { allWorkFuture.get(timeout); } else { - try { - var finishedSignal = allRemainingWorkFutureOrShutdownSignalRef.get().future; - assert finishedSignal.isDone() : "Expected this reference to be EITHER the current work futures " + - "or a sentinel value indicating a shutdown has commenced. The signal, when set, should " + - "have been completed at the time that the reference was set"; - finishedSignal.get(); - log.debug("Did shutdown cleanly"); - } catch (ExecutionException e) { - var c = e.getCause(); - if (c instanceof Error) { - throw (Error) c; - } - else throw e; - } catch (Error t) { - log.atError().setCause(t).setMessage(() -> "Not waiting for all work to finish. " + - "The TrafficReplayer is shutting down").log(); - throw t; - } + handleAlreadySetFinishedSignal(); } - } catch (CancellationException e) { - throw shutdownReasonRef.get(); } catch (TimeoutException e) { var didCancel = allWorkFuture.future.cancel(true); if (!didCancel) { @@ -689,6 +686,43 @@ private void waitForRemainingWork(Level logLevel, // rest of the cleanup to finish, as per the name of the function } + private void handleAlreadySetFinishedSignal() throws InterruptedException, ExecutionException { + try { + var finishedSignal = allRemainingWorkFutureOrShutdownSignalRef.get().future; + assert finishedSignal.isDone() : "Expected this reference to be EITHER the current work futures " + + "or a sentinel value indicating a shutdown has commenced. The signal, when set, should " + + "have been completed at the time that the reference was set"; + finishedSignal.get(); + log.debug("Did shutdown cleanly"); + } catch (ExecutionException e) { + var c = e.getCause(); + if (c instanceof Error) { + throw (Error) c; + } else { + throw e; + } + } catch (Error t) { + log.atError().setCause(t).setMessage(() -> "Not waiting for all work to finish. " + + "The TrafficReplayer is shutting down").log(); + throw t; + } + } + + private static void writeStatusLogsForRemainingWork(Level logLevel, Map.Entry>[] allRemainingWorkArray) { + log.atLevel(logLevel).log("All remaining work to wait on " + allRemainingWorkArray.length); + if (log.isInfoEnabled()) { + LoggingEventBuilder loggingEventBuilderToUse = log.isTraceEnabled() ? log.atTrace() : log.atInfo(); + long itemLimit = log.isTraceEnabled() ? Long.MAX_VALUE : MAX_ITEMS_TO_SHOW_FOR_LEFTOVER_WORK_AT_INFO_LEVEL; + loggingEventBuilderToUse.setMessage(() -> " items: " + + Arrays.stream(allRemainingWorkArray) + .map(kvp -> kvp.getKey() + " --> " + + kvp.getValue().formatAsString(TrafficReplayer::formatWorkItem)) + .limit(itemLimit) + .collect(Collectors.joining("\n"))) + .log(); + } + } + private static String formatWorkItem(DiagnosticTrackableCompletableFuture cf) { try { var resultValue = cf.get(); @@ -750,7 +784,7 @@ private static SourceTargetCaptureTuple getSourceTargetCaptureTuple(UniqueReplay // It might be safer to chain this work directly inside the scheduleWork call above so that the // read buffer horizons aren't set after the transformation work finishes, but after the packets // are fully handled - var sendFuture = transformationCompleteFuture.thenCompose(transformedResult -> + return transformationCompleteFuture.thenCompose(transformedResult -> replayEngine.scheduleRequest(requestKey, start, end, transformedResult.transformedOutput.size(), transformedResult.transformedOutput.streamRetained()) @@ -765,7 +799,6 @@ private static SourceTargetCaptureTuple getSourceTargetCaptureTuple(UniqueReplay () -> "transitioning transformed packets onto the wire") .map(future->future.exceptionally(t->new TransformedTargetRequestAndResponse(null, null, t)), ()->"Checking for exception out of sending data to the target server"); - return sendFuture; } catch (Exception e) { log.debug("Caught exception in writeToSocket, so failing future"); return StringTrackableCompletableFuture.failedFuture(e, ()->"TrafficReplayer.writeToSocketAndClose"); @@ -829,9 +862,9 @@ public void stopReadingAsync() { break; } } - var rval = shutdownFutureRef.get(); + var shutdownFuture = shutdownFutureRef.get(); log.atWarn().setMessage(()->"Shutdown procedure has finished").log(); - return rval; + return shutdownFuture; } public void pullCaptureFromSourceToAccumulator( diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java index e2e4a6870f..5d61e10819 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java @@ -25,13 +25,11 @@ import org.opensearch.migrations.trafficcapture.protos.TrafficStream; import org.opensearch.migrations.trafficcapture.protos.TrafficStreamUtils; import org.opensearch.migrations.transform.StaticAuthTransformerFactory; +import org.slf4j.event.Level; import org.testcontainers.containers.KafkaContainer; import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.shaded.org.apache.commons.io.output.NullOutputStream; -import org.testcontainers.utility.DockerImageName; -import java.io.BufferedOutputStream; import java.io.EOFException; import java.time.Duration; import java.time.Instant; @@ -108,6 +106,16 @@ public void fullTest() throws Exception { } }; + runReplayerUntilSourceWasExhausted(httpServer, trafficSourceSupplier, tupleReceiver); + + //Assertions.assertEquals(); + log.error("done"); + } + + private static void runReplayerUntilSourceWasExhausted(SimpleNettyHttpServer httpServer, + Supplier trafficSourceSupplier, + Consumer tupleReceiver) + throws Exception { for (AtomicInteger runNumberRef = new AtomicInteger(); true; runNumberRef.incrementAndGet()) { int runNumber = runNumberRef.get(); try { @@ -115,17 +123,20 @@ public void fullTest() throws Exception { Assertions.assertEquals(runNumber, runNumberRef.get()); tupleReceiver.accept(t); }); - } catch (FabricatedErrorToKillTheReplayer e) { - if (e.doneWithTest) { - break; - } else { - log.error("broke out of the replayer, but the doneWithTest flag was false"); - } + // if this finished running without an exception, we need to stop the loop + break; + } catch (TrafficReplayer.TerminationException e) { + log.atLevel(e.shutdownCause instanceof FabricatedErrorToKillTheReplayer ? Level.INFO : Level.ERROR) + .setCause(e.shutdownCause) + .setMessage(()->"broke out of the replayer, with this shutdown reason") + .log(); + log.atLevel(e.immediateCause == null ? Level.INFO : Level.ERROR) + .setCause(e.immediateCause) + .setMessage(()->"broke out of the replayer, with the shutdown cause=" + e.shutdownCause + + " and this immediate reason") + .log(); } } - - //Assertions.assertEquals(); - log.error("done"); } private Tuple2, ToIntFunction> @@ -142,18 +153,13 @@ public void fullTest() throws Exception { var testCaseArr = generatedCases.toArray(TrafficStreamGenerator.RandomTrafficStreamAndTransactionSizes[]::new); var shuffledStreams = randomlyInterleaveStreams(r, Arrays.stream(testCaseArr).map(c->Arrays.stream(c.trafficStreams))); - var numExpectedRequests = Arrays.stream(testCaseArr).mapToInt(c->c.requestByteSizes.length).sum(); var previouslyCompletelyHandledItems = new ConcurrentHashMap(); return new Tuple2<>(shuffledStreams, t -> { var key = t.uniqueRequestKey; var keyString = key.getTrafficStreamKey() + "_" + key.getSourceRequestIndex(); previouslyCompletelyHandledItems.put(keyString, t); - var newSize = previouslyCompletelyHandledItems.size(); - if (newSize >= numExpectedRequests) { - throw new FabricatedErrorToKillTheReplayer(true); - } - return newSize; + return previouslyCompletelyHandledItems.size(); }); } @@ -335,14 +341,10 @@ private static void runTrafficReplayer(Supplier cap TrafficReplayer.buildDefaultJsonTransformer(httpServer.localhostEndpoint().getHost())); try (var os = new NullOutputStream(); - var bos = new BufferedOutputStream(os); var trafficSource = captureSourceSupplier.get(); var blockingTrafficSource = new BlockingTrafficSource(trafficSource, Duration.ofMinutes(2))) { - tr.runReplayWithIOStreams(Duration.ofSeconds(70), blockingTrafficSource, bos, + tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(70), blockingTrafficSource, new TimeShifter(10 * 1000), tupleReceiver); - } catch (Exception e) { - log.atError().setCause(e).setMessage(() -> "eating exception to check for memory leaks.").log(); - throw new RuntimeException(e); } } }