Skip to content

Conversation

@bijaybisht
Copy link
Contributor

Topic: Spark Streaming using Event TimeStamps

Spark streaming creates a batch (RDD) of events every T duration. The batch is based on a schedule and the timestamp associated with the batch is the time at which it was scheduled by Spark. Spark applied timestamp may be less relevant than the timestamp for which the event was originally meant to be.

The fundamental reason for the event timestamp to differ from the spark stamp is the delay in event generation in the upstream system and delay in transporting the event to spark after the event generation. The problem is compounded in case of events having a start and an end with both the timestamps packed in a single event generated after the event ends as illustrated in the following diagram.

(upstream) -------s--------e----g---------------->
(spark ) ------------------------r------------->

Horizontal axis is time. Event starts at s ends at e and the event record is generated at g, which is then received by spark at r.

So there is a need to create batches which only contain the relevant events or the relevant proportion of the events according to the original timestamp passed to Spark as a part of the received tuples. Lets refer to a batch which has all the events occurring in the time window it represents as a bin. So a bin from T1 - T2 will 'only have events' which occurred in the period T1 - T2. The definition of the bin can be extended to include ‘all the events’ which occurred in a given period, however second constraint is harder to satisfy in practice, because events can be arbitrarily delayed.

For the rest of the discussion the definition of the batch and the bin shall be as per the previous paragraph.

Bin sizes determine time series time granularity and is an independent consideration in itself i.e independent of the batch/event/delay.

Lets say that batch size is T and the bin size is n_T and an event is delayed (for reception) at a maximum by d_T. So in order to generate a bin, n + d batches of size T are required.

Conversely every batch is going to contribute to current up till the last ceiling((n + d)/ n) bins.

For for batch @ t. The contents can be seen as T1 @ t (where the notation T1 @ t implies events corresponding to bin T1 from batch t), T1 - 1 @t, T1 - 2 @t ... T1 - m @ t (where T1 - 1, represents the bin previous to T1 and m = ceiling(n + d)/ n))).

We can then de-multiplex the contributions from batch @ t into bins T1, T1 - 1, T1 - 2, T1 -3, resulting into streams which represent partial bins relative to the batch stream. So a stream i represents partial bin T1 - i received at t. This way the spark application can deliver incremental bins to the downstream in the most real time possible. Now depending on how the downstream application can handle the partial bins, the definition and the generation of the streams needs to be controlled.

Cases:

  1. The downstream application can handle incremental updates to the bin (i.e. a partial bin = current partial bin + latest partial bin). For this what is required is m streams which send the updates every T interval where

Stream 1: T1 @ t
Stream 2: T1 - 1 @ t

Stream m: T1 - m @ t.

  1. The downstream application can only handle full updates to the bin ( i.e. partial bin = latest partial bin). For this what is required is m streams which send the updates every T interval where

Stream 1: T1 @ t
Stream 2: T1 - 1 @ t + @ t - 1
...
Stream m: T1 - m @ t + … + @ t - m

i.e a bin is getting updated at every T until the bin is final. The first stream represents the most current bin with the latest cumulative update. The next stream represents the previous bin with the latest cumulative update and so on. Until the last stream which represents a final bin.

  1. The downstream application cannot handle updates to a bin. This is basically the last stream from case 2 (highlighted in bold) with the exception that it slides by nT and not T. Note that the next bin after T1 @ t is T1 + 1 @ t + n_T, because the size of the bin is n_T.

Typically each stream needs to treated similarly because it represents that same kind of content, however there can be use cases where the stream may be required to be treated differently. A consideration for the API.

Implementation:

In order to transform a batch stream to a partial bin stream, we can filter the events and put the prorated events in a bin streams representing T @ t, T-1 @ t and so on.

For this we can define a new DStream which generates a DStream by prorating the data from batch to a bin corresponding to the stream.

For the use case 2 which requires progressively accumulating all the events for a bin. A new DStream is required which generates a pulsating window which goes from (s_n + 1) to (s_n + n) where s is the partial stream index. A stream index 0 implies that it is the most current partial bin stream.

APIs

BinStreamer[T](DStream[T], start: T=>Time, end: T=>Time)
This will return a BinStreamer object.

The BinStreamer object can be used to generate incremental bin streams (case 1)/ final bin (case 3) stream/ updated bin streams (case 2) using the following APIs.

BinStreamer.incrementalStreams(sizeInNumBatches: Int, delayIndex: Int, numStreams: Int) : Seq[BinStream[(T,percentage)]]

BinStreamer.finalStream(sizeInNumBatches: Int, delayIndex: Int) : BinStream[(T,percentage)]

BinStreamer.updatedStreams(sizeInNumBatches: Int, delayIndex: Int, numStreams: Int) : Seq[BinStream[(T,percentage)]]

    DStream[T] : This is the batch stream.
        start : Closure to get the start time from the record.
end : Closure to get the end time from the record.
sizeInNumBatches : The size of bin as a multiple of batch size.
delayIndex : The maximum delay between the event relevance and event reception.

numStreams: This is the number of bin streams. Even though it is fixed by batch size, bin size and the delayIndex. This is an optional parameter to control the number of output Streams and it does so by delaying the most current bin.

Each BinStream will wrap a DStream.


def prorate(binStart: Time, binEnd: Time)(x: T) = {

  val sx = startFunc(x)
  val ex = endFunc(x)

  // Even though binStart is not inclusive, binStart here implies limit x as x approaches binStart+
  val s = if (sx > binStart) sx else binStart

  val e = if (ex < binEnd) ex else binEnd

  if (ex == sx) {
    (x, 1.0)
  }
  else {
    (x, (e - s) / (ex - sx))
  }

}

