diff --git a/gearman/client.py b/gearman/client.py index ea6639f..dd30941 100644 --- a/gearman/client.py +++ b/gearman/client.py @@ -32,16 +32,22 @@ def __init__(self, host_list=None, random_unique_bytes=RANDOM_UNIQUE_BYTES): # Ignores the fact if a request has been bound to a connection or not self.request_to_rotating_connection_queue = weakref.WeakKeyDictionary(compat.defaultdict(collections.deque)) - def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None): + def submit_job(self, task, data, unique=None, priority=PRIORITY_NONE, when_to_run=None, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None): """Submit a single job to any gearman server""" - job_info = dict(task=task, data=data, unique=unique, priority=priority) + assert (when_to_run is None or + (when_to_run and + priority == PRIORITY_NONE and + not background) + ), "priority and background cannot be set with when_to_run" + + job_info = dict(task=task, data=data, unique=unique, priority=priority, when_to_run=when_to_run) completed_job_list = self.submit_multiple_jobs([job_info], background=background, wait_until_complete=wait_until_complete, max_retries=max_retries, poll_timeout=poll_timeout) return gearman.util.unlist(completed_job_list) def submit_multiple_jobs(self, jobs_to_submit, background=False, wait_until_complete=True, max_retries=0, poll_timeout=None): """Takes a list of jobs_to_submit with dicts of - {'task': task, 'data': data, 'unique': unique, 'priority': priority} + {'task': task, 'data': data, 'unique': unique, 'priority': priority, 'when_to_run': when_to_run} """ assert type(jobs_to_submit) in (list, tuple, set), "Expected multiple jobs, received 1?" @@ -165,18 +171,26 @@ def continue_while_status_not_updated(any_activity): return job_requests def _create_request_from_dictionary(self, job_info, background=False, max_retries=0): - """Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'background': background}""" + """Takes a dictionary with fields {'task': task, 'unique': unique, 'data': data, 'priority': priority, 'when_to_run': when_to_run, 'background': background}""" # Make sure we have a unique identifier for ALL our tasks job_unique = job_info.get('unique') if not job_unique: job_unique = os.urandom(self.random_unique_bytes).encode('hex') - current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, data=job_info['data']) + run_later = False + if job_info.get('when_to_run'): + job_info['when_to_run'] = str(job_info.get('when_to_run')) + run_later = True + # run_later jobs always are in the background, so set to True + background = True + job_info['background'] = background + + current_job = self.job_class(connection=None, handle=None, task=job_info['task'], unique=job_unique, when_to_run=job_info.get('when_to_run'), data=job_info['data']) initial_priority = job_info.get('priority', PRIORITY_NONE) max_attempts = max_retries + 1 - current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, max_attempts=max_attempts) + current_request = self.job_request_class(current_job, initial_priority=initial_priority, background=background, run_later=run_later, max_attempts=max_attempts) return current_request def establish_request_connection(self, current_request): diff --git a/gearman/client_handler.py b/gearman/client_handler.py index 7ce38cf..3e287e9 100644 --- a/gearman/client_handler.py +++ b/gearman/client_handler.py @@ -6,7 +6,7 @@ from gearman.command_handler import GearmanCommandHandler from gearman.constants import JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE from gearman.errors import InvalidClientState -from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority +from gearman.protocol import GEARMAN_COMMAND_GET_STATUS, submit_cmd_for_background_priority_run_later gearman_logger = logging.getLogger(__name__) @@ -29,10 +29,13 @@ def send_job_request(self, current_request): gearman_job = current_request.job # Handle the I/O for requesting a job - determine which COMMAND we need to send - cmd_type = submit_cmd_for_background_priority(current_request.background, current_request.priority) + cmd_type = submit_cmd_for_background_priority_run_later(current_request.background, current_request.priority, current_request.run_later) outbound_data = self.encode_data(gearman_job.data) - self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data) + if current_request.run_later: + self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, when_to_run=gearman_job.when_to_run, data=outbound_data) + else: + self.send_command(cmd_type, task=gearman_job.task, unique=gearman_job.unique, data=outbound_data) # Once this command is sent, our request needs to wait for a handle current_request.state = JOB_PENDING diff --git a/gearman/job.py b/gearman/job.py index 82195b1..a58ea31 100644 --- a/gearman/job.py +++ b/gearman/job.py @@ -3,27 +3,29 @@ class GearmanJob(object): """Represents the basics of a job... used in GearmanClient / GearmanWorker to represent job states""" - def __init__(self, connection, handle, task, unique, data): + def __init__(self, connection, handle, task, unique, data, when_to_run): self.connection = connection self.handle = handle self.task = task self.unique = unique self.data = data + self.when_to_run = when_to_run def to_dict(self): - return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data) + return dict(task=self.task, job_handle=self.handle, unique=self.unique, data=self.data, when_to_run=self.when_to_run) def __repr__(self): - return '' % (self.connection, self.handle, self.task, self.unique, self.data) + return '' % (self.connection, self.handle, self.task, self.unique, self.data, self.when_to_run) class GearmanJobRequest(object): """Represents a job request... used in GearmanClient to represent job states""" - def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, max_attempts=1): + def __init__(self, gearman_job, initial_priority=PRIORITY_NONE, background=False, run_later=False, max_attempts=1): self.gearman_job = gearman_job self.priority = initial_priority self.background = background + self.run_later = run_later self.connection_attempts = 0 self.max_connection_attempts = max_attempts @@ -79,5 +81,5 @@ def complete(self): return actually_complete def __repr__(self): - formatted_representation = '' - return formatted_representation % (self.job.task, self.job.unique, self.priority, self.background, self.state, self.timed_out) + formatted_representation = '' + return formatted_representation % (self.job.task, self.job.unique, self.job.when_to_run, self.priority, self.background, self.run_later, self.state, self.timed_out) diff --git a/gearman/protocol.py b/gearman/protocol.py index cd67da2..f135732 100644 --- a/gearman/protocol.py +++ b/gearman/protocol.py @@ -50,6 +50,8 @@ GEARMAN_COMMAND_SUBMIT_JOB_LOW = 33 GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG = 34 +GEARMAN_COMMAND_SUBMIT_JOB_EPOCH = 36 + # Fake command code GEARMAN_COMMAND_TEXT_COMMAND = 9999 @@ -95,6 +97,8 @@ GEARMAN_COMMAND_SUBMIT_JOB_LOW: ['task', 'unique', 'data'], GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: ['task', 'unique', 'data'], + GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: ['task', 'unique', 'when_to_run', 'data'], + # Fake gearman command GEARMAN_COMMAND_TEXT_COMMAND: ['raw_text'] } @@ -140,6 +144,8 @@ GEARMAN_COMMAND_SUBMIT_JOB_LOW: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW', GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG: 'GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG', + GEARMAN_COMMAND_SUBMIT_JOB_EPOCH: 'GEARMAN_COMMAND_SUBMIT_JOB_EPOCH', + GEARMAN_COMMAND_TEXT_COMMAND: 'GEARMAN_COMMAND_TEXT_COMMAND' } @@ -156,16 +162,17 @@ def get_command_name(cmd_type): return GEARMAN_COMMAND_TO_NAME.get(cmd_type, cmd_type) -def submit_cmd_for_background_priority(background, priority): +def submit_cmd_for_background_priority_run_later(background, priority, run_later): cmd_type_lookup = { - (True, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB_BG, - (True, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG, - (True, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG, - (False, PRIORITY_NONE): GEARMAN_COMMAND_SUBMIT_JOB, - (False, PRIORITY_LOW): GEARMAN_COMMAND_SUBMIT_JOB_LOW, - (False, PRIORITY_HIGH): GEARMAN_COMMAND_SUBMIT_JOB_HIGH + (True, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB_BG, + (True, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW_BG, + (True, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH_BG, + (False, PRIORITY_NONE, False): GEARMAN_COMMAND_SUBMIT_JOB, + (False, PRIORITY_LOW, False): GEARMAN_COMMAND_SUBMIT_JOB_LOW, + (False, PRIORITY_HIGH, False): GEARMAN_COMMAND_SUBMIT_JOB_HIGH, + (True, PRIORITY_NONE, True): GEARMAN_COMMAND_SUBMIT_JOB_EPOCH } - lookup_tuple = (background, priority) + lookup_tuple = (background, priority, run_later) cmd_type = cmd_type_lookup[lookup_tuple] return cmd_type diff --git a/gearman/worker.py b/gearman/worker.py index e221914..bdc0824 100644 --- a/gearman/worker.py +++ b/gearman/worker.py @@ -210,7 +210,7 @@ def send_job_warning(self, current_job, data, poll_timeout=None): def create_job(self, command_handler, job_handle, task, unique, data): """Create a new job using our self.job_class""" current_connection = self.handler_to_connection_map[command_handler] - return self.job_class(current_connection, job_handle, task, unique, data) + return self.job_class(current_connection, job_handle, task, unique, data, None) def on_job_execute(self, current_job): try: @@ -248,10 +248,10 @@ def set_job_lock(self, command_handler, lock): self.command_handler_holding_job_lock = None return True - + def has_job_lock(self): return bool(self.command_handler_holding_job_lock is not None) - + def check_job_lock(self, command_handler): """Check to see if we hold the job lock""" return bool(self.command_handler_holding_job_lock == command_handler) diff --git a/tests/_core_testing.py b/tests/_core_testing.py index 8d1aa65..cd5a2ab 100644 --- a/tests/_core_testing.py +++ b/tests/_core_testing.py @@ -78,15 +78,15 @@ def setup_command_handler(self): self.command_handler = self.connection_manager.connection_to_handler_map[self.connection] def generate_job(self): - return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random())) + return self.job_class(self.connection, handle=str(random.random()), task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=None) def generate_job_dict(self): current_job = self.generate_job() return current_job.to_dict() - def generate_job_request(self, priority=PRIORITY_NONE, background=False): + def generate_job_request(self, priority=PRIORITY_NONE, when_to_run=None, background=False): job_handle = str(random.random()) - current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random())) + current_job = self.job_class(connection=self.connection, handle=job_handle, task='__test_ability__', unique=str(random.random()), data=str(random.random()), when_to_run=when_to_run) current_request = self.job_request_class(current_job, initial_priority=priority, background=background) self.assertEqual(current_request.state, JOB_UNKNOWN) diff --git a/tests/client_tests.py b/tests/client_tests.py index 824c865..2f138d8 100644 --- a/tests/client_tests.py +++ b/tests/client_tests.py @@ -7,7 +7,7 @@ from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE from gearman.errors import ExceededConnectionAttempts, ServerUnavailable, InvalidClientState -from gearman.protocol import submit_cmd_for_background_priority, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \ +from gearman.protocol import submit_cmd_for_background_priority_run_later, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \ GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_DATA, GEARMAN_COMMAND_WORK_WARNING from tests._core_testing import _GearmanAbstractTest, MockGearmanConnectionManager, MockGearmanConnection @@ -312,7 +312,7 @@ def test_send_job_request(self): queued_request = self.command_handler.requests_awaiting_handles.popleft() self.assertEqual(queued_request, current_request) - expected_cmd_type = submit_cmd_for_background_priority(background, priority) + expected_cmd_type = submit_cmd_for_background_priority_run_later(background, priority, False) self.assert_sent_command(expected_cmd_type, task=gearman_job.task, data=gearman_job.data, unique=gearman_job.unique) def test_get_status_of_job(self):