Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calling enqueueWithData: when queue is not running can cause duplicate calls to processJob: delegate method #10

Open
ruddct opened this issue Oct 21, 2013 · 7 comments
Labels

Comments

@ruddct
Copy link

ruddct commented Oct 21, 2013

Repro steps:

// With [EDQueue sharedInstance] stopped
[[EDQueue sharedInstance] enqueueWithData:@{"test" : "data"} forTask:@"task"];
[[EDQueue sharedInstance] enqueueWithData:@{"test" : "data"} forTask:@"task2"];
[[EDQueue sharedInstance] start];

Expected:
The queue should ask its delegate to execute "task", wait for that to be completed, then follow by asking its delegate to complete "task2".

Actual:
A race condition occurs which can cause the queue to ask its delegate to process "task" and/or "task2" multiple times.

Bug description:
Here's my understanding of why this is happening:

  • enqueueWithData: calls tick after inserting the job into the storage engine.
  • start also calls tick after setting isRunning to true.
  • tick dispatches a block to a background queue that asks its delegate to execute that job. isRunning is checked inside that async call to determine whether the delegate actually gets asked to complete the job.
  • Calling enqueueWithData: before calling start dispatches multiple async blocks that all can ask the delegate to process at the first job in the queue. The blocks that are executed after isRunning is set to true will all ask their delegate to process the first job in the queue (i.e. it duplicates calls for the same job). This causes duplicate calls to the delegate for the same job.

It seems like the solution is to wrap the tick call in enqueueWithData, but would love feedback on whether that would break use-cases I haven't thought about.

Here's a modified enqueueWithData:

if (data == nil) data = @{};
    [self.engine createJob:data forTask:task];
    if (self.isRunning)
        [self tick];
@thisandagain
Copy link
Owner

Thanks so much for the detailed issue. I'll dig into this asap.

@ryandotsmith
Copy link
Collaborator

Hello. I suspect this bug still exists in the project. I have given this type of problem a lot of thought over the years through my experiences with queue_classic; so I will submit a few ideas for consideration.

One initial idea is to make some adjustments to the database model such that we eliminate potential race conditions. In most cases I would curse a RDBMS for a lack of concurrency, but in our case, sqlite's non-concurrent promise comes in handy. As you will notice, we are promised that data updates are serialized by sqlite. Therefore, we could add an additional column to the queue table that would indicate wether a job is locked. We can use a transaction to select an unlocked job, update it's locked column, then commit the transaction.

Assuming we use a data model which handles locking at the database level, we can then start to simplify and potentially improve the method in which jobs are processed. For instance, consider the case in which several jobs have failed (possibly due to a bad network connection) and consequently our queue depth has increased. The current technique we use to process jobs is such that we would attempt to work off the jobs one by one. But if we have a data model that ensures that jobs can only be locked by a single, logical process, we could take a different approach to processing jobs. We could have a job scheduler, whose only responsibility is to lock a job, and then dispatch the job on a thread to be worked. In the aforementioned scenario, we could be working the backlog off in parallel.

I am sure there are some details that need to be addressed with my meager proposal, but I trust that I have demonstrated that by moving towards a more robust data model we can open the door to faster, safer job processing.

@ryandotsmith
Copy link
Collaborator

I will also say that if @thisandagain is interested in this direction, I would be more than happy to start working on a pull request.

@thisandagain
Copy link
Owner

@ryandotsmith Totally agree... "locking" rows in the data model is a nice / simple way to address this and guarantee some safety and add some flexibility. 👍

@dre-hh
Copy link

dre-hh commented May 27, 2014

Therefore, we could add an additional column to the queue table that would indicate wether a job is locked

We should definitely do that

We can use a transaction to select an unlocked job, update it's locked column, then commit the transaction.

But we can do something much simpler. This is actually what the popular delayed_job ruby gem does.
https://github.com/collectiveidea/delayed_job_active_record/blob/master/lib/delayed/backend/active_record.rb

We can just perform lock updates with the where clause.

UPDATE jobs SET locked_at = <time> WHERE locked_at IS NULL

If this statement is executed concurrently by two ore more threads/processes only one of them
can succeed because locked_at is than not null anymore

@thisandagain
Copy link
Owner

Great feedback. Thanks @dre-hh

@tin4g
Copy link

tin4g commented Aug 31, 2016

A similar problem is happened when I create task concurrently. Finally I simply patch the EDQueue.m file and the tick instance method to active task in sequence.

  // EDQueue.m
  - dispatch_queue_t gcd = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0);
  + dispatch_queue_t gcd = dispatch_get_main_queue();

I think that threads read the same isActive property value, which is false, in the same instant.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants