Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for sending OpenLineage events via SDK #701

Merged
merged 2 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pkl = "0.25.3"
adls = "12.19.0"
azure = "1.12.1"
guava = "33.1.0-jre"
openlineage = "1.17.1"

[libraries]
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
Expand Down Expand Up @@ -62,6 +63,7 @@ jakarta-mail = { module = "com.sun.mail:jakarta.mail", version.ref = "jakarta-ma
angus-mail = { module = "org.eclipse.angus:angus-mail", version.ref = "angus-mail" }
pkl-config = { module = "org.pkl-lang:pkl-config-kotlin", version.ref = "pkl" }
pkl-codegen = { module = "org.pkl-lang:pkl-codegen-kotlin", version.ref = "pkl" }
openlineage = { module = "io.openlineage:openlineage-java", version.ref = "openlineage" }

[bundles]
java-test = [ "jnanoid", "testng", "wiremock" ]
Expand Down
1 change: 1 addition & 0 deletions sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
api(libs.slf4j)
api(libs.elasticsearch.java)
api(libs.freemarker)
api(libs.openlineage)
implementation(libs.classgraph)
testImplementation(libs.bundles.java.test)
testImplementation(libs.bundles.log4j)
Expand Down
4 changes: 4 additions & 0 deletions sdk/src/main/java/com/atlan/AtlanClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ public class AtlanClient {
/** Endpoint with operations to manage SSO configuration. */
public final SSOEndpoint sso;

/** Endpoint with operations to interact with OpenLineage. */
public final OpenLineageEndpoint openLineage;

/** Client-aware asset deserializer. */
@Getter
private final AssetDeserializer assetDeserializer;
Expand Down Expand Up @@ -204,6 +207,7 @@ public class AtlanClient {
credentials = new CredentialsEndpoint(this);
tasks = new TaskEndpoint(this);
sso = new SSOEndpoint(this);
openLineage = new OpenLineageEndpoint(this);
atlanTagCache = new AtlanTagCache(typeDefs);
customMetadataCache = new CustomMetadataCache(typeDefs);
enumCache = new EnumCache(typeDefs);
Expand Down
22 changes: 22 additions & 0 deletions sdk/src/main/java/com/atlan/api/ChronosEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2023 Atlan Pte. Ltd. */
package com.atlan.api;

import com.atlan.AtlanClient;
import com.atlan.exception.ApiConnectionException;

/**
* Base class for all API endpoints that ultimately access Chronos-surfaced services.
*/
public abstract class ChronosEndpoint extends AbstractEndpoint {
private static final String PREFIX = "/events/openlineage";
private static final String SERVICE = "http://chronos-service.kong.svc.cluster.local";

protected ChronosEndpoint(AtlanClient client) {
super(client);
}

protected String getBaseUrl() throws ApiConnectionException {
return getBaseUrl(SERVICE, PREFIX);
}
}
62 changes: 62 additions & 0 deletions sdk/src/main/java/com/atlan/api/OpenLineageEndpoint.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2022 Atlan Pte. Ltd. */
package com.atlan.api;

import com.atlan.AtlanClient;
import com.atlan.exception.AtlanException;
import com.atlan.exception.AuthenticationException;
import com.atlan.exception.ErrorCode;
import com.atlan.exception.InvalidRequestException;
import com.atlan.model.enums.AtlanConnectorType;
import com.atlan.model.lineage.OpenLineageEvent;
import com.atlan.net.ApiResource;
import com.atlan.net.RequestOptions;

/**
* API endpoints for interacting with OpenLineage.
*/
public class OpenLineageEndpoint extends ChronosEndpoint {

private static final String endpoint = "api/v1/lineage";

public OpenLineageEndpoint(AtlanClient client) {
super(client);
}

/**
* Sends the OpenLineage event to Atlan to be consumed.
*
* @param request OpenLineage event to send
* @param connectorType of the connection that should receive the OpenLineage event
* @throws AtlanException on any issues with API communication
*/
public void send(OpenLineageEvent request, AtlanConnectorType connectorType) throws AtlanException {
send(request, connectorType, null);
}

/**
* Sends the OpenLineage event to Atlan to be consumed.
*
* @param request OpenLineage event to send
* @param connectorType of the connection that should receive the OpenLineage event
* @param options to override default client settings
* @throws AtlanException on any issues with API communication
*/
public void send(OpenLineageEvent request, AtlanConnectorType connectorType, RequestOptions options)
throws AtlanException {
String url = String.format("%s/%s/%s", getBaseUrl(), connectorType.getValue(), endpoint);
try {
ApiResource.requestPlainText(client, ApiResource.RequestMethod.POST, url, request, options);
} catch (AuthenticationException e) {
if (e.getAtlanError() != null
&& e.getAtlanError().getErrorMessage() != null
&& e.getAtlanError()
.getErrorMessage()
.startsWith("Unauthorized: url path not configured to receive data, urlPath:")) {
throw new InvalidRequestException(ErrorCode.OPENLINEAGE_NOT_CONFIGURED, e, connectorType.getValue());
} else {
throw e;
}
}
}
}
6 changes: 6 additions & 0 deletions sdk/src/main/java/com/atlan/exception/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,12 @@ public enum ErrorCode implements ExceptionMessageSet {
"ATLAN-JAVA-400-047",
"Expected {0} database name(s) matching the given pattern {1} but found {2}.",
"Use a more restrictive regular expression."),
OPENLINEAGE_NOT_CONFIGURED(
400,
"ATLAN-JAVA-400-048",
"Requested OpenLineage connector type {0} is not configured.",
"You must first run the appropriate marketplace package to configure OpenLineage for this connector before you can send events for it."),

AUTHENTICATION_PASSTHROUGH(
401,
"ATLAN-JAVA-401-000",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public InvalidRequestException(ErrorCode error) {
public InvalidRequestException(ErrorCode error, Throwable e) {
super(error, e);
}

public InvalidRequestException(ErrorCode error, Throwable e, String... params) {
super(error, e, params);
}
}
5 changes: 5 additions & 0 deletions sdk/src/main/java/com/atlan/model/core/AtlanError.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@
public class AtlanError extends AtlanObject {
private static final long serialVersionUID = 2L;

public AtlanError(long code, String message) {
this.code = code;
this.errorMessage = message;
}

/** A numeric error code. */
Long code;

Expand Down
38 changes: 38 additions & 0 deletions sdk/src/main/java/com/atlan/model/lineage/OpenLineageEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/* SPDX-License-Identifier: Apache-2.0
Copyright 2022 Atlan Pte. Ltd. */
package com.atlan.model.lineage;

import com.atlan.AtlanClient;
import com.atlan.model.core.AtlanObject;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import lombok.extern.jackson.Jacksonized;

/**
* Base class for handling OpenLineage events, passing through to the OpenLineage Java SDK
* but wrapping events such that they are handled appropriately in the Atlan Java SDK.
*/
@Getter
@Jacksonized
@SuperBuilder(toBuilder = true, builderMethodName = "_internal")
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class OpenLineageEvent extends AtlanObject {
private static final long serialVersionUID = 2L;

OpenLineage.BaseEvent event;

public OpenLineageEvent(OpenLineage.BaseEvent event) {
this.event = event;
}

/** {@inheritDoc} */
@Override
public String toJson(AtlanClient client) {
return OpenLineageClientUtils.toJson(event);
}
}
59 changes: 59 additions & 0 deletions sdk/src/main/java/com/atlan/net/ApiResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,29 @@ public static <T extends ApiResource> T request(
return request(client, method, url, payload.toJson(client), clazz, options);
}

/**
* Pass-through to the request-handling method after confirming that the provided payload is non-null.
* This uses special handling of the response, where the response is plain text rather than JSON.
*
* @param client connectivity to Atlan
* @param method for the request
* @param url of the request
* @param payload to send in the request
* @param options for sending the request (or null to use global defaults)
* @return the response
* @throws AtlanException on any API interaction problem
*/
public static String requestPlainText(
AtlanClient client,
ApiResource.RequestMethod method,
String url,
AtlanObject payload,
RequestOptions options)
throws AtlanException {
checkNullTypedParams(url, payload);
return requestPlainText(client, method, url, payload.toJson(client), options);
}

/**
* Pass-through the request to the request-handling method.
* This method wraps debug-level logging lines around the request to show precisely what was constructed and sent
Expand Down Expand Up @@ -360,6 +383,42 @@ public static <T extends ApiResource> T request(
return response;
}

/**
* Pass-through the request to the request-handling method.
* This method wraps debug-level logging lines around the request to show precisely what was constructed and sent
* to Atlan and precisely what was returned (prior to deserialization).
* This handles the response as plain text rather than JSON.
*
* @param client connectivity to Atlan
* @param method for the request
* @param url of the request
* @param body to send in the request, if any (to not send any use an empty string)
* @param options for sending the request (or null to use global defaults)
* @return the response
* @throws AtlanException on any API interaction problem
*/
public static String requestPlainText(
AtlanClient client, ApiResource.RequestMethod method, String url, String body, RequestOptions options)
throws AtlanException {
// Create a unique ID for every request, and add it to the logging context and header
String requestId = UUID.randomUUID().toString();
MDC.put("X-Atlan-Request-Id", requestId);
log.debug("({}) {} with: {}", method, url, body);
String response =
ApiResource.atlanResponseGetter.requestPlainText(client, method, url, body, options, requestId);
// Ensure we reset the Atlan request ID, so we always have the context from the original
// request that was made (even if it in turn triggered off other requests)
MDC.put("X-Atlan-Request-Id", requestId);
if (log.isDebugEnabled()) {
if (response != null) {
log.debug(" ... response: {}", response);
} else {
log.debug(" ... empty response.");
}
}
return response;
}

/**
* Pass-through the request to the request-handling method, for file uploads.
* This method wraps debug-level logging lines around the request to show precisely what was constructed and sent
Expand Down
22 changes: 22 additions & 0 deletions sdk/src/main/java/com/atlan/net/AtlanResponseGetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ <T extends AtlanResponseInterface> T request(
String requestId)
throws AtlanException;

/**
* Send a request to an Atlan API, when a response is expected.
* This handles the response as plain text, rather than JSON.
*
* @param client connectivity to Atlan
* @param method to use for the request
* @param url of the endpoint (with all path and query parameters) for the request
* @param body payload for the request, if any
* @param options any alternative options to use for the request, or null to use default options
* @param requestId unique identifier (GUID) of a single request to Atlan
* @return the response of the request
* @throws AtlanException on any API interaction problem, indicating the type of problem encountered
*/
String requestPlainText(
AtlanClient client,
ApiResource.RequestMethod method,
String url,
String body,
RequestOptions options,
String requestId)
throws AtlanException;

/**
* Send a request to an Atlan API, when an event-stream response is expected.
*
Expand Down
Loading
Loading