Skip to content
This repository was archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-2054] Elastic group communication: broadcast #1487

Open
wants to merge 29 commits into
base: master
Choose a base branch
from

Conversation

interesaaat
Copy link
Contributor

This PR introduces the new elastic group communication framework. For the moment only the broadcast operator with flat topology is implemented. Further operators and topologies will be added in successive PRs.

This PR uses some of the pieces of PR #1479 so the latter should be merged before merging the former.

JIRA:
REEF-2054 Elastic Broadcast

@motus motus requested review from motus and markusweimer January 5, 2019 00:36
@motus motus changed the title Elastic group communication: broadcast [REEF-2054] Elastic group communication: broadcast Jan 5, 2019
Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think I'll start it slow. I'll bundle my comments into groups of 20 or so, and keep reviewing as you push the updates. This way we can work in parallel.

if (testToRun.Equals("ElasticBroadcast".ToLower()) || testToRun.Equals("all"))
{
new ElasticBroadcastClient(runOnYarn, numNodes, startPort, portRange);
Console.WriteLine("ElasticRunBroadcast completed!!!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we write it to the log instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we? This is the client, I see that often we print directly to console, no?

for (int pos = _position; pos < _operators.Count; pos++)
{
_operators[pos].ResetPosition();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just

foreach (var op in _operators)
{
    op.ResetPosition();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky. When you have iterators in the pipeline you only reset each time within the context of an iteration. The for loop is used because the position tells where to actually start to reset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. I have not noticed that it starts from the _position. Then, are we sure that it never starts from -1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. ResetOperatorPositions is private and only used inside MoveNext where the first operation is _position++.

// Check if we need to iterate
if (_iteratorsPosition.Count > 0 && _position == _iteratorsPosition[0])
{
var iteratorOperator = _operators[_position] as IElasticIterator;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that cast. Can we enforce that _operators is a list of IElasticIterator elements instead?

Copy link
Contributor Author

@interesaaat interesaaat Jan 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really because _operators contains the sequence of all operators composing the workflow. I could make all operators be an IElasticIterator of size 0, but this is less clean than using a cast I believe. I am open for suggestions tho.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I actually would not mind having an empty IElasticIterator implementation, but it is up to you here.

P.S. Note that we don't have to implement the actual iterator: IElasticOperator can just inherit from IEnumerable and return an empty sequence from .GetEnumerator() by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this:

IElasticIterator iteratorOperator;                
switch(_operators[_position])
{
    case IElasticIterator iter:
        iteratorOperator = iter;
        break;
     default:
         throw new IllegalStateException("Operator not Iterator");
         break;
}

@interesaaat
Copy link
Contributor Author

Ready for the second round!

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few very minor comments for today. Check for naming conventions and make sure you use {0}-style string interpolation in logging instead of string concatenation or $"". (Note that for exceptions it's the other way around)

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more substantial comments this time. I think we can shorten the code quite a bit

@motus
Copy link
Contributor

motus commented Jan 31, 2019 via email

- Fixed some line length< 120
- Fixed some var initilization in constructors
- Log notation
- Added generics to drivers and client to decrease the number of files
Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some more comments. will continue tomorrow

Fixed API for generic new threshold for the failre machine.
Added params for better set of threshould in the failure machine
Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a few minor comments. Will add more soon!

{
NumOfDataPoints = initalPoints;
NumOfFailedDataPoints = initalPoints;
State = new DefaultFailureState((int)initalState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can define constructor

public DefaultFailureState(DefaultFailureStates state)

to avoid the explicit cast here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mmm not sure. Because I will have to do a cast later on because FailureState is an int (cannot be a DefaultFailureState because it's defined in the base class).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we have to discuss that. I still cannot think of something elegant.

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another round of comments - will add more tomorrow!

for (int pos = _position; pos < _operators.Count; pos++)
{
_operators[pos].ResetPosition();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. I have not noticed that it starts from the _position. Then, are we sure that it never starts from -1?

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few more comments. To be continued!

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few more things

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few more comments. will continue tomorrow

/// <param name="message">The task message for the operator</param>
/// <param name="returnMessages">A list of messages containing the instructions for the task</param>
/// <exception cref="IllegalStateException">If the message cannot be handled correctly or generate an incorrent state</exception>
void OnTaskMessage(ITaskMessage message, ref List<IElasticDriverMessage> returnMessages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to self: why ref? isn't it against the idea of asynchronous event handlers? Also, maybe we should separate input and output messages? i.e. do one of:

// mesages in, messages out, in the most generic container
IEnumerable<IElasticDriverMessage> OnTaskMessage(
    ITaskMessage message, IEnumerable<IElasticDriverMessage> driverMessages);

// messages in, async handler for messages out
void OnTaskMessage(ITaskMessage message,
    IEnumerable<IElasticDriverMessage> driverMessages,
    EventHandler<IEnumerable<IElasticDriverMessage>> handler = null); // or a delegate

need to track the usage of this interface and give it more thought later...

/// <param name="buffer">The memory space where to copy the serialized update</param>
/// <param name="offset">Where to start writing in the buffer</param>
/// <param name="updates">The updates to serialize</param>
internal static void Serialize(byte[] buffer, ref int offset, List<TopologyUpdate> updates)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, why are we serializing everything like that? is it for Java interoperability?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. Where you suggesting to use codecs? I found that the previous group communication uses a similar strategy and I simply piggyback on that. I actually like to have the serialization logic within the message class.

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's another round of comments. Also, please remove the trailing spaces that popped up in the last commit 55aedfa


_hasProgress = true;
var id = Utils.GetContextNum(activeContext) - 1;
var taskId = Utils.BuildTaskId(StagesId, id + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

man, this +/- 1 thing is really confusing. let's come back to it later and discuss some more elegant solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know! the fact is that task infos are internally stored 0-indexed while externally they are used as 1-indexed. The solution here could be to just 0-index everywhere.

@interesaaat
Copy link
Contributor Author

Here's another round of comments. Also, please remove the trailing spaces that popped up in the last commit 55aedfa

Sorry @motus which trailing spaces are you referring to?

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here's some more

NewEvaluatorNumCores = numCores;
NewEvaluatorMemorySize = memorySize;

System.Threading.Tasks.Task.Factory.StartNew(() => Clock.Run(), TaskCreationOptions.LongRunning);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just

System.Threading.Tasks.Task.Factory.StartNew(Clock.Run, TaskCreationOptions.LongRunning);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I feel a bit uneasy about starting a new thread from the injectable constructor. it's hard to track when exactly it will run and when we'll stop it. I would rather have a separate .Start() method (maybe not in this class), and the corresponding Stop()/Dispose()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't 100% get it. You are suggesting something like starting the clock AFTER (outside) the constructor? So to flow will be (1) inject the constructor, (2) invoke Start?

{
if (!_completed)
{
_completed = _stages.Select(stage => stage.Value.IsCompleted).Aggregate((com1, com2) => com1 && com2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe

_completed |= _stages.Values.All(stage => stage.IsCompleted);

and there is no need for if

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why there is no need for if? I want to print only once when we move from not complete to complete.

@motus motus self-assigned this Feb 14, 2019
Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few more comments

lock (_statusLock)
{
_failureStatus = _failureStatus.Merge(
new DefaultFailureState((int)DefaultFailureStates.ContinueAndReconfigure));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really don't like that we have three different representations for the failure state - int, enum, and class/interface. That leads to awkward casts and news. I am still thinking about the most elegant way to resolve this; one approach would be to remove the DefaultFailureStates enum, and go old Java style, like this:

public sealed class DefaultFailureState : IFailureState
{
    public static readonly DefaultFailureState Continue = new DefaultFailureState(0);
    public static readonly DefaultFailureState ContinueAndReconfigure = new DefaultFailureState(1);
    // ...

then here we can just write

_failureStatus = _failureStatus.Merge(DefaultFailureState.ContinueAndReconfigure);

but I am not 100% sure... maybe we can keep the enum, ditch the IFailureState interface, and use implicit operators to convert between the class and enum..

Copy link
Contributor Author

@interesaaat interesaaat Feb 18, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that is weird. I personally like the IFailureState interface because it allows to implement different failure states and related ways of merging them (this is the behavior I want for failure states). If I have to chose, I would prefer to maintain the interface and implement the default behavior in a different way (e.g., as you were suggesting, without the enum).

lock (_statusLock)
{
_failureStatus = _failureStatus.Merge(
new DefaultFailureState((int)DefaultFailureStates.ContinueAndReschedule));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, if we are OK with mutable the FailureState property (and it is currently fully read and write accessible from the outside - maybe we should tighten it up a bit?), we can write e.g.

_failureStatus.Update(DefaultFailureState.ContinueAndReschedule);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you proposing to implement a default behavior for Merge when mutating? I like it!

@interesaaat
Copy link
Contributor Author

interesaaat commented Feb 18, 2019

Let me take track of the major changes I will have to do (once you are done with the "syntactic" pass). I will update this list as we agree on the changes.

  • remove the id - 1, +1 thing. Move everything to 0-indexed;
  • configurations are immutable: fix the methods passing ref configurations
  • change custom retry logic with Microsoft.Practices.EnterpriseLibrary.TransientFaultHandling
  • discuss on incompatibility between DefaultFailureStates and IFailureState. Should we remove enum or remove IFailureState?

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few minor things. Will write more later today

// and duration will update to reflect the new alarm's timestamp
for (long duration = _timer.GetDuration(_schedule.First().TimeStamp);
duration > 0;
duration = _timer.GetDuration(_schedule.First().TimeStamp))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we don't lock the _schedule here - is that Ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from wake. I think it is probably ok: additions are synchronized so in the worst case you will remove and event that was just added and not the one you were waiting for.


DisposeActiveContext();

_isDisposed = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need _isDisposed flag, when we already have separate ones for task and the context? I would simply write the .Dispose() method as

public void Dispose()
{
    DisposeTask();
    DisposeActiveContext();
}

-- after all, we don't call .Dispose() that often (and we don't set _isDisposed in .DropRuntime() anyway 😄)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should discuss on this. Disposing and dropping runtimes are different. You drop runtimes when the tasks fail, while you dispose them when the task manager finishes / fails. I think that more than removing _isDisposed I should add it in other places so that we cannot add a runtime to a disposed task for instance.

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with the Failures folder. Left a few comments, all minor. Still, the thing that bothers me most is type incompatibility between DefaultFailureStates and IFailureState. We should think of something more elegant.

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few nit picks

{
NumOfDataPoints = initalPoints;
NumOfFailedDataPoints = initalPoints;
State = new DefaultFailureState((int)initalState);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah we have to discuss that. I still cannot think of something elegant.

/// </summary>
/// <typeparam name="T">The data type of the message</typeparam>
[Unstable("0.16", "API may change")]
public interface ISender<T>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make T contravariant? i.e.

public interface ISender<in T>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it contravariant, but I am not sure about the use case. In the sense that users will specify anyway the type on the driver side. Did you have any specific use case in mind?

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done with the logical operators. Switching to the physical ones now..

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

halfway through the Task package; will add more comments tomorrow.

Buffer.BlockCopy(
BitConverter.GetBytes((ushort)TaskMessageType.JoinTopology), 0, message, offset, sizeof(ushort));
offset += sizeof(ushort);
Buffer.BlockCopy(BitConverter.GetBytes((ushort)operatorId), 0, message, offset, sizeof(ushort));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably think about using protobuf instead of our own serialization. Let's discuss it later.

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added a few more comments


IIdentifier destId = _idFactory.Create(destination);

for (int retry = 0; !Send(destId, message); retry++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space before !Send(); also, this code looks like another good candidate to be replaced with retry policy mechanism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I read up on the retry policies available for .NET, and it looks like the Microsoft.Practices.TransientFaultHandling I've mentioned earlier (and used elsewhere in REEF) is now deprecated. The best replacement seems to be the Polly project. So we can use MS Practices now as we already have the dependency, and later switch to Polly (in a separate PR, of course)

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few more nit picks

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few minor suggestions. will add more soon

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left a few comments on ITopology and its implementations

}

// This is required later in order to build the topology
if (_taskStage == string.Empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_taskStage is now null by default. maybe,

_taskStage = _taskStage ?? Utils.GetTaskStages(taskId);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this will do a check and reassignment every time instead of just a check, no?

output += rep + " ";
}

return output;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shorter version:

return _rootId + "\n" + string.Join(" ", _root.Children.Select(
    node => node.FailState == DataNodeState.Reachable ? "" + node.TaskId : "X"));

(by the way, shouldn't we print IDs of unreachable tasks? say, $"UNREACHABLE:{node.TaskId}"?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dunno. The failing node should be already present in the log because the task manager logs it once it receives a failure. This is more sort of like "hey we got a failure, this is the new state of world".

Copy link
Contributor

@motus motus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few more comments, and that concludes my second full pass over the entire code

protected readonly ConcurrentQueue<ElasticGroupCommunicationMessage> _sendQueue =
new ConcurrentQueue<ElasticGroupCommunicationMessage>();
protected readonly BlockingCollection<ElasticGroupCommunicationMessage> _messageQueue =
new BlockingCollection<ElasticGroupCommunicationMessage>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trim trailing spaces, add blank line after each multiline declaration

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, no indentation on multiline declarations?


#region Empty Handlers

public void OnError(Exception error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a chance we'll need to override these handlers in derived classes? if yes, we should probably we make them virtual

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think OnError etc. will be ever used because I believe errors will surface directly as crashes.

{
var childTaskId = Utils.BuildTaskId(StageName, child);

_children.TryAdd(child, childTaskId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, we can pass children into the OperatorTopologyWithDefaultCommunication constructor and initialize the dictionary once:

protected OperatorTopologyWithDefaultCommunication(
    // ...
    int disposeTimeout,
    IEnumerable<KeyValuePair<int, string>> children = null)
{
    _children = children == null
                ? new ConcurrentDictionary<int, string>()
                : new ConcurrentDictionary<int, string>(children);

and here do something like

: base(
    // ...
    disposeTimeout,
    children.Select(child =>
        new KeyValuePair<int, string>(child, Utils.BuildTaskId(stageName, child))))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about this. I want to maintain the population of _children in the subclass because it is operator specific.

@interesaaat
Copy link
Contributor Author

@motus I am checking the protobuf thing and is not that easy to do. I mean, I can probably change the serialization/deserialization of messages to use protobuff, but to completely use protobuff end to end I will have to go and change the network service (which is not my code) and this may require some indefinite amount of time. Do you think it is ok if I only try protobuff for my messages?

@motus
Copy link
Contributor

motus commented Jul 19, 2019 via email

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants