Skip to content

Commit

Permalink
Revert "Revert "Removing Pyarrow dependency (ray-project#7146)" (ray-…
Browse files Browse the repository at this point in the history
  • Loading branch information
simon-mo authored Feb 19, 2020
1 parent f76ce83 commit e8941b1
Show file tree
Hide file tree
Showing 20 changed files with 153 additions and 382 deletions.
8 changes: 0 additions & 8 deletions build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,6 @@ fi

pushd "$BUILD_DIR"

# The following line installs pyarrow from S3, these wheels have been
# generated from https://github.com/ray-project/arrow-build from
# the commit listed in the command.
if [ -z "$SKIP_THIRDPARTY_INSTALL" ]; then
"$PYTHON_EXECUTABLE" -m pip install -q \
--target="$ROOT_DIR/python/ray/pyarrow_files" pyarrow==0.14.0.RAY \
--find-links https://s3-us-west-2.amazonaws.com/arrow-wheels/3a11193d9530fe8ec7fdb98057f853b708f6f6ae/index.html
fi

WORK_DIR=`mktemp -d`
pushd $WORK_DIR
Expand Down
2 changes: 1 addition & 1 deletion ci/travis/format.sh
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ YAPF_FLAGS=(
YAPF_EXCLUDES=(
'--exclude' 'python/ray/cloudpickle/*'
'--exclude' 'python/build/*'
'--exclude' 'python/ray/pyarrow_files/*'
'--exclude' 'python/ray/core/src/ray/gcs/*'
'--exclude' 'python/ray/thirdparty_files/*'
)
Expand Down Expand Up @@ -145,6 +144,7 @@ fi
# Ensure import ordering
# Make sure that for every import psutil; import setpproctitle
# There's a import ray above it.

python ci/travis/check_import_order.py . -s ci -s python/ray/pyarrow_files -s python/ray/thirdparty_files -s python/build

if ! git diff --quiet &>/dev/null; then
Expand Down
1 change: 0 additions & 1 deletion doc/requirements-doc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ mock
numpy
opencv-python-headless
pandas
pyarrow
pygments
psutil
pyyaml
Expand Down
47 changes: 0 additions & 47 deletions python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,8 @@
from ctypes import CDLL
CDLL(so_path, ctypes.RTLD_GLOBAL)

# MUST import ray._raylet before pyarrow to initialize some global variables.
# It seems the library related to memory allocation in pyarrow will destroy the
# initialization of grpc if we import pyarrow at first.
# NOTE(JoeyJiang): See https://github.com/ray-project/ray/issues/5219 for more
# details.
import ray._raylet # noqa: E402

if "pyarrow" in sys.modules:
raise ImportError("Ray must be imported before pyarrow because Ray "
"requires a specific version of pyarrow (which is "
"packaged along with Ray).")

# Add the directory containing pyarrow to the Python path so that we find the
# pyarrow version packaged with ray and not a pre-existing pyarrow.
pyarrow_path = os.path.join(
os.path.abspath(os.path.dirname(__file__)), "pyarrow_files")
sys.path.insert(0, pyarrow_path)

# See https://github.com/ray-project/ray/issues/131.
helpful_message = """
Expand All @@ -64,37 +48,6 @@
conda install libgcc
"""

try:
import pyarrow # noqa: F401

# pyarrow is not imported inside of _raylet because of the issue described
# above. In order for Cython to compile _raylet, pyarrow is set to None
# in _raylet instead, so we give _raylet a real reference to it here.
# We first do the attribute checks here so that building the documentation
# succeeds without fully installing ray..
# TODO(edoakes): Fix this.
if hasattr(ray, "_raylet") and hasattr(ray._raylet, "pyarrow"):
ray._raylet.pyarrow = pyarrow
except ImportError as e:
if ((hasattr(e, "msg") and isinstance(e.msg, str)
and ("libstdc++" in e.msg or "CXX" in e.msg))):
# This code path should be taken with Python 3.
e.msg += helpful_message
elif (hasattr(e, "message") and isinstance(e.message, str)
and ("libstdc++" in e.message or "CXX" in e.message)):
# This code path should be taken with Python 2.
condition = (hasattr(e, "args") and isinstance(e.args, tuple)
and len(e.args) == 1 and isinstance(e.args[0], str))
if condition:
e.args = (e.args[0] + helpful_message, )
else:
if not hasattr(e, "args"):
e.args = ()
elif not isinstance(e.args, tuple):
e.args = (e.args, )
e.args += (helpful_message, )
raise

from ray._raylet import (
ActorCheckpointID,
ActorClassID,
Expand Down
1 change: 1 addition & 0 deletions python/ray/_raylet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ cdef class CoreWorker:
unique_ptr[CCoreWorker] core_worker
object async_thread
object async_event_loop
object plasma_event_handler

cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata,
size_t data_size, ObjectID object_id,
Expand Down
43 changes: 25 additions & 18 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,6 @@ from ray.ray_constants import (
DEFAULT_PUT_OBJECT_RETRIES,
)

# pyarrow cannot be imported until after _raylet finishes initializing
# (see ray/__init__.py for details).
# Unfortunately, Cython won't compile if 'pyarrow' is undefined, so we
# "forward declare" it here and then replace it with a reference to the
# imported package from ray/__init__.py.
# TODO(edoakes): Fix this.
pyarrow = None

cimport cpython

include "includes/unique_ids.pxi"
Expand Down Expand Up @@ -552,6 +544,14 @@ cdef CRayStatus task_execution_handler(

return CRayStatus.OK()

cdef void async_plasma_callback(CObjectID object_id,
int64_t data_size,
int64_t metadata_size) with gil:
message = [tuple([ObjectID(object_id.Binary()), data_size, metadata_size])]
core_worker = ray.worker.global_worker.core_worker
event_handler = core_worker.get_plasma_event_handler()
if event_handler is not None:
event_handler.process_notifications(message)

cdef CRayStatus check_signals() nogil:
with gil:
Expand All @@ -574,17 +574,20 @@ cdef shared_ptr[CBuffer] string_to_buffer(c_string& c_str):

cdef write_serialized_object(
serialized_object, const shared_ptr[CBuffer]& buf):
# avoid initializing pyarrow before raylet
from ray.serialization import Pickle5SerializedObject, RawSerializedObject

if isinstance(serialized_object, RawSerializedObject):
if buf.get() != NULL and buf.get().Size() > 0:
buffer = Buffer.make(buf)
# `Buffer` has a nullptr buffer underlying if size is 0,
# which will cause `pyarrow.py_buffer` crash
stream = pyarrow.FixedSizeBufferWriter(pyarrow.py_buffer(buffer))
stream.set_memcopy_threads(MEMCOPY_THREADS)
stream.write(pyarrow.py_buffer(serialized_object.value))
size = serialized_object.total_bytes
if MEMCOPY_THREADS > 1 and size > kMemcopyDefaultThreshold:
parallel_memcopy(buf.get().Data(),
<const uint8_t*> serialized_object.value,
size, kMemcopyDefaultBlocksize,
MEMCOPY_THREADS)
else:
memcpy(buf.get().Data(),
<const uint8_t*>serialized_object.value, size)

elif isinstance(serialized_object, Pickle5SerializedObject):
(<Pickle5Writer>serialized_object.writer).write_to(
serialized_object.inband, buf, MEMCOPY_THREADS)
Expand All @@ -597,9 +600,6 @@ cdef class CoreWorker:
def __cinit__(self, is_driver, store_socket, raylet_socket,
JobID job_id, GcsClientOptions gcs_options, log_dir,
node_ip_address, node_manager_port):
assert pyarrow is not None, ("Expected pyarrow to be imported from "
"outside _raylet. See __init__.py for "
"details.")

self.core_worker.reset(new CCoreWorker(
WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER,
Expand Down Expand Up @@ -628,6 +628,13 @@ cdef class CoreWorker:
def set_actor_title(self, title):
self.core_worker.get().SetActorTitle(title)

def subscribe_to_plasma(self, plasma_event_handler):
self.plasma_event_handler = plasma_event_handler
self.core_worker.get().SubscribeToAsyncPlasma(async_plasma_callback)

def get_plasma_event_handler(self):
return self.plasma_event_handler

def get_objects(self, object_ids, TaskID current_task_id,
int64_t timeout_ms=-1):
cdef:
Expand Down
6 changes: 6 additions & 0 deletions python/ray/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ class RayTimeoutError(RayError):
pass


class PlasmaObjectNotAvailable(RayError):
"""Called when an object was not available within the given timeout."""
pass


RAY_EXCEPTION_TYPES = [
PlasmaObjectNotAvailable,
RayError,
RayTaskError,
RayWorkerError,
Expand Down
73 changes: 4 additions & 69 deletions python/ray/experimental/async_api.py
Original file line number Diff line number Diff line change
@@ -1,84 +1,22 @@
# Note: asyncio is only compatible with Python 3

import asyncio
import functools
import threading

import pyarrow.plasma as plasma

import ray
from ray.experimental.async_plasma import PlasmaProtocol, PlasmaEventHandler
from ray.experimental.async_plasma import PlasmaEventHandler
from ray.services import logger

handler = None
transport = None
protocol = None


class _ThreadSafeProxy:
"""This class is used to create a thread-safe proxy for a given object.
Every method call will be guarded with a lock.
Attributes:
orig_obj (object): the original object.
lock (threading.Lock): the lock object.
_wrapper_cache (dict): a cache from original object's methods to
the proxy methods.
"""

def __init__(self, orig_obj, lock):
self.orig_obj = orig_obj
self.lock = lock
self._wrapper_cache = {}

def __getattr__(self, attr):
orig_attr = getattr(self.orig_obj, attr)
if not callable(orig_attr):
# If the original attr is a field, just return it.
return orig_attr
else:
# If the orginal attr is a method,
# return a wrapper that guards the original method with a lock.
wrapper = self._wrapper_cache.get(attr)
if wrapper is None:

@functools.wraps(orig_attr)
def _wrapper(*args, **kwargs):
with self.lock:
return orig_attr(*args, **kwargs)

self._wrapper_cache[attr] = _wrapper
wrapper = _wrapper
return wrapper


def thread_safe_client(client, lock=None):
"""Create a thread-safe proxy which locks every method call
for the given client.
Args:
client: the client object to be guarded.
lock: the lock object that will be used to lock client's methods.
If None, a new lock will be used.
Returns:
A thread-safe proxy for the given client.
"""
if lock is None:
lock = threading.Lock()
return _ThreadSafeProxy(client, lock)


async def _async_init():
global handler, transport, protocol
global handler
if handler is None:
worker = ray.worker.global_worker
plasma_client = thread_safe_client(
plasma.connect(worker.node.plasma_store_socket_name, 300))
loop = asyncio.get_event_loop()
plasma_client.subscribe()
rsock = plasma_client.get_notification_socket()
handler = PlasmaEventHandler(loop, worker)
transport, protocol = await loop.create_connection(
lambda: PlasmaProtocol(plasma_client, handler), sock=rsock)
worker.core_worker.subscribe_to_plasma(handler)
logger.debug("AsyncPlasma Connection Created!")


Expand Down Expand Up @@ -126,10 +64,7 @@ def shutdown():
Cancels all related tasks and all the socket transportation.
"""
global handler, transport, protocol
global handler
if handler is not None:
handler.close()
transport.close()
handler = None
transport = None
protocol = None
Loading

0 comments on commit e8941b1

Please sign in to comment.