Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing server extension (#1046) #1069

Merged
merged 20 commits into from
Sep 4, 2023
Merged

Processing server extension (#1046) #1069

merged 20 commits into from
Sep 4, 2023

Conversation

MehmedGIT
Copy link
Contributor

@MehmedGIT MehmedGIT commented Jul 4, 2023

This is the initial draft pull request to follow the development of changes discussed in #1046.

@MehmedGIT MehmedGIT changed the title Processing server extension that implements changes discussed in #1046 Processing server extension (#1046) Jul 4, 2023
)

# The workspace is currently locked (being processed)
# TODO: Do the proper caching here, after the refactored code is working
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Change being_processed in the DBWorkspace from boolean to string
  • Serialize data without page_id in a canonical way, set being_processed to that when locking
  • check here whether canonical serialization of data is the same as being_processed. If so, allow the request, even though workspace_db is being_processed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Objection by @MehmedGIT in today's call: Processing Server clients (like the Workflow Server) could decide to split up the job into several page ranges (esp. if they know there are multiple worker instances running in the back) and issue them concurrently. So instead of a boolean or string, we would need to synchronise over the actual set of pages of each workspace.

Copy link
Contributor Author

@MehmedGIT MehmedGIT Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • check here whether canonical serialization of data is the same as being_processed. If so, allow the request, even though workspace_db is being_processed.

This is not achieving proper workspace locking when the same request is created with overlapping page ranges for the same workspace.

we would need to synchronise over the actual set of pages of each workspace.

This is the way to achieve proper page range locking to prevent collisions on the processor server/worker level, however, also complex to implement. The more I think about it the more problematic it seems when considering error handling.

  1. Ideal scenario example: 200 pages workspace, the Workflow Server will create 4 processing requests (50 pages per request) when knowing there are 4 workers of the specified processor. Then each request's page_id field will be in the form start_page_id..end_page_id. The being_processed boolean field will be replaced with a str field locked_pages in the form start_page_id1..end_page_id1,...,start_page_id4..end_page_id4. Then detecting overlapping single pages or page ranges is achievable to raise errors for the next coming requests. If the ranges don't overlap and match for the same processor, then the workspace would not be considered locked. For other processors, the workspace will still be considered locked. The unlocking of the workspace for pages will then happen based on the internal callbacks to the Processing Server from the Processor Server or Processing Worker when the execution finishes or fails.

Problem: Still, in the case when either of the two fails to use a callback, say due to a crash, the workspace will be indefinitely locked. Considering a proper timeout for a server or worker is hard.

  1. Another point:
    Considering that page_id could be:
  • a single page - PHYS0001
  • multiple pages separated by a comma - PHYS0003,PHYS0004,PHYS0005
  • a page range... PHYS0009..PHYS0015

And now assuming a potential scenario where 3 processing requests with different page_id formats above are passed - the value of locked_pages for the 3 requests will be PHYS0001_PHYS0003,PHYS0004,PHYS0005_PHYS0009..PHYS0015 assuming that the separator will be an underscore. Or just a string list with values [PHYS0001][PHYS0003,PHYS0004,PHYS0005][PHYS0009..PHYS0015].

Potential problem: locked_pages is filled with a lot of values in case single page requests are passed for the entire workspace. Another thing to consider is collision detection - the incoming page_id can have 3 different forms, and hence, lots of extra overhead of handling data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. [...] Then detecting overlapping single pages or page ranges is achievable to raise errors for the next coming requests.

Why raise errors? IMO, locking is simply about blocking requests until the associated resources become free again.

If the ranges don't overlap and match for the same processor, then the workspace would not be considered locked. For other processors, the workspace will still be considered locked.

Why make this contingent on the identity of the processor? IMO, if the other request concerns a disjunct set of pages, then there simply cannot be a dependency relation to the currently running processor(s). It could be because the other request is prior (lagging behind) or posterior (more advanced) in the overall workflow across pages. Or it could simply be an independent request. (Conversely, independent requests which do affect the same page range will gain an artificial dependency relation here, but that's not that much of a problem.)

If anything, if we want to look more closely into dependencies, we should consider pairs set(pages) · fileGrp, where

  • a new request must block if its pages and input fileGrp is overlapping the locked pairs, and
  • a new job will lock its pages and output fileGrp while running

Thus, even requests on the same pages but regarding distinct fileGrps would stay independent.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential problem: locked_pages is filled with a lot of values

Not a problem: simply use the actual resolved list (or rather, set) of pages! Set operations (disjunction for adding, intersection for testing, difference for clearing) are clear and efficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why raise errors? IMO, locking is simply about blocking requests until the associated resources become free again.

Right. I was just considering the cases where double executions of the same processor with overlapping pages are submitted. That would mostly end in error, wouldn't it? Unless overwrite is already set for the next requests.

Why make this contingent on the identity of the processor?

Because the workspace will be unlocked for the same processor requests that potentially can have different page ranges. The collisions for the same processor but different pages are handled by the METS server. However, if we get a different processor but the same pages, then the workspace is locked and that request goes into the waiting internal queue (CACHED). Consider cases when running processors cannot be done independently, i.e., the output of the first processor has to be used by the second processor.

IMO, if the other request concerns a disjunct set of pages, then there simply cannot be a dependency relation to the currently running processor(s)

Right, that means we must track processor · set(pages) · fileGrp (considering the info from the next paragraph). This becomes even harder to track (and potentially debug) what to submit directly in the RabbitMQ and what to queue internally (to cache).

If anything, if we want to look more closely into dependencies, we should consider pairs set(pages) · fileGrp, where
- a new request must block if its pages and input fileGrp is overlapping the locked pairs, and
- a new job will lock its pages and output fileGrp while running
Thus, even requests on the same pages but regarding distinct fileGrps would stay independent.

Oh, right. We should also consider fileGrp in the mix of complexity...

Not a problem: simply use the actual resolved list (or rather, set) of pages! Set operations (disjunction for adding, intersection for testing, difference for clearing) are clear and efficient.

This is of course simpler to manage although there will be more stress on the DB itself with reads/writes - which I guess is okay for now.

So we'll now have cached and queued and running job entries. For running jobs, we agreed there is a need for some kind of timeout anyway, universally

Yes, we have some dummy timeout right now based on the submitted amount of pages (set to 200 by default multiplicated with a timeout value per page). Which could potentially lead to timeout errors even when the output is correctly produced say due to slower processing. The amount of pages is still not part of the processing request.

BTW, IINM we still need to take care of atomicity in some places, i.e. use a mutex to query and change the internal queue...

Yes, there will be a race condition to the internal queue resource.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, if the other request concerns a disjunct set of pages, then there simply cannot be a dependency relation to the currently running processor(s)

Right, that means we must track processor · set(pages) · fileGrp (considering the info from the next paragraph). This becomes even harder to track (and potentially debug) what to submit directly in the RabbitMQ and what to queue internally (to cache).

Sorry to add to the complexity but we also need the parameterization of the processor in the mix, so processor · parameters · set(pages) · fileGrp 😬

Copy link
Collaborator

@bertsky bertsky Jul 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just considering the cases where double executions of the same processor with overlapping pages are submitted. That would mostly end in error, wouldn't it? Unless overwrite is already set for the next requests.

Oh that. Yes, that's right. (If the METS already contains results for a page-fileGrp.) But for that we don't have to do anything (detecting overlaps etc) – this error will be coming from the processor itself (it already contains logic looking for conflicts in the output fileGrp / page list).

Why make this contingent on the identity of the processor?

Because the workspace will be unlocked for the same processor requests that potentially can have different page ranges. The collisions for the same processor but different pages are handled by the METS server. However, if we get a different processor but the same pages, then the workspace is locked and that request goes into the waiting internal queue (CACHED). Consider cases when running processors cannot be done independently, i.e., the output of the first processor has to be used by the second processor.

This sounds confusing to me. My above analysis still stands IMHO – I don't see any reason why we should look at the identity of the processor. (But I agree with @kba that if we do, then it would really have to be the combination of processor and parameters.) It's the just pages and the fileGrps that cause a dependency.

Regarding timeouts (using the actual number of pages in the Processor Server model, adding a timeout mechanism in the Processing Worker) – should we track that in a separate issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh that. Yes, that's right. But for that we don't have to do anything

Agree, we should not try to do early prevention of errors since it makes the implementation more complex on the Processing Server side.

It's the just pages and the fileGrps that cause a dependency.

For the sake of keeping it simple - it should be just pages and fileGrp of a workspace then.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding timeouts (using the actual number of pages in the Processor Server model, adding a timeout mechanism in the Processing Worker) – should we track that in a separate issue?

#1074

Agree, we should not try to do early prevention of errors since it makes the implementation more complex on the Processing Server side.

Yes. If we want to do anticipation of workflow conflicts then it would be what currently is done in ocrd process via ocrd.task_sequence.validate_tasks() – checking that

  • all processors exist
  • all parameters are valid for all steps
  • no output fileGrp exists prior to that step (on the data or the workflow itself), except when using --overwrite
  • no input fileGrp will be missing for any step (because already on the data or generated earlier on the workflow)

That could be done in the Workflow Server statically – before sending requests.

@tdoan2010
Copy link
Contributor

So, this is what I've learned from our meeting today.

  1. We want to move the workflow logic into the Processing Server. It means that the Processing Server knows about all steps and dependencies.
  2. To achieve this, the Workflow Server will send all processing requests to the Processing Server. The Processing Server will put them in an internal queue.
  3. The Processing Server takes a request out from the internal queue, checks if the page that needs to be processed is locked or not. If not, process that page (submit a job to RabbitMQ). If the page is locked, just wait and try again later.
  4. There are many internal queues in a Processing Server. Each queue is for a specific workspace.

@MehmedGIT
Copy link
Contributor Author

MehmedGIT commented Jul 13, 2023

Here are some more technical details (summary):

  1. The Workflow Server will push all processing requests to the Processing Server without waiting for specific steps to finish. The Workflow Server will also provide a callback URL to be used by the Processing Server to report back workflow execution status.

  2. The Processing Server will create a separate internal (python) queue for each specific workspace on demand. The workspace is identified with workspace_id or path_to_mets which will be the key field in the dictionary data structure. The value field will be a Python queue. Each processing request will be pushed to the specific queue of the workspace.

  3. Then the Processing Server will check the output fileGrp and the page_id fields of the most recently received processing request. If the output fileGrp is locked even for a single page in the provided range of page_id, then nothing more will happen. If the output fileGrp is not locked for a specific page_id field (a value or a range), then the processing request will be either:

  • pushed to the RabbitMQ queue (in case of a Processing Worker) OR
  • forwarded to a Processor Server
  1. Once the Processing Worker or Processor Server finish the processing, they will use an internal callback to the Processing Server with the result message. The result message should be either:
  • reworked to include details about output fileGrp and page_id to meet the new requirements OR
  • the Processing Server can just query that information from the DB.
    Which option is better also depends on the callback expectations of outside users. Currently, the result message holds only the following 4 fields: job_id, state, workspace_id, path_to_mets.
  1. When the Processing Server receives a result message to its internal callback endpoint, the Processing Server will unlock the pages of an output fileGrp for the page_id range of the workspace. Then the Processing Server will check if there are other pending requests in the internal queue for that workspace. If there are, step 3 above will repeat for the processing request taken from the queue. If there are not, the Processing Server will delete the internal queue of that specific workspace.

One thing that is still not clear: how to detect which steps belong to a single workflow. The same workspace can be used by 2 different workflows at the same time if there is no collision in the file groups used by the workflows. In that case, all processing requests will end up in the same internal queue for the specific workspace. So it's unclear when the Processing Server should invoke the callback URL of the Workflow Server.

@bertsky
Copy link
Collaborator

bertsky commented Jul 13, 2023

  1. We want to move the workflow logic into the Processing Server. It means that the Processing Server knows about all steps and dependencies.

Not quite: The Processing Server (which is the only central component – there could be many Workflow Servers or requests from other clients) needs to lock workspaces while they are operated on to prevent conflicts (not in the sense of atomicity, which will be the internal METS Server's responsibility, but for scheduling, so no files are still being written while attempted to read from). (And workspaces will not be locked entirely, but only the respective pages and fileGrp of active jobs.)

So from the order of incoming requests the PS has an implicit knowledge of the job dependencies. It still does not know about workflow jobs and workflow syntax. But the WS becomes free of active waiting (and listening to RabbitMQ) – it just decodes the workflow, responds with the workflow job, sends all respective step requests to the PS, and listens to the final step's callback.

  1. To achieve this, the Workflow Server will send all processing requests to the Processing Server. The Processing Server will put them in an internal queue.

As long as they can not be put on the external queue, yes.

  1. The Processing Server takes a request out from the internal queue, checks if the page that needs to be processed is locked or not. If not, process that page (submit a job to RabbitMQ). If the page is locked, just wait and try again later.

No busy waiting involved: The consumers will notify the PS by calling an additional internal callback when finishing each job. So the PS just needs to act (check what can be pushed out) when callbacks or new requests come in.

  1. There are many internal queues in a Processing Server. Each queue is for a specific workspace.

Yes.

Yesterday's discussion also covered more topics:

What do we need Nextflow for, then? We cannot use its error handling facilities, we have to use Groovy code for the networking in the blocks, we get no live monitoring API or UI. All we need is a workflow syntax.

Should we define our own syntax – again, based on ocrd process syntax or a custom YAML notation – and implement it in Python/FastAPI all the way? Nextflow will always be needed for controlling the processor CLIs in Operandi, because it has a native Slurm interface. But for processor APIs it seems to be a total misfit.

In the workflow syntax, the two non-trivial concepts we need are (static) conditionals (i.e. allowing failed steps to fall back to alternative routes) and (dynamic) functions (i.e. making subsequent steps or step parameters dependent on an intermediate result beyond the annotation data itself).

Conditionals may be needed for all steps (evaluator interfaces for quality estimation, failures etc). Functions on the other hand are likely only needed for two use-cases: step/parameter selection based on detected language/script, and step/parameter selection (or simply skipping) based on detected page type.

But perhaps – to avoid dynamic workflows – we could reformulate these special cases into our core processing/annotation paradigm: Instead of adding dedicated parameters (like ocrd-tesserocr-recognize's xpath_parameters and xpath_model) to each processor and requiring the user to specify these mappings in their workflows, we could create a general mechanism which queries the annotation for (inherited, multifaceted) language and script of each segment and allows the user/workflow to opt in to dynamic parameterisation by providing a dict from language/script values to parameter values, treating the empty key as default.

Regardless, it was agreed that our error handling concept must be updated to reflect all the architectural revisions of the past months – and finalised/spelled out.

One problem we still did not fully grasp and uncover IIUC is that once we will allow conditionals in workflow formulations, the Workflow Server cannot just send off all processing requests at once – it must do the branching itself (waiting for the callback of the first branch instead of the callback of the final step).

@tdoan2010
Copy link
Contributor

I agree that there should be a locking mechanism so that we know which pages are being processed. But internal queues are not necessary. Before a worker processes a job, it can first check if the page is locked or not. If yes, simply do nothing and just re-queue the message.

@bertsky
Copy link
Collaborator

bertsky commented Aug 16, 2023

But perhaps – to avoid dynamic workflows – we could reformulate these special cases into our core processing/annotation paradigm: Instead of adding dedicated parameters (like ocrd-tesserocr-recognize's xpath_parameters and xpath_model) to each processor and requiring the user to specify these mappings in their workflows, we could create a general mechanism which queries the annotation for (inherited, multifaceted) language and script of each segment and allows the user/workflow to opt in to dynamic parameterisation by providing a dict from language/script values to parameter values, treating the empty key as default.

We discussed this in the Kitodo-OCRD team yesterday, too. Here are some additional observations/assessments:

  1. A general mechanism (without the need to introduce new parameters) would be nice for the user indeed, but it will be difficult to implement in the processors anyway: depending on how the predictor models get loaded and the lines fed to it, each processor will need to make different adjustments. (For example, preload all models possibly needed, switch models in between or group lines by model choice.)
  2. The XPath expressions should be made as simple as possible (only @script, no necessity for @primaryScript or @secondaryScript or ancestor:... conditions). So we absolutely need the automatic inheritance here.
  3. We really must avoid truly dynamic workflows as soon as we think of local metadata (segment-wise script/language detection). So if the XPath querying / model mapping is needed anyway, why not use it for global metadata (script/language settings in document MODS) as well? Ideally the user does not even need to know where the information came from (line, region, page or document level; so the inheritance must even extend to the METS/MODS...)
  4. We really must use the document-level (MODS) script/language information in the workflow configuration (i.e. parameterisation) itself, because it is the user who will know which models to map for which case, not the administrator. (The latter could only provide a predefined mapping for placeholders in workflow files like ocrd-tesserocr-recognize -P model $TESSMODEL with TESSMODEL set dynamically. But this will be different for every recogniser, and really depend on the installed models and the input materials.)
  5. We should abandon Nextflow, implement the Workflow Server in core, and come up with our own YAML dialect for workflows (including static branching).

@MehmedGIT
Copy link
Contributor Author

After having a call with @joschrew, there have been identified missing cases when caching the processing requests to the internal queue. The caching is done based on locking pages for the output file groups without considering whether the input file group is missing and is not going to be produced in the near future (i.e., by a request that is already in the internal queue).

Consider the following scenario.
2 processing requests with their respective input-output file group pairs:

  1. DEFAULT - OCR-D-BIN
  2. OCR-D-BIN - OCR-D-CROP

The first request starts running since there are no pages locked for OCR-D-BIN and all pages for OCR-D-BIN are now locked.
The second request starts also running since there are no pages locked for OCR-D-CROP.
However, since the input file group of the second request is still not produced, the second request fails on the processing worker level.

Changes needed:

  1. Perform additional checks to verify that the input file group of a request exists
  2. When the input group of the current processing request does not exist:
  • But the internal cache already holds a processing request that would create that input group in the near future - then cache that request instead of forwarding it to a processing worker.
  • And is not going to be created - return an early exception on the Processing Server level.

@kba
Copy link
Member

kba commented Aug 22, 2023

And is not going to be created - return an early exception on the Processing Server level.

  • But the internal cache already holds a processing request that would create that input group in the near future - then cache that request instead of forwarding it to a processing worker.
  • And is not going to be created - return an early exception on the Processing Server level.

This is how we solve that in ocrd process (simplified, full code in https://github.com/OCR-D/core/blob/master/ocrd/ocrd/task_sequence.py#L89:

prev_output_file_grps = workspace.mets.file_groups                                                                                         
first_task = tasks[0]
prev_output_file_grps += first_task.output_file_grps                                                                                       
for task in tasks[1:]:                                                                                                                     
    # check either existing fileGrp or output-file group of previous task matches current input_file_group                                 
    for input_file_grp in task.input_file_grps:                                                                                            
        if not input_file_grp in prev_output_file_grps:                                                                                    
            report.add_error("Input file group not contained in METS or produced by previous steps: %s" % input_file_grp)                  
    prev_output_file_grps += task.output_file_grps                                                                                         

@kba
Copy link
Member

kba commented Aug 22, 2023

So the check whether the workflow is wired correctly wrt. input/output file groups can already happen in the workflow endpoint.

Is there a way to express dependency between tasks, so that the workflow endpoint can post the tasks as a graph of dependendent tasks and the processing server checks whether the prerequisites for a task are met when dequeuing a task?

@MehmedGIT
Copy link
Contributor Author

So the check whether the workflow is wired correctly wrt. input/output file groups can already happen in the workflow endpoint.

This seems to be the simpler approach. However, that approach still does not solve the issues that can be met when independent processing requests are sent through the processing endpoint.

Is there a way to express dependency between tasks, so that the workflow endpoint can post the tasks as a graph of dependendent tasks and the processing server checks whether the prerequisites for a task are met when dequeuing a task?

There is no dependency between the tasks in the Processing Server. The order is determined by the order of received processing requests and cached to some internal queue based on the used workspace. Each workspace has a separate internal queue. Hence, running 2 independent workflows with independent file groups on the same workspace would still queue all processing requests in the same internal queue. Then, it becomes hard to find out when one of the two workflows has already finished since there may be still requests left from the other workflow.

Btw, should we just include a workflow endpoint that accepts ocrd process like format to the Processing Server or should we create a new Workflow Server entity? For now, we thought to implement the workflow endpoint as a part of the Processing Server.

@kba
Copy link
Member

kba commented Aug 22, 2023

So the check whether the workflow is wired correctly wrt. input/output file groups can already happen in the workflow endpoint.

This seems to be the simpler approach. However, that approach still does not solve the issues that can be met when independent processing requests are sent through the processing endpoint.

True, but for the MVP goal of having an ocrd process-like endpoint in the processing server, we should focus on that later.

There is no dependency between the tasks in the Processing Server. The order is determined by the order of received processing requests and cached to some internal queue based on the used workspace. Each workspace has a separate internal queue. Hence, running 2 independent workflows with independent file groups on the same workspace would still queue all processing requests in the same internal queue. Then, it becomes hard to find out when one of the two workflows has already finished since there may be still requests left from the other workflow.

How hard would it be to add such a dependency management to the processing server? If we had a field depends_on in the job model, the queue handling knew about this and we work under the assumption of completely deterministic workflows (as ocrd process does), then for a workflow with three processors A, B, and C, then the workflow endpoint

  • checks the wiring of the file groups statically (see above)
  • generates jobs for A without depends_on, gets back [id_a1, id_a2...] - these can be started right away),
  • generates jobs for B with [depends_on id_a1, depends_on id_a2], gets back [id_b1, id_b1...] - these can be started once [id_a1, id_a2...] finish with SUCCESS
  • generates jobs for C accordingly

The queue logic would need to take this into account and start only the jobs that either have no dependency or have their dependency met.

The details are of course much more involved, like error handling, timeouts etc. but do you think such basic dependency management would be feasible?

Btw, should we just include a workflow endpoint that accepts ocrd process like format to the Processing Server or should we create a new Workflow Server entity? For now, we thought to implement the workflow endpoint as a part of the Processing Server.

The former, i.e. make it part of the processing server like @joschrew has proposed/begun working on.

@MehmedGIT
Copy link
Contributor Author

How hard would it be to add such a dependency management to the processing server? If we had a field depends_on in the job model, the queue handling knew about this and we work under the assumption of completely deterministic workflows

That would just require an additional field depends_on to the job models. Then the workflow endpoint itself could assign the depends_on field for the specific jobs when submitting to the processing endpoint.

Should we allow the user to be able to set dependencies to the processing request they're submitting? I.e., the user sends a processing request, receives a job_id then, submits a second request and sets the depends_on field to the value of the previously received job_id.

The queue logic would need to take this into account and start only the jobs that either have no dependency or have their dependency met.

What would happen to the jobs whose dependencies are not met yet but are in front of the queue and have been consumed from the queue? Should they be pushed to the back of the queue? In that case, this would potentially mess up the order inside the queue. This scenario could only happen when two workflows run on the same workspace in parallel.

The details are of course much more involved, like error handling, timeouts etc. but do you think such basic dependency management would be feasible?

My only concern is the potential blocking of tasks in the internal queue (cache) due to race conditions in the job dependencies and/or locked pages.

@kba
Copy link
Member

kba commented Aug 23, 2023

Should we allow the user to be able to set dependencies to the processing request they're submitting? I.e., the user sends a processing request, receives a job_id then, submits a second request and sets the depends_on field to the value of the previously received job_id.

Yes, this should be open to all requests, not just those by the workflow endpoint of the processing server.

What would happen to the jobs whose dependencies are not met yet but are in front of the queue and have been consumed from the queue? Should they be pushed to the back of the queue? In that case, this would potentially mess up the order inside the queue. This scenario could only happen when two workflows run on the same workspace in parallel.

With regards to two workflow working on the same workspace, as I said, there are a lot of pitfalls and edge cases but in the interest of moving ahead, I'd say we can live with that for now.

Why are jobs whose dependencies are not yet met consumed at all? I think they should stay in the queue and the dequeuing should use first(has_dependencies_met(job) for job in queue) logic.

My only concern is the potential blocking of tasks in the internal queue (cache) due to race conditions in the job dependencies and/or locked pages.

Under the assumption that one workflow per workspace is happening, I think we can live with that for now.

@MehmedGIT
Copy link
Contributor Author

MehmedGIT commented Aug 23, 2023

With regards to two workflow working on the same workspace, as I said, there are a lot of pitfalls and edge cases but in the interest of moving ahead, I'd say we can live with that for now.

Under the assumption that one workflow per workspace is happening, I think we can live with that for now.

Okay.

Why are jobs whose dependencies are not yet met consumed at all? I think they should stay in the queue and the dequeuing should use first(has_dependencies_met(job) for job in queue) logic.

Because the queue is a FIFO data structure. The Processing server cannot decide to consume from the middle of the queue. Python queues, unlike Java queues, do not even support the peek() method. So the Processing server neither can just peek at the processing request in front of the queue to see if the dependencies are met or not. Once the Processing server consumes the front request from some internal queue, it has to process the request. If the request has unmet dependencies then the only thing the Processing server could do is to push the request again in the same internal queue but to the back of the queue.

Btw, could someone check what is that 1 failing test about? It's related to the bashlib and not related to the changes I made in the last commit at all.

@joschrew
Copy link
Contributor

Btw, could someone check what is that 1 failing test about? It's related to the bashlib and not related to the changes I made in the last commit at all.

Seems to me it is the pydantic error (pydantic version 2 not usable) already fixed in core. Can you sync latest core or do what is done in core's 5d3fdb3?

MehmedGIT and others added 2 commits August 23, 2023 14:04
# Conflicts:
#	ocrd_network/ocrd_network/processing_worker.py
@tdoan2010
Copy link
Contributor

After reading your comments, it seems to me that the only problem we have now is the missing check of the input file group. So, according to Mehmed's example:

Consider the following scenario.
2 processing requests with their respective input-output file group pairs:

  1. DEFAULT - OCR-D-BIN
  2. OCR-D-BIN - OCR-D-CROP

We need to make sure that OCR-D-BIN exists and not locked before sending the second processing request to the Processing Worker.


Why do we need to introduce the depends_on field? If I understand it correctly, this does not aim to solve the above problem, but a new feature. If so, we can just ignore it at the moment for our MVP.


The situation that 2 workflows running on the same workspace doesn't make sense to me. I believe such a situation won't happen. But to be sure, when we start working on a workspace, we need to lock it.

@tdoan2010
Copy link
Contributor

Seems to me it is the pydantic error (pydantic version 2 not usable) already fixed in core

Be careful that the newer version of Fast API uses Pydantic 2. So, make sure that you use the correct version of Fast API to avoid Pydantic version conflict.

@kba
Copy link
Member

kba commented Aug 23, 2023

We need to make sure that OCR-D-BIN exists and not locked before sending the second processing request to the Processing Worker.

With the depends_on solution, this is handled by the processing server. With your proposal, the workflow endpoint would need to block and wait and keep track of the dependencies itself, so a lot more effort, so unless there's strong reasons against the depends_on solution, I think it is better.

@MehmedGIT
Copy link
Contributor Author

MehmedGIT commented Aug 23, 2023

Okay, I get the idea, and indeed frequent reads from the Mets file by the Processing Server won't be needed with that approach. No reads at all will be needed if we trust the user that the very first input group (DEFAULT in your example) will always exist. However, keep in mind that we work on page level when locking. So the fields would rather be pages_available and pages_locked. Only pages_locked is currently implemented. The data structure of pages_locked is a dictionary, each output file group is a key, and the value for each key is a set of integers representing the locked pages. If no pages were specified in the processing request, the value is a string all_pages.

Would it be a good solution to have pages_available as well? Then we would have to know how many pages are there in the specific workspace beforehand... Of course, we could use the same approach as in pages_locked. Assign all_pages as an initial value of pages_available indicating that all pages are available in the beginning.

Consider this example:
A binarization processing request comes for the first 10 pages of DEFAULT. pages_available[DEFAULT] is set to "all_pages" by default. The first 10 pages of OCR-D-BIN will be locked -> pages_locked["OCR-D-BIN"] is a set of [1,2,...,10]. After the processing, the first 10 pages of OCR-D-BIN will be unlocked and made available -> pages_locked["OCR-D-BIN] is an empty set and pages_available["OCR-D-BIN"] is a set of [1,2,...,10]. And so on.

However, this approach will fail miserably when the user uploads a workspace that already has some steps processed. Say when there is the OCR-D-BIN file group already available for all pages, or even worse just a subset of pages... Then our assumptions for pages_available will be wrong. Unless we read from the Mets file what file groups are currently available to update the database appropriately.

@kba
Copy link
Member

kba commented Aug 23, 2023

I still don't see how this approach with filegroup-based locking is a better or simpler than explicit job-based dependencies, for which we would not need workspace-global state at all. I mean beyond the workspace-specific queue we already have.

Because the queue is a FIFO data structure. The Processing server cannot decide to consume from the middle of the queue. Python queues, unlike Java queues, do not even support the peek() method. So the Processing server neither can just peek at the processing request in front of the queue to see if the dependencies are met or not. Once the Processing server consumes the front request from some internal queue, it has to process the request. If the request has unmet dependencies then the only thing the Processing server could do is to push the request again in the same internal queue but to the back of the queue.

Indeed, if we stick to the implementations in queue then we really just have put and get, so we would need our own implementation/subclass. The actual list of elements in the queue is available via the .queue attribute but that's not synchronized, so we cannot just modify it.

But I think implementing a queue with an additional method get(condition) returning the first element in the queue that fulfills condition (i.e. lambda job: not job.depends_on) is cleaner than keeping track of combinations of input/output file groups per workspace.

Then our assumptions for pages_available will be wrong.

That is a strong argument against.

@MehmedGIT
Copy link
Contributor Author

MehmedGIT commented Aug 24, 2023

I still don't see how this approach with filegroup-based locking is a better or simpler than explicit job-based dependencies, for which we would not need workspace-global state at all.

We need both the locking and the job-based dependencies. Just the availability is not needed with the dependencies. The page locking prevents writes to the same output file group by multiple processing workers but still allows multiple workers to work on different page domains. The dependencies prevent the usage of an output file group (or page subset of it) before the results are there.

Indeed, if we stick to the implementations in queue then we really just have put and get, so we would need our own implementation/subclass.

But I think implementing a queue with an additional method get(condition) returning the first element in the queue that fulfills condition (i.e. lambda job: not job.depends_on) is cleaner than keeping track of combinations of input/output file groups per workspace.

I think this is not needed. The utilization of a Python list should be enough. When adding new requests, they will be appended to the end of the list. When consuming from the list, we will start checking if the dependencies are met from index 0 till we find a target. Then pop that element from the list. This should work.

However, even with depends_on we achieve the chaining but still do not have a way to indicate the end of a workflow.

EDIT: There is a way with the user-defined callback mechanism.

@MehmedGIT
Copy link
Contributor Author

Seems to me it is the pydantic error (pydantic version 2 not usable) already fixed in core. Can you sync latest core or do what is done in core's 5d3fdb3?

You can just merge master into the branch, like I did in e765bed

That messed up some of the code! It reverted some changes done in #1069...

@kba
Copy link
Member

kba commented Aug 24, 2023

That messed up some of the code! It reverted some changes done in #1069...

What specifically? Something from #1075?

@MehmedGIT
Copy link
Contributor Author

That messed up some of the code! It reverted some changes done in #1069...

What specifically? Something from #1075?

The internal callback mechanism is broken, investigating now.

@MehmedGIT
Copy link
Contributor Author

MehmedGIT commented Aug 24, 2023

depends_on feature is supported now. It is a List of job_ids. Example usage:

SERVER_ADDRESS: str = "http://localhost:8080"
METS_FILE_PATH: str = "/home/mm/Desktop/example_ws2/data/mets.xml"

def post_processing_request(processor_name: str, mets_path: str, input_fileGrp: List[str], output_fileGrp: List[str], depends_on: List[str] = None, parameters: dict = None):
    if not depends_on:
        depends_on = []
    if not parameters:
        parameters = {}

    req_url = f"{SERVER_ADDRESS}/processor/{processor_name}"
    req_headers = {"accept": "application/json"}
    req_data = {
        "path_to_mets": mets_path,
        "description": f"Demo execution of {processor_name}",
        "input_file_grps": input_fileGrp,
        "output_file_grps": output_fileGrp,
        "parameters": {},
        "agent_type": "worker",
        "depends_on": depends_on
    }
    
    response = requests.post(url=req_url, headers=req_headers, json=req_data)
    response_json = response.json()
    return response_json['job_id']
    
def main():
    print(f"Processing Server address: {SERVER_ADDRESS}")
    job_id1 = post_processing_request("ocrd-cis-ocropy-binarize", METS_FILE_PATH, ["DEFAULT"], ["OCR-D-BIN"])
    sleep(5)
    job_id2 = post_processing_request("ocrd-anybaseocr-crop", METS_FILE_PATH, ["OCR-D-BIN"], ["OCR-D-CROP"], [job_id1])
    sleep(5)

main()

The internal_callback_url mechanism (potentially the callback_url one too) is broken. It is not related to the merge of the core as I previously thought. When I submit 2 processing requests, the first one starts running, the second one gets cached. Once the first one finishes and uses the internal callback, the following error is produced:

20:54:06.318 ERROR ocrd_network.processing_server - <starlette.requests.Request object at 0x7f916740e2d0>: 1 validation error for Request body -> input_file_grps field required (type=value_error.missing)
INFO:     127.0.0.1:39152 - "POST /processor/result_callback HTTP/1.1" 422 Unprocessable Entity

This is a complete brain fuck since the result message does not have input_file_grps neither the callback method of /processor/result_callback has anything to do with that parameter.

To reproduce the error, here is a simple python script that sends a result message to the endpoint:

import requests

def post_result_callback(job_id: str, state: str, workspace_id: str, mets_path: str):
    req_url = f"http://localhost:8080/processor/result_callback"
    req_headers = {"accept": "application/json"}
    req_data = {
        "job_id": job_id,
        "state": state,
        "workspace_id": workspace_id,
        "path_to_mets": mets_path
    }

    print(f"Posting result callback to: {req_url}")
    print(f"Request data: {req_data}")
    response = requests.post(url=req_url, headers=req_headers, json=req_data)
    print(response)
    response_json = response.json()
    print(response_json)

def main():
    post_result_callback(
      job_id="1604a308-02c7-49ea-9f32-a900886287dc", 
      state="SUCCESS", 
      workspace_id="c7f25615-fc17-4365-a74d-ad20e1ddbd0e", 
      mets_path="/home/mm/Desktop/example_ws2/data/mets.xml"
    )

main()

Just make sure to adapt the Processing Server's host and port address accordingly.

@kba
Copy link
Member

kba commented Aug 25, 2023

This is a complete brain fuck since the result message does not have input_file_grps neither the callback method of /processor/result_callback has anything to do with that parameter.

Could it be that the /processor/{processor_name} endpoint takes precedence over /processor/result_callback endpoint and the request meant for the latter is sent to the former and that is why it is looking for input_file_grp (and the other PyJobInput fileds)? I.e. is result_callback misinterpreted as a processor name?

@MehmedGIT
Copy link
Contributor Author

This is a complete brain fuck since the result message does not have input_file_grps neither the callback method of /processor/result_callback has anything to do with that parameter.

Could it be that the /processor/{processor_name} endpoint takes precedence over /processor/result_callback endpoint and the request meant for the latter is sent to the former and that is why it is looking for input_file_grp (and the other PyJobInput fileds)? I.e. is result_callback misinterpreted as a processor name?

That makes perfect sense now! The solution is simple and obvious. /processor/result_callback simply replaced with a different endpoint such as /result_callback.

@MehmedGIT
Copy link
Contributor Author

Everything seems to work fine now. I have fixed other bugs such as:

  • Found index was false even when there was an item found (item on index 0)
  • The processor name is injected twice when caching

@MehmedGIT
Copy link
Contributor Author

@joschrew, why is the set swapped with a list for the locked pages in the last commit?

@joschrew
Copy link
Contributor

joschrew commented Aug 29, 2023

@joschrew, why is the set swapped with a list for the locked pages in the last commit?

I wrote the reason into the commit message (extended message): MongoDb cannot store sets. It stores it as a list. If read from mongo it is a list and all set-messages fail with AttributeError

@MehmedGIT
Copy link
Contributor Author

Update: Every time a result callback is received, only the next available item was consumed from the internal queue of a workspace. Now, all available items are consumed and sent to the appropriate RabbitMQ queue to be consumed by processing workers.

@MehmedGIT MehmedGIT requested a review from joschrew August 30, 2023 11:47
@MehmedGIT MehmedGIT marked this pull request as ready for review August 30, 2023 11:49
@kba kba merged commit e352a48 into master Sep 4, 2023
@kba kba deleted the processing_server_ext_1046 branch September 4, 2023 11:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants