Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
fe44676
[SPARK-39963][SQL][TESTS][FOLLOW-UP] Disable ANSI mode for test of ca…
HyukjinKwon Aug 8, 2022
1986747
[SPARK-39988][CORE] Use `try-with-resource` to ensure `DBIterator` is…
LuciferYang Aug 8, 2022
bf42b89
[MINOR][SQL] Improve the comments about null tracking for UnsafeRow
beliefer Aug 8, 2022
b02316c
[SPARK-39764][SQL] Make PhysicalOperation the same as ScanOperation
cloud-fan Aug 8, 2022
f97515e
[SPARK-39973][CORE] Suppress error logs when the number of timers is …
HyukjinKwon Aug 8, 2022
5c9175c
[SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl
cloud-fan Aug 8, 2022
87b312a
[SPARK-39965][K8S] Skip PVC cleanup when driver doesn't own PVCs
dongjoon-hyun Aug 8, 2022
1fed903
[SPARK-40004][CORE] Remove redundant `LevelDB.get` in `RemoteBlockPus…
LuciferYang Aug 8, 2022
0eef968
[SPARK-40003][PYTHON][SQL] Add 'median' to functions
zhengruifeng Aug 9, 2022
c9156e5
[SPARK-40002][SQL] Don't push down limit through window using ntile
bersprockets Aug 9, 2022
527cce5
[SPARK-40006][PYTHON][DOCS] Make pyspark.sql.group examples self-cont…
HyukjinKwon Aug 9, 2022
b012cb7
[SPARK-40007][PYTHON][SQL] Add 'mode' to functions
zhengruifeng Aug 9, 2022
07a5e5f
[SPARK-39819][SQL] DS V2 aggregate push down can work with Top N or P…
beliefer Aug 9, 2022
04ea0ac
[SPARK-40008][SQL] Support casting of integrals to ANSI intervals
MaxGekk Aug 9, 2022
8dc1064
[SPARK-39863][BUILD] Upgrade Hadoop to 3.3.4
sunchao Aug 9, 2022
fc82bc4
[SPARK-39863][BUILD][FOLLOWUP] Update dependency manifest for Hadoop 3
dongjoon-hyun Aug 9, 2022
663b208
[SPARK-40010][PYTHON][DOCS] Make pyspark.sql.window examples self-con…
dcoliversun Aug 9, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -4098,14 +4098,13 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
expect_equal(collect(c)[[1]][[1]], "speed")
expect_error(listColumns("zxwtyswklpf", "default"),
paste("Error in listColumns : analysis error - Table",
"'zxwtyswklpf' does not exist in database 'default'"))
paste("Table or view not found: spark_catalog.default.zxwtyswklpf"))

f <- listFunctions()
expect_true(nrow(f) >= 200) # 250
expect_equal(colnames(f),
c("name", "catalog", "namespace", "description", "className", "isTemporary"))
expect_equal(take(orderBy(f, "className"), 1)$className,
expect_equal(take(orderBy(filter(f, "className IS NOT NULL"), "className"), 1)$className,
"org.apache.spark.sql.catalyst.expressions.Abs")
expect_error(listFunctions("zxwtyswklpf_db"),
paste("Error in listFunctions : no such database - Database",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,18 +457,20 @@ static ConcurrentMap<AppExecId, ExecutorShuffleInfo> reloadRegisteredExecutors(D
throws IOException {
ConcurrentMap<AppExecId, ExecutorShuffleInfo> registeredExecutors = Maps.newConcurrentMap();
if (db != null) {
DBIterator itr = db.iterator();
itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
try (DBIterator itr = db.iterator()) {
itr.seek(APP_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_KEY_PREFIX)) {
break;
}
AppExecId id = parseDbAppExecKey(key);
logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo =
mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
AppExecId id = parseDbAppExecKey(key);
logger.info("Reloading registered executors: " + id.toString());
ExecutorShuffleInfo shuffleInfo = mapper.readValue(e.getValue(), ExecutorShuffleInfo.class);
registeredExecutors.put(id, shuffleInfo);
}
}
return registeredExecutors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,7 @@ void removeAppAttemptPathInfoFromDB(String appId, int attemptId) {
if (db != null) {
try {
byte[] key = getDbAppAttemptPathsKey(appAttemptId);
if (db.get(key) != null) {
db.delete(key);
}
db.delete(key);
} catch (Exception e) {
logger.error("Failed to remove the application attempt {} local path in DB",
appAttemptId, e);
Expand Down Expand Up @@ -909,39 +907,40 @@ void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
if (db != null) {
DBIterator itr = db.iterator();
itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
break;
}
AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class);
logger.debug("Reloading Application paths info for application {}", appAttemptId);
appsShuffleInfo.compute(appAttemptId.appId,
(appId, existingAppShuffleInfo) -> {
if (existingAppShuffleInfo == null ||
existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
if (existingAppShuffleInfo != null) {
AppAttemptId existingAppAttemptId = new AppAttemptId(
existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId);
try {
// Add the former outdated DB key to deletion list
dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId));
} catch (IOException e) {
logger.error("Failed to get the DB key for {}", existingAppAttemptId, e);
try (DBIterator itr = db.iterator()) {
itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
break;
}
AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class);
logger.debug("Reloading Application paths info for application {}", appAttemptId);
appsShuffleInfo.compute(appAttemptId.appId,
(appId, existingAppShuffleInfo) -> {
if (existingAppShuffleInfo == null ||
existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
if (existingAppShuffleInfo != null) {
AppAttemptId existingAppAttemptId = new AppAttemptId(
existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId);
try {
// Add the former outdated DB key to deletion list
dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId));
} catch (IOException e) {
logger.error("Failed to get the DB key for {}", existingAppAttemptId, e);
}
}
return new AppShuffleInfo(
appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
} else {
// Add the current DB key to deletion list as it is outdated
dbKeysToBeRemoved.add(entry.getKey());
return existingAppShuffleInfo;
}
return new AppShuffleInfo(
appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
} else {
// Add the current DB key to deletion list as it is outdated
dbKeysToBeRemoved.add(entry.getKey());
return existingAppShuffleInfo;
}
});
});
}
}
}
return dbKeysToBeRemoved;
Expand All @@ -954,41 +953,44 @@ List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
List<byte[]> reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException {
List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
if (db != null) {
DBIterator itr = db.iterator();
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
break;
}
AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key);
logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId);
AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) {
appShuffleInfo.shuffles.compute(partitionId.shuffleId,
(shuffleId, existingMergePartitionInfo) -> {
if (existingMergePartitionInfo == null ||
existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) {
if (existingMergePartitionInfo != null) {
AppAttemptShuffleMergeId appAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId,
shuffleId, existingMergePartitionInfo.shuffleMergeId);
try{
dbKeysToBeRemoved.add(
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
} catch (Exception e) {
logger.error("Error getting the DB key for {}", appAttemptShuffleMergeId, e);
try (DBIterator itr = db.iterator()) {
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> entry = itr.next();
String key = new String(entry.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
break;
}
AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key);
logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId);
AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) {
appShuffleInfo.shuffles.compute(partitionId.shuffleId,
(shuffleId, existingMergePartitionInfo) -> {
if (existingMergePartitionInfo == null ||
existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) {
if (existingMergePartitionInfo != null) {
AppAttemptShuffleMergeId appAttemptShuffleMergeId =
new AppAttemptShuffleMergeId(
appShuffleInfo.appId, appShuffleInfo.attemptId,
shuffleId, existingMergePartitionInfo.shuffleMergeId);
try{
dbKeysToBeRemoved.add(
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
} catch (Exception e) {
logger.error("Error getting the DB key for {}",
appAttemptShuffleMergeId, e);
}
}
return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
} else {
dbKeysToBeRemoved.add(entry.getKey());
return existingMergePartitionInfo;
}
return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
} else {
dbKeysToBeRemoved.add(entry.getKey());
return existingMergePartitionInfo;
}
});
} else {
dbKeysToBeRemoved.add(entry.getKey());
});
} else {
dbKeysToBeRemoved.add(entry.getKey());
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,18 +341,19 @@ private void loadSecretsFromDb() throws IOException {
logger.info("Recovery location is: " + secretsFile.getPath());
if (db != null) {
logger.info("Going to reload spark shuffle data");
DBIterator itr = db.iterator();
itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
break;
try (DBIterator itr = db.iterator()) {
itr.seek(APP_CREDS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
while (itr.hasNext()) {
Map.Entry<byte[], byte[]> e = itr.next();
String key = new String(e.getKey(), StandardCharsets.UTF_8);
if (!key.startsWith(APP_CREDS_KEY_PREFIX)) {
break;
}
String id = parseDbAppKey(key);
ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
logger.info("Reloading tokens for app: " + id);
secretManager.registerApp(id, secret);
}
String id = parseDbAppKey(key);
ByteBuffer secret = mapper.readValue(e.getValue(), ByteBuffer.class);
logger.info("Reloading tokens for app: " + id);
secretManager.registerApp(id, secret);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,8 +931,12 @@ package object config {
private[spark] val LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED =
ConfigBuilder("spark.scheduler.listenerbus.metrics.maxListenerClassesTimed")
.internal()
.doc("The number of listeners that have timers to track the elapsed time of" +
"processing events. If 0 is set, disables this feature. If -1 is set," +
"it sets no limit to the number.")
.version("2.3.0")
.intConf
.checkValue(_ >= -1, "The number of listeners should be larger than -1.")
.createWithDefault(128)

private[spark] val LISTENER_BUS_LOG_SLOW_EVENT_ENABLED =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,14 @@ private[spark] class LiveListenerBusMetrics(conf: SparkConf)
val maxTimed = conf.get(LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED)
perListenerClassTimers.get(className).orElse {
if (perListenerClassTimers.size == maxTimed) {
logError(s"Not measuring processing time for listener class $className because a " +
s"maximum of $maxTimed listener classes are already timed.")
if (maxTimed != 0) {
// Explicitly disabled.
logError(s"Not measuring processing time for listener class $className because a " +
s"maximum of $maxTimed listener classes are already timed.")
}
None
} else {
// maxTimed is either -1 (no limit), or an explicit number.
perListenerClassTimers(className) =
metricRegistry.timer(MetricRegistry.name("listenerProcessingTime", className))
perListenerClassTimers.get(className)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,22 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == Some(2))
}

test("SPARK-39973: Suppress error logs when the number of timers is set to 0") {
sc = new SparkContext(
"local",
"SparkListenerSuite",
new SparkConf().set(
LISTENER_BUS_METRICS_MAX_LISTENER_CLASSES_TIMED.key, 0.toString))
val testAppender = new LogAppender("Error logger for timers")
withLogAppender(testAppender) {
sc.addSparkListener(new SparkListener { })
sc.addSparkListener(new SparkListener { })
}
assert(!testAppender.loggingEvents
.exists(_.getMessage.getFormattedMessage.contains(
"Not measuring processing time for listener")))
}

/**
* Assert that the given list of numbers has an average that is greater than zero.
*/
Expand Down
24 changes: 11 additions & 13 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ automaton/1.11-8//automaton-1.11-8.jar
avro-ipc/1.11.0//avro-ipc-1.11.0.jar
avro-mapred/1.11.0//avro-mapred-1.11.0.jar
avro/1.11.0//avro-1.11.0.jar
aws-java-sdk-bundle/1.11.1026//aws-java-sdk-bundle-1.11.1026.jar
aws-java-sdk-bundle/1.12.262//aws-java-sdk-bundle-1.12.262.jar
azure-data-lake-store-sdk/2.3.9//azure-data-lake-store-sdk-2.3.9.jar
azure-keyvault-core/1.0.0//azure-keyvault-core-1.0.0.jar
azure-storage/7.0.1//azure-storage-7.0.1.jar
Expand All @@ -52,7 +52,6 @@ commons-math3/3.6.1//commons-math3-3.6.1.jar
commons-pool/1.5.4//commons-pool-1.5.4.jar
commons-text/1.9//commons-text-1.9.jar
compress-lzf/1.1//compress-lzf-1.1.jar
cos_api-bundle/5.6.19//cos_api-bundle-5.6.19.jar
curator-client/2.13.0//curator-client-2.13.0.jar
curator-framework/2.13.0//curator-framework-2.13.0.jar
curator-recipes/2.13.0//curator-recipes-2.13.0.jar
Expand All @@ -66,18 +65,17 @@ generex/1.0.2//generex-1.0.2.jar
gmetric4j/1.0.10//gmetric4j-1.0.10.jar
gson/2.2.4//gson-2.2.4.jar
guava/14.0.1//guava-14.0.1.jar
hadoop-aliyun/3.3.3//hadoop-aliyun-3.3.3.jar
hadoop-annotations/3.3.3//hadoop-annotations-3.3.3.jar
hadoop-aws/3.3.3//hadoop-aws-3.3.3.jar
hadoop-azure-datalake/3.3.3//hadoop-azure-datalake-3.3.3.jar
hadoop-azure/3.3.3//hadoop-azure-3.3.3.jar
hadoop-client-api/3.3.3//hadoop-client-api-3.3.3.jar
hadoop-client-runtime/3.3.3//hadoop-client-runtime-3.3.3.jar
hadoop-cloud-storage/3.3.3//hadoop-cloud-storage-3.3.3.jar
hadoop-cos/3.3.3//hadoop-cos-3.3.3.jar
hadoop-openstack/3.3.3//hadoop-openstack-3.3.3.jar
hadoop-aliyun/3.3.4//hadoop-aliyun-3.3.4.jar
hadoop-annotations/3.3.4//hadoop-annotations-3.3.4.jar
hadoop-aws/3.3.4//hadoop-aws-3.3.4.jar
hadoop-azure-datalake/3.3.4//hadoop-azure-datalake-3.3.4.jar
hadoop-azure/3.3.4//hadoop-azure-3.3.4.jar
hadoop-client-api/3.3.4//hadoop-client-api-3.3.4.jar
hadoop-client-runtime/3.3.4//hadoop-client-runtime-3.3.4.jar
hadoop-cloud-storage/3.3.4//hadoop-cloud-storage-3.3.4.jar
hadoop-openstack/3.3.4//hadoop-openstack-3.3.4.jar
hadoop-shaded-guava/1.1.1//hadoop-shaded-guava-1.1.1.jar
hadoop-yarn-server-web-proxy/3.3.3//hadoop-yarn-server-web-proxy-3.3.3.jar
hadoop-yarn-server-web-proxy/3.3.4//hadoop-yarn-server-web-proxy-3.3.4.jar
hive-beeline/2.3.9//hive-beeline-2.3.9.jar
hive-cli/2.3.9//hive-cli-2.3.9.jar
hive-common/2.3.9//hive-common-2.3.9.jar
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.18.0</log4j.version>
<!-- make sure to update IsolatedClientLoader whenever this version is changed -->
<hadoop.version>3.3.3</hadoop.version>
<hadoop.version>3.3.4</hadoop.version>
<protobuf.version>2.5.0</protobuf.version>
<yarn.version>${hadoop.version}</yarn.version>
<zookeeper.version>3.6.2</zookeeper.version>
Expand Down
2 changes: 2 additions & 0 deletions python/docs/source/reference/pyspark.sql/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,10 @@ Aggregate Functions
max
max_by
mean
median
min
min_by
mode
percentile_approx
product
skewness
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ def getDatabase(self, dbName: str) -> Database:
Examples
--------
>>> spark.catalog.getDatabase("default")
Database(name='default', catalog=None, description='default database', ...
Database(name='default', catalog='spark_catalog', description='default database', ...
>>> spark.catalog.getDatabase("spark_catalog.default")
Database(name='default', catalog='spark_catalog', description='default database', ...
"""
Expand Down Expand Up @@ -376,9 +376,9 @@ def getFunction(self, functionName: str) -> Function:
--------
>>> func = spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
>>> spark.catalog.getFunction("my_func1")
Function(name='my_func1', catalog=None, namespace=['default'], ...
Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
>>> spark.catalog.getFunction("default.my_func1")
Function(name='my_func1', catalog=None, namespace=['default'], ...
Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
>>> spark.catalog.getFunction("spark_catalog.default.my_func1")
Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
>>> spark.catalog.getFunction("my_func2")
Expand Down
Loading