diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index 9ffa2c5..bb9d908 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -330,31 +330,34 @@ def produce_thread(args, config, produce_here, save_here): # Common parameters for both cases common_params = { - 'bootstrap_servers': [f"{args.kafka_host}:{args.kafka_port}"], - 'acks': args.acks, - 'retries': args.retries, - 'batch_size': args.batch_size, - 'buffer_memory': args.buffer_memory, - 'linger_ms': args.linger_ms, - 'max_block_ms': args.max_block_ms, - 'request_timeout_ms': args.request_timeout_ms, - 'compression_type': args.compression_type, + "bootstrap_servers": [f"{args.kafka_host}:{args.kafka_port}"], + "acks": args.acks, + "retries": args.retries, + "batch_size": args.batch_size, + "buffer_memory": args.buffer_memory, + "linger_ms": args.linger_ms, + "max_block_ms": args.max_block_ms, + "request_timeout_ms": args.request_timeout_ms, + "compression_type": args.compression_type, } try: - logging.info(f"Creating SASL password-protected producer to {args.kafka_host}") + logging.info( + f"Creating SASL password-protected producer to {args.kafka_host}" + ) sasl_params = { - 'security_protocol': "SASL_SSL", - 'sasl_mechanism': "SCRAM-SHA-512", - 'sasl_plain_username': args.kafka_username, - 'sasl_plain_password': args.kafka_password, + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": args.kafka_username, + "sasl_plain_password": args.kafka_password, } produce_here = KafkaProducer(**common_params, **sasl_params) except AttributeError: - logging.info(f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}") + logging.info( + f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}" + ) produce_here = KafkaProducer(**common_params) - logging.info(f"Loading queries definition from {args.tables_definition}") queries_definition = yaml.load(args.tables_definition, Loader=yaml.SafeLoader)[ "queries"