Skip to content

Commit

Permalink
Code formatting (patch 7)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffer committed Jan 16, 2024
1 parent 5c7ba65 commit 8d39ea0
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 137 deletions.
141 changes: 77 additions & 64 deletions tools/distributed/thugctrl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@


class ThugCtrl(object):
""" Thug remote control
"""

def __init__(self, configfile, extensive = False, threshold = 0, referer = None, proxy = None, timeout = None):
""" Init Thugd using config file
"""
"""Thug remote control"""

def __init__(
self,
configfile,
extensive=False,
threshold=0,
referer=None,
proxy=None,
timeout=None,
):
"""Init Thugd using config file"""

self.host = "localhost"
self.queue = "thugctrl"
Expand All @@ -36,8 +42,7 @@ def __init__(self, configfile, extensive = False, threshold = 0, referer = None,
self.read_config()

def read_config(self):
""" Read config from config file
"""
"""Read config from config file"""

conf = configparser.ConfigParser()
conf.read(self.configfile)
Expand All @@ -48,48 +53,49 @@ def read_config(self):

def send_command(self, data):
credentials = pika.PlainCredentials(self.username, self.password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=self.host, credentials = credentials))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.host, credentials=credentials)
)
channel = connection.channel()

channel.queue_declare(queue=self.queue, durable=True)

message = json.dumps(data)
channel.basic_publish(exchange='',
routing_key=self.queue,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
))
channel.basic_publish(
exchange="",
routing_key=self.queue,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
),
)
print(" [x] Sent %r" % (message,))
connection.close()

def process(self, url):
""" Send URL to process
"""
"""Send URL to process"""

if url.find("://") < 0:
url = "http://" + url
o = urlparse(url)

jid = o.netloc + "_" + datetime.datetime.now().strftime(
"%Y_%m_%d__%H_%M_%S")
data = {"url": url,
"id": jid,
"threshold": self.threshold,
"extensive": self.extensive,
"referer": self.referer,
"proxy": self.proxy,
"timeout": self.timeout
}
jid = o.netloc + "_" + datetime.datetime.now().strftime("%Y_%m_%d__%H_%M_%S")
data = {
"url": url,
"id": jid,
"threshold": self.threshold,
"extensive": self.extensive,
"referer": self.referer,
"proxy": self.proxy,
"timeout": self.timeout,
}
print(data)

self.send_command(data)


class ThugCollect(object):
""" A class collecting thug results
"""
"""A class collecting thug results"""

def process(self, data):
print(data)
Expand All @@ -99,7 +105,6 @@ def callback(self, ch, method, properties, body):
ch.basic_ack(delivery_tag=method.delivery_tag)

def __init__(self, configfile):

self.configfile = configfile
self.host = "localhost"
self.queue = "thugctrl"
Expand All @@ -110,23 +115,26 @@ def __init__(self, configfile):
self.read_config()

credentials = pika.PlainCredentials(self.username, self.password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=self.rhost, credentials = credentials))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=self.rhost, credentials=credentials)
)
channel = connection.channel()

channel.queue_declare(queue=self.rqueue, durable=True)
print(' [*] Waiting for messages on %s %s To exit press CTRL+C' % (
self.rhost, self.rqueue))
print(
" [*] Waiting for messages on %s %s To exit press CTRL+C"
% (self.rhost, self.rqueue)
)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(lambda c, m, p, b: self.callback(c, m, p, b),
queue=self.rqueue)
channel.basic_consume(
lambda c, m, p, b: self.callback(c, m, p, b), queue=self.rqueue
)

channel.start_consuming()

def read_config(self):
""" Read config from config file
"""
"""Read config from config file"""

conf = ConfigParser.ConfigParser()
conf.read(self.configfile)
Expand All @@ -141,35 +149,40 @@ def read_config(self):


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description='Send urls to Thug daemons tp process')
parser.add_argument('--urls', type=str, nargs='+',
help='One or more URLs to process')
parser.add_argument('--config', help='Config file to use',
default="config.ini")
parser.add_argument('--collect_results',
help='Start a daemon to collect the results',
default=False, action="store_true")
parser.add_argument('--extensive',
help='In depth follow links',
default=False, action="store_true")
parser.add_argument('--threshold', type=int,
help='Maximum pages to fetch',
default=0)
parser.add_argument('--referer', type=str,
help='Referer to send',
default=None)
parser.add_argument('--proxy', type=str,
help='Proxy to use',
default=None)
parser.add_argument('--timeout', type=int,
help='Timeout in seconds for the analysis',
default=None)
parser = argparse.ArgumentParser(description="Send urls to Thug daemons tp process")
parser.add_argument(
"--urls", type=str, nargs="+", help="One or more URLs to process"
)
parser.add_argument("--config", help="Config file to use", default="config.ini")
parser.add_argument(
"--collect_results",
help="Start a daemon to collect the results",
default=False,
action="store_true",
)
parser.add_argument(
"--extensive", help="In depth follow links", default=False, action="store_true"
)
parser.add_argument(
"--threshold", type=int, help="Maximum pages to fetch", default=0
)
parser.add_argument("--referer", type=str, help="Referer to send", default=None)
parser.add_argument("--proxy", type=str, help="Proxy to use", default=None)
parser.add_argument(
"--timeout", type=int, help="Timeout in seconds for the analysis", default=None
)

args = parser.parse_args()

if args.urls:
t = ThugCtrl(args.config, args.extensive, args.threshold, args.referer, args.proxy, args.timeout)
t = ThugCtrl(
args.config,
args.extensive,
args.threshold,
args.referer,
args.proxy,
args.timeout,
)
for aurl in args.urls:
t.process(aurl)

Expand Down
83 changes: 51 additions & 32 deletions tools/distributed/thugd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@

class Thugd(object):
"""
A class waiting for jobs, starting thug, returning results
A class waiting for jobs, starting thug, returning results
"""
def __init__(self, configfile, clear = False):

def __init__(self, configfile, clear=False):
"""
@configfile: The configuration file to use
@clear: Clear the job chain
Expand All @@ -42,9 +43,9 @@ def _read_config(self, configfile):
@configfile: The configfile to use
"""
self.host = "localhost"
self.queue = "thugctrl"
self.rhost = "localhost"
self.host = "localhost"
self.queue = "thugctrl"
self.rhost = "localhost"
self.rqueue = "thugres"

if configfile is None:
Expand All @@ -61,47 +62,59 @@ def _read_config(self, configfile):
self.password = conf.get("credentials", "password")

def _chdir(self):
os.chdir(os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir,
'src')))
os.chdir(
os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir, "src")
)
)

def _run_queue(self):
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(host = self.host, credentials = credentials)
parameters = pika.ConnectionParameters(host=self.host, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare(queue = self.queue, durable = True)
print("[*] Waiting for messages on %s %s (press CTRL+C to exit)" % (self.host, self.queue, ))

channel.basic_qos(prefetch_count = 1)
channel.basic_consume(lambda c, m, p, b: self.callback(c, m, p, b), queue = self.queue)
channel.queue_declare(queue=self.queue, durable=True)
print(
"[*] Waiting for messages on %s %s (press CTRL+C to exit)"
% (
self.host,
self.queue,
)
)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(
lambda c, m, p, b: self.callback(c, m, p, b), queue=self.queue
)
channel.start_consuming()

def runProcess(self, exe):
p = subprocess.Popen(exe, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
while(True):
p = subprocess.Popen(exe, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
while True:
retcode = p.poll()
line = p.stdout.readline()
yield line
if(retcode is not None):
if retcode is not None:
break

def send_results(self, data):
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(host = self.rhost, credentials = credentials)
parameters = pika.ConnectionParameters(host=self.rhost, credentials=credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare(queue = self.rqueue, durable = True)
channel.queue_declare(queue=self.rqueue, durable=True)

message = json.dumps(data)
channel.basic_publish(exchange = '',
routing_key = self.rqueue,
body = message,
properties = pika.BasicProperties(delivery_mode = 2,))
channel.basic_publish(
exchange="",
routing_key=self.rqueue,
body=message,
properties=pika.BasicProperties(
delivery_mode=2,
),
)

print("[x] Sent %r" % (message,))
connection.close()
Expand Down Expand Up @@ -146,19 +159,18 @@ def process(self, job):

for line in self.runProcess(command):
if line.startswith("["):
print(line, end = " ")
print(line, end=" ")

if line.find("] Saving log analysis at ") >= 0:
pathname = line.split(" ")[-1].strip()

rpath = self.copy_to_result(pathname, job)
res = {"id" : job["id"],
"rpath" : rpath}
res = {"id": job["id"], "rpath": rpath}

self.send_results(res)

def callback(self, ch, method, properties, body):
print("[x] Received %r" % (body, ))
print("[x] Received %r" % (body,))

if not self.clear:
self.process(json.loads(body))
Expand All @@ -167,10 +179,17 @@ def callback(self, ch, method, properties, body):

ch.basic_ack(delivery_tag=method.delivery_tag)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description = 'Receives jobs and starts Thug to process them')
parser.add_argument('--config', help = 'Configuration file to use', default = "config.ini")
parser.add_argument('--clear', help = 'Clear the job chain', default = False, action = "store_true")
parser = argparse.ArgumentParser(
description="Receives jobs and starts Thug to process them"
)
parser.add_argument(
"--config", help="Configuration file to use", default="config.ini"
)
parser.add_argument(
"--clear", help="Clear the job chain", default=False, action="store_true"
)
args = parser.parse_args()

try:
Expand Down
Loading

0 comments on commit 8d39ea0

Please sign in to comment.