Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADMIN-3 | Send notifications to Kafka topic after planning is done for tag propagation #4085

Open
wants to merge 10 commits into
base: tagpropv1master
Choose a base branch
from

Conversation

abhijeet-atlan
Copy link

@abhijeet-atlan abhijeet-atlan commented Feb 4, 2025

Change description

Key Changes:

  • Kafka Integration:
    • Added TAG_PROP_EVENTS topic for tag propagation messages.
    • Defined a partition count (TAG_PROP_EVENTS_PARTITION_COUNT = 5) for better load distribution.
    • Updated AtlasTopicCreator and KafkaNotification to support message partitioning.
  • Notification Improvements:
    • Introduced partition-aware Kafka message handling for classification propagation.
    • Updated NotificationInterface and AbstractNotification to support partitioned message sending.

Ref: V1.5

Description here

Type of change

  • Bug fix (fixes an issue)
  • New feature (adds functionality)

Related issues

Fix #1

Checklists

Development

  • Lint rules pass locally
  • Application changes have been tested thoroughly
  • Automated tests covering modified code pass

Security

  • Security impact of change has been considered
  • Code follows company security practices and guidelines

Code review

  • Pull request has a descriptive title and context useful to a reviewer. Screenshots or screencasts are attached as necessary
  • "Ready for review" label attached and reviewers assigned
  • Changes have been reviewed by at least one other contributor
  • Pull request linked to task tracker where applicable

@@ -39,4 +39,6 @@ private AtlasConstants() {
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final int TASK_WAIT_TIME_MS = 180_000;
public static final String ATLAS_KAFKA_TAG_TOPIC = "TAG_PROP_EVENTS";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will create a new topic which will handle other object propagation use-cases w a more generic name.

That is upcoming in a TRD. Will share and we can make changes accordingly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will wait. Include me in the discussions too

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will create a new topic which will handle other object propagation use-cases w a more generic name.
Can we change this to some generic name then ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you want to go ahead with this for now?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's change the name it OBJECT_PROP_EVENTS.

@@ -480,6 +480,9 @@ public enum SupportedFileExtensions { XLSX, XLS, CSV }
public static final String REQUEST_HEADER_USER_AGENT = "User-Agent";
public static final String REQUEST_HEADER_HOST = "Host";

//kafka partition value for TAG_PROP_EVENTS
public static final String TAG_PROP_EVENTS_PARTITION_COUNT = "5";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partition count might also get changed once we determine how we combine more use-cases into the same Kafka topic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will wait. Include me in the discussions too

@@ -418,4 +478,18 @@ private boolean isKafkaConsumerOpen(KafkaConsumer consumer) {
return ret;
}

public Map<String, Object> createKafkaMessage(AtlasVertex vertex, AtlasGraph graph, String classificationType, String tagVertedId) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function name is too generic and its body is doing something v specific to tag propagation. Rename the func.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed it to createTagPropKafkaMessage

@@ -70,6 +71,11 @@ public void sendInternal(NotificationType type, List<String> messages) {
}
}

@Override
public void sendInternal(NotificationType notificationType, List<String> messages, Integer partition) throws NotificationException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raise a NotImplemented exception from this method if it's only written to satisfy an interface.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 335 to 342
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
</dependency>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -98,21 +100,9 @@ public abstract class DeleteHandlerV1 {
private final TaskUtil taskUtil;
private static final int CHUNK_SIZE = AtlasConfiguration.TASKS_GRAPH_COMMIT_CHUNK_SIZE.getInt();

public void updateTaskVertexProperty(String propertyKey, long value, boolean isIncremental, BiConsumer<AtlasTask, Long> taskSetter) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rebase and bring the latest v1 changes into this branch. Will continue reviewing once that is done.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@abhijeet-atlan abhijeet-atlan requested a review from jnkrmg February 5, 2025 04:36
@@ -39,4 +39,6 @@ private AtlasConstants() {
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final int TASK_WAIT_TIME_MS = 180_000;
public static final String ATLAS_KAFKA_TAG_TOPIC = "TAG_PROP_EVENTS";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will create a new topic which will handle other object propagation use-cases w a more generic name.
Can we change this to some generic name then ?

@@ -39,4 +39,6 @@ private AtlasConstants() {
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final int TASK_WAIT_TIME_MS = 180_000;
public static final String ATLAS_KAFKA_TAG_TOPIC = "TAG_PROP_EVENTS";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you want to go ahead with this for now?

@@ -39,4 +39,6 @@ private AtlasConstants() {
public static final String DEFAULT_TYPE_VERSION = "1.0";
public static final int ATLAS_SHUTDOWN_HOOK_PRIORITY = 30;
public static final int TASK_WAIT_TIME_MS = 180_000;
public static final String ATLAS_KAFKA_TAG_TOPIC = "TAG_PROP_EVENTS";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the other python script, we are also considering the atlas.notification.propagation.topic.name from application properties, I think we should follow same here

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Referring to distro/src/bin/atlas_config.py

@@ -108,6 +108,38 @@ public void createTopics(List<String> topicNames, int numPartitions, int replica
}
}

public void createTopics(List<String[]> topicDetails, int replicationFactor)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need this method if the script distro/src/bin/atlas_config.py also does create topics?

List<String[]> topicDetails = new ArrayList<>();

for (String topicName : topicNames) {
if (AtlasConfiguration.NOTIFICATION_PROPAGATION_TOPIC_NAME.getString().equals(topicName)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have created An Enum for all Kafka topics with all the details of every topic needed to avoid such special handling for a particular topic, fine for now shared this for future reference

List<MessageContext> messageContexts = new ArrayList<>();

for (String message : messages) {
ProducerRecord record = new ProducerRecord(topic, partition, null, message);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we accept message key instead of partition, use the proper ProducerRecord constructor here which accepts topic, key & value

Comment on lines +3522 to +3526
Map<String, Object> kafkaMessage = kfknotif.createTagPropKafkaMessage(vertex, graph, CLASSIFICATION_PROPAGATION_ADD, classificationVertex.getIdForDisplay());
int partition = Math.abs((Integer) kafkaMessage.get("parentTaskGuid")) % numPartitions;
LOG.debug("sending message with guid={} to partition={}",kafkaMessage.get("parentTaskVertexId"), partition);
kfknotif.sendInternal(NotificationInterface.NotificationType.EMIT_PLANNED_RELATIONSHIPS, Collections.singletonList(kafkaMessage.toString()), partition);
LOG.debug("Message with guid={} sent to partition={} sent successfully.",kafkaMessage.get("parentTaskVertexId"), partition );

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try to simplify 2 method calls into a single method call in kfknotif may be

Comment on lines +1 to +12
#!/usr/bin/env python3

import os
import subprocess
import sys
import platform
import time





Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this file should not be in your PR, do rebase once

@@ -65,6 +65,8 @@
import static org.apache.atlas.repository.Constants.INDEX_PREFIX;
import static org.apache.atlas.repository.Constants.VERTEX_INDEX;

import org.apache.atlas.hook.AtlasTopicCreator;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not needed, remove all changes in this file

int partition = Math.abs((Integer) kafkaMessage.get("parentTaskGuid")) % numPartitions;
LOG.debug("sending message with guid={} to partition={}",kafkaMessage.get("parentTaskVertexId"), partition);
kfknotif.sendInternal(NotificationInterface.NotificationType.EMIT_PLANNED_RELATIONSHIPS, Collections.singletonList(kafkaMessage.toString()), partition);
LOG.debug("Message with guid={} sent to partition={} sent successfully.",kafkaMessage.get("parentTaskVertexId"), partition );

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need one debug log per message? consider removing this & if needed write a generic log outside the for

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applicable for all other similar places

@manikant-prasad manikant-prasad changed the title DG1925 | Send notifications to Kafka topic after planning is done for tag propagation ADMIN-3 | Send notifications to Kafka topic after planning is done for tag propagation Feb 10, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants