Skip to content

Add Kafka Transport support in Trino Open Lineage Plugin#22998

Closed
alprusty wants to merge 1 commit intotrinodb:masterfrom
alprusty:add-kafka-sink-ol
Closed

Add Kafka Transport support in Trino Open Lineage Plugin#22998
alprusty wants to merge 1 commit intotrinodb:masterfrom
alprusty:add-kafka-sink-ol

Conversation

@alprusty
Copy link
Contributor

@alprusty alprusty commented Aug 9, 2024

Description

Currently Trino open lineage plugin supports Console and Http transport. This change is adding a Kafka transport support.

Additional context and related issues

Fixes #21599

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

# General
* Add support for Kafka transport in Trino Open lineage Plugin (https://github.com/trinodb/trino/issues/21599)

Copy link
Member

@sopel39 sopel39 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm % testing comment


import java.util.Properties;

public class OpenLineageKafkaTransport
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add test like TestOpenLineageEventListenerMarquezIntegration.java but with Kafka?

See also #22888 for similar test with Kafka cc @marton-bod

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for integration tests with kafka

<failOnWarning>false</failOnWarning>
<ignoredUnusedDeclaredDependencies>
<!-- kafka-clients is needed on class path for KafkaTransport to work in openlineage-java lib -->
<ignoredUnusedDeclaredDependency>kafka-clients:::</ignoredUnusedDeclaredDependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use them as a runtime dependency explicitly ?

Comment on lines +32 to +37
private final String acks = "acks";
private final String acksDefault = "all";
private final String keySerializer = "key.serializer";
private final String keySerializerDefault = "org.apache.kafka.common.serialization.StringSerializer";
private final String valueSerializer = "value.serializer";
private final String valueSerializerDefault = "org.apache.kafka.common.serialization.StringSerializer";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about having them as a static final variables ?

Copy link
Member

@sopel39 sopel39 Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup. Either static final variables or configurable via OpenLineageClientKafkaTransportConfig if (some) they need to be changed (prob not)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these all producer configuration options should be configurables viaOpenLineageClientKafkaTransportConfig with said defaults. these ones are also required often:

security.protocol
ssl.keystore.location
ssl.keystore.password
ssl.truststore.location
ssl.truststore.password

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Config key we could use something like

        kafkaProperties.put(BOOTSTRAP_SERVERS_CONFIG, brokerEndpoints);
        kafkaProperties.put(ACKS_CONFIG, acksDefault);
        kafkaProperties.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializerDefault);
        kafkaProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerDefault);`

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of custom protocol - We might need to capture them as a seperate config

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like KafkaSslConfig in Trino

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah but i would really avoid keeping defaults hardcoded without a way for a user to change, KafkaTransportConfig would be more flexible

Copy link
Contributor

@mgorsk1 mgorsk1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments + can you please update docs (event-listeners-openlineage.md)?

Comment on lines +32 to +37
private final String acks = "acks";
private final String acksDefault = "all";
private final String keySerializer = "key.serializer";
private final String keySerializerDefault = "org.apache.kafka.common.serialization.StringSerializer";
private final String valueSerializer = "value.serializer";
private final String valueSerializerDefault = "org.apache.kafka.common.serialization.StringSerializer";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these all producer configuration options should be configurables viaOpenLineageClientKafkaTransportConfig with said defaults. these ones are also required often:

security.protocol
ssl.keystore.location
ssl.keystore.password
ssl.truststore.location
ssl.truststore.password

return this;
}

public String getMessageKey()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we have default value for this as there is no NotNull decorator?


import java.util.Properties;

public class OpenLineageKafkaTransport
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for integration tests with kafka

import io.airlift.configuration.ConfigDescription;
import jakarta.validation.constraints.NotNull;

public class OpenLineageClientKafkaTransportConfig
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add corresponding TestOpenLineageClientKafkaTransportConfig to tests?


@Config("openlineage-event-listener.transport.kafka.topic")
@ConfigDescription("String specifying the topic to which events will be sent")
public OpenLineageClientKafkaTransportConfig setTopicName(String topicName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the config is openlineage-event-listener.transport.kafka.topic can we unify this and use topic here instead of topicName?

<artifactId>jakarta.validation-api</artifactId>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add Kafka as one of the OL transport types

Please avoid using abbreviations.

}

@NotNull
public String getBrokerEndpoints()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace getter/setter orders for messageKey & brokerEndpoints for consistency with fields.

return topicName;
}

@Config("openlineage-event-listener.transport.kafka.topic")
Copy link
Member

@ebyhr ebyhr Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend renaming transport.kafka to kafka-transport. Same for others.

@@ -0,0 +1,66 @@
/*
Copy link
Member

@ebyhr ebyhr Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new test to TestOpenLineagePlugin with Kafka transport.

CONSOLE,
HTTP,
/**/
KAFKA
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
KAFKA
KAFKA,
/**/

* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.openlineage.config.kafka;
Copy link
Member

@ebyhr ebyhr Aug 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use io.trino.plugin.openlineage.transport.kafka package instead. We usually don't add a new package just for config classes.

@github-actions
Copy link

github-actions bot commented Oct 4, 2024

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Oct 4, 2024
@mosabua
Copy link
Member

mosabua commented Oct 4, 2024

👋 @alprusty .. can you rebase and address the comments from @ebyhr @mgorsk1 and @sopel39 ?

@alprusty
Copy link
Contributor Author

alprusty commented Oct 4, 2024

Thanks @mosabua for following up here.
Yes, will rebase and take care of review comments in a follow up commit.

@github-actions github-actions bot removed the stale label Oct 7, 2024
@github-actions
Copy link

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Oct 29, 2024
@github-actions
Copy link

Closing this pull request, as it has been stale for six weeks. Feel free to re-open at any time.

@github-actions github-actions bot closed this Nov 20, 2024
@mosabua
Copy link
Member

mosabua commented Nov 20, 2024

I am reopening and addig stale-ignore label under the assumption that you will continue this work @alprusty

@mosabua mosabua reopened this Nov 20, 2024
@mosabua mosabua added stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. and removed stale labels Nov 20, 2024
@ebyhr ebyhr removed the stale-ignore Use this label on PRs that should be ignored by the stale bot so they are not flagged or closed. label Feb 26, 2025
@github-actions
Copy link

This pull request has gone a while without any activity. Tagging for triage help: @mosabua

@github-actions github-actions bot added the stale label Mar 19, 2025
@ebyhr
Copy link
Member

ebyhr commented Mar 28, 2025

Closing as a stale PR. Please feel free to reopen if you continue the work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Add support for Kafka transport in Trino Open lineage Plugin

6 participants