Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement job persistance #359

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,17 @@ Scrapyd includes an interface with a website to provide simple monitoring
and access to the application's webresources.
This setting must provide the root class of the twisted web resource.

jobstorage
-------

A class that stores finished jobs. There are 2 implementations provided:

* ``MemoryJobStorage`` (default) jobs are stored and memory and lost when the daemon is restarted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default is actually:

scrapyd.jobstorage.MemoryJobStorage

let's put full names here:

scrapyd.jobstorage.MemoryJobStorage
and scrapyd.jobstorage.SqliteJobStorage

* ``SqliteJobStorage`` jobs are persisted in a Sqlite database in ``dbs_dir``

If another backend is needed, one can implement its own class by implementing the IJobStorage
interface.

node_name
---------

Expand Down
1 change: 1 addition & 0 deletions docs/news.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Added
- Make project argument to listjobs.json optional,
so that we can easily query for all jobs.
- Python 3.7, 3.8 and 3.9 support
- Configuration option for job storage class

Removed
~~~~~~~
Expand Down
10 changes: 9 additions & 1 deletion scrapyd/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
from twisted.cred.portal import Portal
from twisted.web.guard import HTTPAuthSessionWrapper, BasicCredentialFactory

from .interfaces import IEggStorage, IPoller, ISpiderScheduler, IEnvironment
from scrapy.utils.misc import load_object

from .interfaces import IEggStorage, IJobStorage, IPoller, ISpiderScheduler, IEnvironment
from .eggstorage import FilesystemEggStorage
from .jobstorage import MemoryJobStorage
from .scheduler import SpiderScheduler
from .poller import QueuePoller
from .environ import Environment
Expand Down Expand Up @@ -50,6 +53,11 @@ def application(config):
app.setComponent(ISpiderScheduler, scheduler)
app.setComponent(IEnvironment, environment)

jspath = config.get('jobstorage', 'scrapyd.jobstorage.MemoryJobStorage')
jscls = load_object(jspath)
jobstorage = jscls(config)
app.setComponent(IJobStorage, jobstorage)

