Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Contents:
pyspark.streaming
pyspark.ml
pyspark.mllib
pyspark.resource


Core classes:
Expand Down
11 changes: 11 additions & 0 deletions python/docs/pyspark.resource.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
pyspark.resource module
=======================

Module Contents
---------------

.. automodule:: pyspark.resource
:members:
:undoc-members:
:inherited-members:

1 change: 1 addition & 0 deletions python/docs/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Subpackages
pyspark.streaming
pyspark.ml
pyspark.mllib
pyspark.resource

Contents
--------
Expand Down
4 changes: 2 additions & 2 deletions python/docs/pyspark.sql.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pyspark.sql module
==================

Module Context
--------------
Module Contents
---------------

.. automodule:: pyspark.sql
:members:
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from pyspark.storagelevel import StorageLevel
from pyspark.accumulators import Accumulator, AccumulatorParam
from pyspark.broadcast import Broadcast
from pyspark.resourceinformation import ResourceInformation
from pyspark.resource.information import ResourceInformation
from pyspark.serializers import MarshalSerializer, PickleSerializer
from pyspark.status import *
from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, AutoBatchedSerializer, NoOpSerializer, ChunkedStream
from pyspark.storagelevel import StorageLevel
from pyspark.resourceinformation import ResourceInformation
from pyspark.resource.information import ResourceInformation
from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix
from pyspark.traceback_utils import CallSite, first_spark_call
from pyspark.status import StatusTracker
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,8 @@
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resource.executorrequests import ExecutorResourceRequests
from pyspark.resource.resourceprofile import ResourceProfile
from pyspark.resource.taskrequests import TaskResourceRequests
from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests
from pyspark.resource.profile import ResourceProfile
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, ExternalMerger, \
get_used_memory, ExternalSorter, ExternalGroupBy
Expand Down
11 changes: 6 additions & 5 deletions python/pyspark/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
"""
APIs to let users manipulate resource requirements.
"""
from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder
from pyspark.resource.resourceprofile import ResourceProfile
from pyspark.resource.information import ResourceInformation
from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
ExecutorResourceRequest, ExecutorResourceRequests
from pyspark.resource.profile import ResourceProfile, ResourceProfileBuilder

__all__ = [
"TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest",
"ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder",
"ExecutorResourceRequests", "ResourceProfile", "ResourceInformation",
"ResourceProfileBuilder",
]
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ class ResourceInformation(object):

One example is GPUs, where the addresses would be the indices of the GPUs

@param name the name of the resource
@param addresses an array of strings describing the addresses of the resource
:param name: the name of the resource
:param addresses: an array of strings describing the addresses of the resource

.. versionadded:: 3.0.0
Copy link
Member Author

@HyukjinKwon HyukjinKwon May 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will open a PR to backport for this change specifically for branch-3.0. This PR itself should go to master only.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make a PR to branch-3.0 for this part.

"""

def __init__(self, name, addresses):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,61 @@
# limitations under the License.
#

from pyspark.resource.executorrequests import ExecutorResourceRequest,\
ExecutorResourceRequests
from pyspark.resource.resourceprofile import ResourceProfile
from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests
from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \
ExecutorResourceRequests, ExecutorResourceRequest


class ResourceProfile(object):

"""
.. note:: Evolving

Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile`
allows the user to specify executor and task requirements for an RDD that will get
applied during a stage. This allows the user to change the resource requirements between
stages. This is meant to be immutable so user cannot change it after building.

.. versionadded:: 3.1.0
"""

def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}):
if _java_resource_profile is not None:
self._java_resource_profile = _java_resource_profile
else:
self._java_resource_profile = None
self._executor_resource_requests = _exec_req
self._task_resource_requests = _task_req

@property
def id(self):
if self._java_resource_profile is not None:
return self._java_resource_profile.id()
else:
raise RuntimeError("SparkContext must be created to get the id, get the id "
"after adding the ResourceProfile to an RDD")

@property
def taskResources(self):
if self._java_resource_profile is not None:
taskRes = self._java_resource_profile.taskResourcesJMap()
result = {}
for k, v in taskRes.items():
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
return result
else:
return self._task_resource_requests

@property
def executorResources(self):
if self._java_resource_profile is not None:
execRes = self._java_resource_profile.executorResourcesJMap()
result = {}
for k, v in execRes.items():
result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(),
v.discoveryScript(), v.vendor())
return result
else:
return self._executor_resource_requests


class ResourceProfileBuilder(object):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# limitations under the License.
#

from pyspark.resource.taskrequests import TaskResourceRequest
from pyspark.util import _parse_memory


Expand Down Expand Up @@ -167,3 +166,89 @@ def requests(self):
return result
else:
return self._executor_resources


class TaskResourceRequest(object):
"""
.. note:: Evolving

A task resource request. This is used in conjuntion with the
:class:`pyspark.resource.ResourceProfile` to programmatically specify the resources
needed for an RDD that will be applied at the stage level. The amount is specified
as a Double to allow for saying you want more than 1 task per resource. Valid values
are less than or equal to 0.5 or whole numbers.
Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API.

:param resourceName: Name of the resource
:param amount: Amount requesting as a Double to support fractional resource requests.
Valid values are less than or equal to 0.5 or whole numbers.

.. versionadded:: 3.1.0
"""
def __init__(self, resourceName, amount):
self._name = resourceName
self._amount = float(amount)

@property
def resourceName(self):
return self._name

@property
def amount(self):
return self._amount


class TaskResourceRequests(object):

"""
.. note:: Evolving

A set of task resource requests. This is used in conjuntion with the
:class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources
needed for an RDD that will be applied at the stage level.

.. versionadded:: 3.1.0
"""

_CPUS = "cpus"

def __init__(self, _jvm=None, _requests=None):
from pyspark import SparkContext
_jvm = _jvm or SparkContext._jvm
if _jvm is not None:
self._java_task_resource_requests = \
SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests()
if _requests is not None:
for k, v in _requests.items():
if k == self._CPUS:
self._java_task_resource_requests.cpus(int(v.amount))
else:
self._java_task_resource_requests.resource(v.resourceName, v.amount)
else:
self._java_task_resource_requests = None
self._task_resources = {}

def cpus(self, amount):
if self._java_task_resource_requests is not None:
self._java_task_resource_requests.cpus(amount)
else:
self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount)
return self

def resource(self, resourceName, amount):
if self._java_task_resource_requests is not None:
self._java_task_resource_requests.resource(resourceName, float(amount))
else:
self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount)
return self

@property
def requests(self):
if self._java_task_resource_requests is not None:
result = {}
taskRes = self._java_task_resource_requests.requestsJMap()
for k, v in taskRes.items():
result[k] = TaskResourceRequest(v.resourceName(), v.amount())
return result
else:
return self._task_resources
72 changes: 0 additions & 72 deletions python/pyspark/resource/resourceprofile.py

This file was deleted.

Loading