Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchReadSupport
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.streaming.CustomMetrics
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, MicroBatchReadSupportProvider, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.ScanConfig;
import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupportProvider;

/**
* An interface that defines how to load the data from data source for continuous streaming
* processing.
*
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
* (e.g. {@link ContinuousReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of
* {@link ScanConfig} for the duration of the streaming query or until
* {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupportProvider;

/**
* An interface that defines how to scan the data from data source for micro-batch streaming
* processing.
*
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.MicroBatchReadSupportProvider}) at the start of a
* (e.g. {@link MicroBatchReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset, Offset)} and create an instance
* of {@link ScanConfig} for each micro-batch. The {@link ScanConfig} will be used to create input
* partitions and reader factory to scan a micro-batch with a Spark job. At the end {@link #stop()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.CustomMetrics;
import org.apache.spark.sql.sources.v2.streaming.CustomMetrics;

/**
* A mix in interface for {@link StreamingReadSupport}. Data sources can implement this interface
* to report custom metrics that gets reported under the
* {@link org.apache.spark.sql.streaming.SourceProgress}
*/
@InterfaceStability.Evolving
@InterfaceStability.Unstable
public interface SupportsCustomReaderMetrics extends StreamingReadSupport {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
package org.apache.spark.sql.sources.v2.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
import org.apache.spark.sql.types.StructType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
package org.apache.spark.sql.sources.v2.streaming;

import org.apache.spark.annotation.InterfaceStability;

/**
* An interface for reporting custom metrics from streaming sources and sinks
*/
@InterfaceStability.Evolving
@InterfaceStability.Unstable
public interface CustomMetrics {
/**
* Returns a JSON serialized representation of custom metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
package org.apache.spark.sql.sources.v2.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReadSupport;
import org.apache.spark.sql.types.StructType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
package org.apache.spark.sql.sources.v2.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.spark.sql.sources.v2.writer.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.CustomMetrics;
import org.apache.spark.sql.sources.v2.streaming.CustomMetrics;

/**
* A mix in interface for {@link StreamingWriteSupport}. Data sources can implement this interface
* to report custom metrics that gets reported under the
* {@link org.apache.spark.sql.streaming.SinkProgress}
*/
@InterfaceStability.Evolving
@InterfaceStability.Unstable
public interface SupportsCustomWriterMetrics extends StreamingWriteSupport {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relat
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupportProvider, StreamingWriteSupportProvider}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.streaming.sources.MicroBatchWritSupport
import org.apache.spark.sql.sources.v2.CustomMetrics
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, SupportsCustomReaderMetrics}
import org.apache.spark.sql.sources.v2.streaming.CustomMetrics
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingWriteSupport, SupportsCustomWriterMetrics}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.StreamingQueryListener.QueryProgressEvent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceV2}
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupportProvider

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriteSupport
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.streaming.StreamingWriteSupportProvider
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, StreamingWriteSupportProvider}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.{Encoder, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig, ScanConfigBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupportProvider
import org.apache.spark.util.RpcUtils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.python.PythonForeachWriter
import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.StreamingWriteSupportProvider
import org.apache.spark.sql.sources.v2.writer.{DataWriter, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport}
import org.apache.spark.sql.streaming.OutputMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousR
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, MicroBatchReadSupportProvider}
import org.apache.spark.sql.types._

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink}
import org.apache.spark.sql.sources.v2.{CustomMetrics, DataSourceOptions, DataSourceV2, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.streaming.{CustomMetrics, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWriteSupport, SupportsCustomWriterMetrics}
import org.apache.spark.sql.streaming.OutputMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{LongOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder}
import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, DataSourceV2, MicroBatchReadSupportProvider}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchReadSupport, Offset}
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, MicroBatchReadSupportProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, MicroBatchReadSupportProvider}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, MicroBatchReadSupportProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
import org.apache.spark.sql.sources.v2.streaming.StreamingWriteSupportProvider

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution,
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
import org.apache.spark.sql.sources.v2.StreamingWriteSupportProvider
import org.apache.spark.sql.sources.v2.streaming.StreamingWriteSupportProvider
import org.apache.spark.util.{Clock, SystemClock, Utils}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, MicroBatchReadSupportProvider}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, MicroBatchReadSupportProvider}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.util.ManualClock

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupportProvider}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupportProvider
import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, ScanConfig, ScanConfigBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupportProvider, MicroBatchReadSupportProvider, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
import org.apache.spark.sql.types.StructType
Expand Down