From 3c267b3693fb281818fac57aaa104f5df9ab2884 Mon Sep 17 00:00:00 2001 From: Christopher Grote Date: Mon, 24 Jun 2024 21:49:27 +0100 Subject: [PATCH] Initial support for sending OpenLineage events via SDK Signed-off-by: Christopher Grote --- gradle/libs.versions.toml | 2 + sdk/build.gradle.kts | 1 + sdk/src/main/java/com/atlan/AtlanClient.java | 4 ++ .../java/com/atlan/api/ChronosEndpoint.java | 22 +++++++ .../com/atlan/api/OpenLineageEndpoint.java | 62 ++++++++++++++++++ .../java/com/atlan/exception/ErrorCode.java | 6 ++ .../exception/InvalidRequestException.java | 4 ++ .../java/com/atlan/model/core/AtlanError.java | 5 ++ .../atlan/model/lineage/OpenLineageEvent.java | 38 +++++++++++ .../main/java/com/atlan/net/ApiResource.java | 59 +++++++++++++++++ .../com/atlan/net/AtlanResponseGetter.java | 22 +++++++ .../atlan/net/LiveAtlanResponseGetter.java | 64 +++++++++++++++++-- 12 files changed, 283 insertions(+), 6 deletions(-) create mode 100644 sdk/src/main/java/com/atlan/api/ChronosEndpoint.java create mode 100644 sdk/src/main/java/com/atlan/api/OpenLineageEndpoint.java create mode 100644 sdk/src/main/java/com/atlan/model/lineage/OpenLineageEvent.java diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 26e7c4081c..3ddd6a35ba 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -26,6 +26,7 @@ pkl = "0.25.3" adls = "12.19.0" azure = "1.12.1" guava = "33.1.0-jre" +openlineage = "1.17.1" [libraries] jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" } @@ -62,6 +63,7 @@ jakarta-mail = { module = "com.sun.mail:jakarta.mail", version.ref = "jakarta-ma angus-mail = { module = "org.eclipse.angus:angus-mail", version.ref = "angus-mail" } pkl-config = { module = "org.pkl-lang:pkl-config-kotlin", version.ref = "pkl" } pkl-codegen = { module = "org.pkl-lang:pkl-codegen-kotlin", version.ref = "pkl" } +openlineage = { module = "io.openlineage:openlineage-java", version.ref = "openlineage" } [bundles] java-test = [ "jnanoid", "testng", "wiremock" ] diff --git a/sdk/build.gradle.kts b/sdk/build.gradle.kts index 92c211240f..0724604fbc 100644 --- a/sdk/build.gradle.kts +++ b/sdk/build.gradle.kts @@ -46,6 +46,7 @@ dependencies { api(libs.slf4j) api(libs.elasticsearch.java) api(libs.freemarker) + api(libs.openlineage) implementation(libs.classgraph) testImplementation(libs.bundles.java.test) testImplementation(libs.bundles.log4j) diff --git a/sdk/src/main/java/com/atlan/AtlanClient.java b/sdk/src/main/java/com/atlan/AtlanClient.java index 8a19d6b84b..1e7334231a 100644 --- a/sdk/src/main/java/com/atlan/AtlanClient.java +++ b/sdk/src/main/java/com/atlan/AtlanClient.java @@ -154,6 +154,9 @@ public class AtlanClient { /** Endpoint with operations to manage SSO configuration. */ public final SSOEndpoint sso; + /** Endpoint with operations to interact with OpenLineage. */ + public final OpenLineageEndpoint openLineage; + /** Client-aware asset deserializer. */ @Getter private final AssetDeserializer assetDeserializer; @@ -204,6 +207,7 @@ public class AtlanClient { credentials = new CredentialsEndpoint(this); tasks = new TaskEndpoint(this); sso = new SSOEndpoint(this); + openLineage = new OpenLineageEndpoint(this); atlanTagCache = new AtlanTagCache(typeDefs); customMetadataCache = new CustomMetadataCache(typeDefs); enumCache = new EnumCache(typeDefs); diff --git a/sdk/src/main/java/com/atlan/api/ChronosEndpoint.java b/sdk/src/main/java/com/atlan/api/ChronosEndpoint.java new file mode 100644 index 0000000000..5912888e72 --- /dev/null +++ b/sdk/src/main/java/com/atlan/api/ChronosEndpoint.java @@ -0,0 +1,22 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2023 Atlan Pte. Ltd. */ +package com.atlan.api; + +import com.atlan.AtlanClient; +import com.atlan.exception.ApiConnectionException; + +/** + * Base class for all API endpoints that ultimately access Chronos-surfaced services. + */ +public abstract class ChronosEndpoint extends AbstractEndpoint { + private static final String PREFIX = "/events/openlineage"; + private static final String SERVICE = "http://chronos-service.kong.svc.cluster.local"; + + protected ChronosEndpoint(AtlanClient client) { + super(client); + } + + protected String getBaseUrl() throws ApiConnectionException { + return getBaseUrl(SERVICE, PREFIX); + } +} diff --git a/sdk/src/main/java/com/atlan/api/OpenLineageEndpoint.java b/sdk/src/main/java/com/atlan/api/OpenLineageEndpoint.java new file mode 100644 index 0000000000..c0eac95558 --- /dev/null +++ b/sdk/src/main/java/com/atlan/api/OpenLineageEndpoint.java @@ -0,0 +1,62 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2022 Atlan Pte. Ltd. */ +package com.atlan.api; + +import com.atlan.AtlanClient; +import com.atlan.exception.AtlanException; +import com.atlan.exception.AuthenticationException; +import com.atlan.exception.ErrorCode; +import com.atlan.exception.InvalidRequestException; +import com.atlan.model.enums.AtlanConnectorType; +import com.atlan.model.lineage.OpenLineageEvent; +import com.atlan.net.ApiResource; +import com.atlan.net.RequestOptions; + +/** + * API endpoints for interacting with OpenLineage. + */ +public class OpenLineageEndpoint extends ChronosEndpoint { + + private static final String endpoint = "api/v1/lineage"; + + public OpenLineageEndpoint(AtlanClient client) { + super(client); + } + + /** + * Sends the OpenLineage event to Atlan to be consumed. + * + * @param request OpenLineage event to send + * @param connectorType of the connection that should receive the OpenLineage event + * @throws AtlanException on any issues with API communication + */ + public void send(OpenLineageEvent request, AtlanConnectorType connectorType) throws AtlanException { + send(request, connectorType, null); + } + + /** + * Sends the OpenLineage event to Atlan to be consumed. + * + * @param request OpenLineage event to send + * @param connectorType of the connection that should receive the OpenLineage event + * @param options to override default client settings + * @throws AtlanException on any issues with API communication + */ + public void send(OpenLineageEvent request, AtlanConnectorType connectorType, RequestOptions options) + throws AtlanException { + String url = String.format("%s/%s/%s", getBaseUrl(), connectorType.getValue(), endpoint); + try { + ApiResource.requestPlainText(client, ApiResource.RequestMethod.POST, url, request, options); + } catch (AuthenticationException e) { + if (e.getAtlanError() != null + && e.getAtlanError().getErrorMessage() != null + && e.getAtlanError() + .getErrorMessage() + .startsWith("Unauthorized: url path not configured to receive data, urlPath:")) { + throw new InvalidRequestException(ErrorCode.OPENLINEAGE_NOT_CONFIGURED, e, connectorType.getValue()); + } else { + throw e; + } + } + } +} diff --git a/sdk/src/main/java/com/atlan/exception/ErrorCode.java b/sdk/src/main/java/com/atlan/exception/ErrorCode.java index 508d755286..5c632d9bad 100644 --- a/sdk/src/main/java/com/atlan/exception/ErrorCode.java +++ b/sdk/src/main/java/com/atlan/exception/ErrorCode.java @@ -262,6 +262,12 @@ public enum ErrorCode implements ExceptionMessageSet { "ATLAN-JAVA-400-047", "Expected {0} database name(s) matching the given pattern {1} but found {2}.", "Use a more restrictive regular expression."), + OPENLINEAGE_NOT_CONFIGURED( + 400, + "ATLAN-JAVA-400-048", + "Requested OpenLineage connector type {0} is not configured.", + "You must first run the appropriate marketplace package to configure OpenLineage for this connector before you can send events for it."), + AUTHENTICATION_PASSTHROUGH( 401, "ATLAN-JAVA-401-000", diff --git a/sdk/src/main/java/com/atlan/exception/InvalidRequestException.java b/sdk/src/main/java/com/atlan/exception/InvalidRequestException.java index e1a737ad18..d269457ac4 100644 --- a/sdk/src/main/java/com/atlan/exception/InvalidRequestException.java +++ b/sdk/src/main/java/com/atlan/exception/InvalidRequestException.java @@ -31,4 +31,8 @@ public InvalidRequestException(ErrorCode error) { public InvalidRequestException(ErrorCode error, Throwable e) { super(error, e); } + + public InvalidRequestException(ErrorCode error, Throwable e, String... params) { + super(error, e, params); + } } diff --git a/sdk/src/main/java/com/atlan/model/core/AtlanError.java b/sdk/src/main/java/com/atlan/model/core/AtlanError.java index 0ef36729b3..e0b0e6e254 100644 --- a/sdk/src/main/java/com/atlan/model/core/AtlanError.java +++ b/sdk/src/main/java/com/atlan/model/core/AtlanError.java @@ -14,6 +14,11 @@ public class AtlanError extends AtlanObject { private static final long serialVersionUID = 2L; + public AtlanError(long code, String message) { + this.code = code; + this.errorMessage = message; + } + /** A numeric error code. */ Long code; diff --git a/sdk/src/main/java/com/atlan/model/lineage/OpenLineageEvent.java b/sdk/src/main/java/com/atlan/model/lineage/OpenLineageEvent.java new file mode 100644 index 0000000000..0dfd1512b6 --- /dev/null +++ b/sdk/src/main/java/com/atlan/model/lineage/OpenLineageEvent.java @@ -0,0 +1,38 @@ +/* SPDX-License-Identifier: Apache-2.0 + Copyright 2022 Atlan Pte. Ltd. */ +package com.atlan.model.lineage; + +import com.atlan.AtlanClient; +import com.atlan.model.core.AtlanObject; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClientUtils; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import lombok.extern.jackson.Jacksonized; + +/** + * Base class for handling OpenLineage events, passing through to the OpenLineage Java SDK + * but wrapping events such that they are handled appropriately in the Atlan Java SDK. + */ +@Getter +@Jacksonized +@SuperBuilder(toBuilder = true, builderMethodName = "_internal") +@EqualsAndHashCode(callSuper = false) +@ToString(callSuper = true) +public class OpenLineageEvent extends AtlanObject { + private static final long serialVersionUID = 2L; + + OpenLineage.BaseEvent event; + + public OpenLineageEvent(OpenLineage.BaseEvent event) { + this.event = event; + } + + /** {@inheritDoc} */ + @Override + public String toJson(AtlanClient client) { + return OpenLineageClientUtils.toJson(event); + } +} diff --git a/sdk/src/main/java/com/atlan/net/ApiResource.java b/sdk/src/main/java/com/atlan/net/ApiResource.java index 7940e9f764..97a38672cf 100644 --- a/sdk/src/main/java/com/atlan/net/ApiResource.java +++ b/sdk/src/main/java/com/atlan/net/ApiResource.java @@ -312,6 +312,29 @@ public static T request( return request(client, method, url, payload.toJson(client), clazz, options); } + /** + * Pass-through to the request-handling method after confirming that the provided payload is non-null. + * This uses special handling of the response, where the response is plain text rather than JSON. + * + * @param client connectivity to Atlan + * @param method for the request + * @param url of the request + * @param payload to send in the request + * @param options for sending the request (or null to use global defaults) + * @return the response + * @throws AtlanException on any API interaction problem + */ + public static String requestPlainText( + AtlanClient client, + ApiResource.RequestMethod method, + String url, + AtlanObject payload, + RequestOptions options) + throws AtlanException { + checkNullTypedParams(url, payload); + return requestPlainText(client, method, url, payload.toJson(client), options); + } + /** * Pass-through the request to the request-handling method. * This method wraps debug-level logging lines around the request to show precisely what was constructed and sent @@ -360,6 +383,42 @@ public static T request( return response; } + /** + * Pass-through the request to the request-handling method. + * This method wraps debug-level logging lines around the request to show precisely what was constructed and sent + * to Atlan and precisely what was returned (prior to deserialization). + * This handles the response as plain text rather than JSON. + * + * @param client connectivity to Atlan + * @param method for the request + * @param url of the request + * @param body to send in the request, if any (to not send any use an empty string) + * @param options for sending the request (or null to use global defaults) + * @return the response + * @throws AtlanException on any API interaction problem + */ + public static String requestPlainText( + AtlanClient client, ApiResource.RequestMethod method, String url, String body, RequestOptions options) + throws AtlanException { + // Create a unique ID for every request, and add it to the logging context and header + String requestId = UUID.randomUUID().toString(); + MDC.put("X-Atlan-Request-Id", requestId); + log.debug("({}) {} with: {}", method, url, body); + String response = + ApiResource.atlanResponseGetter.requestPlainText(client, method, url, body, options, requestId); + // Ensure we reset the Atlan request ID, so we always have the context from the original + // request that was made (even if it in turn triggered off other requests) + MDC.put("X-Atlan-Request-Id", requestId); + if (log.isDebugEnabled()) { + if (response != null) { + log.debug(" ... response: {}", response); + } else { + log.debug(" ... empty response."); + } + } + return response; + } + /** * Pass-through the request to the request-handling method, for file uploads. * This method wraps debug-level logging lines around the request to show precisely what was constructed and sent diff --git a/sdk/src/main/java/com/atlan/net/AtlanResponseGetter.java b/sdk/src/main/java/com/atlan/net/AtlanResponseGetter.java index ce9898ca4c..f0c3f0820b 100644 --- a/sdk/src/main/java/com/atlan/net/AtlanResponseGetter.java +++ b/sdk/src/main/java/com/atlan/net/AtlanResponseGetter.java @@ -59,6 +59,28 @@ T request( String requestId) throws AtlanException; + /** + * Send a request to an Atlan API, when a response is expected. + * This handles the response as plain text, rather than JSON. + * + * @param client connectivity to Atlan + * @param method to use for the request + * @param url of the endpoint (with all path and query parameters) for the request + * @param body payload for the request, if any + * @param options any alternative options to use for the request, or null to use default options + * @param requestId unique identifier (GUID) of a single request to Atlan + * @return the response of the request + * @throws AtlanException on any API interaction problem, indicating the type of problem encountered + */ + String requestPlainText( + AtlanClient client, + ApiResource.RequestMethod method, + String url, + String body, + RequestOptions options, + String requestId) + throws AtlanException; + /** * Send a request to an Atlan API, when an event-stream response is expected. * diff --git a/sdk/src/main/java/com/atlan/net/LiveAtlanResponseGetter.java b/sdk/src/main/java/com/atlan/net/LiveAtlanResponseGetter.java index fc75b3bf2c..b396397a8f 100644 --- a/sdk/src/main/java/com/atlan/net/LiveAtlanResponseGetter.java +++ b/sdk/src/main/java/com/atlan/net/LiveAtlanResponseGetter.java @@ -82,6 +82,20 @@ public T request( return request(request, clazz); } + /** {@inheritDoc} */ + @Override + public String requestPlainText( + AtlanClient client, + ApiResource.RequestMethod method, + String url, + String body, + RequestOptions options, + String requestId) + throws AtlanException { + AtlanRequest request = new AtlanRequest(client, method, url, body, options, requestId); + return requestPlainText(request); + } + /** {@inheritDoc} */ @Override public T requestStream( @@ -165,6 +179,26 @@ private T request(AtlanRequest request, Class return resource; } + /** + * Makes a request to Atlan's API. + * + * @param request bundled details of the request to make + * @return the response of the request + * @throws AtlanException on any API interaction problem, indicating the type of problem encountered + */ + private String requestPlainText(AtlanRequest request) throws AtlanException { + AtlanResponse response = httpClient.requestWithRetries(request); + + int responseCode = response.code(); + String responseBody = response.body(); + + if (responseCode < 200 || responseCode >= 300) { + handleApiError(responseCode, responseBody); + } + + return responseBody; + } + /** * Makes a request to Atlan's API. * @@ -259,7 +293,26 @@ private static void handleApiError(AtlanResponse response) throws AtlanException raiseMalformedJsonError(response.body(), response.code(), null); } - raiseError(response, error); + raiseError(response.code(), error); + } + + /** + * Detect specific exceptions based primarily on the response code received from Atlan. + * + * @param code numeric response code + * @param body of the response received from an API call + * @throws AtlanException a more specific exception, based on the details of that response + */ + private static void handleApiError(int code, String body) throws AtlanException { + + // Check for a 500 response first -- if found, we won't have a JSON body to parse, + // so preemptively exit with a generic ApiException pass-through. + if (code == 500) { + throw new ApiException(ErrorCode.ERROR_PASSTHROUGH, null, "" + code, body == null ? "" : body); + } + + AtlanError error = new AtlanError(code, body); + raiseError(code, error); } /** @@ -291,20 +344,19 @@ private static void handleApiError(AtlanEventStreamResponse response) throws Atl raiseMalformedJsonError(response.body().toString(), response.code(), null); } - raiseError(response, error); + raiseError(response.code(), error); } /** * Raise an Atlan-specific exception based on the response code. * - * @param response received from an API call + * @param code numeric response code received from an API call * @param error error details parsed from the response - * @param type of the response (unused here) * @throws AtlanException a more specific exception, based on the details of the response */ - private static void raiseError(AbstractAtlanResponse response, AtlanError error) throws AtlanException { + private static void raiseError(int code, AtlanError error) throws AtlanException { AtlanException exception; - switch (response.code()) { + switch (code) { case 400: exception = new InvalidRequestException( ErrorCode.INVALID_REQUEST_PASSTHROUGH, error.findCode(), error.findMessage());