Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zmq parsing #262

Closed
rawmean opened this issue Dec 9, 2024 · 3 comments
Closed

zmq parsing #262

rawmean opened this issue Dec 9, 2024 · 3 comments

Comments

@rawmean
Copy link

rawmean commented Dec 9, 2024

I am using this code to parse the message form tesla-telemetry (zmq is configured).
The code receives the message from fleet-telemetry but parsing errors out:
Error parsing message with type 'telemetry.vehicle_connectivity.VehicleConnectivity'
I have compiled the protobuf (vehicle_connectivity_pb2 )
What am I doing wrong?

import zmq
from vehicle_connectivity_pb2 import VehicleConnectivity
from google.protobuf.json_format import MessageToJson, Parse


def main():
    # Create a ZeroMQ context
    context = zmq.Context()

    # Create a SUB (subscriber) socket
    socket = context.socket(zmq.SUB)

    # Connect to the publisher at localhost:4444
    socket.connect("tcp://localhost:4444")

    # Subscribe to all messages
    socket.setsockopt_string(zmq.SUBSCRIBE, "")

    print("Listening for messages on localhost:4444...")

    try:
        while True:
            # Receive a message from the publisher
            message = socket.recv()
            try:
                # Parse the binary message to VehicleConnectivity protobuf
                vehicle_data = VehicleConnectivity()
                vehicle_data.ParseFromString(message)
                # Convert the protobuf message to JSON
                json_representation = MessageToJson(vehicle_data)
                # Print the JSON representation
                print(json_representation)
            except Exception as e:
                print(f"Failed to parse message: {e}")

    except KeyboardInterrupt:
        print("Interrupted by user")

    finally:
        # Clean up
        socket.close()
        context.term()

if __name__ == "__main__":
    main()
    ```
@jbanyer
Copy link

jbanyer commented Dec 9, 2024

The zmq producer publishes multipart messages. Here is example handling code in Typescript. This example assumes JSON messages, not protobuf, but it demonstrates the multipart handling.

    // the server sends 2-part messages:
    //  topic: "tesla_V" for vehicle telemetry records
    //  data: JSON-encoded telemetry record
    for await (const [topicBuf, dataBuf] of socket) {
      try {
        const topic = topicBuf.toString()
        await this.processTelemetryMessage(topic, dataBuf)
      }
      catch (err: any) {
        logger.error('processTelemetryEvent error: %s', err.toString())
      }
    }

@rawmean
Copy link
Author

rawmean commented Dec 9, 2024

The zmq producer publishes multipart messages. Here is example handling code in Typescript. This example assumes JSON messages, not protobuf, but it demonstrates the multipart handling.

Thanks. Doesn't zmq publish binary messages? How did you get zmq from telemetry to publish json?

@jbanyer
Copy link

jbanyer commented Dec 9, 2024

Thanks. Doesn't zmq publish binary messages? How did you get zmq from telemetry to publish json?

As per the README, set "transmit_decoded_records": true in your server_config.json. That will convert all messages to JSON for all publishers, not just ZMQ.

@rawmean rawmean closed this as completed Jan 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants