Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ title: Structured Streaming Programming Guide
# Overview
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main title still says Experimental :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, good catch


**Structured Streaming is still ALPHA in Spark 2.1** and the APIs are still experimental. In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.
In this guide, we are going to walk you through the programming model and the APIs. First, let's start with a simple example - a streaming word count.

# Quick Example
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,14 @@

import scala.concurrent.duration.Duration;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;

/**
* :: Experimental ::
* Policy used to indicate how often results should be produced by a [[StreamingQuery]].
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
public class Trigger {

Copy link
Contributor

@tdas tdas May 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file has more places with ":: Experimental ::" in the scala docs

Expand Down
2 changes: 0 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2782,13 +2782,11 @@ class Dataset[T] private[sql](
}

/**
* :: Experimental ::
* Interface for saving the content of the streaming Dataset out into external storage.
*
* @group basic
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def writeStream: DataStreamWriter[T] = {
if (!isStreaming) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability

/**
* :: Experimental ::
* A class to consume data generated by a `StreamingQuery`. Typically this is used to send the
* generated data to external systems. Each partition will use a new deserialized instance, so you
* usually should do all the initialization (e.g. opening a connection or initiating a transaction)
Expand Down Expand Up @@ -66,7 +65,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
* }}}
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
abstract class ForeachWriter[T] extends Serializable {

Expand Down
8 changes: 1 addition & 7 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.reflect.runtime.universe.{typeTag, TypeTag}
import scala.util.Try
import scala.util.control.NonFatal

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down Expand Up @@ -2800,8 +2800,6 @@ object functions {
* @group datetime_funcs
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you intend to remove the evolving here?

Copy link
Contributor Author

@marmbrus marmbrus May 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did. This has been out since 2.0 and works in batch, so I don't think we can change it at this point.

def window(
timeColumn: Column,
windowDuration: String,
Expand Down Expand Up @@ -2854,8 +2852,6 @@ object functions {
* @group datetime_funcs
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def window(timeColumn: Column, windowDuration: String, slideDuration: String): Column = {
window(timeColumn, windowDuration, slideDuration, "0 second")
}
Expand Down Expand Up @@ -2893,8 +2889,6 @@ object functions {
* @group datetime_funcs
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
def window(timeColumn: Column, windowDuration: String): Column = {
window(timeColumn, windowDuration, windowDuration, "0 second")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.sql.types.StructType
*
* @since 2.0.0
*/
@Experimental
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

importing Experimental in this file is not used now.

@InterfaceStability.Evolving
final class DataStreamReader private[sql](sparkSession: SparkSession) extends Logging {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,19 @@ import java.util.Locale

import scala.collection.JavaConverters._

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.{AnalysisException, Dataset, ForeachWriter}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}

/**
* :: Experimental ::
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
* key-value stores, etc). Use `Dataset.writeStream` to access this.
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@ package org.apache.spark.sql.streaming

import java.util.UUID

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.sql.SparkSession

/**
* :: Experimental ::
* A handle to a query that is executing continuously in the background as new data arrives.
* All these methods are thread-safe.
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
trait StreamingQuery {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@

package org.apache.spark.sql.streaming

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability

/**
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
* @param message Message of this exception
Expand All @@ -29,7 +28,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
* @param endOffset Ending offset in json of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
class StreamingQueryException private[sql](
private val queryDebugString: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ package org.apache.spark.sql.streaming

import java.util.UUID

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.scheduler.SparkListenerEvent

/**
* :: Experimental ::
* Interface for listening to events related to [[StreamingQuery StreamingQueries]].
* @note The methods are not thread-safe as they may be called from different threads.
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
abstract class StreamingQueryListener {

Expand Down Expand Up @@ -66,50 +64,41 @@ abstract class StreamingQueryListener {


/**
* :: Experimental ::
* Companion object of [[StreamingQueryListener]] that defines the listener events.
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
object StreamingQueryListener {

/**
* :: Experimental ::
* Base type of [[StreamingQueryListener]] events
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
trait Event extends SparkListenerEvent

/**
* :: Experimental ::
* Event representing the start of a query
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
* @param runId A query id that is unique for every start/restart. See `StreamingQuery.runId()`.
* @param name User-specified name of the query, null if not specified.
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class QueryStartedEvent private[sql](
val id: UUID,
val runId: UUID,
val name: String) extends Event

/**
* :: Experimental ::
* Event representing any progress updates in a query.
* @param progress The query progress updates.
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) extends Event

/**
* :: Experimental ::
* Event representing that termination of a query.
*
* @param id An unique query id that persists across restarts. See `StreamingQuery.id()`.
Expand All @@ -118,7 +107,6 @@ object StreamingQueryListener {
* with an exception. Otherwise, it will be `None`.
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class QueryTerminatedEvent private[sql](
val id: UUID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable

import org.apache.hadoop.fs.Path

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
Expand All @@ -34,12 +34,10 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
* :: Experimental ::
* A class to manage all the [[StreamingQuery]] active on a `SparkSession`.
* A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability

/**
* :: Experimental ::
* Reports information about the instantaneous status of a streaming query.
*
* @param message A human readable description of what the stream is currently doing.
Expand All @@ -35,7 +34,6 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
*
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class StreamingQueryStatus protected[sql](
val message: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@ import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.annotation.InterfaceStability

/**
* :: Experimental ::
* Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger.
*/
@Experimental
@InterfaceStability.Evolving
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
Expand All @@ -54,7 +52,6 @@ class StateOperatorProgress private[sql](
}

/**
* :: Experimental ::
* Information about progress made in the execution of a [[StreamingQuery]] during
* a trigger. Each event relates to processing done for a single trigger of the streaming
* query. Events are emitted even when no new data is available to be processed.
Expand All @@ -80,7 +77,6 @@ class StateOperatorProgress private[sql](
* @param sources detailed statistics on data being read from each of the streaming sources.
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class StreamingQueryProgress private[sql](
val id: UUID,
Expand Down Expand Up @@ -139,7 +135,6 @@ class StreamingQueryProgress private[sql](
}

/**
* :: Experimental ::
* Information about progress made for a source in the execution of a [[StreamingQuery]]
* during a trigger. See [[StreamingQueryProgress]] for more information.
*
Expand All @@ -152,7 +147,6 @@ class StreamingQueryProgress private[sql](
* Spark.
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class SourceProgress protected[sql](
val description: String,
Expand Down Expand Up @@ -191,14 +185,12 @@ class SourceProgress protected[sql](
}

/**
* :: Experimental ::
* Information about progress made for a sink in the execution of a [[StreamingQuery]]
* during a trigger. See [[StreamingQueryProgress]] for more information.
*
* @param description Description of the source corresponding to this status.
* @since 2.1.0
*/
@Experimental
@InterfaceStability.Evolving
class SinkProgress protected[sql](
val description: String) extends Serializable {
Expand Down