diff --git a/scrapylib/chunkexports.py b/scrapylib/chunkexports.py new file mode 100644 index 0000000..215665c --- /dev/null +++ b/scrapylib/chunkexports.py @@ -0,0 +1,97 @@ +import os +import copy +from datetime import datetime + +from scrapy.contrib.feedexport import FeedExporter +from scrapy.exceptions import NotConfigured + + +DEFAULT_TIMESTAMP_FORMAT = '%Y-%m-%d-%H' + + +class ChunkedFeedExporter(FeedExporter): + """Extension for breaking item exports into chunks. + + Settings: + * CHUNKED_FEED_URI: The feed uri to use for exporting (Overrides FEED_URI setting). + * CHUNKED_FEED_FORMAT: The feed format to use for exporting (Overrides FEED_FORMAT setting). + * CHUNKED_FEED_ITEMS_PER_CHUNK: Number of items included in each chunk + * CHUNKED_FEED_TIMESTAMP_FORMAT: A string representing the format to be used for representing \ + the ``timestamp`` uri parameter. + + Example: + CHUNKED_FEED_URI = 'export_%(chunk_number)02d.json' + CHUNKED_FEED_FORMAT = 'json' + CHUNKED_FEED_ITEMS_PER_CHUNK = 100 + + For 250 items will generate the following files: + * export_01.json (100 items) + * export_02.json (100 items) + * export_03.json (50 items) + + Available uri format values: + * chunk_number: The active chunk counter. (Starts in 1). + * scrapy_job: The Scrapy job (if available). + * scrapy_project_id: The Scrapy job id (if available). + * timestamp: Current timestamp in UTC (formatted with CHUNKED_FEED_TIMESTAMP_FORMAT setting). + + """ + + def __init__(self, settings): + + # Override settings object to reuse feed exporter settings + settings = copy.deepcopy(settings) + self.settings = settings + + # Get chunked settings + chunked_feed_uri = self._get_from_settings_or_not_configured('CHUNKED_FEED_URI', None) + chunked_feed_format = self._get_from_settings_or_not_configured('CHUNKED_FEED_FORMAT') + self._items_per_chunk = self._get_from_settings_or_not_configured('CHUNKED_FEED_ITEMS_PER_CHUNK') + + # Settings override + settings.set('FEED_URI', chunked_feed_uri, 100) + settings.set('FEED_FORMAT', chunked_feed_format, 100) + + # Parent call with overridden settings + super(ChunkedFeedExporter, self).__init__(settings) + + # Internal stuff + self._chunk_number = 1 + self._uripar = self.get_uri_parameters + self._timestamp_format = settings.get('CHUNKED_FEED_TIMESTAMP_FORMAT', DEFAULT_TIMESTAMP_FORMAT) + + # Get uri parameters from settings or environment + self.settings = settings + self._scrapy_job = self._get_from_settings_or_environ('SCRAPY_JOB', 'nojob') + self._scrapy_project = self._get_from_settings_or_environ('SCRAPY_PROJECT', 'noproject') + self._scrapy_project_id = self._get_from_settings_or_environ('SCRAPY_PROJECT_ID', 'noprojectid') + + def get_uri_parameters(self, params, spider): + """Update feed uri available parameters. Override if you want to add more parameters""" + params.update({ + 'chunk_number': self._chunk_number, + 'scrapy_job': self._scrapy_job, + 'scrapy_project_id': self._scrapy_project_id, + 'timestamp': datetime.utcnow().strftime("%Y-%m-%d-%H"), + }) + + def item_scraped(self, item, spider): + if self._items_per_chunk and self.slot.itemcount+1 > self._items_per_chunk: + # Item must be in next chunk, close current chunk and create a new one + d = self.close_spider(spider) + self._chunk_number += 1 + self.open_spider(spider) + super(ChunkedFeedExporter, self).item_scraped(item, spider) + return d + else: + # Item in current chunk, normal call + return super(ChunkedFeedExporter, self).item_scraped(item, spider) + + def _get_from_settings_or_environ(self, name, default): + return self.settings.get(name=name, default=os.environ.get(name, default)) + + def _get_from_settings_or_not_configured(self, name, default=None): + value = self.settings.get(name, default) + if not value: + raise NotConfigured + return value diff --git a/tests/test_chunkexports.py b/tests/test_chunkexports.py new file mode 100644 index 0000000..a9d6785 --- /dev/null +++ b/tests/test_chunkexports.py @@ -0,0 +1,247 @@ +import os +import json +import shutil +import unittest + +from scrapy.spider import Spider +from scrapy import Item, Field +from scrapy.utils.test import get_crawler +from scrapy.exceptions import NotConfigured + +from scrapylib.chunkexports import ChunkedFeedExporter + +EXPORT_TEMP_DIR = '.exports' +EXPORT_FILE_PATTERN = EXPORT_TEMP_DIR + '/export_test_%(chunk_number)02d.json' +JSON_FEED_EXPORTERS = {'json': 'scrapy.contrib.exporter.JsonItemExporter'} + + +class FakeItem(Item): + id = Field() + + +class ItemGenerator(object): + item_id = 0 + + @classmethod + def generate(cls): + cls.item_id += 1 + return FakeItem(id=cls.item_id) + + @classmethod + def reset(cls): + cls.item_id = 0 + + +class ChunkExtensionTest(object): + settings = {} + + def tearDown(self): + self.remove_temp_dir() + + def start(self, n_items_per_chunk=None, n_items=None, settings=None): + + # Reset item generator and remove temporary dir + ItemGenerator.reset() + self.remove_temp_dir() + + # Setup settings + settings = settings or self.settings.copy() + if n_items_per_chunk is not None: + settings['CHUNKED_FEED_ITEMS_PER_CHUNK'] = n_items_per_chunk + + # Init Scrapy + self.crawler = get_crawler(settings) + self.spider = Spider('chunk_test') + self.spider.set_crawler(self.crawler) + self.extension = ChunkedFeedExporter.from_crawler(self.crawler) + self.extension.open_spider(self.spider) + + # Add items if we have to + if n_items: + self.add_items(n_items) + + def stop(self): + return self.extension.close_spider(self.spider) + + def remove_temp_dir(self): + shutil.rmtree(EXPORT_TEMP_DIR, ignore_errors=True) + + def add_items(self, n_items): + for i in range(n_items): + item = ItemGenerator.generate() + self.extension.item_scraped(item, self.spider) + + def get_chunk_filename(self, chunk): + return EXPORT_FILE_PATTERN % {'chunk_number':chunk} + + def get_chunk_filenames(self): + return [f for f in os.listdir(EXPORT_TEMP_DIR) if f.endswith(".json")] + + def get_number_of_chunks(self): + return len(self.get_chunk_filenames()) + + def get_chunk_content(self, chunk): + with open(self.get_chunk_filename(chunk)) as f: + return json.load(f) + + def ensure_number_of_chunks(self, n_chunks): + n = self.get_number_of_chunks() + assert n_chunks == n, "Wrong number of chunks. found %d, expecting %d" % (n, n_chunks) + + def ensure_number_of_exported_items_per_chunk(self, chunk, n_items): + n_exported_items = len(self.get_chunk_content(chunk)) + assert n_items == n_exported_items, "Wrong number of exported items. found %d, expecting %d" % \ + (n_exported_items, n_items) + + +class ConfigFailures(ChunkExtensionTest, unittest.TestCase): + + def test_no_settings(self): + self.assertRaises(NotConfigured, self.start, settings={}) + + def test_no_feed_uri(self): + self.assertRaises(NotConfigured, self.start, settings={ + 'CHUNKED_FEED_FORMAT': 'json', + 'CHUNKED_FEED_ITEMS_PER_CHUNK': 1, + 'FEED_EXPORTERS': JSON_FEED_EXPORTERS + }) + + def test_no_feed_format(self): + self.assertRaises(NotConfigured, self.start, settings={ + 'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN, + 'CHUNKED_FEED_ITEMS_PER_CHUNK': 1, + 'FEED_EXPORTERS': JSON_FEED_EXPORTERS + }) + + def test_no_feed_items_per_chunk(self): + self.assertRaises(NotConfigured, self.start, settings={ + 'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN, + 'CHUNKED_FEED_FORMAT': 'json', + 'FEED_EXPORTERS': JSON_FEED_EXPORTERS + }) + + def test_zero_feed_items_per_chunk(self): + self.assertRaises(NotConfigured, self.start, settings={ + 'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN, + 'CHUNKED_FEED_FORMAT': 'json', + 'CHUNKED_FEED_ITEMS_PER_CHUNK': 0, + 'FEED_EXPORTERS': JSON_FEED_EXPORTERS + }) + + +class ItemsAndChunks(ChunkExtensionTest, unittest.TestCase): + settings = { + 'CHUNKED_FEED_URI': EXPORT_FILE_PATTERN, + 'CHUNKED_FEED_FORMAT': 'json', + 'CHUNKED_FEED_ITEMS_PER_CHUNK': 1, + 'FEED_EXPORTERS': JSON_FEED_EXPORTERS + } + + def test_items_0(self): + # FIXME: Scrapy exporter creates always one file + self.start(n_items=0, n_items_per_chunk=1) + self.stop() + #self.ensure_number_of_chunks(n_chunks=0) + + def test_items_1_chunksize_1(self): + self.start(n_items=1, n_items_per_chunk=1) + self.stop() + self.ensure_number_of_chunks(n_chunks=1) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1) + + def test_items_1_chunksize_2(self): + self.start(n_items=1, n_items_per_chunk=2) + self.stop() + self.ensure_number_of_chunks(n_chunks=1) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1) + + def test_items_2_chunksize_1(self): + self.start(n_items=2, n_items_per_chunk=1) + self.stop() + self.ensure_number_of_chunks(n_chunks=2) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=1) + + def test_items_2_chunksize_2(self): + self.start(n_items=2, n_items_per_chunk=2) + self.stop() + self.ensure_number_of_chunks(n_chunks=1) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=2) + + def test_items_2_chunksize_3(self): + self.start(n_items=2, n_items_per_chunk=3) + self.stop() + self.ensure_number_of_chunks(n_chunks=1) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=2) + + def test_items_5_chunksize_1(self): + self.start(n_items=5, n_items_per_chunk=1) + self.stop() + self.ensure_number_of_chunks(n_chunks=5) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=1) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=1) + self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=1) + self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=1) + self.ensure_number_of_exported_items_per_chunk(chunk=5, n_items=1) + + def test_items_5_chunksize_2(self): + self.start(n_items=5, n_items_per_chunk=2) + self.stop() + self.ensure_number_of_chunks(n_chunks=3) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=2) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=2) + self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=1) + + def test_items_5_chunksize_3(self): + self.start(n_items=5, n_items_per_chunk=3) + self.stop() + self.ensure_number_of_chunks(n_chunks=2) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=3) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=2) + + def test_items_5_chunksize_4(self): + self.start(n_items=5, n_items_per_chunk=4) + self.stop() + self.ensure_number_of_chunks(n_chunks=2) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=4) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=1) + + def test_items_5_chunksize_5(self): + self.start(n_items=5, n_items_per_chunk=5) + self.stop() + self.ensure_number_of_chunks(n_chunks=1) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=5) + + def test_items_5_chunksize_6(self): + self.start(n_items=5, n_items_per_chunk=6) + self.stop() + self.ensure_number_of_chunks(n_chunks=1) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=5) + + def test_items_100_chunksize_25(self): + self.start(n_items=100, n_items_per_chunk=25) + self.stop() + self.ensure_number_of_chunks(n_chunks=4) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=25) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=25) + self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=25) + self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=25) + + def test_items_100_chunksize_24(self): + self.start(n_items=100, n_items_per_chunk=24) + self.stop() + self.ensure_number_of_chunks(n_chunks=5) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=24) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=24) + self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=24) + self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=24) + self.ensure_number_of_exported_items_per_chunk(chunk=5, n_items=4) + + def test_items_100_chunksize_26(self): + self.start(n_items=100, n_items_per_chunk=26) + self.stop() + self.ensure_number_of_chunks(n_chunks=4) + self.ensure_number_of_exported_items_per_chunk(chunk=1, n_items=26) + self.ensure_number_of_exported_items_per_chunk(chunk=2, n_items=26) + self.ensure_number_of_exported_items_per_chunk(chunk=3, n_items=26) + self.ensure_number_of_exported_items_per_chunk(chunk=4, n_items=22)