diff --git a/python/docs/index.rst b/python/docs/index.rst index 0e7b62361802a..6e059264e6bbb 100644 --- a/python/docs/index.rst +++ b/python/docs/index.rst @@ -16,6 +16,7 @@ Contents: pyspark.streaming pyspark.ml pyspark.mllib + pyspark.resource Core classes: diff --git a/python/docs/pyspark.resource.rst b/python/docs/pyspark.resource.rst new file mode 100644 index 0000000000000..7f3a79b9e5b52 --- /dev/null +++ b/python/docs/pyspark.resource.rst @@ -0,0 +1,11 @@ +pyspark.resource module +======================= + +Module Contents +--------------- + +.. automodule:: pyspark.resource + :members: + :undoc-members: + :inherited-members: + diff --git a/python/docs/pyspark.rst b/python/docs/pyspark.rst index 0df12c49ad033..402d6ce9eb016 100644 --- a/python/docs/pyspark.rst +++ b/python/docs/pyspark.rst @@ -11,6 +11,7 @@ Subpackages pyspark.streaming pyspark.ml pyspark.mllib + pyspark.resource Contents -------- diff --git a/python/docs/pyspark.sql.rst b/python/docs/pyspark.sql.rst index b69562e845920..406ada701941a 100644 --- a/python/docs/pyspark.sql.rst +++ b/python/docs/pyspark.sql.rst @@ -1,8 +1,8 @@ pyspark.sql module ================== -Module Context --------------- +Module Contents +--------------- .. automodule:: pyspark.sql :members: diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 70c0b27a6aa33..2fb6f503359a8 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -54,7 +54,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast -from pyspark.resourceinformation import ResourceInformation +from pyspark.resource.information import ResourceInformation from pyspark.serializers import MarshalSerializer, PickleSerializer from pyspark.status import * from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6cc343e3e495c..4f29f2f0be1e8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -35,7 +35,7 @@ from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \ PairDeserializer, AutoBatchedSerializer, NoOpSerializer, ChunkedStream from pyspark.storagelevel import StorageLevel -from pyspark.resourceinformation import ResourceInformation +from pyspark.resource.information import ResourceInformation from pyspark.rdd import RDD, _load_from_socket, ignore_unicode_prefix from pyspark.traceback_utils import CallSite, first_spark_call from pyspark.status import StatusTracker diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index d0ac000ba3208..db0c1971cd2fe 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -47,9 +47,8 @@ from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler from pyspark.storagelevel import StorageLevel -from pyspark.resource.executorrequests import ExecutorResourceRequests -from pyspark.resource.resourceprofile import ResourceProfile -from pyspark.resource.taskrequests import TaskResourceRequests +from pyspark.resource.requests import ExecutorResourceRequests, TaskResourceRequests +from pyspark.resource.profile import ResourceProfile from pyspark.resultiterable import ResultIterable from pyspark.shuffle import Aggregator, ExternalMerger, \ get_used_memory, ExternalSorter, ExternalGroupBy diff --git a/python/pyspark/resource/__init__.py b/python/pyspark/resource/__init__.py index 89070ec4adc7e..b5f4c4a6b1825 100644 --- a/python/pyspark/resource/__init__.py +++ b/python/pyspark/resource/__init__.py @@ -18,12 +18,13 @@ """ APIs to let users manipulate resource requirements. """ -from pyspark.resource.executorrequests import ExecutorResourceRequest, ExecutorResourceRequests -from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests -from pyspark.resource.resourceprofilebuilder import ResourceProfileBuilder -from pyspark.resource.resourceprofile import ResourceProfile +from pyspark.resource.information import ResourceInformation +from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \ + ExecutorResourceRequest, ExecutorResourceRequests +from pyspark.resource.profile import ResourceProfile, ResourceProfileBuilder __all__ = [ "TaskResourceRequest", "TaskResourceRequests", "ExecutorResourceRequest", - "ExecutorResourceRequests", "ResourceProfile", "ResourceProfileBuilder", + "ExecutorResourceRequests", "ResourceProfile", "ResourceInformation", + "ResourceProfileBuilder", ] diff --git a/python/pyspark/resourceinformation.py b/python/pyspark/resource/information.py similarity index 89% rename from python/pyspark/resourceinformation.py rename to python/pyspark/resource/information.py index aaed21374b6ee..b0e41cced85b5 100644 --- a/python/pyspark/resourceinformation.py +++ b/python/pyspark/resource/information.py @@ -26,8 +26,10 @@ class ResourceInformation(object): One example is GPUs, where the addresses would be the indices of the GPUs - @param name the name of the resource - @param addresses an array of strings describing the addresses of the resource + :param name: the name of the resource + :param addresses: an array of strings describing the addresses of the resource + + .. versionadded:: 3.0.0 """ def __init__(self, name, addresses): diff --git a/python/pyspark/resource/resourceprofilebuilder.py b/python/pyspark/resource/profile.py similarity index 69% rename from python/pyspark/resource/resourceprofilebuilder.py rename to python/pyspark/resource/profile.py index 67654289d500f..3f6ae1ddd5e30 100644 --- a/python/pyspark/resource/resourceprofilebuilder.py +++ b/python/pyspark/resource/profile.py @@ -15,10 +15,61 @@ # limitations under the License. # -from pyspark.resource.executorrequests import ExecutorResourceRequest,\ - ExecutorResourceRequests -from pyspark.resource.resourceprofile import ResourceProfile -from pyspark.resource.taskrequests import TaskResourceRequest, TaskResourceRequests +from pyspark.resource.requests import TaskResourceRequest, TaskResourceRequests, \ + ExecutorResourceRequests, ExecutorResourceRequest + + +class ResourceProfile(object): + + """ + .. note:: Evolving + + Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile` + allows the user to specify executor and task requirements for an RDD that will get + applied during a stage. This allows the user to change the resource requirements between + stages. This is meant to be immutable so user cannot change it after building. + + .. versionadded:: 3.1.0 + """ + + def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}): + if _java_resource_profile is not None: + self._java_resource_profile = _java_resource_profile + else: + self._java_resource_profile = None + self._executor_resource_requests = _exec_req + self._task_resource_requests = _task_req + + @property + def id(self): + if self._java_resource_profile is not None: + return self._java_resource_profile.id() + else: + raise RuntimeError("SparkContext must be created to get the id, get the id " + "after adding the ResourceProfile to an RDD") + + @property + def taskResources(self): + if self._java_resource_profile is not None: + taskRes = self._java_resource_profile.taskResourcesJMap() + result = {} + for k, v in taskRes.items(): + result[k] = TaskResourceRequest(v.resourceName(), v.amount()) + return result + else: + return self._task_resource_requests + + @property + def executorResources(self): + if self._java_resource_profile is not None: + execRes = self._java_resource_profile.executorResourcesJMap() + result = {} + for k, v in execRes.items(): + result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(), + v.discoveryScript(), v.vendor()) + return result + else: + return self._executor_resource_requests class ResourceProfileBuilder(object): diff --git a/python/pyspark/resource/executorrequests.py b/python/pyspark/resource/requests.py similarity index 70% rename from python/pyspark/resource/executorrequests.py rename to python/pyspark/resource/requests.py index 91a195c94b6e5..56ad6e8be9bcb 100644 --- a/python/pyspark/resource/executorrequests.py +++ b/python/pyspark/resource/requests.py @@ -15,7 +15,6 @@ # limitations under the License. # -from pyspark.resource.taskrequests import TaskResourceRequest from pyspark.util import _parse_memory @@ -167,3 +166,89 @@ def requests(self): return result else: return self._executor_resources + + +class TaskResourceRequest(object): + """ + .. note:: Evolving + + A task resource request. This is used in conjuntion with the + :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources + needed for an RDD that will be applied at the stage level. The amount is specified + as a Double to allow for saying you want more than 1 task per resource. Valid values + are less than or equal to 0.5 or whole numbers. + Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API. + + :param resourceName: Name of the resource + :param amount: Amount requesting as a Double to support fractional resource requests. + Valid values are less than or equal to 0.5 or whole numbers. + + .. versionadded:: 3.1.0 + """ + def __init__(self, resourceName, amount): + self._name = resourceName + self._amount = float(amount) + + @property + def resourceName(self): + return self._name + + @property + def amount(self): + return self._amount + + +class TaskResourceRequests(object): + + """ + .. note:: Evolving + + A set of task resource requests. This is used in conjuntion with the + :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources + needed for an RDD that will be applied at the stage level. + + .. versionadded:: 3.1.0 + """ + + _CPUS = "cpus" + + def __init__(self, _jvm=None, _requests=None): + from pyspark import SparkContext + _jvm = _jvm or SparkContext._jvm + if _jvm is not None: + self._java_task_resource_requests = \ + SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests() + if _requests is not None: + for k, v in _requests.items(): + if k == self._CPUS: + self._java_task_resource_requests.cpus(int(v.amount)) + else: + self._java_task_resource_requests.resource(v.resourceName, v.amount) + else: + self._java_task_resource_requests = None + self._task_resources = {} + + def cpus(self, amount): + if self._java_task_resource_requests is not None: + self._java_task_resource_requests.cpus(amount) + else: + self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount) + return self + + def resource(self, resourceName, amount): + if self._java_task_resource_requests is not None: + self._java_task_resource_requests.resource(resourceName, float(amount)) + else: + self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount) + return self + + @property + def requests(self): + if self._java_task_resource_requests is not None: + result = {} + taskRes = self._java_task_resource_requests.requestsJMap() + for k, v in taskRes.items(): + result[k] = TaskResourceRequest(v.resourceName(), v.amount()) + return result + else: + return self._task_resources diff --git a/python/pyspark/resource/resourceprofile.py b/python/pyspark/resource/resourceprofile.py deleted file mode 100644 index 59e9ccb4b6ea0..0000000000000 --- a/python/pyspark/resource/resourceprofile.py +++ /dev/null @@ -1,72 +0,0 @@ - -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from pyspark.resource.taskrequests import TaskResourceRequest -from pyspark.resource.executorrequests import ExecutorResourceRequest - - -class ResourceProfile(object): - - """ - .. note:: Evolving - - Resource profile to associate with an RDD. A :class:`pyspark.resource.ResourceProfile` - allows the user to specify executor and task requirements for an RDD that will get - applied during a stage. This allows the user to change the resource requirements between - stages. This is meant to be immutable so user doesn't change it after building. - - .. versionadded:: 3.1.0 - """ - - def __init__(self, _java_resource_profile=None, _exec_req={}, _task_req={}): - if _java_resource_profile is not None: - self._java_resource_profile = _java_resource_profile - else: - self._java_resource_profile = None - self._executor_resource_requests = _exec_req - self._task_resource_requests = _task_req - - @property - def id(self): - if self._java_resource_profile is not None: - return self._java_resource_profile.id() - else: - raise RuntimeError("SparkContext must be created to get the id, get the id " - "after adding the ResourceProfile to an RDD") - - @property - def taskResources(self): - if self._java_resource_profile is not None: - taskRes = self._java_resource_profile.taskResourcesJMap() - result = {} - for k, v in taskRes.items(): - result[k] = TaskResourceRequest(v.resourceName(), v.amount()) - return result - else: - return self._task_resource_requests - - @property - def executorResources(self): - if self._java_resource_profile is not None: - execRes = self._java_resource_profile.executorResourcesJMap() - result = {} - for k, v in execRes.items(): - result[k] = ExecutorResourceRequest(v.resourceName(), v.amount(), - v.discoveryScript(), v.vendor()) - return result - else: - return self._executor_resource_requests diff --git a/python/pyspark/resource/taskrequests.py b/python/pyspark/resource/taskrequests.py deleted file mode 100644 index e8dca98d14b61..0000000000000 --- a/python/pyspark/resource/taskrequests.py +++ /dev/null @@ -1,102 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -class TaskResourceRequest(object): - """ - .. note:: Evolving - - A task resource request. This is used in conjuntion with the - :class:`pyspark.resource.ResourceProfile` to programmatically specify the resources - needed for an RDD that will be applied at the stage level. The amount is specified - as a Double to allow for saying you want more then 1 task per resource. Valid values - are less than or equal to 0.5 or whole numbers. - Use :class:`pyspark.resource.TaskResourceRequests` class as a convenience API. - - :param resourceName: Name of the resource - :param amount: Amount requesting as a Double to support fractional resource requests. - Valid values are less than or equal to 0.5 or whole numbers. - - .. versionadded:: 3.1.0 - """ - def __init__(self, resourceName, amount): - self._name = resourceName - self._amount = float(amount) - - @property - def resourceName(self): - return self._name - - @property - def amount(self): - return self._amount - - -class TaskResourceRequests(object): - - """ - .. note:: Evolving - - A set of task resource requests. This is used in conjuntion with the - :class:`pyspark.resource.ResourceProfileBuilder` to programmatically specify the resources - needed for an RDD that will be applied at the stage level. - - .. versionadded:: 3.1.0 - """ - - _CPUS = "cpus" - - def __init__(self, _jvm=None, _requests=None): - from pyspark import SparkContext - _jvm = _jvm or SparkContext._jvm - if _jvm is not None: - self._java_task_resource_requests = \ - SparkContext._jvm.org.apache.spark.resource.TaskResourceRequests() - if _requests is not None: - for k, v in _requests.items(): - if k == self._CPUS: - self._java_task_resource_requests.cpus(int(v.amount)) - else: - self._java_task_resource_requests.resource(v.resourceName, v.amount) - else: - self._java_task_resource_requests = None - self._task_resources = {} - - def cpus(self, amount): - if self._java_task_resource_requests is not None: - self._java_task_resource_requests.cpus(amount) - else: - self._task_resources[self._CPUS] = TaskResourceRequest(self._CPUS, amount) - return self - - def resource(self, resourceName, amount): - if self._java_task_resource_requests is not None: - self._java_task_resource_requests.resource(resourceName, float(amount)) - else: - self._task_resources[resourceName] = TaskResourceRequest(resourceName, amount) - return self - - @property - def requests(self): - if self._java_task_resource_requests is not None: - result = {} - taskRes = self._java_task_resource_requests.requestsJMap() - for k, v in taskRes.items(): - result[k] = TaskResourceRequest(v.resourceName(), v.amount()) - return result - else: - return self._task_resources diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 988941e7550b9..5f4a8a2d2db1f 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -36,7 +36,7 @@ from pyspark.java_gateway import local_connect_and_auth from pyspark.taskcontext import BarrierTaskContext, TaskContext from pyspark.files import SparkFiles -from pyspark.resourceinformation import ResourceInformation +from pyspark.resource import ResourceInformation from pyspark.rdd import PythonEvalType from pyspark.serializers import write_with_length, write_int, read_long, read_bool, \ write_long, read_int, SpecialLengths, UTF8Deserializer, PickleSerializer, \