Skip to content
Merged

sync #17

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
91cd06b
[SPARK-8981][CORE][FOLLOW-UP] Clean up MDC properties after running a…
Ngone51 Jun 11, 2020
11d3a74
[SPARK-31705][SQL] Push more possible predicates through Join via CNF…
gengliangwang Jun 11, 2020
b1adc3d
[SPARK-21117][SQL] Built-in SQL Function Support - WIDTH_BUCKET
maropu Jun 11, 2020
88a4e55
[SPARK-31765][WEBUI][TEST-MAVEN] Upgrade HtmlUnit >= 2.37.0
sarutak Jun 11, 2020
b87a342
[SPARK-31916][SQL] StringConcat can lead to StringIndexOutOfBoundsExc…
dilipbiswal Jun 12, 2020
78f9043
[SPARK-31912][SQL][TESTS] Normalize all binary comparison expressions
wangyum Jun 12, 2020
c259844
[SPARK-31959][SQL][TEST-JAVA11] Fix Gregorian-Julian micros rebasing …
MaxGekk Jun 12, 2020
ff89b11
[SPARK-31736][SQL] Nested column aliasing for RepartitionByExpression…
viirya Jun 12, 2020
d3a5e29
Revert "[SPARK-31860][BUILD] only push release tags on succes"
cloud-fan Jun 12, 2020
9b098f1
[SPARK-30119][WEBUI] Support pagination for streaming tab
iRakson Jun 12, 2020
28f131f
[SPARK-31979] Release script should not fail when remove non-existing…
cloud-fan Jun 12, 2020
78d08a8
[SPARK-31950][SQL][TESTS] Extract SQL keywords from the SqlBase.g4 file
maropu Jun 12, 2020
a620a2a
[SPARK-31977][SQL] Returns the plan directly from NestedColumnAliasing
HyukjinKwon Jun 12, 2020
f535004
[SPARK-31967][UI] Downgrade to vis.js 4.21.0 to fix Jobs UI loading t…
gengliangwang Jun 13, 2020
89c98a4
[SPARK-31944] Add instance weight support in LinearRegressionSummary
huaxingao Jun 13, 2020
610acb2
[SPARK-31644][BUILD][FOLLOWUP] Make Spark's guava version configurabl…
sarutak Jun 14, 2020
c2e5012
[SPARK-31632][CORE][WEBUI][FOLLOWUP] Enrich the exception message whe…
sarutak Jun 14, 2020
a4ea599
[SPARK-31968][SQL] Duplicate partition columns check when writing data
TJX2014 Jun 14, 2020
84815d0
[SPARK-24634][SS] Add a new metric regarding number of inputs later t…
HeartSaVioR Jun 14, 2020
1e40bcc
[SPARK-31593][SS] Remove unnecessary streaming query progress update
uncleGen Jun 14, 2020
54e702c
[SPARK-31970][CORE] Make MDC configuration step be consistent between…
Ngone51 Jun 14, 2020
f5f6eee
[SPARK-31642][FOLLOWUP] Fix Sorting for duration column and make Stat…
iRakson Jun 14, 2020
8282bbf
[SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing
viirya Jun 15, 2020
a0187cd
[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency issue fo…
yaooqinn Jun 15, 2020
9d95f1b
[SPARK-31992][SQL] Benchmark the EXCEPTION rebase mode
MaxGekk Jun 15, 2020
f83cb3c
[SPARK-31925][ML] Summary.totalIterations greater than maxIters
huaxingao Jun 15, 2020
7f7b4dd
[SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates
maropu Jun 15, 2020
eae1747
[SPARK-31959][SQL][TESTS][FOLLOWUP] Adopt the test "SPARK-31959: JST …
MaxGekk Jun 15, 2020
3698a14
[SPARK-26905][SQL] Follow the SQL:2016 reserved keywords
maropu Jun 15, 2020
a7d0d35
[SPARK-31994][K8S] Docker image should use `https` urls for only deb.…
ScrapCodes Jun 15, 2020
5e89fbe
[SPARK-31824][CORE][TESTS] DAGSchedulerSuite: Improve and reuse compl…
beliefer Jun 15, 2020
75afd88
Revert "[SPARK-31926][SQL][TEST-HIVE1.2][TEST-MAVEN] Fix concurrency …
dongjoon-hyun Jun 16, 2020
e9145d4
[SPARK-31986][SQL] Fix Julian-Gregorian micros rebasing of overlappin…
MaxGekk Jun 16, 2020
fe68e95
[SPARK-24634][SS][FOLLOWUP] Rename the variable from "numLateInputs" …
HeartSaVioR Jun 16, 2020
f0e6d0e
[SPARK-31710][SQL] Fail casting numeric to timestamp by default
GuoPhilipse Jun 16, 2020
d24d27f
[SPARK-31997][SQL][TESTS] Drop test_udtf table when SingleSessionSuit…
LuciferYang Jun 16, 2020
6e9ff72
[SPARK-31984][SQL] Make micros rebasing functions via local timestamp…
MaxGekk Jun 16, 2020
3643565
[SPARK-31710][SQL][FOLLOWUP] Replace CAST by TIMESTAMP_SECONDS in ben…
MaxGekk Jun 16, 2020
8d57709
[SPARK-31705][SQL][FOLLOWUP] Avoid the unnecessary CNF computation fo…
maropu Jun 16, 2020
2ec9b86
[SPARK-31929][WEBUI] Close leveldbiterator when leveldb.close
zhli1142015 Jun 16, 2020
7f6a8ab
[SPARK-31777][ML][PYSPARK] Add user-specified fold column to CrossVal…
viirya Jun 16, 2020
eeb8120
[SPARK-31337][SQL] Support MS SQL Kerberos login in JDBC connector
gaborgsomogyi Jun 17, 2020
afd8a8b
[SPARK-31989][SQL] Generate JSON rebasing files w/ 30 minutes step
MaxGekk Jun 17, 2020
feeca63
[SPARK-32011][PYTHON][CORE] Remove warnings about pin-thread modes an…
HyukjinKwon Jun 17, 2020
93bb70f
[SPARK-29148][CORE][FOLLOWUP] Fix warning message to show a correct e…
dongjoon-hyun Jun 17, 2020
350aa85
[SPARK-32006][SQL] Create date/timestamp formatters once before colle…
MaxGekk Jun 17, 2020
4badef3
[SPARK-32000][CORE][TESTS] Fix the flaky test for partially launched …
Ngone51 Jun 17, 2020
9b79251
[SPARK-31960][YARN][BUILD] Only populate Hadoop classpath for no-hado…
dbtsai Jun 18, 2020
e4f5036
[SPARK-32020][SQL] Better error message when SPARK_HOME or spark.test…
dilipbiswal Jun 18, 2020
8a9ae01
[MINOR] update dev/create-release/known_translations
cloud-fan Jun 18, 2020
ac98a9a
[MINOR][DOCS] Update running-on-kubernetes.md
yuj Jun 18, 2020
8750363
[MINOR][DOCS] Emphasize the Streaming tab is for DStream API
xuanyuanking Jun 19, 2020
17a5007
[SPARK-30865][SQL][SS] Refactor DateTimeUtils
MaxGekk Jun 19, 2020
86b54f3
[SPARK-31894][SS] Introduce UnsafeRow format validation for streaming…
xuanyuanking Jun 19, 2020
abc8ccc
[SPARK-31926][SQL][TESTS][FOLLOWUP][TEST-HIVE1.2][TEST-MAVEN] Fix con…
yaooqinn Jun 19, 2020
6fe3bf6
[SPARK-31993][SQL] Build arrays for passing variables generated from …
HeartSaVioR Jun 19, 2020
5ee5cfd
[SPARK-31826][SQL] Support composed type of case class for typed Scal…
Ngone51 Jun 19, 2020
3c34e45
[SPARK-31029] Avoid using global execution context in driver main thr…
shanyu Jun 19, 2020
a9247c3
[SPARK-32033][SS][DSTEAMS] Use new poll API in Kafka connector execut…
gaborgsomogyi Jun 19, 2020
7b86838
[SPARK-31350][SQL] Coalesce bucketed tables for sort merge join if ap…
imback82 Jun 19, 2020
177a380
[SPARK-31980][SQL] Function sequence() fails if start and end of rang…
TJX2014 Jun 20, 2020
66ba356
[SPARK-32021][SQL] Increase precision of seconds and fractions of `ma…
MaxGekk Jun 20, 2020
93529a8
[SPARK-31957][SQL] Cleanup hive scratch dir for the developer api sta…
yaooqinn Jun 20, 2020
297016e
[SPARK-31893][ML] Add a generic ClassificationSummary trait
huaxingao Jun 20, 2020
9784934
[SPARK-32019][SQL] Add spark.sql.files.minPartitionNum config
ulysses-you Jun 21, 2020
d2a656c
[SPARK-27702][K8S] Allow using some alternatives for service accounts
Udbhav30 Jun 21, 2020
9f8e15b
[SPARK-32034][SQL] Port HIVE-14817: Shutdown the SessionManager timeo…
yaooqinn Jun 21, 2020
aa4c100
[SPARK-31798][SHUFFLE][API] Shuffle Writer API changes to return cust…
mccheah Jun 22, 2020
6fdea63
[SPARK-31905][SS] Add compatibility tests for streaming state store f…
xuanyuanking Jun 22, 2020
2e4557f
[SPARK-32038][SQL] NormalizeFloatingNumbers should also work on disti…
viirya Jun 22, 2020
6293c38
[MINOR][SQL] Add `IS [NOT] NULL` examples to ArrayFilter/ArrayExists
dilipbiswal Jun 22, 2020
338efee
[SPARK-32031][SQL] Fix the wrong references of the PartialMerge/Final…
Ngone51 Jun 22, 2020
979a8eb
[MINOR][SQL] Simplify DateTimeUtils.cleanLegacyTimestampStr
MaxGekk Jun 23, 2020
fcf9768
[SPARK-32052][SQL] Extract common code from date-time field expressions
MaxGekk Jun 23, 2020
2bcbe3d
[SPARK-32045][BUILD] Upgrade to Apache Commons Lang 3.10
williamhyun Jun 23, 2020
2dbfae8
[SPARK-32049][SQL][TESTS] Upgrade Oracle JDBC Driver 8
gaborgsomogyi Jun 23, 2020
e00f43c
[SPARK-32043][SQL] Replace Decimal by Int op in `make_interval` and `…
MaxGekk Jun 23, 2020
11d2b07
[SPARK-31918][R] Ignore S4 generic methods under SparkR namespace in …
HyukjinKwon Jun 24, 2020
b62e253
[SPARK-32073][R] Drop R < 3.5 support
HyukjinKwon Jun 24, 2020
eedc6cc
[SPARK-32028][WEBUI] fix app id link for multi attempts app in histor…
zhli1142015 Jun 24, 2020
986fa01
[SPARK-32075][DOCS] Fix a few issues in parameters table
sidedoorleftroad Jun 24, 2020
045106e
[SPARK-32072][CORE][TESTS] Fix table formatting with benchmark results
MaxGekk Jun 24, 2020
9f540fa
[SPARK-32062][SQL] Reset listenerRegistered in SparkSession
ulysses-you Jun 24, 2020
e29ec42
[SPARK-32074][BUILD][R] Update AppVeyor R version to 4.0.2
HyukjinKwon Jun 24, 2020
df04107
[SPARK-32080][SPARK-31998][SQL] Simplify ArrowColumnVector ListArray …
BryanCutler Jun 24, 2020
47fb9d6
[SPARK-32087][SQL] Allow UserDefinedType to use encoder to deserializ…
Ngone51 Jun 24, 2020
71b6d46
[SPARK-32089][R][BUILD] Upgrade R version to 4.0.2 in the release Doc…
HyukjinKwon Jun 24, 2020
d06604f
[SPARK-32078][DOC] Add a redirect to sql-ref from sql-reference
gatorsmile Jun 24, 2020
1af19a7
[SPARK-32098][PYTHON] Use iloc for positional slicing instead of dire…
HyukjinKwon Jun 25, 2020
594cb56
[SPARK-32100][CORE][TESTS] Add WorkerDecommissionExtendedSuite
dongjoon-hyun Jun 25, 2020
bbb2cba
[SPARK-32025][SQL] Csv schema inference problems with different types…
planga82 Jun 26, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
4 changes: 2 additions & 2 deletions R/WINDOWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ To build SparkR on Windows, the following steps are required

1. Make sure `bash` is available and in `PATH` if you already have a built-in `bash` on Windows. If you do not have, install [Cygwin](https://www.cygwin.com/).

2. Install R (>= 3.1) and [Rtools](https://cloud.r-project.org/bin/windows/Rtools/). Make sure to
include Rtools and R in `PATH`. Note that support for R prior to version 3.4 is deprecated as of Spark 3.0.0.
2. Install R (>= 3.5) and [Rtools](https://cloud.r-project.org/bin/windows/Rtools/). Make sure to
include Rtools and R in `PATH`.

3. Install JDK that SparkR supports (see `R/pkg/DESCRIPTION`), and set `JAVA_HOME` in the system environment variables.

Expand Down
8 changes: 7 additions & 1 deletion R/install-dev.bat
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,13 @@ set SPARK_HOME=%~dp0..

MKDIR %SPARK_HOME%\R\lib

R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" %SPARK_HOME%\R\pkg\
rem When you pass the package path directly as an argument to R CMD INSTALL,
rem it takes the path as 'C:\projects\spark\R\..\R\pkg"' as an example at
rem R 4.0. To work around this, directly go to the directoy and install it.
rem See also SPARK-32074
pushd %SPARK_HOME%\R\pkg\
R.exe CMD INSTALL --library="%SPARK_HOME%\R\lib" .
popd

rem Zip the SparkR package so that it can be distributed to worker nodes on YARN
pushd %SPARK_HOME%\R\lib
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ URL: https://www.apache.org/ https://spark.apache.org/
BugReports: https://spark.apache.org/contributing.html
SystemRequirements: Java (>= 8, < 12)
Depends:
R (>= 3.1),
R (>= 3.5),
methods
Suggests:
knitr,
Expand Down
5 changes: 4 additions & 1 deletion R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,10 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
# Namespaces other than "SparkR" will not be searched.
if (!isNamespace(func.env) ||
(getNamespaceName(func.env) == "SparkR" &&
!(nodeChar %in% getNamespaceExports("SparkR")))) {
!(nodeChar %in% getNamespaceExports("SparkR")) &&
# Note that generic S4 methods should not be set to the environment of
# cleaned closure. It does not work with R 4.0.0+. See also SPARK-31918.
nodeChar != "" && !methods::isGeneric(nodeChar, func.env))) {
# Only include SparkR internals.

# Set parameter 'inherits' to FALSE since we do not need to search in
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/inst/profile/general.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
#

.First <- function() {
if (utils::compareVersion(paste0(R.version$major, ".", R.version$minor), "3.4.0") == -1) {
warning("Support for R prior to version 3.4 is deprecated since Spark 3.0.0")
}

packageDir <- Sys.getenv("SPARKR_PACKAGE_DIR")
dirs <- strsplit(packageDir, ",")[[1]]
.libPaths(c(dirs, .libPaths()))
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/inst/profile/shell.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
#

.First <- function() {
if (utils::compareVersion(paste0(R.version$major, ".", R.version$minor), "3.4.0") == -1) {
warning("Support for R prior to version 3.4 is deprecated since Spark 3.0.0")
}

home <- Sys.getenv("SPARK_HOME")
.libPaths(c(file.path(home, "R", "lib"), .libPaths()))
Sys.setenv(NOAWT = 1)
Expand Down
4 changes: 3 additions & 1 deletion R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ test_that("Check masked functions", {
"colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset",
"summary", "transform", "drop", "window", "as.data.frame", "union", "not")
version <- packageVersion("base")
if (as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3) {
is33Above <- as.numeric(version$major) >= 3 && as.numeric(version$minor) >= 3
is40Above <- as.numeric(version$major) >= 4
if (is33Above || is40Above) {
namesOfMasked <- c("endsWith", "startsWith", namesOfMasked)
}
masked <- conflicts(detail = TRUE)$`package:SparkR`
Expand Down
18 changes: 9 additions & 9 deletions R/pkg/tests/fulltests/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ test_that("spark.svmLinear", {
summary <- summary(model)

# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(any(class(summary$coefficients) == "matrix"))
expect_true(class(summary$coefficients[, 1]) == "numeric")

coefs <- summary$coefficients[, "Estimate"]
Expand Down Expand Up @@ -130,7 +130,7 @@ test_that("spark.logit", {
summary <- summary(model)

# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(any(class(summary$coefficients) == "matrix"))
expect_true(class(summary$coefficients[, 1]) == "numeric")

versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00)
Expand Down Expand Up @@ -242,8 +242,8 @@ test_that("spark.logit", {
# Test binomial logistic regression against two classes with upperBoundsOnCoefficients
# and upperBoundsOnIntercepts
u <- matrix(c(1.0, 0.0, 1.0, 0.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
upperBoundsOnIntercepts = 1.0)
model <- suppressWarnings(spark.logit(training, Species ~ ., upperBoundsOnCoefficients = u,
upperBoundsOnIntercepts = 1.0))
summary <- summary(model)
coefsR <- c(-11.13331, 1.00000, 0.00000, 1.00000, 0.00000)
coefs <- summary$coefficients[, "Estimate"]
Expand All @@ -255,8 +255,8 @@ test_that("spark.logit", {
# Test binomial logistic regression against two classes with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0), nrow = 1, ncol = 4)
model <- spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = 0.0)
model <- suppressWarnings(spark.logit(training, Species ~ ., lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = 0.0))
summary <- summary(model)
coefsR <- c(0, 0, -1, 0, 1.902192)
coefs <- summary$coefficients[, "Estimate"]
Expand All @@ -268,9 +268,9 @@ test_that("spark.logit", {
# Test multinomial logistic regression with lowerBoundsOnCoefficients
# and lowerBoundsOnIntercepts
l <- matrix(c(0.0, -1.0, 0.0, -1.0, 0.0, -1.0, 0.0, -1.0), nrow = 2, ncol = 4)
model <- spark.logit(training, Species ~ ., family = "multinomial",
lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = as.array(c(0.0, 0.0)))
model <- suppressWarnings(spark.logit(training, Species ~ ., family = "multinomial",
lowerBoundsOnCoefficients = l,
lowerBoundsOnIntercepts = as.array(c(0.0, 0.0))))
summary <- summary(model)
versicolorCoefsR <- c(42.639465, 7.258104, 14.330814, 16.298243, 11.716429)
virginicaCoefsR <- c(0.0002970796, 4.79274, 7.65047, 25.72793, 30.0021)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ test_that("spark.kmeans", {
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1))

# test summary coefficients return matrix type
expect_true(class(summary.model$coefficients) == "matrix")
expect_true(any(class(summary.model$coefficients) == "matrix"))
expect_true(class(summary.model$coefficients[1, ]) == "numeric")

# Test model save/load
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/tests/fulltests/test_mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ test_that("spark.glm summary", {
rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = dataset))

# test summary coefficients return matrix type
expect_true(class(stats$coefficients) == "matrix")
expect_true(any(class(stats$coefficients) == "matrix"))
expect_true(class(stats$coefficients[, 1]) == "numeric")

coefs <- stats$coefficients
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;

/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;

public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
}
Expand Down Expand Up @@ -94,6 +103,8 @@ public LevelDB(File path, KVStoreSerializer serializer) throws Exception {
aliases = new HashMap<>();
}
typeAliases = new ConcurrentHashMap<>(aliases);

iteratorTracker = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -189,7 +200,9 @@ public <T> KVStoreView<T> view(Class<T> type) throws Exception {
@Override
public Iterator<T> iterator() {
try {
return new LevelDBIterator<>(type, LevelDB.this, this);
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
}
Expand Down Expand Up @@ -238,6 +251,14 @@ public void close() throws IOException {
}

try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();
}
}
}
_db.close();
} catch (IOException ioe) {
throw ioe;
Expand All @@ -252,6 +273,7 @@ public void close() throws IOException {
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator<?> it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
Expand All @@ -260,6 +282,14 @@ void closeIterator(LevelDBIterator<?> it) throws IOException {
}
}

/**
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
* iterator is closed.
*/
void notifyIteratorClosed(LevelDBIterator<?> it) {
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
}

/** Returns metadata about indices for the given type. */
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public boolean skip(long n) {

@Override
public synchronized void close() throws IOException {
db.notifyIteratorClosed(this);
if (!closed) {
it.close();
closed = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -276,6 +277,41 @@ public void testNegativeIndexValues() throws Exception {
assertEquals(expected, results);
}

@Test
public void testCloseLevelDBIterator() throws Exception {
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
// are closed. And files opened by iterators are also closed.
File dbPathForCloseTest = File
.createTempFile(
"test_db_close.",
".ldb");
dbPathForCloseTest.delete();
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
for (int i = 0; i < 8192; i++) {
dbForCloseTest.write(createCustomType1(i));
}
String key = dbForCloseTest
.view(CustomType1.class).iterator().next().key;
assertEquals("key0", key);
Iterator<CustomType1> it0 = dbForCloseTest
.view(CustomType1.class).max(1).iterator();
while (it0.hasNext()) {
it0.next();
}
System.gc();
Iterator<CustomType1> it1 = dbForCloseTest
.view(CustomType1.class).iterator();
assertEquals("key0", it1.next().key);
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
.view(CustomType1.class).closeableIterator()) {
assertEquals("key0", it2.next().key);
}
dbForCloseTest.close();
assertTrue(dbPathForCloseTest.exists());
FileUtils.deleteQuietly(dbPathForCloseTest);
assertTrue(!dbPathForCloseTest.exists());
}

private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@
</dependency>
<dependency>
<groupId>org.seleniumhq.selenium</groupId>
<artifactId>selenium-htmlunit-driver</artifactId>
<artifactId>htmlunit-driver</artifactId>
<scope>test</scope>
</dependency>
<!-- Coerce sbt into honoring these dependency updates: -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;

import org.apache.spark.annotation.Private;
import org.apache.spark.shuffle.api.metadata.MapOutputCommitMessage;

/**
* :: Private ::
Expand Down Expand Up @@ -60,10 +61,15 @@ public interface ShuffleMapOutputWriter {
* <p>
* This can also close any resources and clean up temporary state if necessary.
* <p>
* The returned array should contain, for each partition from (0) to (numPartitions - 1), the
* number of bytes written by the partition writer for that partition id.
* The returned commit message is a structure with two components:
* <p>
* 1) An array of longs, which should contain, for each partition from (0) to
* (numPartitions - 1), the number of bytes written by the partition writer
* for that partition id.
* <p>
* 2) An optional metadata blob that can be used by shuffle readers.
*/
long[] commitAllPartitions() throws IOException;
MapOutputCommitMessage commitAllPartitions() throws IOException;

/**
* Abort all of the writes done by any writers returned by {@link #getPartitionWriter(int)}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.api.metadata;

import java.util.Optional;

import org.apache.spark.annotation.Private;

/**
* :: Private ::
*
* Represents the result of writing map outputs for a shuffle map task.
* <p>
* Partition lengths represents the length of each block written in the map task. This can
* be used for downstream readers to allocate resources, such as in-memory buffers.
* <p>
* Map output writers can choose to attach arbitrary metadata tags to register with a
* shuffle output tracker (a module that is currently yet to be built in a future
* iteration of the shuffle storage APIs).
*/
@Private
public final class MapOutputCommitMessage {

private final long[] partitionLengths;
private final Optional<MapOutputMetadata> mapOutputMetadata;

private MapOutputCommitMessage(
long[] partitionLengths, Optional<MapOutputMetadata> mapOutputMetadata) {
this.partitionLengths = partitionLengths;
this.mapOutputMetadata = mapOutputMetadata;
}

public static MapOutputCommitMessage of(long[] partitionLengths) {
return new MapOutputCommitMessage(partitionLengths, Optional.empty());
}

public static MapOutputCommitMessage of(
long[] partitionLengths, MapOutputMetadata mapOutputMetadata) {
return new MapOutputCommitMessage(partitionLengths, Optional.of(mapOutputMetadata));
}

public long[] getPartitionLengths() {
return partitionLengths;
}

public Optional<MapOutputMetadata> getMapOutputMetadata() {
return mapOutputMetadata;
}
}
Loading