Skip to content

Conversation

@uncleGen
Copy link
Contributor

What changes were proposed in this pull request?

StatefulAggregationStrategy should check logicplan is streaming or not

Test code:

case class Record(key: Int, value: String)
val df = spark.createDataFrame((1 to 100).map(i => Record(i, s"value_$i"))).groupBy("value").count
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", "9999").load 
val words = lines.as[String].flatMap(_.split(" ")) 
val result = words.join(df, "value")

before pr:

== Physical Plan ==
*Project [value#13, count#19L]
+- *BroadcastHashJoin [value#13], [value#1], Inner, BuildRight
   :- *Filter isnotnull(value#13)
   :  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#13]
   :     +- MapPartitions <function1>, obj#12: java.lang.String
   :        +- DeserializeToObject value#5.toString, obj#11: java.lang.String
   :           +- StreamingRelation textSocket, [value#5]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *HashAggregate(keys=[value#1], functions=[count(1)])
         +- StateStoreSave [value#1], OperatorStateId(<unknown>,0,0), Append, 0
            +- *HashAggregate(keys=[value#1], functions=[merge_count(1)])
               +- StateStoreRestore [value#1], OperatorStateId(<unknown>,0,0)
                  +- *HashAggregate(keys=[value#1], functions=[merge_count(1)])
                     +- Exchange hashpartitioning(value#1, 200)
                        +- *HashAggregate(keys=[value#1], functions=[partial_count(1)])
                           +- *Project [value#1]
                              +- *Filter isnotnull(value#1)
                                 +- LocalTableScan [key#0, value#1]

after pr:

== Physical Plan ==
*Project [value#13, count#19L]
+- *BroadcastHashJoin [value#13], [value#1], Inner, BuildRight
   :- *Filter isnotnull(value#13)
   :  +- *SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true) AS value#13]
   :     +- MapPartitions <function1>, obj#12: java.lang.String
   :        +- DeserializeToObject value#5.toString, obj#11: java.lang.String
   :           +- StreamingRelation textSocket, [value#5]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, true]))
      +- *HashAggregate(keys=[value#1], functions=[count(1)])
         +- Exchange hashpartitioning(value#1, 200)
            +- *HashAggregate(keys=[value#1], functions=[partial_count(1)])
               +- *Project [value#1]
                  +- *Filter isnotnull(value#1)
                     +- LocalTableScan [key#0, value#1]

How was this patch tested?

add new unit test.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73407 has started for PR 17052 at commit e8a24e1.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73403 has finished for PR 17052 at commit 9eb57b7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 24, 2017

Test build #73412 has finished for PR 17052 at commit 9c15fcb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

cc @zsxwing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stateful indicates if the aggregate is base on streaming or batch, resolved by ResolveStatefulAggregate rule

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve one aggregate, determine statefule or not.

@zsxwing
Copy link
Member

zsxwing commented Feb 24, 2017

Thanks for doing this. I'm wondering if you can fix isStreaming instead. We added it to be able to distinguish batch and streaming dataframes. However, it doesn't work for batch DFs in a streaming query. My initial thought is making sure all streaming Source's getBatch should return a DataFrame whose isStreaming must return true. Could you try to resolve this issue in this direction?

@uncleGen
Copy link
Contributor Author

@zsxwing got it

@uncleGen
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 27, 2017

Test build #73507 has finished for PR 17052 at commit 38e3a14.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

working on unit test failure

@uncleGen uncleGen force-pushed the SPARK-19690 branch 2 times, most recently from c87651a to 59f4272 Compare February 28, 2017 03:53
@uncleGen
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73559 has finished for PR 17052 at commit 59f4272.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73558 has finished for PR 17052 at commit c87651a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73571 has started for PR 17052 at commit 9ffbad2.

SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1"
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "1",
SQLConf.UNSUPPORTED_OPERATION_CHECK_ENABLED.key -> "false"
) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Close the "UNSUPPORTED_OPERATION_CHECK_ENABLED", as Source.getBatch returns DF whose isStreaming is true.

} else {
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a streaming query, we will transfrom stream source to a batch LocalRelation whose isStreaming is true, so we should keep new LocalRelation's isStreaming is true in this rule.

case agg @ PhysicalAggregation(
namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child)
if agg.isStreaming =>

Copy link
Contributor Author

@uncleGen uncleGen Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apply this strategy only if the logical plan is streaming.

@uncleGen
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 28, 2017

Test build #73576 has finished for PR 17052 at commit 9ffbad2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

uncleGen commented Mar 2, 2017

\cc @zsxwing

@uncleGen
Copy link
Contributor Author

uncleGen commented Mar 3, 2017

ping @zsxwing


private var _analyzed: Boolean = false

private var _incremental: Boolean = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding it here will break sameResult, equals and other methods. Could you add a new parameter to the constructor of LogicalRelation and LogicalRDD instead?

@uncleGen uncleGen force-pushed the SPARK-19690 branch 2 times, most recently from c87651a to 67847e5 Compare March 7, 2017 11:56
case localRelation @ LocalRelation(_, _, false) =>
localRelation.dataFromStreaming = true
localRelation
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a new parameter dataFromStreaming to the constructor of LogicalRelation, LogicalRDD and LocalRelation. dataFromStreaming indicate if this relation comes from a streaming source. In a streaming query, stream relation will be cut into a series of batch relations.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74099 has finished for PR 17052 at commit 67847e5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class LocalRelation(

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74100 has finished for PR 17052 at commit 3fc31d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 7, 2017

Test build #74109 has finished for PR 17052 at commit 186be58.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 8, 2017

Test build #74206 has finished for PR 17052 at commit ff38db7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public static class LongWrapper
  • public static class IntWrapper
  • trait WatermarkSupport extends UnaryExecNode

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74844 has finished for PR 17052 at commit a1f9327.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 20, 2017

Test build #74852 has finished for PR 17052 at commit a1f9327.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@uncleGen
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74968 has finished for PR 17052 at commit a1f9327.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Mar 28, 2017

This is marked as "Critical" for 2.1.1, but I'm not clear it's a regression or that urgent?

@HyukjinKwon
Copy link
Member

@uncleGen is this still active?

@uncleGen
Copy link
Contributor Author

@HyukjinKwon Sorry! Busy for this period of time. Let me resolve this conflict.

@HyukjinKwon
Copy link
Member

Yea, I just wanted to check if it is in progress in any way. Thanks for your input.

@SparkQA
Copy link

SparkQA commented Jun 29, 2017

Test build #78895 has finished for PR 17052 at commit a1f9327.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 3a45c7f Aug 5, 2017
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
## What changes were proposed in this pull request?

This PR proposes to close stale PRs, mostly the same instances with apache#18017

Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory …
Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage.
Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation
Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers
Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key…
Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples
Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python
Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage
Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins
Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP]
Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job
Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable
Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator
Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset
Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns
Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work
Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly
Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column
Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService
Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication
Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone
Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000)
Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos
Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table
Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit…
Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex
Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable
Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting
Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages
Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery
Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException
Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer
Closes apache#18585 - SPARK-21359
Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala

Added:
Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I…
Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0
Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to …
Closes apache#18667 - Fix the simpleString used in error messages
Closes apache#18782 - Branch 2.1

Added:
Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads

Added:
Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread
Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable
Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server

Added:
Closes apache#18827 - Merge pull request 1 from apache/master

## How was this patch tested?

N/A

Author: hyukjinkwon <[email protected]>

Closes apache#18780 from HyukjinKwon/close-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants