diff --git a/.gitignore b/.gitignore index cb8a3b7e..4cd4e4f0 100644 --- a/.gitignore +++ b/.gitignore @@ -20,6 +20,7 @@ # Maven target/ +dependency-reduced-pom.xml # Gradle .gradle diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java index 98b9bc8a..a35f2225 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java @@ -21,6 +21,7 @@ import com.google.cloud.functions.CloudEventsFunction; import com.google.cloud.functions.Context; import com.google.cloud.functions.RawBackgroundFunction; +import com.google.cloud.functions.invoker.gcf.ExecutionIdUtil; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.TypeAdapter; @@ -51,6 +52,7 @@ public final class BackgroundFunctionExecutor extends HttpServlet { private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker"); private final FunctionExecutor functionExecutor; + private final ExecutionIdUtil executionIdUtil = new ExecutionIdUtil(); private BackgroundFunctionExecutor(FunctionExecutor functionExecutor) { this.functionExecutor = functionExecutor; @@ -323,6 +325,7 @@ void serviceCloudEvent(CloudEvent cloudEvent) throws Exception { public void service(HttpServletRequest req, HttpServletResponse res) throws IOException { String contentType = req.getContentType(); try { + executionIdUtil.storeExecutionId(req); if ((contentType != null && contentType.startsWith("application/cloudevents+json")) || req.getHeader("ce-specversion") != null) { serviceCloudEvent(req); @@ -333,6 +336,8 @@ public void service(HttpServletRequest req, HttpServletResponse res) throws IOEx } catch (Throwable t) { res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); logger.log(Level.SEVERE, "Failed to execute " + functionExecutor.functionName(), t); + } finally { + executionIdUtil.removeExecutionId(); } } @@ -359,7 +364,14 @@ private void serviceCloudEvent(HttpServletRequest req) throws Exce // ServiceLoader.load // will throw ServiceConfigurationError. At this point we're still running with the default // context ClassLoader, which is the system ClassLoader that has loaded the code here. - runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent(data -> data))); + try { + executionIdUtil.storeExecutionId(req); + runWithContextClassLoader(() -> executor.serviceCloudEvent(reader.toEvent(data -> data))); + } catch (Throwable t) { + logger.log(Level.SEVERE, "Failed to execute " + executor.functionName(), t); + } finally { + executionIdUtil.removeExecutionId(); + } // The data->data is a workaround for a bug fixed since Milestone 4 of the SDK, in // https://github.com/cloudevents/sdk-java/pull/259. } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java index 401e22a2..01f07e74 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/HttpFunctionExecutor.java @@ -15,6 +15,7 @@ package com.google.cloud.functions.invoker; import com.google.cloud.functions.HttpFunction; +import com.google.cloud.functions.invoker.gcf.ExecutionIdUtil; import com.google.cloud.functions.invoker.http.HttpRequestImpl; import com.google.cloud.functions.invoker.http.HttpResponseImpl; import java.util.logging.Level; @@ -28,6 +29,7 @@ public class HttpFunctionExecutor extends HttpServlet { private static final Logger logger = Logger.getLogger("com.google.cloud.functions.invoker"); private final HttpFunction function; + private final ExecutionIdUtil executionIdUtil = new ExecutionIdUtil(); private HttpFunctionExecutor(HttpFunction function) { this.function = function; @@ -68,6 +70,7 @@ public void service(HttpServletRequest req, HttpServletResponse res) { HttpResponseImpl respImpl = new HttpResponseImpl(res); ClassLoader oldContextLoader = Thread.currentThread().getContextClassLoader(); try { + executionIdUtil.storeExecutionId(req); Thread.currentThread().setContextClassLoader(function.getClass().getClassLoader()); function.service(reqImpl, respImpl); } catch (Throwable t) { @@ -75,6 +78,7 @@ public void service(HttpServletRequest req, HttpServletResponse res) { res.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); } finally { Thread.currentThread().setContextClassLoader(oldContextLoader); + executionIdUtil.removeExecutionId(); respImpl.flush(); } } diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java new file mode 100644 index 00000000..7987317d --- /dev/null +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/ExecutionIdUtil.java @@ -0,0 +1,63 @@ +package com.google.cloud.functions.invoker.gcf; + +import java.util.Base64; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; +import java.util.logging.Handler; +import java.util.logging.Logger; +import javax.servlet.http.HttpServletRequest; + +/** + * A helper class that either fetches a unique execution id from request HTTP headers or generates a + * random id. + */ +public final class ExecutionIdUtil { + private static final Logger rootLogger = Logger.getLogger(""); + private static final int EXECUTION_ID_LENGTH = 12; + private static final String EXECUTION_ID_HTTP_HEADER = "HTTP_FUNCTION_EXECUTION_ID"; + private static final String LOG_EXECUTION_ID_ENV_NAME = "LOG_EXECUTION_ID"; + + private final Random random = ThreadLocalRandom.current(); + + /** + * Add mapping to root logger from current thread id to execution id. This mapping will be used to + * append the execution id to log lines. + */ + public void storeExecutionId(HttpServletRequest request) { + if (!executionIdLoggingEnabled()) { + return; + } + for (Handler handler : rootLogger.getHandlers()) { + if (handler instanceof JsonLogHandler) { + String id = getOrGenerateExecutionId(request); + ((JsonLogHandler) handler).addExecutionId(Thread.currentThread().getId(), id); + } + } + } + + /** Remove mapping from curent thread to request execution id */ + public void removeExecutionId() { + if (!executionIdLoggingEnabled()) { + return; + } + for (Handler handler : rootLogger.getHandlers()) { + if (handler instanceof JsonLogHandler) { + ((JsonLogHandler) handler).removeExecutionId(Thread.currentThread().getId()); + } + } + } + + private String getOrGenerateExecutionId(HttpServletRequest request) { + String executionId = request.getHeader(EXECUTION_ID_HTTP_HEADER); + if (executionId == null) { + byte[] array = new byte[EXECUTION_ID_LENGTH]; + random.nextBytes(array); + executionId = Base64.getEncoder().encodeToString(array); + } + return executionId; + } + + private boolean executionIdLoggingEnabled() { + return Boolean.parseBoolean(System.getenv().getOrDefault(LOG_EXECUTION_ID_ENV_NAME, "false")); + } +} diff --git a/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/JsonLogHandler.java b/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/JsonLogHandler.java index 9c94b92a..b21f78a1 100644 --- a/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/JsonLogHandler.java +++ b/invoker/core/src/main/java/com/google/cloud/functions/invoker/gcf/JsonLogHandler.java @@ -5,6 +5,8 @@ import java.io.StringWriter; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -24,6 +26,14 @@ public final class JsonLogHandler extends Handler { private final PrintStream out; private final boolean closePrintStreamOnClose; + // This map is used to track execution id for currently running Jetty requests. Mapping thread + // id to request works because of an implementation detail of Jetty thread pool handling. + // Jetty worker threads completely handle a request before beginning work on a new request. + // NOTE: Store thread id as a string to avoid comparison failures between int and long. + // + // Jetty Documentation (https://jetty.org/docs/jetty/10/programming-guide/arch/threads.html) + private static final ConcurrentMap executionIdByThreadMap = + new ConcurrentHashMap<>(); public JsonLogHandler(PrintStream out, boolean closePrintStreamOnClose) { this.out = out; @@ -38,6 +48,7 @@ public void publish(LogRecord record) { StringBuilder json = new StringBuilder("{"); appendSeverity(json, record); appendSourceLocation(json, record); + appendExecutionId(json, record); appendMessage(json, record); // must be last, see appendMessage json.append("}"); // We must output the log all at once (should only call println once per call to publish) @@ -96,6 +107,12 @@ private static void appendSourceLocation(StringBuilder json, LogRecord record) { json.append(SOURCE_LOCATION_KEY).append("{").append(String.join(", ", entries)).append("}, "); } + private void appendExecutionId(StringBuilder json, LogRecord record) { + json.append("\"execution_id\": \"") + .append(executionIdByThreadMap.get(Integer.toString(record.getThreadID()))) + .append("\", "); + } + private static String escapeString(String s) { return s.replace("\\", "\\\\").replace("\"", "\\\"").replace("\n", "\\n").replace("\r", "\\r"); } @@ -117,4 +134,12 @@ public void close() throws SecurityException { out.close(); } } + + public void addExecutionId(long threadId, String executionId) { + executionIdByThreadMap.put(Long.toString(threadId), executionId); + } + + public void removeExecutionId(long threadId) { + executionIdByThreadMap.remove(Long.toString(threadId)); + } } diff --git a/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java b/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java index 82197547..37d35ebd 100644 --- a/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java +++ b/invoker/core/src/test/java/com/google/cloud/functions/invoker/IntegrationTest.java @@ -28,6 +28,8 @@ import com.google.common.truth.Expect; import com.google.gson.Gson; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.stream.JsonReader; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; import io.cloudevents.core.format.EventFormat; @@ -39,6 +41,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.StringReader; import java.io.UncheckedIOException; import java.net.ServerSocket; import java.net.URI; @@ -89,6 +92,8 @@ public class IntegrationTest { @Rule public final TestName testName = new TestName(); private static final String SERVER_READY_STRING = "Started ServerConnector"; + private static final String EXECUTION_ID_HTTP_HEADER = "HTTP_FUNCTION_EXECUTION_ID"; + private static final String EXECUTION_ID = "1234abcd"; private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(); @@ -286,7 +291,10 @@ public void exceptionHttp() throws Exception { String exceptionExpectedOutput = "\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":" + " \"com/google/cloud/functions/invoker/HttpFunctionExecutor.java\", \"method\":" - + " \"service\"}, \"message\": \"Failed to execute" + + " \"service\"}, \"execution_id\": \"" + + EXECUTION_ID + + "\"," + + " \"message\": \"Failed to execute" + " com.google.cloud.functions.invoker.testfunctions.ExceptionHttp\\n" + "java.lang.RuntimeException: exception thrown for test"; testHttpFunction( @@ -294,6 +302,7 @@ public void exceptionHttp() throws Exception { ImmutableList.of( TestCase.builder() .setExpectedResponseCode(500) + .setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID)) .setExpectedOutput(exceptionExpectedOutput) .build())); } @@ -303,7 +312,10 @@ public void exceptionBackground() throws Exception { String exceptionExpectedOutput = "\"severity\": \"ERROR\", \"logging.googleapis.com/sourceLocation\": {\"file\":" + " \"com/google/cloud/functions/invoker/BackgroundFunctionExecutor.java\", \"method\":" - + " \"service\"}, \"message\": \"Failed to execute" + + " \"service\"}, \"execution_id\": \"" + + EXECUTION_ID + + "\", " + + "\"message\": \"Failed to execute" + " com.google.cloud.functions.invoker.testfunctions.ExceptionBackground\\n" + "java.lang.RuntimeException: exception thrown for test"; @@ -317,6 +329,7 @@ public void exceptionBackground() throws Exception { ImmutableList.of( TestCase.builder() .setRequestText(gcfRequestText) + .setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID)) .setExpectedResponseCode(500) .setExpectedOutput(exceptionExpectedOutput) .build()), @@ -359,13 +372,21 @@ public void stackDriverLogging() throws Exception { + "\"logging.googleapis.com/sourceLocation\": " + "{\"file\": \"com/google/cloud/functions/invoker/testfunctions/Log.java\"," + " \"method\": \"service\"}," + + " \"execution_id\": \"" + + EXECUTION_ID + + "\"," + " \"message\": \"blim\"}"; TestCase simpleTestCase = - TestCase.builder().setUrl("/?message=blim").setExpectedOutput(simpleExpectedOutput).build(); + TestCase.builder() + .setUrl("/?message=blim") + .setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID)) + .setExpectedOutput(simpleExpectedOutput) + .build(); String quotingExpectedOutput = "\"message\": \"foo\\nbar\\\""; TestCase quotingTestCase = TestCase.builder() .setUrl("/?message=" + URLEncoder.encode("foo\nbar\"", "UTF-8")) + .setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID)) .setExpectedOutput(quotingExpectedOutput) .build(); String exceptionExpectedOutput = @@ -373,11 +394,15 @@ public void stackDriverLogging() throws Exception { + "\"logging.googleapis.com/sourceLocation\": " + "{\"file\": \"com/google/cloud/functions/invoker/testfunctions/Log.java\", " + "\"method\": \"service\"}, " + + "\"execution_id\": \"" + + EXECUTION_ID + + "\", " + "\"message\": \"oops\\njava.lang.Exception: disaster\\n" + " at com.google.cloud.functions.invoker.testfunctions.Log.service(Log.java:"; TestCase exceptionTestCase = TestCase.builder() .setUrl("/?message=oops&level=severe&exception=disaster") + .setHttpHeaders(ImmutableMap.of(EXECUTION_ID_HTTP_HEADER, EXECUTION_ID)) .setExpectedOutput(exceptionExpectedOutput) .build(); testHttpFunction( @@ -753,7 +778,11 @@ private void testFunction( for (TestCase testCase : testCases) { testCase .expectedOutput() - .ifPresent(output -> expect.that(serverProcess.output()).contains(output)); + .ifPresent( + (output) -> { + expect.that(serverProcess.output()).contains(output); + parseLogJson(serverProcess.output()); + }); } // Wait for the output monitor task to terminate. If it threw an exception, we will get an // ExecutionException here. @@ -842,7 +871,9 @@ private ServerProcess startServer( "FUNCTION_SIGNATURE_TYPE", signatureType.toString(), "FUNCTION_TARGET", - target); + target, + "LOG_EXECUTION_ID", + "true"); processBuilder.environment().putAll(environment); processBuilder.environment().putAll(environmentVariables); Process serverProcess = processBuilder.start(); @@ -879,4 +910,12 @@ private void monitorOutput( throw new UncheckedIOException(e); } } + + // Attempt to parse Json object, throws on parse failure + private void parseLogJson(String json) throws RuntimeException { + System.out.println("trying to parse the following object "); + System.out.println(json); + JsonReader reader = new JsonReader(new StringReader(json)); + JsonParser.parseReader(reader); + } }