diff --git a/cpp/apidoc/index.md b/cpp/apidoc/index.md index 8389d16b4aa..ab9bbaa405a 100644 --- a/cpp/apidoc/index.md +++ b/cpp/apidoc/index.md @@ -39,6 +39,7 @@ Table of Contents * How to access [HDFS](HDFS.md) * Tutorials * [Convert a vector of row-wise data into an Arrow table](tutorials/row_wise_conversion.md) + * [Using the Plasma In-Memory Object Store](tutorials/plasma.md) Getting Started --------------- diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md new file mode 100644 index 00000000000..9911546ed5c --- /dev/null +++ b/cpp/apidoc/tutorials/plasma.md @@ -0,0 +1,442 @@ + + +Using the Plasma In-Memory Object Store from C++ +================================================ + +Apache Arrow offers the ability to share your data structures among multiple +processes simultaneously through Plasma, an in-memory object store. + +Note that **the Plasma API is not stable**. + +Plasma clients are processes that run on the same machine as the object store. +They communicate with the object store over Unix domain sockets, and they read +and write data in the object store through shared memory. + +Plasma objects are immutable once they have been created. + +The following goes over the basics so you can begin using Plasma in your big +data applications. + +Starting the Plasma store +------------------------- + +To start running the Plasma object store so that clients may +connect and access the data, run the following command: + +``` +plasma_store -m 1000000000 -s /tmp/plasma +``` + +The `-m` flag specifies the size of the object store in bytes. The `-s` flag +specifies the path of the Unix domain socket that the store will listen at. + +Therefore, the above command initializes a Plasma store up to 1 GB of memory +and sets the socket to `/tmp/plasma.` + +The Plasma store will remain available as long as the `plasma_store` process is +running in a terminal window. Messages, such as alerts for disconnecting +clients, may occasionally be output. To stop running the Plasma store, you +can press `Ctrl-C` in the terminal window. + +Alternatively, you can run the Plasma store in the background and ignore all +message output with the following terminal command: + +``` +plasma_store -m 1000000000 -s /tmp/plasma 1> /dev/null 2> /dev/null & +``` + +The Plasma store will instead run silently in the background. To stop running +the Plasma store in this case, issue the command below: + +``` +killall plasma_store +``` + +Creating a Plasma client +------------------------ + +Now that the Plasma object store is up and running, it is time to make a client +process connect to it. To use the Plasma object store as a client, your +application should initialize a `plasma::PlasmaClient` object and tell it to +connect to the socket specified when starting up the Plasma object store. + +```cpp +#include + +using namespace plasma; + +int main(int argc, char** argv) { + // Start up and connect a Plasma client. + PlasmaClient client; + ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY)); + // Disconnect the Plasma client. + ARROW_CHECK_OK(client.Disconnect()); +} +``` + +Save this program in a file `test.cc` and compile it with + +``` +g++ test.cc `pkg-config --cflags --libs plasma` --std=c++11 +``` + +Note that multiple clients can be created within the same process. + +Note that a `PlasmaClient` object is **not thread safe**. + +If the Plasma store is still running, you can now execute the `a.out` executable +and the store will print something like + +``` +Disconnecting client on fd 5 +``` + +which shows that the client was successfully disconnected. + +Object IDs +---------- + +The Plasma object store uses twenty-byte identifiers for accessing objects +stored in shared memory. Each object in the Plasma store should be associated +with a unique ID. The Object ID is then a key that can be used by **any** client +to fetch that object from the Plasma store. + +Random generation of Object IDs is often good enough to ensure unique IDs: + +```cpp +// Randomly generate an Object ID. +ObjectID object_id = ObjectID::from_random(); +``` + +Now, any connected client that knows the object's Object ID can access the +same object from the Plasma object store. For easy transportation of Object IDs, +you can convert/serialize an Object ID into a binary string and back as +follows: + +```cpp +// From ObjectID to binary string +std:string id_string = object_id.binary(); + +// From binary string to ObjectID +ObjectID id_object = ObjectID::from_binary(&id_string); +``` + +You can also get a human readable representation of ObjectIDs in the same +format that git uses for commit hashes by running `ObjectID::hex`. + +Here is a test program you can run: + +```cpp +#include +#include +#include + +using namespace plasma; + +int main(int argc, char** argv) { + ObjectID object_id1 = ObjectID::from_random(); + std::cout << "object_id1 is " << object_id1.hex() << std::endl; + + std::string id_string = object_id1.binary(); + ObjectID object_id2 = ObjectID::from_binary(id_string); + std::cout << "object_id2 is " << object_id2.hex() << std::endl; +} +``` + +Creating an Object +------------------ + +Now that you learned about Object IDs that are used to refer to objects, +let's look at how objects can be stored in Plasma. + +Storing objects is a two-stage process. First a buffer is allocated with a call +to `Create`. Then it can be constructed in place by the client. Then it is made +immutable and shared with other clients via a call to `Seal`. + +The `Create` call blocks while the Plasma store allocates a buffer of the +appropriate size. The client will then map the buffer into its own address +space. At this point the object can be constructed in place using a pointer that +was written by the `Create` command. + +```cpp +int64_t data_size = 100; +// The address of the buffer allocated by the Plasma store will be written at +// this address. +uint8_t* data; +// Create a Plasma object by specifying its ID and size. +ARROW_CHECK_OK(client.Create(object_id, data_size, NULL, 0, &data)); +``` + +You can also specify metadata for the object; the third argument is the +metadata (as raw bytes) and the fourth argument is the size of the metadata. + +```cpp +// Create a Plasma object without metadata. +int64_t data_size = 100; +std::string metadata = "{'author': 'john'}"; +uint8_t* data; +client.Create(object_id, data_size, (uint8_t*) metadata.data(), metadata.size(), &data); +``` + +Now that we've obtained a pointer to our object's data, we can +write our data to it: + +```cpp +// Write some data for the Plasma object. +for (int64_t i = 0; i < data_size; i++) { + data[i] = static_cast(i % 4); +} +``` + +When the client is done, the client **seals** the buffer, making the object +immutable, and making it available to other Plasma clients: + +```cpp +// Seal the object. This makes it available for all clients. +client.Seal(object_id); +``` + +Here is an example that combines all these features: + +```cpp +#include + +using namespace plasma; + +int main(int argc, char** argv) { + // Start up and connect a Plasma client. + PlasmaClient client; + ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY)); + // Create an object with a random ObjectID. + ObjectID object_id = ObjectID::from_binary("00000000000000000000"); + int64_t data_size = 1000; + uint8_t *data; + std::string metadata = "{'author': 'john'}"; + ARROW_CHECK_OK(client.Create(object_id, data_size, (uint8_t*) metadata.data(), metadata.size(), &data)); + // Write some data into the object. + for (int64_t i = 0; i < data_size; i++) { + data[i] = static_cast(i % 4); + } + // Seal the object. + ARROW_CHECK_OK(client.Seal(object_id)); + // Disconnect the client. + ARROW_CHECK_OK(client.Disconnect()); +} +``` + +This example can be compiled with + +``` +g++ create.cc `pkg-config --cflags --libs plasma` --std=c++11 -o create +``` + +To verify that an object exists in the Plasma object store, you can +call `PlasmaClient::Contains()` to check if an object has +been created and sealed for a given Object ID. Note that this function +will still return False if the object has been created, but not yet +sealed: + +```cpp +// Check if an object has been created and sealed. +bool has_object; +client.Contains(object_id, &has_object); +if (has_object) { + // Object has been created and sealed, proceed +} +``` + +Getting an Object +----------------- + +After an object has been sealed, any client who knows the Object ID can get +the object. To store the retrieved object contents, you should create an +`ObjectBuffer`, then call `PlasmaClient::Get()` as follows: + +```cpp +// Get from the Plasma store by Object ID. +ObjectBuffer object_buffer; +client.Get(&object_id, 1, -1, &object_buffer); +``` + +`PlasmaClient::Get()` isn't limited to fetching a single object +from the Plasma store at once. You can specify an array of Object IDs and +`ObjectBuffers` to fetch at once, so long as you also specify the +number of objects being fetched: + +```cpp +// Get two objects at once from the Plasma store. This function +// call will block until both objects have been fetched. +ObjectBuffer multiple_buffers[2]; +ObjectID multiple_ids[2] = {object_id1, object_id2}; +client.Get(multiple_ids, 2, -1, multiple_buffers); +``` + +Since `PlasmaClient::Get()` is a blocking function call, it may be +necessary to limit the amount of time the function is allowed to take +when trying to fetch from the Plasma store. You can pass in a timeout +in milliseconds when calling `PlasmaClient::Get().` To use `PlasmaClient::Get()` +without a timeout, just pass in -1 like in the previous example calls: + +```cpp +// Make the function call give up fetching the object if it takes +// more than 100 milliseconds. +int64_t timeout = 100; +client.Get(&object_id, 1, timeout, &object_buffer); +``` + +Finally, to access the object, you can access the `data` and +`metadata` attributes of the `ObjectBuffer`. The `data` can be indexed +like any array: + +```cpp +// Access object data. +uint8_t* data = object_buffer.data; +int64_t data_size = object_buffer.data_size; + +// Access object metadata. +uint8_t* metadata = object_buffer.metadata; +uint8_t metadata_size = object_buffer.metadata_size; + +// Index into data array. +uint8_t first_data_byte = data[0]; +``` + +Here is a longer example that shows these capabilities: + +```cpp +#include + +using namespace plasma; + +int main(int argc, char** argv) { + // Start up and connect a Plasma client. + PlasmaClient client; + ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY)); + ObjectID object_id = ObjectID::from_binary("00000000000000000000"); + ObjectBuffer object_buffer; + ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer)); + + // Retrieve object data. + uint8_t* data = object_buffer.data; + int64_t data_size = object_buffer.data_size; + + // Check that the data agrees with what was written in the other process. + for (int64_t i = 0; i < data_size; i++) { + ARROW_CHECK(data[i] == static_cast(i % 4)); + } + + // Disconnect the client. + ARROW_CHECK_OK(client.Disconnect()); +} +``` + +If you compile it with + +``` +g++ get.cc `pkg-config --cflags --libs plasma` --std=c++11 -o get +``` + +and run it with `./get`, all the assertions will pass if you run the `create` +example from above on the same Plasma store. + + +Object Lifetime Management +-------------------------- + +The Plasma store internally does reference counting to make sure objects that +are mapped into the address space of one of the clients with `PlasmaClient::Get` +are accessible. To unmap objects from a client, call `PlasmaClient::Release`. +All objects that are mapped into a clients address space will automatically +be released when the client is disconnected from the store (this happens even +if the client process crashes or otherwise fails to call `Disconnect`). + +If a new object is created and there is not enough space in the Plasma store, +the store will evict the least recently used object (an object is in use if at +least one client has gotten it but not released it). + +Object notifications +-------------------- + +Additionally, you can arrange to have Plasma notify you when objects are +sealed in the object store. This may especially be handy when your +program is collaborating with other Plasma clients, and needs to know +when they make objects available. + +First, you can subscribe your current Plasma client to such notifications +by getting a file descriptor: + +```cpp +// Start receiving notifications into file_descriptor. +int fd; +ARROW_CHECK_OK(client.Subscribe(&fd)); +``` + +Once you have the file descriptor, you can have your current Plasma client +wait to receive the next object notification. Object notifications +include information such as Object ID, data size, and metadata size of +the next newly available object: + +```cpp +// Receive notification of the next newly available object. +// Notification information is stored in object_id, data_size, and metadata_size +ObjectID new_object_id; +int64_t data_size; +int64_t metadata_size; +ARROW_CHECK_OK(client.GetNotification(fd, &object_id, &data_size, &metadata_size)); + +// Get the newly available object. +ObjectBuffer object_buffer; +ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer)); +``` + +Here is a full program that shows this capability: + +```cpp +#include + +using namespace plasma; + +int main(int argc, char** argv) { + // Start up and connect a Plasma client. + PlasmaClient client; + ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY)); + + int fd; + ARROW_CHECK_OK(client.Subscribe(&fd)); + + ObjectID object_id; + int64_t data_size; + int64_t metadata_size; + while (true) { + ARROW_CHECK_OK(client.GetNotification(fd, &object_id, &data_size, &metadata_size)); + + std::cout << "Received object notification for object_id = " + << object_id.hex() << ", with data_size = " << data_size + << ", and metadata_size = " << metadata_size << std::endl; + } + + // Disconnect the client. + ARROW_CHECK_OK(client.Disconnect()); +} +``` + +If you compile it with + +``` +g++ subscribe.cc `pkg-config --cflags --libs plasma` --std=c++11 -o subscribe +``` + +and invoke `./create` and `./subscribe` while the Plasma store is running, +you can observe the new object arriving. diff --git a/python/doc/source/development.rst b/python/doc/source/development.rst index 55b3efdad17..4114ab0f3dc 100644 --- a/python/doc/source/development.rst +++ b/python/doc/source/development.rst @@ -159,12 +159,16 @@ Now build and install the Arrow C++ libraries: cmake -DCMAKE_BUILD_TYPE=$ARROW_BUILD_TYPE \ -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \ -DARROW_PYTHON=on \ + -DARROW_PLASMA=on \ -DARROW_BUILD_TESTS=OFF \ .. make -j4 make install popd +If you don't want to build and install the Plasma in-memory object store, +you can omit the ``-DARROW_PLASMA=on`` flag. + Now, optionally build and install the Apache Parquet libraries in your toolchain: @@ -190,9 +194,10 @@ Now, build pyarrow: cd arrow/python python setup.py build_ext --build-type=$ARROW_BUILD_TYPE \ - --with-parquet --inplace + --with-parquet --with-plasma --inplace -If you did not build parquet-cpp, you can omit ``--with-parquet``. +If you did not build parquet-cpp, you can omit ``--with-parquet`` and if +you did not build with plasma, you can omit ``--with-plasma``. You should be able to run the unit tests with: @@ -224,9 +229,10 @@ You can build a wheel by running: .. code-block:: shell python setup.py build_ext --build-type=$ARROW_BUILD_TYPE \ - --with-parquet --bundle-arrow-cpp bdist_wheel + --with-parquet --with-plasma --bundle-arrow-cpp bdist_wheel -Again, if you did not build parquet-cpp, you should omit ``--with-parquet``. +Again, if you did not build parquet-cpp, you should omit ``--with-parquet`` and +if you did not build with plasma, you should omit ``--with-plasma``. Developing on Windows ===================== diff --git a/python/doc/source/index.rst b/python/doc/source/index.rst index a12853c4482..c2ae769b23e 100644 --- a/python/doc/source/index.rst +++ b/python/doc/source/index.rst @@ -40,6 +40,7 @@ structures. data ipc filesystems + plasma pandas parquet api diff --git a/python/doc/source/plasma.rst b/python/doc/source/plasma.rst new file mode 100644 index 00000000000..98dd62f97e9 --- /dev/null +++ b/python/doc/source/plasma.rst @@ -0,0 +1,337 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at + +.. http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +.. software distributed under the License is distributed on an +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +.. KIND, either express or implied. See the License for the +.. specific language governing permissions and limitations +.. under the License. + +.. currentmodule:: pyarrow +.. _io: + +The Plasma In-Memory Object Store +================================= + +.. contents:: Contents + :depth: 3 + + +The Plasma API +-------------- + +Starting the Plasma store +^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can start the Plasma store by issuing a terminal command similar to the +following: + +.. code-block:: bash + + plasma_store -m 1000000000 -s /tmp/plasma + +The ``-m`` flag specifies the size of the store in bytes, and the ``-s`` flag +specifies the socket that the store will listen at. Thus, the above command +allows the Plasma store to use up to 1GB of memory, and sets the socket to +``/tmp/plasma``. + +Leaving the current terminal window open as long as Plasma store should keep +running. Messages, concerning such as disconnecting clients, may occasionally be +printed to the screen. To stop running the Plasma store, you can press +``Ctrl-C`` in the terminal. + +Creating a Plasma client +^^^^^^^^^^^^^^^^^^^^^^^^ + +To start a Plasma client from Python, call ``plasma.connect`` using the same +socket name: + +.. code-block:: python + + import pyarrow.plasma as plasma + client = plasma.connect("/tmp/plasma", "", 0) + +If the following error occurs from running the above Python code, that +means that either the socket given is incorrect, or the ``./plasma_store`` is +not currently running. Check to see if the Plasma store is still running. + +.. code-block:: shell + + >>> client = plasma.connect("/tmp/plasma", "", 0) + Connection to socket failed for pathname /tmp/plasma + Could not connect to socket /tmp/plasma + + +Object IDs +^^^^^^^^^^ + +Each object in the Plasma store should be associated with a unique ID. The +Object ID then serves as a key that any client can use to retrieve that object +from the Plasma store. You can form an ``ObjectID`` object from a byte string of +length 20. + +.. code-block:: shell + + # Create an ObjectID. + >>> id = plasma.ObjectID(20 * b"a") + + # The character "a" is encoded as 61 in hex. + >>> id + ObjectID(6161616161616161616161616161616161616161) + +The random generation of Object IDs is often good enough to ensure unique IDs. +You can easily create a helper function that randomly generates object IDs as +follows: + +.. code-block:: python + + import numpy as np + + def random_object_id(): + return plasma.ObjectID(np.random.bytes(20)) + + +Creating an Object +^^^^^^^^^^^^^^^^^^ + +Objects are created in Plasma in two stages. First, they are **created**, which +allocates a buffer for the object. At this point, the client can write to the +buffer and construct the object within the allocated buffer. + +To create an object for Plasma, you need to create an object ID, as well as +give the object's maximum size in bytes. + +.. code-block:: python + + # Create an object. + object_id = plasma.ObjectID(20 * b"a") + object_size = 1000 + buffer = memoryview(client.create(object_id, object_size)) + + # Write to the buffer. + for i in range(1000): + buffer[i] = i % 128 + +When the client is done, the client **seals** the buffer, making the object +immutable, and making it available to other Plasma clients. + +.. code-block:: python + + # Seal the object. This makes the object immutable and available to other clients. + client.seal(object_id) + + +Getting an Object +^^^^^^^^^^^^^^^^^ + +After an object has been sealed, any client who knows the object ID can get +the object. + +.. code-block:: python + + # Create a different client. Note that this second client could be + # created in the same or in a separate, concurrent Python session. + client2 = plasma.connect("/tmp/plasma", "", 0) + + # Get the object in the second client. This blocks until the object has been sealed. + object_id2 = plasma.ObjectID(20 * b"a") + [buffer2] = client2.get([object_id]) + +If the object has not been sealed yet, then the call to client.get will block +until the object has been sealed by the client constructing the object. Using +the ``timeout_ms`` argument to get, you can specify a timeout for this (in +milliseconds). After the timeout, the interpreter will yield control back. + +.. code-block:: shell + + >>> buffer + + >>> buffer[1] + 1 + >>> buffer2 + + >>> view2 = memoryview(buffer2) + >>> view2[1] + 1 + >>> view2[129] + 1 + >>> bytes(buffer[1:4]) + b'\x01\x02\x03' + >>> bytes(view2[1:4]) + b'\x01\x02\x03' + + +Using Arrow and Pandas with Plasma +---------------------------------- + +Storing Arrow Objects in Plasma +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To store an Arrow object in Plasma, we must first **create** the object and then +**seal** it. However, Arrow objects such as ``Tensors`` may be more complicated +to write than simple binary data. + +To create the object in Plasma, you still need an ``ObjectID`` and a size to +pass in. To find out the size of your Arrow object, you can use pyarrow +API such as ``pyarrow.get_tensor_size``. + +.. code-block:: python + + import numpy as np + import pyarrow as pa + + # Create a pyarrow.Tensor object from a numpy random 2-dimensional array + data = np.random.randn(10, 4) + tensor = pa.Tensor.from_numpy(data) + + # Create the object in Plasma + object_id = plasma.ObjectID(np.random.bytes(20)) + data_size = pa.get_tensor_size(tensor) + buf = client.create(object_id, data_size) + +To write the Arrow ``Tensor`` object into the buffer, you can use Plasma to +convert the ``memoryview`` buffer into a ``pyarrow.FixedSizeBufferOutputStream`` +object. A ``pyarrow.FixedSizeBufferOutputStream`` is a format suitable for Arrow's +``pyarrow.write_tensor``: + +.. code-block:: python + + # Write the tensor into the Plasma-allocated buffer + stream = pa.FixedSizeBufferOutputStream(buf) + pa.write_tensor(tensor, stream) # Writes tensor's 552 bytes to Plasma stream + +To finish storing the Arrow object in Plasma, call ``seal``: + +.. code-block:: python + + # Seal the Plasma object + client.seal(object_id) + +Getting Arrow Objects from Plasma +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To read the object, first retrieve it as a ``PlasmaBuffer`` using its object ID. + +.. code-block:: python + + # Get the arrow object by ObjectID. + [buf2] = client.get([object_id]) + +To convert the ``PlasmaBuffer`` back into an Arrow ``Tensor``, first create a +pyarrow ``BufferReader`` object from it. You can then pass the ``BufferReader`` +into ``pyarrow.read_tensor`` to reconstruct the Arrow ``Tensor`` object: + +.. code-block:: python + + # Reconstruct the Arrow tensor object. + reader = pa.BufferReader(buf2) + tensor2 = pa.read_tensor(reader) + +Finally, you can use ``pyarrow.read_tensor`` to convert the Arrow object +back into numpy data: + +.. code-block:: python + + # Convert back to numpy + array = tensor2.to_numpy() + +Storing Pandas DataFrames in Plasma +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Storing a Pandas ``DataFrame`` still follows the **create** then **seal** +process of storing an object in the Plasma store, however one cannot directly +write the ``DataFrame`` to Plasma with Pandas alone. Plasma also needs to know +the size of the ``DataFrame`` to allocate a buffer for. + +See :ref:`pandas` for more information on using Arrow with Pandas. + +You can create the pyarrow equivalent of a Pandas ``DataFrame`` by using +``pyarrow.from_pandas`` to convert it to a ``RecordBatch``. + +.. code-block:: python + + import pyarrow as pa + import pandas as pd + + # Create a Pandas DataFrame + d = {'one' : pd.Series([1., 2., 3.], index=['a', 'b', 'c']), + 'two' : pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} + df = pd.DataFrame(d) + + # Convert the Pandas DataFrame into a PyArrow RecordBatch + record_batch = pa.RecordBatch.from_pandas(df) + +Creating the Plasma object requires an ``ObjectID`` and the size of the +data. Now that we have converted the Pandas ``DataFrame`` into a PyArrow +``RecordBatch``, use the ``MockOutputStream`` to determine the +size of the Plasma object. + +.. code-block:: python + + # Create the Plasma object from the PyArrow RecordBatch. Most of the work here + # is done to determine the size of buffer to request from the object store. + object_id = plasma.ObjectID(np.random.bytes(20)) + mock_sink = pa.MockOutputStream() + stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) + stream_writer.write_batch(record_batch) + stream_writer.close() + data_size = mock_sink.size() + buf = client.create(object_id, data_size) + +The DataFrame can now be written to the buffer as follows. + +.. code-block:: python + + # Write the PyArrow RecordBatch to Plasma + stream = pa.FixedSizeBufferOutputStream(buf) + stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) + stream_writer.write_batch(record_batch) + stream_writer.close() + +Finally, seal the finished object for use by all clients: + +.. code-block:: python + + # Seal the Plasma object + client.seal(object_id) + +Getting Pandas DataFrames from Plasma +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Since we store the Pandas DataFrame as a PyArrow ``RecordBatch`` object, +to get the object back from the Plasma store, we follow similar steps +to those specified in `Getting Arrow Objects from Plasma`_. + +We first have to convert the ``PlasmaBuffer`` returned from ``client.get`` +into an Arrow ``BufferReader`` object. + +.. code-block:: python + + # Fetch the Plasma object + [data] = client.get([object_id]) # Get PlasmaBuffer from ObjectID + buffer = pa.BufferReader(data) + +From the ``BufferReader``, we can create a specific ``RecordBatchStreamReader`` +in Arrow to reconstruct the stored PyArrow ``RecordBatch`` object. + +.. code-block:: python + + # Convert object back into an Arrow RecordBatch + reader = pa.RecordBatchStreamReader(buffer) + record_batch = reader.read_next_batch() + +The last step is to convert the PyArrow ``RecordBatch`` object back into +the original Pandas ``DataFrame`` structure. + +.. code-block:: python + + # Convert back into Pandas + result = record_batch.to_pandas()