Skip to content
This repository has been archived by the owner on Apr 18, 2018. It is now read-only.

Add support for SUBMIT_JOB_EPOCH #40

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 20 additions & 6 deletions gearman/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ def __init__(self, host_list=None, random_unique_bytes=RANDOM_UNIQUE_BYTES):
# Ignores the fact if a request has been bound to a connection or not
self.request_to_rotating_connection_queue = weakref.WeakKeyDictionary(compat.defaultdict(collections.deque))

def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, when_to_run=None, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Submit a single job to any gearman server"""
job_info = dict(task=task, data=data, unique=unique, priority=priority)
assert (when_to_run is None or
(when_to_run and
priority == PRIORITY_NONE and
not background)
), "priority and background cannot be set with when_to_run"

job_info = dict(task=task, data=data, unique=unique, priority=priority, when_to_run=when_to_run)
completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout)
return gearman.util.unlist(completed_job_list)

def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None):
"""Takes a list of jobs_to_submit with dicts of

{'task': task, 'data': data, 'unique': unique, 'priority': priority}
{'task': task, 'data': data, 'unique': unique, 'priority': priority, 'when_to_run': when_to_run}
"""
assert type(jobs_to_submit) in (list, tuple, set), "Expected multiple jobs, received 1?"

Expand Down Expand Up @@ -165,18 +171,26 @@ def continue_while_status_not_updated(any_activity):
return job_requests

def _create_request_from_dictionary(self, job_info, background=False, max_retries=0):
"""Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'background': background}"""
"""Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'when_to_run': when_to_run, 'background': background}"""
# Make sure we have a unique identifier for ALL our tasks
job_unique = job_info.get('unique')
if not job_unique:
job_unique = os.urandom(self.random_unique_bytes).encode('hex')

current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, data=job_info['data'])
run_later = False
if job_info.get('when_to_run'):
job_info['when_to_run'] = str(job_info.get('when_to_run'))
run_later = True
# run_later jobs always are in the background, so set to True
background = True
job_info['background'] = background

current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, when_to_run=job_info.get('when_to_run'), data=job_info['data'])

initial_priority = job_info.get('priority', PRIORITY_NONE)

max_attempts = max_retries + 1
current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, max_attempts=max_attempts)
current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, run_later=run_later, max_attempts=max_attempts)
return current_request

def establish_request_connection(self, current_request):
Expand Down
9 changes: 6 additions & 3 deletions gearman/client_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from gearman.command_handler import GearmanCommandHandler
from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import InvalidClientState
from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority
from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority_run_later

gearman_logger = logging.getLogger(__name__)

Expand All @@ -29,10 +29,13 @@ def send_job_request(self, current_request):
gearman_job = current_request.job

# Handle the I/O for requesting a job - determine which COMMAND we need to send
cmd_type = submit_cmd_for_background_priority(current_request.background, current_request.priority)
cmd_type = submit_cmd_for_background_priority_run_later(current_request.background, current_request.priority, current_request.run_later)

outbound_data = self.encode_data(gearman_job.data)
self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)
if current_request.run_later:
self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, when_to_run=gearman_job.when_to_run, data=outbound_data)
else:
self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data)

# Once this command is sent, our request needs to wait for a handle
current_request.state = JOB_PENDING
Expand Down
14 changes: 8 additions & 6 deletions gearman/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,29 @@

class GearmanJob(object):
"""Represents the basics of a job... used in GearmanClient / GearmanWorker to represent job states"""
def __init__(self, connection, handle, task, unique, data):
def __init__(self, connection, handle, task, unique, data, when_to_run):
self.connection = connection
self.handle = handle

self.task = task
self.unique = unique
self.data = data
self.when_to_run = when_to_run

def to_dict(self):
return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data)
return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data, when_to_run=self.when_to_run)

def __repr__(self):
return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r>' % (self.connection, self.handle, self.task, self.unique, self.data)
return '<GearmanJob connection/handle=(%r, %r), task=%s, unique=%s, data=%r, when_to_run=%d>' % (self.connection, self.handle, self.task, self.unique, self.data, self.when_to_run)

class GearmanJobRequest(object):
"""Represents a job request... used in GearmanClient to represent job states"""
def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_attempts=1):
def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, run_later=False, max_attempts=1):
self.gearman_job = gearman_job

self.priority = initial_priority
self.background = background
self.run_later = run_later

self.connection_attempts = 0
self.max_connection_attempts = max_attempts
Expand Down Expand Up @@ -79,5 +81,5 @@ def complete(self):
return actually_complete

def __repr__(self):
formatted_representation = '<GearmanJobRequest task=%r, unique=%r, priority=%r, background=%r, state=%r, timed_out=%r>'
return formatted_representation % (self.job.task, self.job.unique, self.priority, self.background, self.state, self.timed_out)
formatted_representation = '<GearmanJobRequest task=%r, unique=%r, when_to_run=%r, priority=%r, background=%r, run_later=%r, state=%r, timed_out=%r>'
return formatted_representation % (self.job.task, self.job.unique, self.job.when_to_run, self.priority, self.background, self.run_later, self.state, self.timed_out)
23 changes: 15 additions & 8 deletions gearman/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW = 33
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG = 34

GEARMAN_COMMAND_SUBMIT_JOB_EPOCH = 36

# Fake command code
GEARMAN_COMMAND_TEXT_COMMAND = 9999

Expand Down Expand Up @@ -95,6 +97,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW: ['task', 'unique', 'data'],
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: ['task', 'unique', 'data'],

GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: ['task', 'unique', 'when_to_run', 'data'],

# Fake gearman command
GEARMAN_COMMAND_TEXT_COMMAND: ['raw_text']
}
Expand Down Expand Up @@ -140,6 +144,8 @@
GEARMAN_COMMAND_SUBMIT_JOB_LOW: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW',
GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG',

GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: 'GEARMAN_COMMAND_SUBMIT_JOB_EPOCH',

GEARMAN_COMMAND_TEXT_COMMAND: 'GEARMAN_COMMAND_TEXT_COMMAND'
}

Expand All @@ -156,16 +162,17 @@
def get_command_name(cmd_type):
return GEARMAN_COMMAND_TO_NAME.get(cmd_type, cmd_type)

def submit_cmd_for_background_priority(background, priority):
def submit_cmd_for_background_priority_run_later(background, priority, run_later):
cmd_type_lookup = {
(True, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB_BG,
(True, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
(True, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
(False, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB,
(False, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW,
(False, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH
(True, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB_BG,
(True, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG,
(True, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG,
(False, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB,
(False, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW,
(False, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH,
(True, PRIORITY_NONE, True): GEARMAN_COMMAND_SUBMIT_JOB_EPOCH
}
lookup_tuple = (background, priority)
lookup_tuple = (background, priority, run_later)
cmd_type = cmd_type_lookup[lookup_tuple]
return cmd_type

Expand Down
6 changes: 3 additions & 3 deletions gearman/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ def send_job_warning(self, current_job, data, poll_timeout=None):
def create_job(self, command_handler, job_handle, task, unique, data):
"""Create a new job using our self.job_class"""
current_connection = self.handler_to_connection_map[command_handler]
return self.job_class(current_connection, job_handle, task, unique, data)
return self.job_class(current_connection, job_handle, task, unique, data, None)

def on_job_execute(self, current_job):
try:
Expand Down Expand Up @@ -248,10 +248,10 @@ def set_job_lock(self, command_handler, lock):
self.command_handler_holding_job_lock = None

return True

def has_job_lock(self):
return bool(self.command_handler_holding_job_lock is not None)

def check_job_lock(self, command_handler):
"""Check to see if we hold the job lock"""
return bool(self.command_handler_holding_job_lock == command_handler)
6 changes: 3 additions & 3 deletions tests/_core_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ def setup_command_handler(self):
self.command_handler = self.connection_manager.connection_to_handler_map[self.connection]

def generate_job(self):
return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()))
return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=None)

def generate_job_dict(self):
current_job = self.generate_job()
return current_job.to_dict()

def generate_job_request(self, priority=PRIORITY_NONE, background=False):
def generate_job_request(self, priority=PRIORITY_NONE, when_to_run=None, background=False):
job_handle = str(random.random())
current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()))
current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=when_to_run)
current_request = self.job_request_class(current_job, initial_priority=priority, background=background)

self.assertEqual(current_request.state, JOB_UNKNOWN)
Expand Down
4 changes: 2 additions & 2 deletions tests/client_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ExceededConnectionAttempts, ServerUnavailable, InvalidClientState
from gearman.protocol import submit_cmd_for_background_priority, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
from gearman.protocol import submit_cmd_for_background_priority_run_later, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_DATA, GEARMAN_COMMAND_WORK_WARNING

from tests._core_testing import _GearmanAbstractTest, MockGearmanConnectionManager, MockGearmanConnection
Expand Down Expand Up @@ -312,7 +312,7 @@ def test_send_job_request(self):
queued_request = self.command_handler.requests_awaiting_handles.popleft()
self.assertEqual(queued_request, current_request)

expected_cmd_type = submit_cmd_for_background_priority(background, priority)
expected_cmd_type = submit_cmd_for_background_priority_run_later(background, priority, False)
self.assert_sent_command(expected_cmd_type, task=gearman_job.task, data=gearman_job.data, unique=gearman_job.unique)

def test_get_status_of_job(self):
Expand Down