laupath = config.get('launcher', 'scrapyd.launcher.Launcher')
laucls = load_object(laupath)
launcher = laucls(config, app)
Expand Down
1 change: 1 addition & 0 deletions scrapyd/default_scrapyd.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ username =
password =
debug = off
runner = scrapyd.runner
jobstorage = scrapyd.jobstorage.MemoryJobStorage
application = scrapyd.app.application
launcher = scrapyd.launcher.Launcher
webroot = scrapyd.website.Root
Expand Down
15 changes: 15 additions & 0 deletions scrapyd/interfaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,18 @@ def get_environment(message, slot):
`message` is the message received from the IPoller.next() method
`slot` is the Launcher slot where the process will be running.
"""

class IJobStorage(Interface):
"""A component that handles storing and retrieving finished jobs. """

def add(job):
"""Add a finished job in the storage. """

def list():
"""Return a list of the finished jobs. """

def __len__():
"""Return a number of the finished jobs. """

def __iter__():
"""Iterate over the finished jobs. """
64 changes: 64 additions & 0 deletions scrapyd/jobstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from datetime import datetime
from zope.interface import implementer

from .interfaces import IJobStorage
from .sqlite import SqliteFinishedJobs


class Job(object):
def __init__(self, project, spider, job=None, start_time=None, end_time=None):
self.project = project
self.spider = spider
self.job = job
self.start_time = start_time if start_time else datetime.now()
self.end_time = end_time if end_time else datetime.now()


@implementer(IJobStorage)
class MemoryJobStorage(object):

def __init__(self, config):
self.jobs = []
self.finished_to_keep = config.getint('finished_to_keep', 100)

def add(self, job):
self.jobs.append(job)
del self.jobs[:-self.finished_to_keep] # keep last x finished jobs

def list(self):
return self.jobs

def __len__(self):
return len(self.jobs)

def __iter__(self):
for j in self.jobs:
yield j


@implementer(IJobStorage)
class SqliteJobStorage(object):

def __init__(self, config):
dbsdir = config.get('dbs_dir', 'dbs')
if not os.path.exists(dbsdir):
os.makedirs(dbsdir)
dbpath = os.path.join(dbsdir, 'jobs.db')
self.jstorage = SqliteFinishedJobs(dbpath, "finished_jobs")
self.finished_to_keep = config.getint('finished_to_keep', 100)

def add(self, job):
self.jstorage.add(job)
self.jstorage.clear(self.finished_to_keep)

def list(self):
return [j for j in self.__iter__()]

def __len__(self):
return len(self.jstorage)

def __iter__(self):
for j in self.jstorage:
yield Job(project=j[0], spider=j[1], job=j[2],
start_time=j[3], end_time=j[4])
9 changes: 4 additions & 5 deletions scrapyd/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

from scrapyd.utils import get_crawl_args, native_stringify_dict
from scrapyd import __version__
from .interfaces import IPoller, IEnvironment
from .interfaces import IPoller, IEnvironment, IJobStorage

class Launcher(Service):

name = 'launcher'

def __init__(self, config, app):
self.processes = {}
self.finished = []
self.finished_to_keep = config.getint('finished_to_keep', 100)
self.finished = app.getComponent(IJobStorage)
self.max_proc = self._get_max_proc(config)
self.runner = config.get('runner', 'scrapyd.runner')
self.app = app


def startService(self):
for slot in range(self.max_proc):
Expand Down Expand Up @@ -50,8 +50,7 @@ def _spawn_process(self, message, slot):
def _process_finished(self, _, slot):
process = self.processes.pop(slot)
process.end_time = datetime.now()
self.finished.append(process)
del self.finished[:-self.finished_to_keep] # keep last 100 finished jobs
self.finished.add(process)
self._wait_for_project(slot)

def _get_max_proc(self, config):
Expand Down
43 changes: 43 additions & 0 deletions scrapyd/sqlite.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
import sqlite3
import json
try:
Expand Down Expand Up @@ -145,3 +146,45 @@ def encode(self, obj):

def decode(self, text):
return json.loads(bytes(text).decode('ascii'))


class SqliteFinishedJobs(object):
"""SQLite finished jobs. """

def __init__(self, database=None, table="finished_jobs"):
self.database = database or ':memory:'
self.table = table
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (id integer primary key, " \
"project text, spider text, job text, start_time datetime, end_time datetime)" % table
self.conn.execute(q)

def add(self, job):
args = (job.project, job.spider, job.job, job.start_time, job.end_time)
q = "insert into %s (project, spider, job, start_time, end_time) values (?,?,?,?,?)" % self.table
self.conn.execute(q, args)
self.conn.commit()

def clear(self, finished_to_keep=None):
w = ""
if finished_to_keep:
limit = len(self) - finished_to_keep
if limit <= 0:
return # nothing to delete
w = "where id <= (select max(id) from " \
"(select id from %s order by end_time limit %d))" % (self.table, limit)
q = "delete from %s %s" % (self.table, w)
self.conn.execute(q)
self.conn.commit()

def __len__(self):
q = "select count(*) from %s" % self.table
return self.conn.execute(q).fetchone()[0]

def __iter__(self):
q = "select project, spider, job, start_time, end_time from %s order by end_time desc" % \
self.table
return ((j[0],j[1],j[2],
datetime.strptime(j[3], "%Y-%m-%d %H:%M:%S.%f"),
datetime.strptime(j[4], "%Y-%m-%d %H:%M:%S.%f")) for j in self.conn.execute(q))
61 changes: 61 additions & 0 deletions scrapyd/tests/test_jobstorage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from twisted.trial import unittest

from zope.interface.verify import verifyObject

from scrapyd.interfaces import IJobStorage
from scrapyd.config import Config
from scrapyd.jobstorage import Job, MemoryJobStorage, SqliteJobStorage

j1, j2, j3 = Job('p1', 's1'), Job('p2', 's2'), Job('p3', 's3')


class MemoryJobStorageTest(unittest.TestCase):

def setUp(self):
d = self.mktemp()
config = Config(values={'dbs_dir': d, 'finished_to_keep': '2'})
self.jobst = MemoryJobStorage(config)
self.j1, self.j2, self.j3 = j1, j2, j3
self.jobst.add(self.j1)
self.jobst.add(self.j2)
self.jobst.add(self.j3)

def test_interface(self):
verifyObject(IJobStorage, self.jobst)

def test_add(self):
self.assertEqual(len(self.jobst.list()), 2)

def test_iter(self):
l = [j for j in self.jobst]
self.assertEqual(l[0], j2)
self.assertEqual(l[1], j3)
self.assertEqual(len(l), 2)

def test_len(self):
self.assertEqual(len(self.jobst), 2)


class SqliteJobsStorageTest(unittest.TestCase):

def setUp(self):
d = self.mktemp()
config = Config(values={'dbs_dir': d, 'finished_to_keep': '2'})
self.jobst = SqliteJobStorage(config)
self.j1, self.j2, self.j3 = j1, j2, j3

def test_interface(self):
verifyObject(IJobStorage, self.jobst)

def test_add(self):
self.jobst.add(self.j1)
self.jobst.add(self.j2)
self.jobst.add(self.j3)
self.assertEqual(len(self.jobst.list()), 2)

def test_iter(self):
self.jobst.add(self.j1)
self.jobst.add(self.j2)
self.jobst.add(self.j3)
l = [j for j in self.jobst]
self.assertEqual(len(l), 2)
30 changes: 29 additions & 1 deletion scrapyd/tests/test_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from decimal import Decimal

from scrapy.http import Request
from scrapyd.sqlite import JsonSqlitePriorityQueue, JsonSqliteDict
from scrapyd.jobstorage import Job
from scrapyd.sqlite import JsonSqlitePriorityQueue, JsonSqliteDict, SqliteFinishedJobs


class JsonSqliteDictTest(unittest.TestCase):
Expand Down Expand Up @@ -128,3 +129,30 @@ def test_types(self):
for x in self.supported_values:
self.q.put(x)
self.assertEqual(self.q.pop(), x)


class SqliteFinishedJobsTest(unittest.TestCase):

def setUp(self):
self.q = SqliteFinishedJobs(':memory:')
self.j1, self.j2, self.j3 = Job('p1', 's1'), Job('p2', 's2'), Job('p3', 's3')
self.q.add(self.j1)
self.q.add(self.j2)
self.q.add(self.j3)

def test_add(self):
self.assertEqual(len(self.q), 3)

def test_clear_all(self):
self.q.clear()
self.assertEqual(len(self.q), 0)

def test_clear_keep_2(self):
self.q.clear(finished_to_keep=2)
self.assertEqual(len(self.q), 2)

def test__iter__(self):
l = [j for j in self.q]
self.assertEqual((l[0][0], l[0][1]), ('p3', 's3'))
self.assertEqual((l[1][0], l[1][1]), ('p2', 's2'))
self.assertEqual((l[2][0], l[2][1]), ('p1', 's1'))