Skip to content

Commit

Permalink
Merge pull request #701 from atlanhq/DVX-522
Browse files Browse the repository at this point in the history
Initial support for sending OpenLineage events via SDK
  • Loading branch information
cmgrote authored Jun 24, 2024
2 parents 7c0301d + d58c412 commit 48a1653
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 6 deletions.
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pkl = "0.25.3"
adls = "12.19.0"
azure = "1.12.1"
guava = "33.1.0-jre"
openlineage = "1.17.1"

[libraries]
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
Expand Down Expand Up @@ -62,6 +63,7 @@ jakarta-mail = { module = "com.sun.mail:jakarta.mail", version.ref = "jakarta-ma
angus-mail = { module = "org.eclipse.angus:angus-mail", version.ref = "angus-mail" }
pkl-config = { module = "org.pkl-lang:pkl-config-kotlin", version.ref = "pkl" }
pkl-codegen = { module = "org.pkl-lang:pkl-codegen-kotlin", version.ref = "pkl" }
openlineage = { module = "io.openlineage:openlineage-java", version.ref = "openlineage" }

[bundles]
java-test = [ "jnanoid", "testng", "wiremock" ]
Expand Down
1 change: 1 addition & 0 deletions sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
api(libs.slf4j)
api(libs.elasticsearch.java)
api(libs.freemarker)
api(libs.openlineage)
implementation(libs.classgraph)
testImplementation(libs.bundles.java.test)
testImplementation(libs.bundles.log4j)
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/main/java/com/atlan/AtlanClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public class AtlanClient {
/** Endpoint with operations to manage SSO configuration. */
public final SSOEndpoint sso;

/** Endpoint with operations to interact with OpenLineage. */
public final OpenLineageEndpoint openLineage;

/** Client-aware asset deserializer. */
@Getter
private final AssetDeserializer assetDeserializer;
Expand Down Expand Up @@ -204,6 +207,7 @@ public class AtlanClient {
credentials = new CredentialsEndpoint(this);
tasks = new TaskEndpoint(this);
sso = new SSOEndpoint(this);
openLineage = new OpenLineageEndpoint(this);
atlanTagCache = new AtlanTagCache(typeDefs);
customMetadataCache = new CustomMetadataCache(typeDefs);
enumCache = new EnumCache(typeDefs);
Expand Down
22 changes: 22 additions & 0 deletions sdk/src/main/java/com/atlan/api/ChronosEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.api;

import com.atlan.AtlanClient;
import com.atlan.exception.ApiConnectionException;

/**
* Base class for all API endpoints that ultimately access Chronos-surfaced services.
*/
public abstract class ChronosEndpoint extends AbstractEndpoint {
private static final String PREFIX = "/events/openlineage";
private static final String SERVICE = "http://chronos-service.kong.svc.cluster.local";

protected ChronosEndpoint(AtlanClient client) {
super(client);
}

protected String getBaseUrl() throws ApiConnectionException {
return getBaseUrl(SERVICE, PREFIX);
}
}
62 changes: 62 additions & 0 deletions sdk/src/main/java/com/atlan/api/OpenLineageEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2022 Atlan Pte. Ltd. */
package com.atlan.api;

import com.atlan.AtlanClient;
import com.atlan.exception.AtlanException;
import com.atlan.exception.AuthenticationException;
import com.atlan.exception.ErrorCode;
import com.atlan.exception.InvalidRequestException;
import com.atlan.model.enums.AtlanConnectorType;
import com.atlan.model.lineage.OpenLineageEvent;
import com.atlan.net.ApiResource;
import com.atlan.net.RequestOptions;

/**
* API endpoints for interacting with OpenLineage.
*/
public class OpenLineageEndpoint extends ChronosEndpoint {

private static final String endpoint = "api/v1/lineage";

public OpenLineageEndpoint(AtlanClient client) {
super(client);
}

/**
* Sends the OpenLineage event to Atlan to be consumed.
*
* @param request OpenLineage event to send
* @param connectorType of the connection that should receive the OpenLineage event
* @throws AtlanException on any issues with API communication
*/
public void send(OpenLineageEvent request, AtlanConnectorType connectorType) throws AtlanException {
send(request, connectorType, null);
}

/**
* Sends the OpenLineage event to Atlan to be consumed.
*
* @param request OpenLineage event to send
* @param connectorType of the connection that should receive the OpenLineage event
* @param options to override default client settings
* @throws AtlanException on any issues with API communication
*/
public void send(OpenLineageEvent request, AtlanConnectorType connectorType, RequestOptions options)
throws AtlanException {
String url = String.format("%s/%s/%s", getBaseUrl(), connectorType.getValue(), endpoint);
try {
ApiResource.requestPlainText(client, ApiResource.RequestMethod.POST, url, request, options);
} catch (AuthenticationException e) {
if (e.getAtlanError() != null
&& e.getAtlanError().getErrorMessage() != null
&& e.getAtlanError()
.getErrorMessage()
.startsWith("Unauthorized: url path not configured to receive data, urlPath:")) {
throw new InvalidRequestException(ErrorCode.OPENLINEAGE_NOT_CONFIGURED, e, connectorType.getValue());
} else {
throw e;
}
}
}
}
6 changes: 6 additions & 0 deletions sdk/src/main/java/com/atlan/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ public enum ErrorCode implements ExceptionMessageSet {
"ATLAN-JAVA-400-047",
"Expected {0} database name(s) matching the given pattern {1} but found {2}.",
"Use a more restrictive regular expression."),
OPENLINEAGE_NOT_CONFIGURED(
400,
"ATLAN-JAVA-400-048",
"Requested OpenLineage connector type {0} is not configured.",
"You must first run the appropriate marketplace package to configure OpenLineage for this connector before you can send events for it."),

AUTHENTICATION_PASSTHROUGH(
401,
"ATLAN-JAVA-401-000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public InvalidRequestException(ErrorCode error) {
public InvalidRequestException(ErrorCode error, Throwable e) {
super(error, e);
}

public InvalidRequestException(ErrorCode error, Throwable e, String... params) {
super(error, e, params);
}
}
5 changes: 5 additions & 0 deletions sdk/src/main/java/com/atlan/model/core/AtlanError.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
public class AtlanError extends AtlanObject {
private static final long serialVersionUID = 2L;

public AtlanError(long code, String message) {
this.code = code;
this.errorMessage = message;
}

/** A numeric error code. */
Long code;

Expand Down
38 changes: 38 additions & 0 deletions sdk/src/main/java/com/atlan/model/lineage/OpenLineageEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2022 Atlan Pte. Ltd. */
package com.atlan.model.lineage;

import com.atlan.AtlanClient;
import com.atlan.model.core.AtlanObject;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;

/**
* Base class for handling OpenLineage events, passing through to the OpenLineage Java SDK
* but wrapping events such that they are handled appropriately in the Atlan Java SDK.
*/
@Getter
@Jacksonized
@SuperBuilder(toBuilder = true, builderMethodName = "_internal")
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class OpenLineageEvent extends AtlanObject {
private static final long serialVersionUID = 2L;

OpenLineage.BaseEvent event;

public OpenLineageEvent(OpenLineage.BaseEvent event) {
this.event = event;
}

/** {@inheritDoc} */
@Override
public String toJson(AtlanClient client) {
return OpenLineageClientUtils.toJson(event);
}
}
59 changes: 59 additions & 0 deletions sdk/src/main/java/com/atlan/net/ApiResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,29 @@ public static <T extends ApiResource> T request(
return request(client, method, url, payload.toJson(client), clazz, options);
}

/**
* Pass-through to the request-handling method after confirming that the provided payload is non-null.
* This uses special handling of the response, where the response is plain text rather than JSON.
*
* @param client connectivity to Atlan
* @param method for the request
* @param url of the request
* @param payload to send in the request
* @param options for sending the request (or null to use global defaults)
* @return the response
* @throws AtlanException on any API interaction problem
*/
public static String requestPlainText(
AtlanClient client,
ApiResource.RequestMethod method,
String url,
AtlanObject payload,
RequestOptions options)
throws AtlanException {
checkNullTypedParams(url, payload);
return requestPlainText(client, method, url, payload.toJson(client), options);
}

/**
* Pass-through the request to the request-handling method.
* This method wraps debug-level logging lines around the request to show precisely what was constructed and sent
Expand Down Expand Up @@ -360,6 +383,42 @@ public static <T extends ApiResource> T request(
return response;
}

/**
* Pass-through the request to the request-handling method.
* This method wraps debug-level logging lines around the request to show precisely what was constructed and sent
* to Atlan and precisely what was returned (prior to deserialization).
* This handles the response as plain text rather than JSON.
*
* @param client connectivity to Atlan
* @param method for the request
* @param url of the request
* @param body to send in the request, if any (to not send any use an empty string)
* @param options for sending the request (or null to use global defaults)
* @return the response
* @throws AtlanException on any API interaction problem
*/
public static String requestPlainText(
AtlanClient client, ApiResource.RequestMethod method, String url, String body, RequestOptions options)
throws AtlanException {
// Create a unique ID for every request, and add it to the logging context and header
String requestId = UUID.randomUUID().toString();
MDC.put("X-Atlan-Request-Id", requestId);
log.debug("({}) {} with: {}", method, url, body);
String response =
ApiResource.atlanResponseGetter.requestPlainText(client, method, url, body, options, requestId);
// Ensure we reset the Atlan request ID, so we always have the context from the original
// request that was made (even if it in turn triggered off other requests)
MDC.put("X-Atlan-Request-Id", requestId);
if (log.isDebugEnabled()) {
if (response != null) {
log.debug(" ... response: {}", response);
} else {
log.debug(" ... empty response.");
}
}
return response;
}

/**
* Pass-through the request to the request-handling method, for file uploads.
* This method wraps debug-level logging lines around the request to show precisely what was constructed and sent
Expand Down
22 changes: 22 additions & 0 deletions sdk/src/main/java/com/atlan/net/AtlanResponseGetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ <T extends AtlanResponseInterface> T request(
String requestId)
throws AtlanException;

/**
* Send a request to an Atlan API, when a response is expected.
* This handles the response as plain text, rather than JSON.
*
* @param client connectivity to Atlan
* @param method to use for the request
* @param url of the endpoint (with all path and query parameters) for the request
* @param body payload for the request, if any
* @param options any alternative options to use for the request, or null to use default options
* @param requestId unique identifier (GUID) of a single request to Atlan
* @return the response of the request
* @throws AtlanException on any API interaction problem, indicating the type of problem encountered
*/
String requestPlainText(
AtlanClient client,
ApiResource.RequestMethod method,
String url,
String body,
RequestOptions options,
String requestId)
throws AtlanException;

/**
* Send a request to an Atlan API, when an event-stream response is expected.
*
Expand Down
Loading

0 comments on commit 48a1653

Please sign in to comment.