Skip to content

Commit

Permalink
lint fix - post_kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Smejky338 committed Dec 11, 2023
1 parent 1ca9ced commit 95ea160
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions opl/post_kafka_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,31 +330,34 @@ def produce_thread(args, config, produce_here, save_here):

# Common parameters for both cases
common_params = {
'bootstrap_servers': [f"{args.kafka_host}:{args.kafka_port}"],
'acks': args.acks,
'retries': args.retries,
'batch_size': args.batch_size,
'buffer_memory': args.buffer_memory,
'linger_ms': args.linger_ms,
'max_block_ms': args.max_block_ms,
'request_timeout_ms': args.request_timeout_ms,
'compression_type': args.compression_type,
"bootstrap_servers": [f"{args.kafka_host}:{args.kafka_port}"],
"acks": args.acks,
"retries": args.retries,
"batch_size": args.batch_size,
"buffer_memory": args.buffer_memory,
"linger_ms": args.linger_ms,
"max_block_ms": args.max_block_ms,
"request_timeout_ms": args.request_timeout_ms,
"compression_type": args.compression_type,
}

try:
logging.info(f"Creating SASL password-protected producer to {args.kafka_host}")
logging.info(
f"Creating SASL password-protected producer to {args.kafka_host}"
)
sasl_params = {
'security_protocol': "SASL_SSL",
'sasl_mechanism': "SCRAM-SHA-512",
'sasl_plain_username': args.kafka_username,
'sasl_plain_password': args.kafka_password,
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": args.kafka_username,
"sasl_plain_password": args.kafka_password,
}
produce_here = KafkaProducer(**common_params, **sasl_params)
except AttributeError:
logging.info(f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}")
logging.info(
f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}"
)
produce_here = KafkaProducer(**common_params)


logging.info(f"Loading queries definition from {args.tables_definition}")
queries_definition = yaml.load(args.tables_definition, Loader=yaml.SafeLoader)[
"queries"
Expand Down

0 comments on commit 95ea160

Please sign in to comment.