Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
925f620
[SPARK-28302][CORE] Make sure to generate unique output file for Spar…
Ngone51 Jul 9, 2019
019efaa
[SPARK-28029][SQL][TEST] Port int2.sql
wangyum Jul 9, 2019
a32c92c
[SPARK-28140][MLLIB][PYTHON] Accept DataFrames in RowMatrix and Index…
henrydavidge Jul 9, 2019
1b23267
[SPARK-28136][SQL][TEST] Port int8.sql
wangyum Jul 10, 2019
90c64ea
[SPARK-28267][DOC] Update building-spark.md(support build with hadoop…
wangyum Jul 10, 2019
bbc2be4
[SPARK-28294][CORE] Support `spark.history.fs.cleaner.maxNum` configu…
dongjoon-hyun Jul 10, 2019
b89c3de
[SPARK-28310][SQL] Support (FIRST_VALUE|LAST_VALUE)(expr[ (IGNORE|RES…
zhulipeng Jul 10, 2019
a6506f0
[SPARK-28290][CORE][SQL] Use SslContextFactory.Server instead of SslC…
dongjoon-hyun Jul 10, 2019
579edf4
[SPARK-28335][DSTREAMS][TEST] DirectKafkaStreamSuite wait for Kafka a…
gaborgsomogyi Jul 10, 2019
7858e53
[SPARK-28323][SQL][PYTHON] PythonUDF should be able to use in join co…
viirya Jul 10, 2019
f84cca2
[SPARK-28234][CORE][PYTHON] Add python and JavaSparkContext support t…
tgravescs Jul 11, 2019
06ac7d5
[SPARK-27922][SQL][PYTHON][TESTS] Convert and port 'natural-join.sql'…
Jul 11, 2019
b598dfd
[SPARK-28275][SQL][PYTHON][TESTS] Convert and port 'count.sql' into U…
vinodkc Jul 11, 2019
8d686f3
[SPARK-28271][SQL][PYTHON][TESTS] Convert and port 'pgSQL/aggregates_…
imback82 Jul 11, 2019
3a94fb3
[SPARK-28281][SQL][PYTHON][TESTS] Convert and port 'having.sql' into …
huaxingao Jul 11, 2019
d26642d
[SPARK-28107][SQL] Support 'DAY TO (HOUR|MINUTE|SECOND)', 'HOUR TO (M…
zhulipeng Jul 11, 2019
ec821b4
[SPARK-27919][SQL] Add v2 session catalog
rdblue Jul 11, 2019
6532153
[SPARK-28015][SQL] Check stringToDate() consumes entire input for the…
MaxGekk Jul 11, 2019
92e051c
[SPARK-28270][SQL][PYTHON] Convert and port 'pgSQL/aggregates_part1.s…
HyukjinKwon Jul 11, 2019
3f375c8
[SPARK-28339][SQL] Rename Spark SQL adaptive execution configuration …
carsonwang Jul 11, 2019
0197628
[SPARK-28342][SQL][TESTS] Replace REL_12_BETA1 to REL_12_BETA2 in Pos…
HyukjinKwon Jul 11, 2019
7021588
[SPARK-28306][SQL] Make NormalizeFloatingNumbers rule idempotent
yeshengm Jul 11, 2019
19bcce1
[SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal i…
HyukjinKwon Jul 11, 2019
8dff711
[SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Col…
revans2 Jul 11, 2019
e83583e
[MINOR][SQL] Clean up ObjectProducerExec operators
jaceklaskowski Jul 11, 2019
d1ef6be
[SPARK-26978][SQL][FOLLOWUP] Initialize date-time constants by foldab…
MaxGekk Jul 11, 2019
d47c219
[SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient c…
gaborgsomogyi Jul 11, 2019
f830005
[SPARK-23472][CORE] Add defaultJavaOptions for driver and executor.
gaborgsomogyi Jul 11, 2019
9eca58e
[SPARK-28334][SQL][TEST] Port select.sql
wangyum Jul 11, 2019
507b745
[SPARK-28139][SQL] Add v2 ALTER TABLE implementation.
rdblue Jul 12, 2019
a5c88ec
[SPARK-28321][SQL] 0-args Java UDF should not be called only once
HyukjinKwon Jul 12, 2019
27e41d6
[SPARK-28270][TEST-MAVEN][FOLLOW-UP][SQL][PYTHON][TESTS] Avoid cast i…
HyukjinKwon Jul 12, 2019
42b80ae
[SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in SQL
WangGuangxin Jul 12, 2019
fe22faa
[SPARK-28034][SQL][TEST] Port with.sql
peter-toth Jul 12, 2019
1c29212
[SPARK-28357][CORE][TEST] Fix Flaky Test - FileAppenderSuite.rollingf…
dongjoon-hyun Jul 12, 2019
13ae9eb
[SPARK-28354][INFRA] Use JIRA user name instead of JIRA user key
dongjoon-hyun Jul 12, 2019
1a26126
[SPARK-28228][SQL] Fix substitution order of nested WITH clauses
peter-toth Jul 12, 2019
687dd4e
[SPARK-28260][SQL] Add CLOSED state to ExecutionState
wangyum Jul 12, 2019
aa41dce
[SPARK-28159][ML][FOLLOWUP] fix typo & (0 until v.size).toList => Lis…
zhengruifeng Jul 12, 2019
60b89cf
[SPARK-28361][SQL][TEST] Test equality of generated code with id in c…
gatorsmile Jul 12, 2019
79e2047
[SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which com…
jessecai Jul 13, 2019
b5a9baa
[SPARK-28247][SS] Fix flaky test "query without test harness" on Cont…
HeartSaVioR Jul 13, 2019
7f9da2b
[SPARK-28371][SQL] Make Parquet "StartsWith" filter null-safe
Jul 13, 2019
fab75db
[SPARK-28370][BUILD][TEST] Upgrade Mockito to 2.28.2
dongjoon-hyun Jul 13, 2019
707411f
[SPARK-28378][PYTHON] Remove usage of cgi.escape
viirya Jul 14, 2019
76079fa
[SPARK-28343][SQL][TEST] Enabling cartesian product and ansi mode for…
wangyum Jul 14, 2019
7548a88
[SPARK-28199][SS] Move Trigger implementations to Triggers.scala and …
HeartSaVioR Jul 14, 2019
591de42
[SPARK-28381][PYSPARK] Upgraded version of Pyrolite to 4.30
viirya Jul 15, 2019
a2f71a8
[SPARK-28133][SQL] Add acosh/asinh/atanh functions to SQL
Jul 15, 2019
e238ebe
[SPARK-28387][SQL][TEST] Port select_having.sql
wangyum Jul 15, 2019
72cc853
[SPARK-28384][SQL][TEST] Port select_distinct.sql
wangyum Jul 15, 2019
a7a02a8
[SPARK-28392][SQL][TESTS] Add traits for UDF and PostgreSQL tests to …
HyukjinKwon Jul 15, 2019
f241fc7
[SPARK-28389][SQL] Use Java 8 API in add_months
MaxGekk Jul 15, 2019
8ecbb67
[SPARK-28311][SQL] Fix STS OpenSession failed return wrong origin PRO…
Jul 15, 2019
8d1e87a
[SPARK-28150][CORE][FOLLOW-UP] Don't try to log in when impersonating.
Jul 15, 2019
2f3997f
[SPARK-28306][SQL][FOLLOWUP] Fix NormalizeFloatingNumbers rule idempo…
yeshengm Jul 15, 2019
8f7ccc5
[SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousP…
gaborgsomogyi Jul 15, 2019
d8996fd
[SPARK-28152][SQL] Mapped ShortType to SMALLINT and FloatType to REAL…
shivsood Jul 15, 2019
8e26d4d
[SPARK-28408][SQL][TEST] Restrict test values for DateType, Timestamp…
MaxGekk Jul 16, 2019
b94fa97
[SPARK-28345][SQL][PYTHON] PythonUDF predicate should be able to push…
viirya Jul 16, 2019
be4a552
[SPARK-28106][SQL] When Spark SQL use "add jar" , before add to Spa…
Jul 16, 2019
6926849
[SPARK-28395][SQL] Division operator support integral division
wangyum Jul 16, 2019
9a7f01d
[SPARK-28201][SQL][TEST][FOLLOWUP] Fix Integration test suite accordi…
dongjoon-hyun Jul 16, 2019
421d9d5
[SPARK-27485] EnsureRequirements.reorder should handle duplicate expr…
hvanhovell Jul 16, 2019
d1a1376
[SPARK-28356][SQL] Do not reduce the number of partitions for reparti…
carsonwang Jul 16, 2019
f74ad3d
[SPARK-28129][SQL][TEST] Port float8.sql
wangyum Jul 16, 2019
113f62d
[SPARK-27485][FOLLOWUP] Do not reduce the number of partitions for re…
gaborgsomogyi Jul 16, 2019
282a12d
[SPARK-27944][ML] Unify the behavior of checking empty output column …
zhengruifeng Jul 16, 2019
71882f1
[SPARK-28343][FOLLOW-UP][SQL][TEST] Enable spark.sql.function.preferI…
wangyum Jul 16, 2019
43d68cd
[SPARK-27959][YARN] Change YARN resource configs to use .amount
tgravescs Jul 16, 2019
1134fae
[SPARK-18299][SQL] Allow more aggregations on KeyValueGroupedDataset
nooberfsh Jul 16, 2019
2ddeff9
[SPARK-27963][CORE] Allow dynamic allocation without a shuffle service.
Jul 16, 2019
66179fa
[SPARK-28418][PYTHON][SQL] Wait for event process in 'test_query_exec…
HyukjinKwon Jul 17, 2019
28774cd
[SPARK-28359][SQL][PYTHON][TESTS] Make integrated UDF tests robust by…
HyukjinKwon Jul 17, 2019
eb5dc74
[SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
mojodna Jul 17, 2019
70073b1
[SPARK-27609][PYTHON] Convert values of function options to strings
MaxGekk Jul 18, 2019
971e832
[SPARK-28411][PYTHON][SQL] InsertInto with overwrite is not honored
huaxingao Jul 18, 2019
4645ffb
[SPARK-28276][SQL][PYTHON][TEST] Convert and port 'cross-join.sql' in…
viirya Jul 18, 2019
62004f1
[SPARK-28283][SQL][PYTHON][TESTS] Convert and port 'intersect-all.sql…
imback82 Jul 18, 2019
eaaf1aa
[SPARK-28278][SQL][PYTHON][TESTS] Convert and port 'except-all.sql' i…
imback82 Jul 18, 2019
4b86510
[SPARK-28286][SQL][PYTHON][TESTS] Convert and port 'pivot.sql' into U…
chitralverma Jul 18, 2019
8acc22c
[SPARK-28138][SQL][TEST] Port timestamp.sql
wangyum Jul 18, 2019
2cf0491
[SPARK-28388][SQL][TEST] Port select_implicit.sql
wangyum Jul 18, 2019
a0c2fa6
[SPARK-28439][PYTHON][SQL] Add support for count: Column in array_repeat
zero323 Jul 18, 2019
3776fbd
[SPARK-28430][UI] Fix stage table rendering when some tasks' metrics …
JoshRosen Jul 18, 2019
0c21404
[SPARK-28312][SQL][TEST] Port numeric.sql
wangyum Jul 18, 2019
54e058d
[SPARK-28416][SQL] Use java.time API in timestampAddInterval
MaxGekk Jul 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
5 changes: 5 additions & 0 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ if not "x%JAVA_HOME%"=="x" (

rem The launcher library prints the command to be executed in a single line suitable for being
rem executed by the batch interpreter. So read all the output of the launcher into a variable.
:gen
set LAUNCHER_OUTPUT=%temp%\spark-class-launcher-output-%RANDOM%.txt
rem SPARK-28302: %RANDOM% would return the same number if we call it instantly after last call,
rem so we should make it sure to generate unique file to avoid process collision of writing into
rem the same file concurrently.
if exist %LAUNCHER_OUTPUT% goto :gen
"%RUNNER%" -Xmx128m -cp "%LAUNCH_CLASSPATH%" org.apache.spark.launcher.Main %* > %LAUNCHER_OUTPUT%
for /f "tokens=*" %%i in (%LAUNCHER_OUTPUT%) do (
set SPARK_CMD=%%i
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ private static String unitRegex(String unit) {
private static Pattern yearMonthPattern =
Pattern.compile("^(?:['|\"])?([+|-])?(\\d+)-(\\d+)(?:['|\"])?$");

private static Pattern dayTimePattern =
Pattern.compile("^(?:['|\"])?([+|-])?((\\d+) )?(\\d+):(\\d+):(\\d+)(\\.(\\d+))?(?:['|\"])?$");
private static Pattern dayTimePattern = Pattern.compile(
"^(?:['|\"])?([+|-])?((\\d+) )?((\\d+):)?(\\d+):(\\d+)(\\.(\\d+))?(?:['|\"])?$");

private static Pattern quoteTrimPattern = Pattern.compile("^(?:['|\"])?(.*?)(?:['|\"])?$");

Expand Down Expand Up @@ -160,6 +160,20 @@ public static CalendarInterval fromYearMonthString(String s) throws IllegalArgum
* adapted from HiveIntervalDayTime.valueOf
*/
public static CalendarInterval fromDayTimeString(String s) throws IllegalArgumentException {
return fromDayTimeString(s, "day", "second");
}

/**
* Parse dayTime string in form: [-]d HH:mm:ss.nnnnnnnnn and [-]HH:mm:ss.nnnnnnnnn
*
* adapted from HiveIntervalDayTime.valueOf.
* Below interval conversion patterns are supported:
* - DAY TO (HOUR|MINUTE|SECOND)
* - HOUR TO (MINUTE|SECOND)
* - MINUTE TO SECOND
*/
public static CalendarInterval fromDayTimeString(String s, String from, String to)
throws IllegalArgumentException {
CalendarInterval result = null;
if (s == null) {
throw new IllegalArgumentException("Interval day-time string was null");
Expand All @@ -174,12 +188,40 @@ public static CalendarInterval fromDayTimeString(String s) throws IllegalArgumen
int sign = m.group(1) != null && m.group(1).equals("-") ? -1 : 1;
long days = m.group(2) == null ? 0 : toLongWithRange("day", m.group(3),
0, Integer.MAX_VALUE);
long hours = toLongWithRange("hour", m.group(4), 0, 23);
long minutes = toLongWithRange("minute", m.group(5), 0, 59);
long seconds = toLongWithRange("second", m.group(6), 0, 59);
long hours = 0;
long minutes;
long seconds = 0;
if (m.group(5) != null || from.equals("minute")) { // 'HH:mm:ss' or 'mm:ss minute'
hours = toLongWithRange("hour", m.group(5), 0, 23);
minutes = toLongWithRange("minute", m.group(6), 0, 59);
seconds = toLongWithRange("second", m.group(7), 0, 59);
} else if (m.group(8) != null){ // 'mm:ss.nn'
minutes = toLongWithRange("minute", m.group(6), 0, 59);
seconds = toLongWithRange("second", m.group(7), 0, 59);
} else { // 'HH:mm'
hours = toLongWithRange("hour", m.group(6), 0, 23);
minutes = toLongWithRange("second", m.group(7), 0, 59);
}
// Hive allow nanosecond precision interval
String nanoStr = m.group(8) == null ? null : (m.group(8) + "000000000").substring(0, 9);
String nanoStr = m.group(9) == null ? null : (m.group(9) + "000000000").substring(0, 9);
long nanos = toLongWithRange("nanosecond", nanoStr, 0L, 999999999L);
switch (to) {
case "hour":
minutes = 0;
seconds = 0;
nanos = 0;
break;
case "minute":
seconds = 0;
nanos = 0;
break;
case "second":
// No-op
break;
default:
throw new IllegalArgumentException(
String.format("Cannot support (interval '%s' %s to %s) expression", s, from, to));
}
result = new CalendarInterval(0, sign * (
days * MICROS_PER_DAY + hours * MICROS_PER_HOUR + minutes * MICROS_PER_MINUTE +
seconds * MICROS_PER_SECOND + nanos / 1000L));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ public void fromDayTimeStringTest() {
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("not match day-time format"));
}

try {
input = "5 1:12:20";
fromDayTimeString(input, "hour", "microsecond");
fail("Expected to throw an exception for the invalid convention type");
} catch (IllegalArgumentException e) {
assertTrue(e.getMessage().contains("Cannot support (interval"));
}
}

@Test
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
<version>4.23</version>
<version>4.30</version>
<exclusions>
<exclusion>
<groupId>net.razorvine</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf,
cleaner: Option[ContextCleaner] = None,
clock: Clock = new SystemClock())
extends Logging {

Expand Down Expand Up @@ -148,7 +149,7 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener

val executorMonitor = new ExecutorMonitor(conf, client, clock)
val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock)

// Executor that handles the scheduling task.
private val executor =
Expand Down Expand Up @@ -194,11 +195,13 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
Expand All @@ -214,6 +217,7 @@ private[spark] class ExecutorAllocationManager(
def start(): Unit = {
listenerBus.addToManagementQueue(listener)
listenerBus.addToManagementQueue(executorMonitor)
cleaner.foreach(_.attachListener(executorMonitor))

val scheduleTask = new Runnable() {
override def run(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] case class SSLOptions(
*/
def createJettySslContextFactory(): Option[SslContextFactory] = {
if (enabled) {
val sslContextFactory = new SslContextFactory()
val sslContextFactory = new SslContextFactory.Server()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
Expand Down
54 changes: 39 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,22 @@ class SparkContext(config: SparkConf) extends Logging {
None
}

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())

val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner))
case _ =>
None
}
Expand All @@ -569,14 +577,6 @@ class SparkContext(config: SparkConf) extends Logging {
}
_executorAllocationManager.foreach(_.start())

_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())

setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
Expand Down Expand Up @@ -1791,7 +1791,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String) {
def addJarFile(file: File): String = {
def addLocalJarFile(file: File): String = {
try {
if (!file.exists()) {
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
Expand All @@ -1808,12 +1808,36 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

def checkRemoteJarFile(path: String): String = {
val hadoopPath = new Path(path)
val scheme = new URI(path).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
try {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Jar ${path} not found")
}
if (fs.isDirectory(hadoopPath)) {
throw new IllegalArgumentException(
s"Directory ${path} is not allowed for addJar")
}
path
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
}
} else {
path
}
}

if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
val key = if (path.contains("\\")) {
// For local paths with backslashes on Windows, URI throws an exception
addJarFile(new File(path))
addLocalJarFile(new File(path))
} else {
val uri = new URI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Expand All @@ -1822,12 +1846,12 @@ class SparkContext(config: SparkConf) extends Logging {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
addJarFile(new File(uri.getRawPath))
addLocalJarFile(new File(uri.getRawPath))
// A JAR file which exists only on the driver node
case "file" => addJarFile(new File(uri.getPath))
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" => "file:" + uri.getPath
case _ => path
case _ => checkRemoteJarFile(path)
}
}
if (key != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD}
import org.apache.spark.resource.ResourceInformation

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -114,6 +115,8 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {

def appName: String = sc.appName

def resources: JMap[String, ResourceInformation] = sc.resources.asJava

def jars: util.List[String] = sc.jars.asJava

def startTime: java.lang.Long = sc.startTime
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
dataOut.writeInt(context.partitionId())
dataOut.writeInt(context.attemptNumber())
dataOut.writeLong(context.taskAttemptId())
val resources = context.resources()
dataOut.writeInt(resources.size)
resources.foreach { case (k, v) =>
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v.name, dataOut)
dataOut.writeInt(v.addresses.size)
v.addresses.foreach { case addr =>
PythonRDD.writeUTF(addr, dataOut)
}
}
val localProps = context.getLocalProperties.asScala
dataOut.writeInt(localProps.size)
localProps.foreach { case (k, v) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ private[spark] object PythonUtils {
def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
}

def getBroadcastThreshold(sc: JavaSparkContext): Long = {
sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
unpickle.close()
if (batched) {
obj match {
case array: Array[Any] => array.toSeq
Expand Down
Loading