Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow multiprocess dep instead of multiprocessing #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions pypeln/process/queue.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import multiprocessing
from multiprocessing.queues import Empty, Queue
import sys
import traceback
import typing as tp

if "multiprocess" in sys.modules:
from multiprocess import get_context
from multiprocess.queues import Empty, Queue
else:
from multiprocessing import get_context
from multiprocessing.queues import Empty, Queue


from pypeln import utils as pypeln_utils

Expand All @@ -18,13 +23,13 @@

class IterableQueue(Queue, tp.Generic[T], tp.Iterable[T]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this inheritance of Queue might complicate things

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typehints should stay the same, the important thing would be to use a different module when initializing Queue and Process at runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

heres a wip impl #100

def __init__(self, maxsize: int = 0, total_sources: int = 1):
super().__init__(maxsize=maxsize, ctx=multiprocessing.get_context())
super().__init__(maxsize=maxsize, ctx=get_context())

self.namespace = utils.Namespace(
remaining=total_sources, exception=False, force_stop=False
)
self.exception_queue: Queue[PipelineException] = Queue(
ctx=multiprocessing.get_context()
ctx=get_context()
)

def get(self, block: bool = True, timeout: tp.Optional[float] = None) -> T:
Expand Down
1 change: 0 additions & 1 deletion pypeln/process/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
import multiprocessing
import threading
import time
import typing as tp
Expand Down
12 changes: 8 additions & 4 deletions pypeln/process/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import multiprocessing
import multiprocessing.synchronize
import typing as tp
import sys

if "multiprocess" in sys.modules:
from multiprocess import Manager, Lock
else:
from multiprocessing import Manager, Lock

from pypeln import utils as pypeln_utils

Expand All @@ -12,10 +16,10 @@ def __init__(self, **kwargs):
global MANAGER

if MANAGER is None:
MANAGER = multiprocessing.Manager()
MANAGER = Manager()

self.__dict__["_namespace"] = MANAGER.Namespace(**kwargs)
self.__dict__["_lock"] = multiprocessing.Lock()
self.__dict__["_lock"] = Lock()

def __getattr__(self, key) -> tp.Any:
if key in ("_namespace", "_lock"):
Expand Down
19 changes: 10 additions & 9 deletions pypeln/process/worker.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import abc
from copy import copy
from dataclasses import dataclass, field
import functools
import multiprocessing
from multiprocessing import synchronize
import threading
import time
import typing as tp
import sys

if "multiprocess" in sys.modules:
from multiprocess import Process
else:
from multiprocessing import Process

import stopit

Expand Down Expand Up @@ -63,7 +64,7 @@ class Worker(tp.Generic[T]):
namespace: utils.Namespace = field(
default_factory=lambda: utils.Namespace(done=False, task_start_time=None)
)
process: tp.Optional[tp.Union[multiprocessing.Process, threading.Thread]] = None
process: tp.Optional[tp.Union[Process, threading.Thread]] = None

def __call__(self):

Expand Down Expand Up @@ -138,7 +139,7 @@ def stop(self):
if not self.process.is_alive():
return

if isinstance(self.process, multiprocessing.Process):
if isinstance(self.process, Process):
self.process.terminate()
else:
stopit.async_raise(
Expand Down Expand Up @@ -227,7 +228,7 @@ def start_workers(
args: tp.Tuple[tp.Any, ...] = tuple(),
kwargs: tp.Optional[tp.Dict[tp.Any, tp.Any]] = None,
use_threads: bool = False,
) -> tp.Union[tp.List[multiprocessing.Process], tp.List[threading.Thread]]:
) -> tp.Union[tp.List[Process], tp.List[threading.Thread]]:
if kwargs is None:
kwargs = {}

Expand All @@ -237,7 +238,7 @@ def start_workers(
if use_threads:
t = threading.Thread(target=target, args=args, kwargs=kwargs)
else:
t = multiprocessing.Process(target=target, args=args, kwargs=kwargs)
t = Process(target=target, args=args, kwargs=kwargs)
t.daemon = True
t.start()
workers.append(t)
Expand Down
1 change: 0 additions & 1 deletion pypeln/thread/supervisor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
import multiprocessing
import threading
import time
import typing as tp
Expand Down