Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation for channel classes #12

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions distalg/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from distalg.message import Message
from multipledispatch import dispatch


class UnreliableDelayedChannel:
class TerminateToken(Message):
"""
A special kind of message which when sent into a channel
breaks/terminates the channel
"""
def __init__(self):
super(UnreliableDelayedChannel.TerminateToken, self).__init__()


class PoppedMsgsAsyncIterable:
def __init__(self, outer_instance):
self.outer = outer_instance
Expand All @@ -32,7 +36,7 @@ def __init__(self,
Every message sent into the channel is sent individually to all the receiving processes.
All units are in milliseconds
:param delay_mean: mean delay for a message to reach from in end to out end
:param delay_std_dev: variation in delay for a message to reach fro in end to out end
:param delay_std_dev: variation in delay for a message to reach from in end to out end
:param min_delay: guarantee that the delay won't be less than this value
:param max_delay: guarantee that the delay won't be more than this value
:param reliability: The reliability with which a message is delivered. [0.0, 1.0]
Expand All @@ -51,20 +55,29 @@ def __init__(self,

@property
def in_end(self):
"""
:return: The Process that is sending messages into the channel
"""
return self._in_end

@property
def out_end(self):
"""
:return: The Process that is receiving messages from the channel
"""
return self._out_end

@property
def back(self):
"""
:return: The channel that connects the participating processes in the reverse direction
"""
return self._back

async def __deliver(self, message):
"""
:param message: The Message object to be delivered
:return:
:return: None
"""
sample = random.random() # generates [0.0, 1.0)
if sample >= self.reliability:
Expand All @@ -80,21 +93,51 @@ async def __deliver(self, message):
await self._out_end.incoming_msgs.put(message)

async def start(self):
"""
The function needs to be called in the event loop for the channel to function.
This function directs the channel to start conducting messages from the in_end to out_end.
:return: None
"""
async for msg in self.obtain_msgs():
if isinstance(msg, UnreliableDelayedChannel.TerminateToken):
return
await self.__deliver(msg)

async def send(self, message):
"""
Submits the message to the channel to be delivered at the
receiving end. May not be received at the other end if
channel is unreliable.
:param message: The message to be sent across the channel
:return: None
"""
message._channel = self
await self.started.put(message)

async def close(self):
"""
The function closes the channel. All the pending messages before issuing this
function will be delivered and then the channel will stop.
This also means that all the messages submitted after issuing this
command will not be delivered unless the channel is started again.
:return: None
"""
await self.started.put(UnreliableDelayedChannel.TerminateToken())


class DelayedChannel(UnreliableDelayedChannel):
def __init__(self, delay_mean=100, delay_std_dev=10, min_delay=1, max_delay=500):
"""
A kind of channel that is reliable but has a delay between when send is
called and when the message reaches at the receiving end.
:param delay_mean: (default 100 ms) The average delay in receiving a message
:param delay_std_dev: (default 10 ms) The standard deviation in the
normal distribution from which the actual delay is sampled
:param min_delay: (default 1 ms) The minimum time message is going
to take to reach the receiving end.
:param max_delay: (default 500 ms) The maximum delay. Any message won't take
longer than this to reach the receiving end.
"""
super(DelayedChannel, self).__init__(
delay_mean=delay_mean,
delay_std_dev=delay_std_dev,
Expand All @@ -106,6 +149,11 @@ def __init__(self, delay_mean=100, delay_std_dev=10, min_delay=1, max_delay=500)

class UnreliableChannel(UnreliableDelayedChannel):
def __init__(self, reliability=0.9):
"""
A kind of channel that is instant with no delay but is unreliable.
:param reliability: The probability with which the message will
reach the receiving end and won't get lost midway
"""
super(UnreliableChannel, self).__init__(
delay_mean=0,
delay_std_dev=0,
Expand All @@ -117,6 +165,9 @@ def __init__(self, reliability=0.9):

class Channel(DelayedChannel):
def __init__(self):
"""
A normal reliable channel with no transit delays.
"""
super(Channel, self).__init__(0, 0, 0, 0)

async def __deliver(self, message):
Expand Down