Skip to content

Conversation

@shijinkui
Copy link
Contributor

Better support sending partial messages in Pregel API

1. the reqirement

In many iterative graph algorithms, only a part of the vertexes (we call them ActiveVertexes) need to send messages to their neighbours in each iteration. In many cases, ActiveVertexes are the vertexes that their attributes do not change between the previous and current iteration. To implement this requirement, we can use Pregel API + a flag (e.g., bool isAttrChanged) in each vertex's attribute.

However, after aggregateMessage or mapReduceTriplets of each iteration, we need to reset this flag to the init value in every vertex, which needs a heavy joinVertices.

We find a more efficient way to meet this requirement and want to discuss it here.

Look at a simple example as follows:

In i-th iteartion, the previous attribute of each vertex is Attr and the newly computed attribute is NewAttr:

VID Attr NewAttr Neighbours
1 4 5 2, 3
2 3 2 1, 4
3 2 2 1, 4
4 3 4 1, 2, 3

Our requirement is that:

  1. Set each vertex's Attr to be NewAttr in i-th iteration
  2. For each vertex whose Attr!=NewAttr, send message to its neighbours in the next iteration's aggregateMessage.

We found it is hard to implement this requirment using current Pregel API efficiently. The reason is that we not only need to perform pregel() to compute the NewAttr (2) but also need to perform outJoin() to satisfy (1).

A simple idea is to keep a isAttrChanged:Boolean (solution 1) or flag:Int (solution 2) in each vertex's attribute.

2. two solution


2.1 solution 1: label and reset isAttrChanged:Boolean of Vertex Attr

s1

  1. init message by aggregateMessage
    it return a messageRDD

  2. innerJoin
    compute the messages on the received vertex, return a new VertexRDD which have the computed value by customed logic function vprog, set isAttrChanged = true

  3. outerJoinVertices
    update the changed vertex to the whole graph. now the graph is new.

  4. aggregateMessage. it return a messageRDD

  5. joinVertices reset erery isAttrChanged of Vertex attr to false

    //  here reset the isAttrChanged to false
    g = updateG.joinVertices(updateG.vertices) {
        (vid, oriVertex, updateGVertex) => updateGVertex.reset()
        }
    

    here need to reset the vertex attribute object's variable as false

if don't reset the isAttrChanged, it will send message next iteration directly.

result:

  • Edge: 890041895
  • Vertex: 181640208
  • Iterate: 150 times
  • Cost total: 8.4h
  • can't run until the 0 message

solution 2. color vertex. (Choose this)

s2

iterate process:

  1. innerJoin
    vprog using as a partial function, looks like vprog(curIter, _: VertexId, _: VD, _: A)
    i = i + 1; val curIter = i.
    in vprog, user can fetch curIter and assign to falg.
  2. outerJoinVertices
    graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => newOpt.getOrElse(old)}.cache()
  3. aggregateMessages
    sendMsg is partial function, looks like sendMsg(curIter, _: EdgeContext[VD, ED, A]
    in sendMsg, compare curIter with flag, determine whether sending message

result

raw data from

  • vertex: 181640208
  • edge: 890041895
iteration average cost 150 iteration cost 420 iteration cost
solution 1 188m 7.8h cannot finish
solution 2 24m 1.2h 3.1h
compare 7x 6.5x finished in 3.1

the end

i think the second solution(Pregel + a flag) is better.
this can really support the iterative graph algorithms which only part of the vertexes send messages to their neighbours in each iteration.

@liancheng
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25029 has finished for PR 3866 at commit 2d2195b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Jan 4, 2015

Test build #25034 has finished for PR 3866 at commit 4f3c95a.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25042 has finished for PR 3866 at commit a9668db.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 5, 2015

Test build #25045 has finished for PR 3866 at commit 1cb8770.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@shijinkui
Copy link
Contributor Author

@ankurdave @rxin

@shijinkui shijinkui force-pushed the pregle_with_cur_iterId branch from 1cb8770 to 6da1939 Compare January 6, 2015 13:07
@SparkQA
Copy link

SparkQA commented Jan 6, 2015

Test build #25098 has finished for PR 3866 at commit 6da1939.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 6, 2015

Test build #25099 has finished for PR 3866 at commit b584ac4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ankurdave
Copy link
Contributor

I'm not sure I understand the problems you're trying to solve in this PR. Is this an accurate summary:

  1. Activeness based on whether a vertex attribute changed from one superstep to the next (not implemented in the code yet?). An alternative to using an isAttrChanged flag could be to use a VertexRDD join such as VertexRDD#diff.
  2. Arbitrary convergence tests based on iteration number and number of active messages (implemented using isTerminal). What is the use case for this?
  3. Optional initial message (implemented by making initialMsg an Option).

@shijinkui
Copy link
Contributor Author

@ankurdave thanks for reviewing :)

  1. Activeness is from innerJoin's result by vprog the message of pre iteration.isAttrChanged is from solution1. It can run, but the key problem is after 100 or more iteration, it progressive increase one second every iteration. So, I consider the solution 2. Solution 2 is surprised on both performance and code logic is more clear.
  2. take k-core algorithm for example. The coreness from 4 to 3 need iterating 68 times, from 3 to 2 need hundreds. But the sum coreness of 4, 2, 1 in the final result is not important, or user can ignore the effect of lacking computing coreness 4, 3, 2,1. So they want't to finish the loop ahead of time.
    isTerminal provide a custom condition of terminal by current iterate id curIter and messageCount.
    Default isTerminal is true.
  3. making initialMsg an Option, as sometimes the initalMsg is empty. when initalMsg is empty, aggregate message on every Vertex directly.

@shijinkui
Copy link
Contributor Author

@ankurdave
Solution 1 is better. It have to add joinVertices for reset isAttrChanged to default value.
It have two problem having no idea for solving:

  1. Progressive increase one second every iteration.
    We can't accept one second progressive increased, because after 600th iteration, the extra cost will become 600s. It like this:
iteration increase cost
100 1 150
101 1 151
102 1 152
103 1 153
104 1 154
.. 1 ..
700 1 850
701 1 851
703 1 852
704 1 853

2 joinVertices
joinVertices will own half cost of the whole Pregel-liked cost.
The more iteration the bigger of joinVertices. It's a linear increase.

@shijinkui
Copy link
Contributor Author

@rxin @ankurdave have any other problem?

@shijinkui
Copy link
Contributor Author

cc @andrewor14

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented Dec 31, 2015

I'm going to close this pull request. If this is still relevant and you are interested in pushing it forward, please open a new pull request. Thanks!

@asfgit asfgit closed this in 7b4452b Dec 31, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants