Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consecutive publish() is clobbered or mixed up somehow #501

Closed
hoIIer opened this issue Sep 1, 2023 · 3 comments
Closed

Consecutive publish() is clobbered or mixed up somehow #501

hoIIer opened this issue Sep 1, 2023 · 3 comments
Labels
bug This issue is a bug. needs-triage This issue or PR still needs to be triaged.

Comments

@hoIIer
Copy link

hoIIer commented Sep 1, 2023

Describe the bug

I am trying to publish the same payload to two separate topics sequentially (serverless python api req).

        session.mqtt.publish(f'groups/{message.group_id}/messages', {
            'payload': <payload>,
        })
        session.mqtt.publish(f'groups/{message.group_id}/preview', {
            'payload': <payload>,
        })

The client is subscribed to groups/<group_id>/preview only.

What happens is the client receives an event from topic groups/<group_id>/messages, and does not receive one from the 2nd /preview topic.

I create the mqtt connection via a global session object as below (wondering if this is part of my problem?):

AWSSession:
    ...

    @cached_property
    def _mqtt(self):
        """
        Creates an mqtt connection to the AWS IoT Core service using
        the iot domain endpoint passed in from the environment.

        Additionally overrides the mqtt.publish function to set global
        QoS and enable parsing the payload dict in one place. This is
        to make the ergonomics of working with mqtt slightly nicer.

        Example: session.mqtt.publish('my/topic', {'message': 'hello'})
        """
        ...
        mqtt_conn = (
            mqtt_connection_builder
            .websockets_with_default_aws_signing(
                ...,
            )
        )

        # connect.
        conn = mqtt_conn.connect()

        # resolve the future.
        conn.result()

        return mqtt_conn

    @property
    def mqtt(self):
        conn = self._mqtt

        # use partial for publish defaults.
        pub_orig = conn.publish

        def publish(topic, payload):
            pub_orig(topic, json.dumps(payload), mqtt.QoS.AT_LEAST_ONCE)

        conn.publish = publish

        return conn

Expected Behavior

I expect that sequential mqtt publish() calls would work as expected and publish both messages to the correct topics without clobbering one or the other.

Current Behavior

The client receives the event from the first publish() despite not being subscribed to that topic and actually being subscribed to the topic from the second publish()

Reproduction Steps

Included my code above.

Possible Solution

No response

Additional Information/Context

No response

aws-crt-python version used

awscrt==0.16.21

Python version used

3.9.17

Operating System and version

amazon linux 4

@hoIIer hoIIer added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Sep 1, 2023
@bretambrose
Copy link
Contributor

Rather than indirectly invoke publish through intermediate, dynamically-created function objects, what happens if you just, in one function, invoke connect -> subscribe -> publish1 -> publish2 -> wait-on-publish-received directly?

My first-glance reaction is that there is a scope or function object capture issue which is causing the actual publish call to be invoked with the wrong topic.

@hoIIer
Copy link
Author

hoIIer commented Sep 2, 2023

Ok I tried this directly where the calls to publish() happen, part of regular http request:

        mqtt_conn = (
            mqtt_connection_builder
            .websockets_with_default_aws_signing(
                ...,
            )
        )

        # connect.
        conn = mqtt_conn.connect()

        # resolve the future.
        conn.result()

        # publish the message to message topics.
        mqtt_conn.publish(f'groups/{message.group_id}/messages', json.dumps({
            'payload': <payload>,
        }), mqtt.QoS.AT_LEAST_ONCE)
        mqtt_conn.publish(f'groups/{message.group_id}/preview', json.dumps({
            'payload': <payload>,
        }), mqtt.QoS.AT_LEAST_ONCE)

It sends both events, however it's weird that the first one is received by the client because the client is only subscribed to the second topic groups/<group_id>/preview. Also the second event to the correct topic has an empty payload.

I've verified this to be sure!

Wonder how/what causes the session object to clobber it? I use Starlette (asgi) served via lambda.

@hoIIer
Copy link
Author

hoIIer commented Sep 5, 2023

I just tried logging out of the user I was testing against and noticed it seems to work fine for unauthenticated user OR a different user.

So I think that maybe somehow the user I was testing with may have stale subscription to the other topic somehow? Unsure how to wipe that session on iot but I think it works in other cases. :X

@hoIIer hoIIer closed this as completed Sep 5, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug This issue is a bug. needs-triage This issue or PR still needs to be triaged.
Projects
None yet
Development

No branches or pull requests

2 participants