Skip to content

Commit

Permalink
Implement job persistance
Browse files Browse the repository at this point in the history
  • Loading branch information
mxdev88 committed Apr 15, 2021
1 parent f916d9b commit efbdede
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 7 deletions.
11 changes: 11 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ Scrapyd includes an interface with a website to provide simple monitoring
and access to the application's webresources.
This setting must provide the root class of the twisted web resource.

jobstorage
-------

A class that stores finished jobs. There are 2 implementations provided:

* ``MemoryJobStorage`` (default) jobs are stored and memory and lost when the daemon is restarted
* ``SqliteJobStorage`` jobs are persisted in a Sqlite database in ``dbs_dir``

If another backend is needed, one can implement its own class by implementing the IJobStorage
interface.

node_name
---------

Expand Down
1 change: 1 addition & 0 deletions docs/news.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Added
- Make project argument to listjobs.json optional,
so that we can easily query for all jobs.
- Python 3.7, 3.8 and 3.9 support
- Configuration option for job storage class

Removed
~~~~~~~
Expand Down
10 changes: 9 additions & 1 deletion scrapyd/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
from twisted.cred.portal import Portal
from twisted.web.guard import HTTPAuthSessionWrapper, BasicCredentialFactory

from .interfaces import IEggStorage, IPoller, ISpiderScheduler, IEnvironment
from scrapy.utils.misc import load_object

from .interfaces import IEggStorage, IJobStorage, IPoller, ISpiderScheduler, IEnvironment
from .eggstorage import FilesystemEggStorage
from .jobstorage import MemoryJobStorage
from .scheduler import SpiderScheduler
from .poller import QueuePoller
from .environ import Environment
Expand Down Expand Up @@ -50,6 +53,11 @@ def application(config):
app.setComponent(ISpiderScheduler, scheduler)
app.setComponent(IEnvironment, environment)

jspath = config.get('jobstorage', 'scrapyd.jobstorage.MemoryJobStorage')
jscls = load_object(jspath)
jobstorage = jscls(config)
app.setComponent(IJobStorage, jobstorage)

