Skip to content

Conversation

@uncleGen
Copy link
Contributor

@uncleGen uncleGen commented Mar 21, 2017

What changes were proposed in this pull request?

reproduce code:

import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
  .builder\
  .appName("StructuredKafkaWordCount")\
  .getOrCreate()

lines = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", bootstrapServers)\
  .option(subscribeType, topics)\
  .load()\
  .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

words = lines.select(explode(split(lines.value, ' ')).alias('word'),lines.timestamp)

windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
window(words.timestamp, "30 seconds", "30 seconds"), words.word
).count()

query = windowedCounts\
  .writeStream\
  .outputMode('append')\
  .format('console')\ 
  .option("truncate", "false")\
  .start()
query.awaitTermination()

An exception was thrown:

pyspark.sql.utils.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Aggregate [window#32, word#21], [window#32 AS window#26, word#21, count(1) AS count#31L]
+- Filter ((timestamp#16 >= window#32.start) && (timestamp#16 < window#32.end))
   +- Expand [ArrayBuffer(named_struct(start, ...]
      +- EventTimeWatermark timestamp#16: timestamp, interval 10 seconds
         +- Project [word#21, timestamp#16]
            +- Generate explode(split(value#15,  )), true, false, [word#21]
               +- Project [cast(value#1 as string) AS value#15, cast(timestamp#5 as timestamp) AS timestamp#16]
                  +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession ...]

IIUC, the root cause is: words.withWatermark("timestamp", "30 seconds") add the watermark metadata into time column, but in groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ), the words.timestamp miss the metadata. At last, it failed to pass the check:

use @viirya 's more clear explanation:

For now, after withWatermark, we only update the metadata for the column of event time. The expression id is the same. So once we use the column before adding watermark words.timestamp as grouping expression, it binds to the old attribute before watermarking.

if (watermarkAttributes.isEmpty) {
      throwError(
            s"$outputMode output mode not supported when there are streaming aggregations on " +
                s"streaming DataFrames/DataSets without watermark")(plan)
}

In this pr, pass a UnresolvedAttribute to window instead of a Column

How was this patch tested?

Jenkins


sc = SparkContext._active_spark_context
time_col = _to_java_column(timeColumn)
if isinstance(timeColumn, Column):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, doesn't this break the current API?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, it is OK for current codebase. Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this change, you can't pass in a Column. But it is supported for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Sounds reasonable, I pushed an update, take a review please.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74959 has finished for PR 17371 at commit 654c512.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74967 has finished for PR 17371 at commit 890c6e6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Mar 21, 2017

For now, after withWatermark, we only update the metadata for the column of event time. The expression id is the same. So once we use the column before adding watermark words.timestamp as grouping expression, it binds to the old attribute before watermarking.

I am thinking, should we create new expression id for the watermarking column with withWatermark? So we must write the query like:

wordsWithWatermark = words.withWatermark("timestamp", "30 seconds")
windowedCounts = wordsWithWatermark.groupBy(window(wordsWithWatermark.timestamp, "30 seconds", "30 seconds"), wordsWithWatermark.word).count()

@uncleGen
Copy link
Contributor Author

@viirya Great, you give a more clear explanation.

I am thinking, should we create new expression id for the watermarking column with withWatermark? So we must write the query like:

It really can fix this problem, but not very user-friendly.

@viirya
Copy link
Member

viirya commented Mar 21, 2017

IMHO, the output after withWatermark should be new attribute and have new expression id. Maybe @zsxwing @marmbrus have more insights on this?

Btw, does this issue also happen in Scala code?

@marmbrus
Copy link
Contributor

I really think the core problem here is that we allow you to use resolved attributes at all in the user API. Unfortunately we are somewhat stuck with that bad decision. Personally, I never use df['col'] and only ever use col("col") since that avoids the problem.

However, I don't think that piecemeal switching to unresolved attributes is a good idea.

@viirya
Copy link
Member

viirya commented Mar 21, 2017

Unfortunately, yes, allowing resolved attributes in user API will have this kind of trouble.

However, I don't think that piecemeal switching to unresolved attributes is a good idea.

Agreed. Should we create new attributes after withWatermark to avoid the problem? It might be cumbersome from the user side, however.

@marmbrus
Copy link
Contributor

I don't think that will solve the problem though. You will just get a different error message.

@viirya
Copy link
Member

viirya commented Mar 22, 2017

yeah, I just tried it. IncrementalExecution will re-new the attribute for each batch. Although we can replace the attribute...

@marmbrus
Copy link
Contributor

marmbrus commented Jun 2, 2017

Can we add an analysis rule that just pulls up missing metadata from attributes in the child? It could run once after other rules.

@HyukjinKwon
Copy link
Member

gentle ping @uncleGen, is this PR still active?

@asfgit asfgit closed this in 3a45c7f Aug 5, 2017
zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
## What changes were proposed in this pull request?

This PR proposes to close stale PRs, mostly the same instances with apache#18017

Closes apache#14085 - [SPARK-16408][SQL] SparkSQL Added file get Exception: is a directory …
Closes apache#14239 - [SPARK-16593] [CORE] [WIP] Provide a pre-fetch mechanism to accelerate shuffle stage.
Closes apache#14567 - [SPARK-16992][PYSPARK] Python Pep8 formatting and import reorganisation
Closes apache#14579 - [SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers
Closes apache#14601 - [SPARK-13979][Core] Killed executor is re spawned without AWS key…
Closes apache#14830 - [SPARK-16992][PYSPARK][DOCS] import sort and autopep8 on Pyspark examples
Closes apache#14963 - [SPARK-16992][PYSPARK] Virtualenv for Pylint and pep8 in lint-python
Closes apache#15227 - [SPARK-17655][SQL]Remove unused variables declarations and definations in a WholeStageCodeGened stage
Closes apache#15240 - [SPARK-17556] [CORE] [SQL] Executor side broadcast for broadcast joins
Closes apache#15405 - [SPARK-15917][CORE] Added support for number of executors in Standalone [WIP]
Closes apache#16099 - [SPARK-18665][SQL] set statement state to "ERROR" after user cancel job
Closes apache#16445 - [SPARK-19043][SQL]Make SparkSQLSessionManager more configurable
Closes apache#16618 - [SPARK-14409][ML][WIP] Add RankingEvaluator
Closes apache#16766 - [SPARK-19426][SQL] Custom coalesce for Dataset
Closes apache#16832 - [SPARK-19490][SQL] ignore case sensitivity when filtering hive partition columns
Closes apache#17052 - [SPARK-19690][SS] Join a streaming DataFrame with a batch DataFrame which has an aggregation may not work
Closes apache#17267 - [SPARK-19926][PYSPARK] Make pyspark exception more user-friendly
Closes apache#17371 - [SPARK-19903][PYSPARK][SS] window operator miss the `watermark` metadata of time column
Closes apache#17401 - [SPARK-18364][YARN] Expose metrics for YarnShuffleService
Closes apache#17519 - [SPARK-15352][Doc] follow-up: add configuration docs for topology-aware block replication
Closes apache#17530 - [SPARK-5158] Access kerberized HDFS from Spark standalone
Closes apache#17854 - [SPARK-20564][Deploy] Reduce massive executor failures when executor count is large (>2000)
Closes apache#17979 - [SPARK-19320][MESOS][WIP]allow specifying a hard limit on number of gpus required in each spark executor when running on mesos
Closes apache#18127 - [SPARK-6628][SQL][Branch-2.1] Fix ClassCastException when executing sql statement 'insert into' on hbase table
Closes apache#18236 - [SPARK-21015] Check field name is not null and empty in GenericRowWit…
Closes apache#18269 - [SPARK-21056][SQL] Use at most one spark job to list files in InMemoryFileIndex
Closes apache#18328 - [SPARK-21121][SQL] Support changing storage level via the spark.sql.inMemoryColumnarStorage.level variable
Closes apache#18354 - [SPARK-18016][SQL][CATALYST][BRANCH-2.1] Code Generation: Constant Pool Limit - Class Splitting
Closes apache#18383 - [SPARK-21167][SS] Set kafka clientId while fetch messages
Closes apache#18414 - [SPARK-21169] [core] Make sure to update application status to RUNNING if executors are accepted and RUNNING after recovery
Closes apache#18432 - resolve com.esotericsoftware.kryo.KryoException
Closes apache#18490 - [SPARK-21269][Core][WIP] Fix FetchFailedException when enable maxReqSizeShuffleToMem and KryoSerializer
Closes apache#18585 - SPARK-21359
Closes apache#18609 - Spark SQL merge small files to big files Update InsertIntoHiveTable.scala

Added:
Closes apache#18308 - [SPARK-21099][Spark Core] INFO Log Message Using Incorrect Executor I…
Closes apache#18599 - [SPARK-21372] spark writes one log file even I set the number of spark_rotate_log to 0
Closes apache#18619 - [SPARK-21397][BUILD]Maven shade plugin adding dependency-reduced-pom.xml to …
Closes apache#18667 - Fix the simpleString used in error messages
Closes apache#18782 - Branch 2.1

Added:
Closes apache#17694 - [SPARK-12717][PYSPARK] Resolving race condition with pyspark broadcasts when using multiple threads

Added:
Closes apache#16456 - [SPARK-18994] clean up the local directories for application in future by annother thread
Closes apache#18683 - [SPARK-21474][CORE] Make number of parallel fetches from a reducer configurable
Closes apache#18690 - [SPARK-21334][CORE] Add metrics reporting service to External Shuffle Server

Added:
Closes apache#18827 - Merge pull request 1 from apache/master

## How was this patch tested?

N/A

Author: hyukjinkwon <[email protected]>

Closes apache#18780 from HyukjinKwon/close-prs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants