Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD}
import org.apache.spark.resource.ResourceInformation

/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
Expand Down Expand Up @@ -114,6 +115,8 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable {

def appName: String = sc.appName

def resources: JMap[String, ResourceInformation] = sc.resources.asJava

def jars: util.List[String] = sc.jars.asJava

def startTime: java.lang.Long = sc.startTime
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
dataOut.writeInt(context.partitionId())
dataOut.writeInt(context.attemptNumber())
dataOut.writeLong(context.taskAttemptId())
val resources = context.resources()
dataOut.writeInt(resources.size)
resources.foreach { case (k, v) =>
PythonRDD.writeUTF(k, dataOut)
PythonRDD.writeUTF(v.name, dataOut)
dataOut.writeInt(v.addresses.size)
v.addresses.foreach { case addr =>
PythonRDD.writeUTF(addr, dataOut)
}
}
val localProps = context.getLocalProperties.asScala
dataOut.writeInt(localProps.size)
localProps.foreach { case (k, v) =>
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
from pyspark.storagelevel import StorageLevel
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
from pyspark.resourceinformation import ResourceInformation
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.status import *
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
Expand Down Expand Up @@ -118,5 +119,5 @@ def wrapper(self, *args, **kwargs):
"SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast",
"Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
"StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext",
"RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo",
"RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo", "ResourceInformation",
]
12 changes: 12 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer, ChunkedStream
from pyspark.storagelevel import StorageLevel
from pyspark.resourceinformation import ResourceInformation
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
Expand Down Expand Up @@ -1105,6 +1106,17 @@ def getConf(self):
conf.setAll(self._conf.getAll())
return conf

@property
def resources(self):
resources = {}
jresources = self._jsc.resources()
for x in jresources:
name = jresources[x].name()
jaddresses = jresources[x].addresses()
addrs = [addr for addr in jaddresses]
resources[name] = ResourceInformation(name, addrs)
return resources


def _test():
import atexit
Expand Down
43 changes: 43 additions & 0 deletions python/pyspark/resourceinformation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


class ResourceInformation(object):

"""
.. note:: Evolving

Class to hold information about a type of Resource. A resource could be a GPU, FPGA, etc.
The array of addresses are resource specific and its up to the user to interpret the address.

One example is GPUs, where the addresses would be the indices of the GPUs

@param name the name of the resource
@param addresses an array of strings describing the addresses of the resource
"""

def __init__(self, name, addresses):
self._name = name
self._addresses = addresses

@property
def name(self):
return self._name

@property
def addresses(self):
return self._addresses
8 changes: 8 additions & 0 deletions python/pyspark/taskcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class TaskContext(object):
_stageId = None
_taskAttemptId = None
_localProperties = None
_resources = None

def __new__(cls):
"""Even if users construct TaskContext instead of using get, give them the singleton."""
Expand Down Expand Up @@ -95,6 +96,13 @@ def getLocalProperty(self, key):
"""
return self._localProperties.get(key, None)

def resources(self):
"""
Resources allocated to the task. The key is the resource name and the value is information
about the resource.
"""
return self._resources


BARRIER_FUNCTION = 1

Expand Down
35 changes: 34 additions & 1 deletion python/pyspark/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
#
import os
import shutil
import stat
import tempfile
import threading
import time
import unittest
from collections import namedtuple

from pyspark import SparkFiles, SparkContext
from pyspark import SparkConf, SparkFiles, SparkContext
from pyspark.testing.utils import ReusedPySparkTestCase, PySparkTestCase, QuietTest, SPARK_HOME


Expand Down Expand Up @@ -256,6 +257,38 @@ def test_forbid_insecure_gateway(self):
SparkContext(gateway=mock_insecure_gateway)
self.assertIn("insecure Py4j gateway", str(context.exception))

def test_resources(self):
"""Test the resources are empty by default."""
with SparkContext() as sc:
resources = sc.resources
self.assertEqual(len(resources), 0)


class ContextTestsWithResources(unittest.TestCase):

def setUp(self):
class_name = self.__class__.__name__
self.tempFile = tempfile.NamedTemporaryFile(delete=False)
self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}')
self.tempFile.close()
os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP |
stat.S_IROTH | stat.S_IXOTH)
conf = SparkConf().set("spark.driver.resource.gpu.amount", "1")
conf = conf.set("spark.driver.resource.gpu.discoveryScript", self.tempFile.name)
self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf)

def test_resources(self):
"""Test the resources are available."""
resources = self.sc.resources
self.assertEqual(len(resources), 1)
self.assertTrue('gpu' in resources)
self.assertEqual(resources['gpu'].name, 'gpu')
self.assertEqual(resources['gpu'].addresses, ['0'])

def tearDown(self):
os.unlink(self.tempFile.name)
self.sc.stop()


if __name__ == "__main__":
from pyspark.tests.test_context import *
Expand Down
38 changes: 38 additions & 0 deletions python/pyspark/tests/test_taskcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#
import os
import random
import stat
import sys
import tempfile
import time
import unittest

Expand All @@ -43,6 +45,15 @@ def test_stage_id(self):
self.assertEqual(stage1 + 2, stage3)
self.assertEqual(stage2 + 1, stage3)

def test_resources(self):
"""Test the resources are empty by default."""
rdd = self.sc.parallelize(range(10))
resources1 = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0]
# Test using the constructor directly rather than the get()
resources2 = rdd.map(lambda x: TaskContext().resources()).take(1)[0]
self.assertEqual(len(resources1), 0)
self.assertEqual(len(resources2), 0)

def test_partition_id(self):
"""Test the partition id."""
rdd1 = self.sc.parallelize(range(10), 1)
Expand Down Expand Up @@ -174,6 +185,33 @@ def tearDown(self):
self.sc.stop()


class TaskContextTestsWithResources(unittest.TestCase):

def setUp(self):
class_name = self.__class__.__name__
self.tempFile = tempfile.NamedTemporaryFile(delete=False)
self.tempFile.write(b'echo {\\"name\\": \\"gpu\\", \\"addresses\\": [\\"0\\"]}')
self.tempFile.close()
os.chmod(self.tempFile.name, stat.S_IRWXU | stat.S_IXGRP | stat.S_IRGRP |
stat.S_IROTH | stat.S_IXOTH)
conf = SparkConf().set("spark.task.resource.gpu.amount", "1")
conf = conf.set("spark.executor.resource.gpu.amount", "1")
conf = conf.set("spark.executor.resource.gpu.discoveryScript", self.tempFile.name)
self.sc = SparkContext('local-cluster[2,1,1024]', class_name, conf=conf)

def test_resources(self):
"""Test the resources are available."""
rdd = self.sc.parallelize(range(10))
resources = rdd.map(lambda x: TaskContext.get().resources()).take(1)[0]
self.assertEqual(len(resources), 1)
self.assertTrue('gpu' in resources)
self.assertEqual(resources['gpu'].name, 'gpu')
self.assertEqual(resources['gpu'].addresses, ['0'])

def tearDown(self):
os.unlink(self.tempFile.name)
self.sc.stop()

if __name__ == "__main__":
import unittest
from pyspark.tests.test_taskcontext import *
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from pyspark.java_gateway import local_connect_and_auth
from pyspark.taskcontext import BarrierTaskContext, TaskContext
from pyspark.files import SparkFiles
from pyspark.resourceinformation import ResourceInformation
from pyspark.rdd import PythonEvalType
from pyspark.serializers import write_with_length, write_int, read_long, read_bool, \
write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \
Expand Down Expand Up @@ -432,6 +433,16 @@ def main(infile, outfile):
taskContext._partitionId = read_int(infile)
taskContext._attemptNumber = read_int(infile)
taskContext._taskAttemptId = read_long(infile)
taskContext._resources = {}
for r in range(read_int(infile)):
key = utf8_deserializer.loads(infile)
name = utf8_deserializer.loads(infile)
addresses = []
taskContext._resources = {}
for a in range(read_int(infile)):
addresses.append(utf8_deserializer.loads(infile))
taskContext._resources[key] = ResourceInformation(name, addresses)

taskContext._localProperties = dict()
for i in range(read_int(infile)):
k = utf8_deserializer.loads(infile)
Expand Down