Skip to content

Conversation

@djnewbould
Copy link
Contributor

  • Service setup now runs as a cron job that runs every 5 minutes rather than just at start-up and on helm upgrade. This ensures the cluster configuration maintains a desired state.
  • Authorizers have been added to the hivemq plugin, these run on publish and subscribe to check if the principle has permission for the mqtt action.

This requires more testing before merging

import java.util.concurrent.ConcurrentHashMap;

public class ClientSessionStore {
private static final ConcurrentHashMap<String, String> sessionMap = new ConcurrentHashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to use a global here. A ClientSessionStore instance can be created in FPKrbMain and passed down through the Providers. If necessary this class could be reworked to an FPKrbContext which holds both the ServiceClient and this session store map; this avoids passing down multiple context parameters.

var clientId = publishAuthorizerInput.getClientInformation().getClientId();
var clientUsername = ClientSessionStore.getUsername(clientId);
var topic = publishAuthorizerInput.getPublishPacket().getTopic();
try{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to try/catch here; errors will be asynchronous. You do want to use the second argument to subscribe, which is called for async errors.

isPermissionAllowed(getACLforPrincipal(clientUsername), topic, TopicPermission.MqttActivity.PUBLISH)
.subscribe(result -> {
if(result){
publishAuthorizerOutput.authorizeSuccessfully();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because you are handling this asynchronously you need to use the async method on the PublishAuthorizerOutput. Call async synchronously (not within Rx) to give you an Async, and then call the getOutput and resume methods on that object from within the Rx callback. Otherwise I think you end up blocking the whole broker...

import java.util.UUID;
import java.util.stream.Collectors;

public class FPKrbAuthorizer implements SubscriptionAuthorizer, PublishAuthorizer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A SubscriptionAuthorizer is only called when a client makes a new SUBSCRIBE request. If ACLs change while a client is subscribed these changes will not be picked up. I think the only way to handle this correctly is with the Interceptor API.

.delay(5, TimeUnit.SECONDS))
.subscribe(() -> log.info("Registered service successfully"),
e -> log.error("Failed to register service: {}",
e.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this bit should have been removed? But we still need to call http().start().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've based this off of main instead of testing v4, it might be better to rebase to v4.

To prevent a service setup failure from not inserting config. Service setup runs every 5 minutes to maintain a constant config state.
The string comparison did not account for MQTT special characters (#, +). The added comparison accounts for this.
The previous sync implementation could cause the broker the hang, using the async authorizers remediates this issue.
The added interceptor framework allows for dynamic checking of acl's for messages sent to subscribers.
The public interceptor checks if client subscribed to the publish topic still have permission to subscribe, if not, they're kicked.
The publish inbound interceptor usage was incorrect, the publish outbound interceptor ensures packet delivery can be prevented.
@djnewbould djnewbould force-pushed the dn/service-setup-cron branch from 71c53fa to 5e00113 Compare March 12, 2025 15:42
@djnewbould djnewbould changed the base branch from main to testing/v4 March 12, 2025 15:42
This is now done through service setup.
RequestCache now has a clean-up job to remove expired entries to account for ACL changes.
When preventing packet delivery to a client, force reconnect.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants