Skip to content

Feature: custom outputs #315

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 82 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
4b7ae82
Temp version change
showerst Dec 4, 2017
8d5a2d5
Add GCPS settings
showerst Dec 4, 2017
fd267ae
pluggable backends initial commit
showerst Dec 4, 2017
7b38107
move settings from cmd line to env
showerst Dec 4, 2017
17d8a3a
add google-cloud-pubsub install requirement
Dec 5, 2017
8090d65
try setting up conditional import of module based on env var
Dec 5, 2017
5253ffc
tiny var name change since not really dealing w/ class
Dec 5, 2017
6685f30
try to set up pub/sub for env var auth (since it will be difficult to…
Dec 5, 2017
e2d2889
small style changes to abide by flake8
Dec 5, 2017
c95963f
tiny styling change
Dec 5, 2017
48e215b
add in obj pre_save method to match file writing save
Dec 5, 2017
cbf13f0
change env var for output switch and remove from settings
Dec 5, 2017
6d5e4a8
bunch o name changes
Dec 5, 2017
8d5622b
remove unused import_lib
Dec 5, 2017
774a53b
remove extra whitespace (causing diff change)
Dec 5, 2017
1a1a1da
ditto
Dec 5, 2017
9b8eca9
remove all just because hehe
Dec 5, 2017
2a6e749
single quote consistency
Dec 5, 2017
9e72ffb
add debuggering line
Dec 6, 2017
fd6eb6a
more debug printing
Dec 6, 2017
9ca4c3d
remove debugging
Dec 6, 2017
4a39f4a
(try) add scopes and tiny flake8 patch
Dec 6, 2017
5c5c69a
try another way for scopes
Dec 6, 2017
a5bc525
oops, forgot to specify keyword for arg
Dec 6, 2017
0d26c22
try a tuple instead?
Dec 6, 2017
6e42604
meh, trying this, maybe i missed something
Dec 6, 2017
ef688c3
try another way
Dec 6, 2017
dad6321
patch validation to reference caller
Dec 6, 2017
5e02928
one last shot, for this nite my friend
Dec 6, 2017
d9362d1
try again
Dec 6, 2017
5696342
trying json file loaded into single env var
Dec 6, 2017
951f92b
debug step
Dec 6, 2017
ef28c48
undo debug step
Dec 6, 2017
6564f03
add reference to issue on github that helped me so dearly
Dec 6, 2017
4cbe0a3
remove kafka for now to make the diff/pr focused on googel cloude
Dec 6, 2017
68222b8
small version change
Dec 6, 2017
3a4ef00
match env var used in google cloud documentation for the project id
Dec 7, 2017
8c5f9da
remove project_id var and just use env var during topic setup
Dec 7, 2017
db461d7
ditto, remove topic var and just use env var during topic setup
Dec 7, 2017
4542caf
refactor google auth to allow for automatic detection of credentials,…
Dec 7, 2017
77d41ad
add bill_identifier prop to vote_event for a more reliable way to ref…
Dec 13, 2017
25b72ac
nevermind re: new field; fallback to env var switch option
Dec 13, 2017
9a717bd
remove some whitespace from stringified json sent to pub/sub and expl…
Dec 14, 2017
3509b2c
use pub/sub topic_path publisher helper method to obtain topic path w…
Dec 14, 2017
7c5fe64
remove automatic topic creation; assume user will create the topic pr…
Dec 14, 2017
70ab25c
do not require ordering before publishing to pubsub
Dec 14, 2017
31a8b09
update google pubsub dependency version
Dec 23, 2017
8f30f76
tiny update to service account env var name
Dec 27, 2017
9276645
merge upstream and settle conflicts
Jan 17, 2018
301dabe
add base output class that wraps some before and after save handling
Jan 18, 2018
7be4e6e
add local file output subclass to handle/replace default/existing scr…
Jan 18, 2018
e7bf5fe
remove save_object method from scraper class as it has been moved to …
Jan 18, 2018
a77eb67
update google cloud pubsub class to use new output base class
Jan 18, 2018
957a0bd
remove local file output init method
Jan 19, 2018
6f8f8a4
Merge pull request #1 from doubleswirve/custom-export--dry-it-up
Jan 19, 2018
ce3456c
patch scraper reference prop in google cloud output
Jan 19, 2018
cd967b1
add amazon sqs output
Jan 21, 2018
757e907
add amazon sqs output target to condition
Jan 21, 2018
99afa3e
add boto3 dep
Jan 21, 2018
337beab
do not encode for aws sqs
Jan 21, 2018
4165197
update pubsub filename
Jan 21, 2018
f550a37
update initialization of publisher with new env var
Jan 21, 2018
c985014
break some more helper methods out to the parent class and remove pub…
Jan 22, 2018
4cf74ac
try renewing the publisher client every publish...
Jan 22, 2018
71dc7b7
patch order so var error does not occur
Jan 22, 2018
a01dc61
smore patches
Jan 22, 2018
2e55902
try to debug http version
Jan 22, 2018
1c832dd
remove http version testing and revert to instantiating the publisher…
Jan 22, 2018
289f9a0
add debug helper method to parent class and update improts
Jan 22, 2018
e2c950b
try upgrading the grpc lib version
Jan 23, 2018
c6e4d71
remove grpc dep, no dice
Jan 23, 2018
0adc9a5
Merge pull request #2 from doubleswirve/custom-export--google-pubsub-env
Jan 23, 2018
585dc90
Merge branch 'master' into custom-export
Jan 23, 2018
0b5d707
Merge branch 'custom-export' into custom-export--amazon-sqs
Jan 23, 2018
90bc055
update info logging
Jan 23, 2018
5ade2fe
port over some more helper to sqs
Jan 23, 2018
838b36e
add s3 handling for messages that are too large
Jan 23, 2018
9d5e9d2
update s3 key format
Jan 23, 2018
820d395
update number formatting for py version
Jan 24, 2018
9db57ba
Merge pull request #3 from doubleswirve/custom-export--amazon-sqs
Mar 27, 2018
81bb820
resolve conflicts
doubleswirve Aug 15, 2018
2978f4c
Update opencivicdata min version
showerst Dec 11, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pupa/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.9.0' # pragma: no cover
__version__ = '0.9.0.dev2' # pragma: no cover
46 changes: 13 additions & 33 deletions pupa/scrape/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,17 @@ def __init__(self, jurisdiction, datadir, *, strict_validation=True, fastmode=Fa
self.error = self.logger.error
self.critical = self.logger.critical

def save_object(self, obj):
"""
Save object to disk as JSON.

Generally shouldn't be called directly.
"""
obj.pre_save(self.jurisdiction.jurisdiction_id)

filename = '{0}_{1}.json'.format(obj._type, obj._id).replace('/', '-')

self.info('save %s %s as %s', obj._type, obj, filename)
self.debug(json.dumps(OrderedDict(sorted(obj.as_dict().items())),
cls=utils.JSONEncoderPlus, indent=4, separators=(',', ': ')))

self.output_names[obj._type].add(filename)

with open(os.path.join(self.datadir, filename), 'w') as f:
json.dump(obj.as_dict(), f, cls=utils.JSONEncoderPlus)

# validate after writing, allows for inspection on failure
try:
obj.validate()
except ValueError as ve:
if self.strict_validation:
raise ve
else:
self.warning(ve)

# after saving and validating, save subordinate objects
for obj in obj._related:
self.save_object(obj)
self.output_target = self.get_output_target(os.environ.get('OUTPUT_TARGET'))

def get_output_target(self, output_target_name):
if output_target_name == 'GOOGLE_CLOUD_PUBSUB':
from pupa.scrape.outputs.google_cloud_pubsub import GoogleCloudPubSub
return GoogleCloudPubSub(self)
if output_target_name == 'AMAZON_SQS':
from pupa.scrape.outputs.amazon_sqs import AmazonSQS
return AmazonSQS(self)
from pupa.scrape.outputs.local_file import LocalFile
return LocalFile(self)

def do_scrape(self, **kwargs):
record = {'objects': defaultdict(int)}
Expand All @@ -111,9 +91,9 @@ def do_scrape(self, **kwargs):
for obj in self.scrape(**kwargs) or []:
if hasattr(obj, '__iter__'):
for iterobj in obj:
self.save_object(iterobj)
self.output_target.save_object(iterobj)
else:
self.save_object(obj)
self.output_target.save_object(obj)
record['end'] = utils.utcnow()
record['skipped'] = getattr(self, 'skipped', 0)
if not self.output_names:
Expand Down
Empty file added pupa/scrape/outputs/__init__.py
Empty file.
43 changes: 43 additions & 0 deletions pupa/scrape/outputs/amazon_sqs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import boto3
import os
import json
import uuid
from collections import OrderedDict

from pupa import utils
from pupa.scrape.outputs.output import Output

MAX_BYTE_LENGTH = 230000


class AmazonSQS(Output):

def __init__(self, scraper):
super().__init__(scraper)

self.sqs = boto3.resource('sqs')
self.queue_name = os.environ.get('AMAZON_SQS_QUEUE')
self.queue = self.sqs.get_queue_by_name(QueueName=self.queue_name)

self.s3 = boto3.resource('s3')
self.bucket_name = os.environ.get('AMAZON_S3_BUCKET')

def handle_output(self, obj):
self.scraper.info('send %s %s to queue %s', obj._type, obj,
self.queue_name)
self.debug_obj(obj)

self.add_output_name(obj, self.queue_name)
obj_str = self.stringify_obj(obj, True, True)
encoded_obj_str = obj_str.encode('utf-8')

if len(encoded_obj_str) > MAX_BYTE_LENGTH:
key = 'S3:{}'.format(str(uuid.uuid4()))

self.scraper.info('put %s %s to bucket %s/%s', obj._type, obj,
self.bucket_name, key)

self.s3.Object(self.bucket_name, key).put(Body=encoded_obj_str)
self.queue.send_message(MessageBody=key)
else:
self.queue.send_message(MessageBody=obj_str)
26 changes: 26 additions & 0 deletions pupa/scrape/outputs/google_cloud_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os

from pupa.scrape.outputs.output import Output

from google.cloud import pubsub


class GoogleCloudPubSub(Output):

def __init__(self, scraper):
super().__init__(scraper)

project = os.environ.get('GOOGLE_CLOUD_PROJECT')
topic_name = os.environ.get('GOOGLE_CLOUD_PUBSUB_TOPIC')
self.publisher = pubsub.PublisherClient()
self.topic_path = self.publisher.topic_path(project, topic_name)

def handle_output(self, obj):
self.scraper.info('publish %s %s to topic %s', obj._type, obj,
self.topic_path)
self.debug_obj(obj)

self.add_output_name(obj, self.topic_path)
obj_str = self.stringify_obj(obj, True, True)

self.publisher.publish(self.topic_path, obj_str.encode('utf-8'))
19 changes: 19 additions & 0 deletions pupa/scrape/outputs/local_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import json
import os

from pupa import utils
from pupa.scrape.outputs.output import Output


class LocalFile(Output):

def handle_output(self, obj):
filename = '{0}_{1}.json'.format(obj._type, obj._id).replace('/', '-')

self.scraper.info('save %s %s as %s', obj._type, obj, filename)
self.debug_obj(obj)

self.add_output_name(obj, filename)

with open(os.path.join(self.scraper.datadir, filename), 'w') as f:
json.dump(obj.as_dict(), f, cls=utils.JSONEncoderPlus)
60 changes: 60 additions & 0 deletions pupa/scrape/outputs/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import json

from abc import ABCMeta, abstractmethod
from collections import OrderedDict
from datetime import datetime, timezone

from pupa import utils


class Output(metaclass=ABCMeta):

def __init__(self, scraper):
self.scraper = scraper

def add_output_name(self, obj, output_name):
self.scraper.output_names[obj._type].add(output_name)

def debug_obj(self, obj):
self.scraper.debug(json.dumps(OrderedDict(sorted(obj.as_dict().items())),
cls=utils.JSONEncoderPlus,
indent=4, separators=(',', ': ')))

def get_obj_as_dict(self, obj, add_jurisdiction=False, add_type=False):
obj_dict = obj.as_dict()
if add_jurisdiction and self.scraper.jurisdiction:
obj_dict['jurisdiction'] = self.scraper.jurisdiction.jurisdiction_id
if add_type:
obj_dict['type'] = obj._type
return obj_dict

@abstractmethod
def handle_output(self, obj):
pass

def save_object(self, obj):
obj.pre_save(self.scraper.jurisdiction.jurisdiction_id)

# actual output handling, to be handled by subclass
self.handle_output(obj)

# validate after writing, allows for inspection on failure
try:
obj.validate()
except ValueError as ve:
if self.scraper.strict_validation:
raise ve
else:
self.scraper.warning(ve)

# after saving and validating, save subordinate objects
for obj in obj._related:
self.save_object(obj)

def stringify_obj(self, obj, add_jurisdiction=False, add_type=False):
obj_dict = self.get_obj_as_dict(obj, add_jurisdiction, add_type)
return self.stringify_obj_dict(obj_dict)

def stringify_obj_dict(self, obj_dict):
return json.dumps(obj_dict, cls=utils.JSONEncoderPlus,
separators=(',', ':'))
8 changes: 7 additions & 1 deletion pupa/scrape/vote_event.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from ..utils import _make_pseudo_id
from .base import BaseModel, cleanup_list, SourceMixin
from .bill import Bill
Expand Down Expand Up @@ -47,7 +49,11 @@ def set_bill(self, bill_or_identifier, *, chamber=None):
elif isinstance(bill_or_identifier, Bill):
if chamber:
raise ScrapeValueError("set_bill takes no arguments when using a `Bill` object")
self.bill = bill_or_identifier._id
if os.environ.get('VOTE_EVENT_NO_BILL_UUID') == 'true':
kwargs = {'identifier': bill_or_identifier.identifier}
self.bill = _make_pseudo_id(**kwargs)
else:
self.bill = bill_or_identifier._id
else:
if chamber is None:
chamber = 'legislature'
Expand Down
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
pupa = pupa.cli.__main__:main''',
install_requires=[
'Django>=1.11',
'opencivicdata>=2.1.0',
'opencivicdata>=2.1.2',
'dj_database_url>=0.3.0',
'scrapelib>=1.0',
'jsonschema>=2.6.0',
'psycopg2',
'pytz',
'google-cloud-pubsub==0.30.1',
'boto3==1.5.18',
],
extras_require={
'dev': [
Expand Down