Skip to content
This repository has been archived by the owner on Aug 25, 2024. It is now read-only.

Commit

Permalink
high level: subflow typecast: Basic OpImpCtx helper
Browse files Browse the repository at this point in the history
Signed-off-by: John Andersen <[email protected]>
  • Loading branch information
pdxjohnny committed Jul 29, 2022
1 parent 8c0531e commit 85d57ad
Showing 1 changed file with 36 additions and 1 deletion.
37 changes: 36 additions & 1 deletion dffml/high_level/dataflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import inspect
import asyncio
from typing import Optional, Tuple, List, Union, Dict, Any, AsyncIterator
from typing import Optional, Tuple, List, Union, Dict, Any, AsyncIterator, Type

from ..overlay.overlay import (
Overlay,
Expand Down Expand Up @@ -230,3 +230,38 @@ async def run(
async with orchestrator(dataflow) as ctx:
async for ctx, results in ctx.run(*input_sets, strict=strict):
yield ctx, results


async def subflow_typecast(
opimp_ctx,
cls: Type,
input_set_context: Type[BaseInputSetContext],
value: Any,
) -> AsyncIterator[Tuple[BaseInputSetContext, Any]]:
dataflow, upstream = Overlay._static_dataflow_and_upstream(cls)
key, definition = list(opimp_ctx.parent.op.outputs.items())[0]
# TODO Run with opimp_ctx.subflow(), enable forwarding
async with opimp_ctx.octx.ictx.definitions(opimp_ctx.ctx) as definitions:
async for ctx, results in run(
dataflow,
{
input_set_context: [
Input(
value=value,
definition=definition,
parents=None,
origin=(opimp_ctx.parent.op.instance_name, key),
),
*[
item
async for item in definitions.inputs()
if (
item.definition in upstream.definitions.values()
and item.definition
not in opimp_ctx.parent.op.inputs.values()
)
],
],
},
):
yield ctx, results

0 comments on commit 85d57ad

Please sign in to comment.