-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-39] Introduce system topic and topic policies service #4955
Conversation
run Integration Tests |
conf/broker.conf
Outdated
|
||
# Enable or disable topic level policies, topic level policies depends on the system topic | ||
# Please enable the system topic first. | ||
topicLevelPoliciesEnable=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicLevelPoliciesEnabled
namespaceEventsSystemTopicService = new NamespaceEventsSystemTopicService(getClient()); | ||
} | ||
|
||
if (config.isTopicLevelPoliciesEnable()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check if system topic is enabled?
@@ -652,8 +652,9 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, | |||
@Override | |||
public void openLedgerComplete(ManagedLedger ledger, Object ctx) { | |||
try { | |||
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, | |||
BrokerService.this); | |||
PersistentTopic persistentTopic = isSystemTopic(topic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this, isSystemTopic(topic));
private final PulsarService pulsarService; | ||
private final TopicPoliciesCache topicPoliciesCache; | ||
private NamespaceEventsSystemTopicService namespaceEventsSystemTopicService; | ||
private Map<NamespaceName, SystemTopic.Reader> readers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
/** | ||
* Pulsar system event type | ||
*/ | ||
public enum EventType { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments as above.
|
||
public class NamespaceEventsSystemTopicFactory { | ||
|
||
public static final String LOCAL_TOPIC_NAME = "__change_events"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define this constant in common
module.
@Builder | ||
@NoArgsConstructor | ||
@AllArgsConstructor | ||
public class PulsarEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments as above
} | ||
|
||
private void refreshCacheIfNeeded(SystemTopic.Reader reader, CompletableFuture<Void> refreshFuture) { | ||
reader.hasMoreEventsAsync().whenComplete((has, ex) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reader.hasMoreEventsAsync().whenComplete((has, ex) -> { | |
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { |
|
||
private final Map<TopicName, TopicPolicies> policiesCache = new ConcurrentHashMap<>(); | ||
|
||
private final LoadingCache<NamespaceName, CompletableFuture<SystemTopic.Reader>> readerCache; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are using Guava cache, how do we backfill the cache if the entries are evicted out of the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic policies service will check the reader is have no more events can read before get topic policies, so when a reader evicted, the topic policies service will create a new reader and cache it.
@sijie Thanks for the review, i have addressed your comment. |
run java8 tests |
retest this please |
fc8721a
to
1516044
Compare
ownershipCache.getOwnerAsync(bundle).thenAccept(nsData -> { | ||
if (!nsData.isPresent()) { | ||
// No one owns this bundle | ||
if (targetMap.get(bundle) != null && !targetMap.get(bundle).isDone()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the two get calls can potentially return different values. so as a good practice, I would recommend writing in the following way.
CompletableFuture<Optional<LookupResult>> future = targetMap.get(bundle);
if (future != null && !future.isDone()) {
// ...
}
if (!nsData.isPresent()) { | ||
// No one owns this bundle | ||
if (targetMap.get(bundle) != null && !targetMap.get(bundle).isDone()) { | ||
return findBrokerServiceUrlInternal(bundle, authoritative, readOnly); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we return the future
here, since there is already an ongoing lookup, no?
@@ -691,8 +693,8 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, | |||
@Override | |||
public void openLedgerComplete(ManagedLedger ledger, Object ctx) { | |||
try { | |||
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, | |||
BrokerService.this); | |||
PersistentTopic persistentTopic = new PersistentTopic( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually wondering if it is actually clearer if we create a class SystemTopic
extending PersistentTopic
. So it will be:
class SystemTopic extends PersistentTopic {
}
PersistentTopic persistentTopic;
if (isSystemTopic(topic)) {
persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);
} else {
persistentTopic = new SystemTopic(topic, ledger, BrokerService.this);
}
@@ -977,6 +979,9 @@ public BacklogQuotaManager getBacklogQuotaManager() { | |||
* @return determine if quota enforcement needs to be done for topic | |||
*/ | |||
public boolean isBacklogExceeded(PersistentTopic topic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this method to PersistentTopic?
so the logic can be clear:
class PersistentTopic {
public boolean isBacklogExceed() {
// check the backlog logic
}
}
class SystemTopic {
public boolean isBacklogExceed() {
return false;
}
}
so the method can be simplified as
public boolean isBacklogExceeded(PersistentTopic topic) {
return topic.isBacklogExceeded();
}
* @param topicName topic name | ||
* @return future of the topic policies | ||
*/ | ||
CompletableFuture<TopicPolicies> getTopicPoliciesWithoutCacheAsync(TopicName topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture<TopicPolicies> getTopicPoliciesWithoutCacheAsync(TopicName topicName); | |
CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName); |
@@ -151,6 +151,8 @@ | |||
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN); | |||
private final CompactedTopic compactedTopic; | |||
|
|||
private final boolean isSystemTopic; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment above about making a new SystemTopic
class and making it extend PersistentTopic
.
@@ -1031,7 +1042,7 @@ public void checkCompaction() { | |||
.orElseThrow(() -> new KeeperException.NoNodeException()); | |||
|
|||
|
|||
if (policies.compaction_threshold != 0 | |||
if (isSystemTopic || policies.compaction_threshold != 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (isSystemTopic || policies.compaction_threshold != 0 | |
if (isSystemTopic() || policies.compaction_threshold != 0 |
@@ -164,7 +164,7 @@ public String toString() { | |||
.add("clusterSubscribeRate", clusterSubscribeRate) | |||
.add("latency_stats_sample_rate", latency_stats_sample_rate) | |||
.add("antiAffinityGroup", antiAffinityGroup) | |||
.add("message_ttl_in_seconds", message_ttl_in_seconds).add("retention_policies", retention_policies) | |||
.add("message_ttl_in_seconds", message_ttl_in_seconds).add("retentionPolicies", retention_policies) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why do you change this field here?
2fb4f9b
to
4a50ab0
Compare
fa7914a
to
97993d8
Compare
run java8 tests |
1 similar comment
run java8 tests |
@sijie I have addressed your comments, please take a look. |
run integration tests |
run java8 tests |
1 similar comment
run java8 tests |
0ffb5e7
to
ad2c296
Compare
run java8 tests |
run java8 tests |
a405454
to
1f5a174
Compare
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
3 similar comments
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
Pulsar docs to be updated (Proposed): @sijie , @codelipenghui , Please help check whether any doc updates are missed. |
@Huanli-Meng We don't need to add a document for this PR because this is a basic framework that can be used to develop topic level policy on it. After we add a topic level policy, that is a suitable time to add documents. |
@codelipenghui got it. Thanks. |
Fix: apache#4899 ### Motivation PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) Introduce system topic and topic policies service which can be used support topic level policies overwrite feature. ### Modification Added system topic interface and topic policies system topic. Added topic policies interface and system topic based implementation.
Fix: apache#4899 ### Motivation PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) Introduce system topic and topic policies service which can be used support topic level policies overwrite feature. ### Modification Added system topic interface and topic policies system topic. Added topic policies interface and system topic based implementation.
Not valid from 2.6.0 Thanks @gaoran10 > We introduce topic-level policies from 2.6.0, refer to apache/pulsar#4955, but I think we support retention policies at the topic level from 2.7.0, refer to apache/pulsar#7747. >Make the default value of the configuration topicLevelPoliciesEnabled as true from 2.11.0, refer to apache/pulsar#15619.
Fix: #4899
Motivation
PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
Introduce system topic and topic policies service which can be used support topic level policies overwrite feature.
Modification
Added system topic interface and topic policies system topic.
Added topic policies interface and system topic based implementation.