#!/usr/bin/env python3 import argparse from contextlib import closing import platform import socket is_windows = platform.system() == "Windows" def make_parser(): parser = argparse.ArgumentParser("Send or receive a multicast packet") parser.add_argument("--interface_ip", type=str, required=True, help="IP address of the network interface to use") parser.add_argument("--multicast_ip", type=str, default="239.255.0.1", help="IP address of the multicast group to use") parser.add_argument("--port", type=int, default=7400, help="Port to send or receive multicast packet on") parser.add_argument("--reuse_addr", action="store_true", help="Allow binding to an address and port already in use") modes = parser.add_subparsers(title="Modes", dest="mode") modes.add_parser("send", help="Send the packet 'hello!'") receive = modes.add_parser("receive", help="Receive a packet of up to 6B") receive.add_argument("--timeout", type=int, default=15, help="Timeout when listening for packets") return parser def multicast_send(interface_ip, multicast_ip, port): data = "hello!".encode('utf-8') with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)) as s: s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(interface_ip)) s.sendto(data, (multicast_ip, port)) print("Sent:") print(data) def multicast_receive(interface_ip, multicast_ip, port, reuse_addr=False, timeout=15): with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)) as s: # Optionally allow binding to a port and socket already in use (common in multicast scenarios) if reuse_addr: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Only available in Linux if not is_windows: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) # Bind the socket to listen on the multicast address # Windows cannot bind to the multicast address, instead bind to the interface IP: https://stackoverflow.com/a/75611507 bind_addr = interface_ip if is_windows else multicast_ip s.bind((bind_addr, port)) s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(interface_ip)) # Set a timeout s.settimeout(timeout) # Join the multicast group membership = socket.inet_aton(multicast_ip) + socket.inet_aton(interface_ip) s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, membership) # Listen and print... try: print("Listening...") data, sender_addr = s.recvfrom(6) print(f"Received data: {data}") print(f"From: {sender_addr}") finally: # Leave the group when we're done s.setsockopt(socket.IPPROTO_IP, socket.IP_DROP_MEMBERSHIP, membership) def main(): args = make_parser().parse_args() if args.mode == "send": multicast_send(args.interface_ip, args.multicast_ip, args.port) elif args.mode == "receive": multicast_receive(args.interface_ip, args.multicast_ip, args.port, args.reuse_addr, args.timeout) else: raise ValueError(f"Unknowm mode '{args.mode}'") if __name__ == "__main__": main()