Skip to content

Commit

Permalink
fix lint: avoid code duplication
Browse files Browse the repository at this point in the history
  • Loading branch information
Smejky338 committed Dec 11, 2023
1 parent 2ea5dfe commit 1ca9ced
Showing 1 changed file with 23 additions and 31 deletions.
54 changes: 23 additions & 31 deletions opl/post_kafka_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,40 +328,32 @@ def produce_thread(args, config, produce_here, save_here):
if args.acks != "all":
args.acks = int(args.acks)

kafka_host = f"{args.kafka_host}:{args.kafka_port}"
# Common parameters for both cases
common_params = {
'bootstrap_servers': [f"{args.kafka_host}:{args.kafka_port}"],
'acks': args.acks,
'retries': args.retries,
'batch_size': args.batch_size,
'buffer_memory': args.buffer_memory,
'linger_ms': args.linger_ms,
'max_block_ms': args.max_block_ms,
'request_timeout_ms': args.request_timeout_ms,
'compression_type': args.compression_type,
}

try:
logging.info(f"Creating SASL password-protected producer to {kafka_host}")
produce_here = KafkaProducer(
bootstrap_servers=kafka_host, # [args.kafka_host + ":" + str(args.kafka_port)],
acks=args.acks,
retries=args.retries,
batch_size=args.batch_size,
buffer_memory=args.buffer_memory,
linger_ms=args.linger_ms,
max_block_ms=args.max_block_ms,
request_timeout_ms=args.request_timeout_ms,
compression_type=args.compression_type,
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username=args.kafka_username,
sasl_plain_password=args.kafka_password,
)
logging.info(f"Creating SASL password-protected producer to {args.kafka_host}")
sasl_params = {
'security_protocol': "SASL_SSL",
'sasl_mechanism': "SCRAM-SHA-512",
'sasl_plain_username': args.kafka_username,
'sasl_plain_password': args.kafka_password,
}
produce_here = KafkaProducer(**common_params, **sasl_params)
except AttributeError:
logging.info(
f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}"
)
produce_here = KafkaProducer(
bootstrap_servers=kafka_host, # [args.kafka_host + ":" + str(args.kafka_port)],
acks=args.acks,
retries=args.retries,
batch_size=args.batch_size,
buffer_memory=args.buffer_memory,
linger_ms=args.linger_ms,
max_block_ms=args.max_block_ms,
request_timeout_ms=args.request_timeout_ms,
compression_type=args.compression_type,
)
logging.info(f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}")
produce_here = KafkaProducer(**common_params)


logging.info(f"Loading queries definition from {args.tables_definition}")
queries_definition = yaml.load(args.tables_definition, Loader=yaml.SafeLoader)[
Expand Down

0 comments on commit 1ca9ced

Please sign in to comment.