Skip to content

Commit

Permalink
static check changes
Browse files Browse the repository at this point in the history
Signed-off-by: arunjose696 <[email protected]>
  • Loading branch information
arunjose696 committed Mar 27, 2023
1 parent 5e84a30 commit b31b343
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 82 deletions.
33 changes: 19 additions & 14 deletions unidist/core/backends/mpi/core/controller/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
from unidist.core.backends.mpi.core.controller.garbage_collector import (
garbage_collector,
)
from unidist.core.backends.mpi.core.controller.common import push_data, RoundRobin
from unidist.core.backends.mpi.core.controller.common import RoundRobin
from .api import workQueue


class ActorMethod:
"""
Class responsible to execute method of an actor.
Expand All @@ -40,13 +41,10 @@ def __call__(self, *args, num_returns=1, **kwargs):
unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}

#push_data(self._actor._owner_rank, unwrapped_args)
#push_data(self._actor._owner_rank, unwrapped_kwargs)
from unidist.core.backends.common.data_id import DataID
breakpoint()
workQueue.put([1,(self._actor._owner_rank, [DataID("dadada")])])
workQueue.put([1,(self._actor._owner_rank, unwrapped_args)])
workQueue.put([1,(self._actor._owner_rank, unwrapped_kwargs)])
# push_data(self._actor._owner_rank, unwrapped_args)
# push_data(self._actor._owner_rank, unwrapped_kwargs)
workQueue.put([(self._actor._owner_rank, unwrapped_args)])
workQueue.put([(self._actor._owner_rank, unwrapped_kwargs)])
operation_type = common.Operation.ACTOR_EXECUTE
operation_data = {
"task": self._method_name,
Expand All @@ -55,12 +53,19 @@ def __call__(self, *args, num_returns=1, **kwargs):
"output": common.master_data_ids_to_base(output_id),
"handler": self._actor._handler_id.base_data_id(),
}

workQueue.put([2,(communication.MPIState.get_instance().comm,
operation_type,
operation_data,
self._actor._owner_rank,)])


workQueue.put(
[
2,
(
communication.MPIState.get_instance().comm,
operation_type,
operation_data,
self._actor._owner_rank,
),
]
)

return output_id


Expand Down
115 changes: 54 additions & 61 deletions unidist/core/backends/mpi/core/controller/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
import atexit
import signal
import asyncio
import queue
import threading
import time
from collections import defaultdict
from datetime import datetime

try:
import mpi4py
from mpi4py import futures
Expand Down Expand Up @@ -51,47 +54,44 @@
# The global variable is responsible for if MPI backend has already been initialized
is_mpi_initialized = False

import queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print ("Starting " + self.name)
process_data(self.name, self.q)
print ("Exiting " + self.name)

class myThread(threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q

def run(self):
print("Starting " + self.name)
process_data(self.name, self.q)
print("Exiting " + self.name)


def process_data(threadName, q):
global getRequests
while not exitFlag:
queueLock.acquire()
#print("queue size is {} , time ={}".format(workQueue.qsize(),datetime.fromtimestamp(time.time())))
# print("queue size is {} , time ={}".format(workQueue.qsize(),datetime.fromtimestamp(time.time())))
if not workQueue.empty():
future, data = q.get()
future, data = q.get()
queueLock.release()
function, args = data
function, args = data
result = function(*args)
if future:
future.set_result(result)
else:
queueLock.release()
#time.sleep(.1)
# time.sleep(.1)
print("exited")


threadList = ["Thread-1"]
queueLock = threading.Lock()
workQueue = queue.Queue(0)
getQueue = queue.Queue(0)
threads = []


Expand Down Expand Up @@ -122,12 +122,12 @@ def init():
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
parent_comm = MPI.Comm.Get_parent()
if not threads and rank == 0 and parent_comm == MPI.COMM_NULL:
if not threads and rank == 0 and parent_comm == MPI.COMM_NULL:
for tName in threadList:
thread = myThread(1, tName, workQueue)
thread.start()
threads.append(thread)

# path to dynamically spawn MPI processes
if rank == 0 and parent_comm == MPI.COMM_NULL:
# Create new threads
Expand Down Expand Up @@ -231,7 +231,6 @@ def shutdown():
"""
global exitFlag
exitFlag = 1
print("here")
for t in threads:
t.join()
mpi_state = communication.MPIState.get_instance()
Expand All @@ -245,7 +244,6 @@ def shutdown():
async_operations.finish()
if not MPI.Is_finalized():
MPI.Finalize()



