Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.apache.spark.sql.streamv1

import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Column, DataFrame}


/**
* Version A: A single StreamFrame abstraction to represent unwindowed stream and windowed stream.
*
* There are two alternatives for the blocking operations in StreamFrame when it is not windowed:
*
* A1. Blocking operations return new StreamFrames, and emits a new tuple for every update.
* As an example, sf.groupby("id").count() will emit a tuple every time we see a new record
* for "id", i.e. a running count. Note that these operations will be expensive, because
* they require ordering all the inputs by time.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "emitting" a tuple mean? In other words, how does this API tie in with event time and stuff like that?

Generally, the way I understood event time is that you want to define a query based on the event time in the data, and then evaluate it at possibly multiple points in "real" (processing) time to get a different answer for the same event time window.

If you mean that this kind of stream is the same as one with an infinite window, that does make sense.

*
* A2. Blocking operation throw runtime exceptions saying they are not supported.
*/
class StreamFrame {

/////////////////////////////////////////////////////////////////////
// Meta operations
/////////////////////////////////////////////////////////////////////

def schema: StructType = ???

def dtypes: Array[(String, String)] = ???

def columns: Array[String] = ???

def printSchema(): Unit = ???

def explain(extended: Boolean): Unit = ???

/////////////////////////////////////////////////////////////////////
// Window specification
/////////////////////////////////////////////////////////////////////

def window(window: WindowSpec): StreamFrame = ???

/////////////////////////////////////////////////////////////////////
// Pipelined operations:
// - works only within a bounded dataset.
// - throws runtime exception if called on an unbounded dataset.
/////////////////////////////////////////////////////////////////////

def select(cols: Column*): StreamFrame = ???

def filter(condition: Column): StreamFrame = ???

def drop(col: Column): StreamFrame = ???

def withColumn(colName: String, col: Column): StreamFrame = ???

def withColumnRenamed(existingName: String, newName: String): StreamFrame = ???

def join(right: DataFrame): StreamFrame = ???

def write: StreamFrameWriter = ???

/////////////////////////////////////////////////////////////////////
// Blocking operations: works only within a window
/////////////////////////////////////////////////////////////////////

def agg(exprs: Column*): StreamFrame = ???

def groupby(cols: Column*): GroupedStreamFrame = ???
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do these interact with windows? I think people will want a separate window per key for some things, like sessionization. It would be nice to see all the possible window and grouping specs.


def cube(cols: Column*): GroupedStreamFrame = ???

def rollup(cols: Column*): GroupedStreamFrame = ???

def sort(sortExprs: Column*): StreamFrame = ???

def dropDuplicates(colNames: Seq[String]): StreamFrame = ???

def distinct(): StreamFrame = ???
}


class GroupedStreamFrame {

def agg(exprs: Column*): StreamFrame = ???

def avg(colNames: String*): StreamFrame = ???

// ...
}


class StreamFrameWriter {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.spark.sql.streamv1

trait WindowSpec
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore this file for now.


class TimeBasedWindow private() extends WindowSpec {
def over(length: Long): TimeBasedWindow = ???
def every(interval: Long): TimeBasedWindow = ???
}

object TimeBasedWindow {
def over(length: Long): TimeBasedWindow = {
new TimeBasedWindow().over(length)
}

def every(interval: Long): TimeBasedWindow = {
new TimeBasedWindow().every(interval)
}
}


class GlobalWindow private (interval: Long) extends WindowSpec

object GlobalWindow {
def every(interval: Long): GlobalWindow = {
new GlobalWindow(interval)
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package org.apache.spark.sql.streamv2

import org.apache.spark.sql.streamv1.WindowSpec
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Column}


/**
* Version B: A StreamFrame, and a WindowedStreamFrame, which can be created by StreamFrame.window.
*
* Blocking operations are only available on WindowedStreamFrame.
*/
class StreamFrame {
/////////////////////////////////////////////////////////////////////
// Meta operations
/////////////////////////////////////////////////////////////////////

def schema: StructType = ???

def dtypes: Array[(String, String)] = ???

def columns: Array[String] = ???

def printSchema(): Unit = ???

def explain(extended: Boolean): Unit = ???

/////////////////////////////////////////////////////////////////////
// Window specification
/////////////////////////////////////////////////////////////////////

def window(window: WindowSpec): WindowedStreamFrame = ???

/////////////////////////////////////////////////////////////////////
// Pipelined operations:
// - works only within a bounded dataset.
// - throws runtime exception if called on an unbounded dataset.
/////////////////////////////////////////////////////////////////////

def select(cols: Column*): StreamFrame = ???

def filter(condition: Column): StreamFrame = ???

def drop(col: Column): StreamFrame = ???

def withColumn(colName: String, col: Column): StreamFrame = ???

def withColumnRenamed(existingName: String, newName: String): StreamFrame = ???

def join(right: DataFrame): StreamFrame = ???

def write: StreamFrameWriter = ???

}


/**
* A WindowedStreamFrame can run all the operations available on StreamFrame, and also blocking
* operations.
*/
class WindowedStreamFrame extends StreamFrame {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't this be a DataFrame?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Half of the DataFrame functions are about interactivity -- it doesn't make sense to have them in a StreamFrame.

We can however have a common ancestor for them -- but that can be done for all the 3 alternatives here -- I'd like to defer that to a separate discussion later.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


/////////////////////////////////////////////////////////////////////
// Blocking operations: works only within a window
/////////////////////////////////////////////////////////////////////

def agg(exprs: Column*): StreamFrame = ???

def groupby(cols: Column*): GroupedStreamFrame = ???

def cube(cols: Column*): GroupedStreamFrame = ???

def rollup(cols: Column*): GroupedStreamFrame = ???

def sort(sortExprs: Column*): StreamFrame = ???

def dropDuplicates(colNames: Seq[String]): StreamFrame = ???

def distinct(): StreamFrame = ???
}


class GroupedStreamFrame {

def agg(exprs: Column*): StreamFrame = ???

def avg(colNames: String*): StreamFrame = ???

// ...
}


class StreamFrameWriter {

}