def filter(binStart: Time, binEnd: Time)(x: T) = {

// The flow is starting in the subsequent bin
if (startFunc(x) > binEnd) false

// The flow ended in the prior bin
else if (endFunc(x) <= binStart) false

// start(x) approaches from binEnd+
else if (startFunc(x) == binEnd && endFunc(x) > binEnd) false

else true

}

@andrewor14
Copy link
Contributor

ok to test. Hey @bijaybisht can you open an associated JIRA for this PR? See how other PRs are opened.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have started for PR 2633 at commit 2002151.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 2, 2014

QA tests have finished for PR 2633 at commit 2002151.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • println(s"Failed to load main class $childMainClass.")
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
    • class BinStream[T: ClassTag](@transient ds: DStream[T], sizeInNumBatches: Int, delayInNumBatches: Int)
    • class BinStreamer[T: ClassTag](@transient ds: DStream[T], getStartTime: (T) => Time, getEndTime: (T) => Time) extends Serializable
    • class BinAlignedWindowDStream[T: ClassTag](
    • class ProratedEventDStream[T: ClassTag](parent: DStream[T],
    • class PulsatingWindowDStream[T: ClassTag](parent: DStream[T],

@bijaybisht
Copy link
Contributor Author

Fixed for Apache license headers.

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have started for PR 2633 at commit 9dd7826.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have finished for PR 2633 at commit 9dd7826.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
    • protected case class Keyword(str: String)
    • class BinStream[T: ClassTag](@transient ds: DStream[T], sizeInNumBatches: Int, delayInNumBatches: Int)
    • class BinStreamer[T: ClassTag](@transient ds: DStream[T], getStartTime: (T) => Time, getEndTime: (T) => Time) extends Serializable
    • class BinAlignedWindowDStream[T: ClassTag](
    • class ProratedEventDStream[T: ClassTag](parent: DStream[T],
    • class PulsatingWindowDStream[T: ClassTag](parent: DStream[T],

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have started for PR 2633 at commit 9dd7826.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 3, 2014

QA tests have finished for PR 2633 at commit 9dd7826.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
    • protected case class Keyword(str: String)
    • class BinStream[T: ClassTag](@transient ds: DStream[T], sizeInNumBatches: Int, delayInNumBatches: Int)
    • class BinStreamer[T: ClassTag](@transient ds: DStream[T], getStartTime: (T) => Time, getEndTime: (T) => Time) extends Serializable
    • class BinAlignedWindowDStream[T: ClassTag](
    • class ProratedEventDStream[T: ClassTag](parent: DStream[T],
    • class PulsatingWindowDStream[T: ClassTag](parent: DStream[T],

@bijaybisht
Copy link
Contributor Author

Ran dev/run-tests and removed the scala style errors.

@bijaybisht
Copy link
Contributor Author

Don't understand why it failed this time. Can test be re-fired?

@andrewor14
Copy link
Contributor

Yes, but please update the title of the PR. Jenkins retest this please

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have started for PR 2633 at commit bfe9502.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have finished for PR 2633 at commit bfe9502.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class BinStream[T: ClassTag](
    • class BinStreamer[T: ClassTag](
    • class BinAlignedWindowDStream[T: ClassTag](
    • class ProratedEventDStream[T: ClassTag](parent: DStream[T],
    • class PulsatingWindowDStream[T: ClassTag](parent: DStream[T],

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@bijaybisht
Copy link
Contributor Author

Anything I need to do?

@tdas
Copy link
Contributor

tdas commented Nov 11, 2014

@bijaybisht Hey this is pretty cool PR! Sorry that we hadnt been able to review this. Could you please update the title with JIRA number (see other PRs to understand what it should look like).

@bijaybisht bijaybisht changed the title Event proration based on event timestamps. [https://issues.apache.org/jira/browse/SPARK-4392] Event proration based on event timestamps. Nov 14, 2014
@tdas
Copy link
Contributor

tdas commented Dec 25, 2014

Hey @bijaybisht

Sorry for no response from our side for a while. This feature is a very good extension of the streaming functionality. However, right now, we are focusing on fault-tolerance, robustness, and stability of Spark Streaming and this feature does not quite fit in the roadmap.

On that note, we have started spark-packages.org as a way for active community members such as you can contribute features to the community and maintain them themselves. This decouples the speed at which the maintainers can manage the overall roadmap of the project from the speed at which contributors would like to develop applications and features on Spark. Please consider adding this extension to spark-packages.org so that others can find and use this.

In the meantime, mind closing this PR?

@asfgit asfgit closed this in 534f24b Dec 27, 2014
@mnd999
Copy link

mnd999 commented Nov 26, 2015

Did this ever make it to spark-packages.org? If not, it's a shame, but would not be entirely surprising given how this PR was handled.

@hiboyang
Copy link

hiboyang commented Apr 25, 2016

Now Spark has improved a lot in fault-tolerance, robustness, and stability. Could we revisit this PR?

There is a lot of value supporting event timestamp, since it is often needed when people do real production streaming data processing. It may be a blocking issue for some people to adopt Spark Streaming.

@saurabhdaftary
Copy link

This would be a very useful feature to add-on. As @boy-uber stated would love to see this getting in?

@Mobe91
Copy link

Mobe91 commented May 8, 2017

When is this going the be merged? Would be really useful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants