-
Notifications
You must be signed in to change notification settings - Fork 0
/
sz_rabbit_download.py
executable file
·98 lines (82 loc) · 2.81 KB
/
sz_rabbit_download.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#! /usr/bin/env python3
import argparse
import logging
import traceback
import importlib
import sys
import os
import time
import random
import pika
from werkzeug.utils import secure_filename
INTERVAL = 1000
MSG_FRAME = 0
MSG_BODY = 2
log_format = "%(asctime)s %(message)s"
try:
log_level_map = {
"notset": logging.NOTSET,
"debug": logging.DEBUG,
"info": logging.INFO,
"fatal": logging.FATAL,
"warning": logging.WARNING,
"error": logging.ERROR,
"critical": logging.CRITICAL,
}
log_level_parameter = os.getenv("SENZING_LOG_LEVEL", "info").lower()
log_level = log_level_map.get(log_level_parameter, logging.INFO)
logging.basicConfig(format=log_format, level=log_level)
logging.getLogger("pika").setLevel(logging.WARNING)
parser = argparse.ArgumentParser()
parser.add_argument(
"-u", "--url", dest="url", required=True, help="RabbitMQ server URL"
)
parser.add_argument(
"-q", "--queue", dest="queue", required=True, help="source queue"
)
parser.add_argument(
"-o", "--output", dest="output", required=True, help="output filename to append to"
)
parser.add_argument(
"-t",
"--debugTrace",
dest="debugTrace",
action="store_true",
default=False,
help="output debug trace information",
)
args = parser.parse_args()
prevTime = time.time()
params = pika.URLParameters(args.url)
with pika.BlockingConnection(params) as conn:
with open(secure_filename(args.output), "a") as fpOut:
messages = 0
ch = conn.channel()
ch.queue_declare(queue=args.queue, passive=True)
for msg in ch.consume(args.queue, inactivity_timeout=1):
if not msg or not msg[MSG_BODY]:
break
try:
nowTime = time.time()
fpOut.write(msg[MSG_BODY].decode())
fpOut.write('\n')
ch.basic_ack(msg[MSG_FRAME].delivery_tag)
messages += 1
if messages % INTERVAL == 0: # display rate stats
diff = nowTime - prevTime
speed = -1
if diff > 0.0:
speed = int(INTERVAL / diff)
print(
f"Downloaded {messages} messages, {speed} records per second"
)
prevTime = nowTime
except Exception as err:
traceback.print_exc()
conn.close()
exit(-1)
print(f"Downloaded total of {messages} messages")
except Exception as err:
print(err, file=sys.stderr)
traceback.print_exc()
exit(-1)