laupath = config.get('launcher', 'scrapyd.launcher.Launcher')
laucls = load_object(laupath)
launcher = laucls(config, app)
Expand Down
1 change: 1 addition & 0 deletions scrapyd/default_scrapyd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ username =
password =
debug = off
runner = scrapyd.runner
jobstorage = scrapyd.jobstorage.MemoryJobStorage
application = scrapyd.app.application
launcher = scrapyd.launcher.Launcher
webroot = scrapyd.website.Root
Expand Down
15 changes: 15 additions & 0 deletions scrapyd/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,18 @@ def get_environment(message, slot):
`message` is the message received from the IPoller.next() method
`slot` is the Launcher slot where the process will be running.
"""

class IJobStorage(Interface):
"""A component that handles storing and retrieving finished jobs. """

def add(job):
"""Add a finished job in the storage. """

def list():
"""Return a list of the finished jobs. """

def __len__():
"""Return a number of the finished jobs. """

def __iter__():
"""Iterate over the finished jobs. """
64 changes: 64 additions & 0 deletions scrapyd/jobstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from datetime import datetime
from zope.interface import implementer

from .interfaces import IJobStorage
from .sqlite import SqliteFinishedJobs


class Job(object):
def __init__(self, project, spider, job=None, start_time=None, end_time=None):
self.project = project
self.spider = spider
self.job = job
self.start_time = start_time if start_time else datetime.now()
self.end_time = end_time if end_time else datetime.now()


@implementer(IJobStorage)
class MemoryJobStorage(object):

def __init__(self, config):
self.jobs = []
self.finished_to_keep = config.getint('finished_to_keep', 100)

def add(self, job):
self.jobs.append(job)
del self.jobs[:-self.finished_to_keep] # keep last x finished jobs

def list(self):
return self.jobs

def __len__(self):
return len(self.jobs)

def __iter__(self):
for j in self.jobs:
yield j


@implementer(IJobStorage)
class SqliteJobStorage(object):

def __init__(self, config):
dbsdir = config.get('dbs_dir', 'dbs')
if not os.path.exists(dbsdir):
os.makedirs(dbsdir)
dbpath = os.path.join(dbsdir, 'jobs.db')
self.jstorage = SqliteFinishedJobs(dbpath, "finished_jobs")
self.finished_to_keep = config.getint('finished_to_keep', 100)

def add(self, job):
self.jstorage.add(job)
self.jstorage.clear(self.finished_to_keep)

def list(self):
return [j for j in self.__iter__()]

def __len__(self):
return len(self.jstorage)

def __iter__(self):
for j in self.jstorage:
yield Job(project=j[0], spider=j[1], job=j[2],
start_time=j[3], end_time=j[4])
9 changes: 4 additions & 5 deletions scrapyd/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

from scrapyd.utils import get_crawl_args, native_stringify_dict
from scrapyd import __version__
from .interfaces import IPoller, IEnvironment
from .interfaces import IPoller, IEnvironment, IJobStorage

class Launcher(Service):

name = 'launcher'

def __init__(self, config, app):
self.processes = {}
self.finished = []
self.finished_to_keep = config.getint('finished_to_keep', 100)
self.finished = app.getComponent(IJobStorage)
self.max_proc = self._get_max_proc(config)
self.runner = config.get('runner', 'scrapyd.runner')
self.app = app


def startService(self):
for slot in range(self.max_proc):
Expand Down Expand Up @@ -50,8 +50,7 @@ def _spawn_process(self, message, slot):
def _process_finished(self, _, slot):
process = self.processes.pop(slot)
process.end_time = datetime.now()
self.finished.append(process)
del self.finished[:-self.finished_to_keep] # keep last 100 finished jobs
self.finished.add(process)
self._wait_for_project(slot)

def _get_max_proc(self, config):
Expand Down
43 changes: 43 additions & 0 deletions scrapyd/sqlite.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import sqlite3
import json
try:
Expand Down Expand Up @@ -145,3 +146,45 @@ def encode(self, obj):

def decode(self, text):
return json.loads(bytes(text).decode('ascii'))


class SqliteFinishedJobs(object):
"""SQLite finished jobs. """

def __init__(self, database=None, table="finished_jobs"):
self.database = database or ':memory:'
self.table = table
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (id integer primary key, " \
"project text, spider text, job text, start_time datetime, end_time datetime)" % table
self.conn.execute(q)

def add(self, job):
args = (job.project, job.spider, job.job, job.start_time, job.end_time)
q = "insert into %s (project, spider, job, start_time, end_time) values (?,?,?,?,?)" % self.table
self.conn.execute(q, args)
self.conn.commit()

def clear(self, finished_to_keep=None):
w = ""
if finished_to_keep:
limit = len(self) - finished_to_keep
if limit <= 0:
return # nothing to delete
w = "where id <= (select max(id) from " \
"(select id from %s order by end_time limit %d))" % (self.table, limit)
q = "delete from %s %s" % (self.table, w)
self.conn.execute(q)
self.conn.commit()

def __len__(self):
q = "select count(*) from %s" % self.table
return self.conn.execute(q).fetchone()[0]

def __iter__(self):
q = "select project, spider, job, start_time, end_time from %s order by end_time desc" % \
self.table
return ((j[0],j[1],j[2],
datetime.strptime(j[3], "%Y-%m-%d %H:%M:%S.%f"),
datetime.strptime(j[4], "%Y-%m-%d %H:%M:%S.%f")) for j in self.conn.execute(q))
61 changes: 61 additions & 0 deletions scrapyd/tests/test_jobstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from twisted.trial import unittest

from zope.interface.verify import verifyObject

from scrapyd.interfaces import IJobStorage
from scrapyd.config import Config
from scrapyd.jobstorage import Job, MemoryJobStorage, SqliteJobStorage

j1, j2, j3 = Job('p1', 's1'), Job('p2', 's2'), Job('p3', 's3')


class MemoryJobStorageTest(unittest.TestCase):

def setUp(self):
d = self.mktemp()
config = Config(values={'dbs_dir': d, 'finished_to_keep': '2'})
self.jobst = MemoryJobStorage(config)
self.j1, self.j2, self.j3 = j1, j2, j3
self.jobst.add(self.j1)
self.jobst.add(self.j2)
self.jobst.add(self.j3)

def test_interface(self):
verifyObject(IJobStorage, self.jobst)

def test_add(self):
self.assertEqual(len(self.jobst.list()), 2)

def test_iter(self):
l = [j for j in self.jobst]
self.assertEqual(l[0], j2)
self.assertEqual(l[1], j3)
self.assertEqual(len(l), 2)

def test_len(self):
self.assertEqual(len(self.jobst), 2)


class SqliteJobsStorageTest(unittest.TestCase):

def setUp(self):
d = self.mktemp()
config = Config(values={'dbs_dir': d, 'finished_to_keep': '2'})
self.jobst = SqliteJobStorage(config)
self.j1, self.j2, self.j3 = j1, j2, j3

def test_interface(self):
verifyObject(IJobStorage, self.jobst)

def test_add(self):
self.jobst.add(self.j1)
self.jobst.add(self.j2)
self.jobst.add(self.j3)
self.assertEqual(len(self.jobst.list()), 2)

def test_iter(self):
self.jobst.add(self.j1)
self.jobst.add(self.j2)
self.jobst.add(self.j3)
l = [j for j in self.jobst]
self.assertEqual(len(l), 2)
30 changes: 29 additions & 1 deletion scrapyd/tests/test_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from decimal import Decimal

from scrapy.http import Request
from scrapyd.sqlite import JsonSqlitePriorityQueue, JsonSqliteDict
from scrapyd.jobstorage import Job
from scrapyd.sqlite import JsonSqlitePriorityQueue, JsonSqliteDict, SqliteFinishedJobs


class JsonSqliteDictTest(unittest.TestCase):
Expand Down Expand Up @@ -128,3 +129,30 @@ def test_types(self):
for x in self.supported_values:
self.q.put(x)
self.assertEqual(self.q.pop(), x)


class SqliteFinishedJobsTest(unittest.TestCase):

def setUp(self):
self.q = SqliteFinishedJobs(':memory:')
self.j1, self.j2, self.j3 = Job('p1', 's1'), Job('p2', 's2'), Job('p3', 's3')
self.q.add(self.j1)
self.q.add(self.j2)
self.q.add(self.j3)

def test_add(self):
self.assertEqual(len(self.q), 3)

def test_clear_all(self):
self.q.clear()
self.assertEqual(len(self.q), 0)

def test_clear_keep_2(self):
self.q.clear(finished_to_keep=2)
self.assertEqual(len(self.q), 2)

def test__iter__(self):
l = [j for j in self.q]
self.assertEqual((l[0][0], l[0][1]), ('p3', 's3'))
self.assertEqual((l[1][0], l[1][1]), ('p2', 's2'))
self.assertEqual((l[2][0], l[2][1]), ('p1', 's1'))

0 comments on commit efbdede

Please sign in to comment.