def cluster_resources():
Expand Down Expand Up @@ -283,12 +281,11 @@ def put(data):
unidist.core.backends.mpi.core.common.MasterDataID
An ID of an object in object storage.
"""
#data_id = object_store.generate_data_id(garbage_collector)
data_id = futures.Future()
workQueue.put([data_id, [ object_store.generate_data_id,[garbage_collector]] ])
# data_id = object_store.generate_data_id(garbage_collector)
data_id = futures.Future()
workQueue.put([data_id, [object_store.generate_data_id, [garbage_collector]]])
data_id = data_id.result()
workQueue.put([ None, [object_store.put,[data_id, data]] ])
#object_store.put(data_id, data)
workQueue.put([None, [object_store.put, [data_id, data]]])

logger.debug("PUT {} id".format(data_id._id))

Expand All @@ -311,26 +308,16 @@ def get(data_ids):
"""

def get_impl(data_id):
global getRequests,workQueue
global getRequests, workQueue
if object_store.contains(data_id):
value = object_store.get(data_id)
else:
queueLock.acquire()
print("size {} data_id={}".format(workQueue.qsize(), data_id))
future = futures.Future()
workQueue.put([future, [ request_worker_data, [data_id] ] ])
print("size {} data_id={}".format(workQueue.qsize(), data_id))
future = futures.Future()
workQueue.put([future, [request_worker_data, [data_id]]])
queueLock.release()
value = future.result()


print("got {}".format( data_id))
print("value {}".format( value))






value = future.result()

if isinstance(value, Exception):
raise value
Expand Down Expand Up @@ -449,8 +436,16 @@ def submit(task, *args, num_returns=1, **kwargs):
# dest_rank, garbage_collector, num_returns
# )
global workQueue
output_ids = futures.Future()
workQueue.put([output_ids, [ object_store.generate_output_data_id,[dest_rank, garbage_collector, num_returns]] ])
output_ids = futures.Future()
workQueue.put(
[
output_ids,
[
object_store.generate_output_data_id,
[dest_rank, garbage_collector, num_returns],
],
]
)
output_ids = output_ids.result()
logger.debug("REMOTE OPERATION")
logger.debug(
Expand All @@ -466,16 +461,13 @@ def submit(task, *args, num_returns=1, **kwargs):

unwrapped_args = [common.unwrap_data_ids(arg) for arg in args]
unwrapped_kwargs = {k: common.unwrap_data_ids(v) for k, v in kwargs.items()}



queueLock.acquire()
print(workQueue.qsize())
workQueue.put([ None, [ push_data, [dest_rank, unwrapped_args]]])

print(unwrapped_args, time.time())
workQueue.put([ None, [ push_data, [dest_rank, unwrapped_kwargs]]])
workQueue.put([None, [push_data, [dest_rank, unwrapped_args]]])


print(unwrapped_args, time.time())
workQueue.put([None, [push_data, [dest_rank, unwrapped_kwargs]]])

operation_type = common.Operation.EXECUTE
operation_data = {
Expand All @@ -484,19 +476,22 @@ def submit(task, *args, num_returns=1, **kwargs):
"kwargs": unwrapped_kwargs,
"output": common.master_data_ids_to_base(output_ids),
}

def send_operation_impl(comm, operation_type, operation_data, dest_rank):
async_operations = AsyncOperations.get_instance()
h_list, _ = communication.isend_complex_operation(
comm,
operation_type,
operation_data,
dest_rank,
comm,
operation_type,
operation_data,
dest_rank,
)
async_operations.extend(h_list)

comm = communication.MPIState.get_instance().comm
workQueue.put([ None, [ send_operation_impl, [comm, operation_type, operation_data, dest_rank]]])


workQueue.put(
[None, [send_operation_impl, [comm, operation_type, operation_data, dest_rank]]]
)

queueLock.release()
# Track the task execution
garbage_collector.increment_task_counter()
Expand All @@ -507,8 +502,6 @@ def send_operation_impl(comm, operation_type, operation_data, dest_rank):
# ---------------------------- #
# unidist termination handling #
# ---------------------------- #
#!/usr/bin/python



def _termination_handler():
Expand Down
15 changes: 8 additions & 7 deletions unidist/core/backends/mpi/core/controller/garbage_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ def _send_cleanup_request(self, cleanup_list):
s_cleanup_list = SimpleDataSerializer().serialize_pickle(cleanup_list)
async_operations = AsyncOperations.get_instance()
for rank_id in range(initial_worker_number, mpi_state.world_size):
h_list = communication.isend_serialized_operation(
mpi_state.comm,
common.Operation.CLEANUP,
s_cleanup_list,
rank_id,
)
async_operations.extend(h_list)
if rank_id != mpi_state.rank:
h_list = communication.isend_serialized_operation(
mpi_state.comm,
common.Operation.CLEANUP,
s_cleanup_list,
rank_id,
)
async_operations.extend(h_list)

def increment_task_counter(self):
"""
Expand Down

0 comments on commit b31b343

Please sign in to comment.