Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[EPIC][WIP] Native IO Layer in Rust Tracking Issue #66

Closed
9 tasks done
dmetasoul01 opened this issue Jul 24, 2022 · 0 comments
Closed
9 tasks done

[EPIC][WIP] Native IO Layer in Rust Tracking Issue #66

dmetasoul01 opened this issue Jul 24, 2022 · 0 comments
Assignees
Labels
enhancement New feature or request epic

Comments

@dmetasoul01
Copy link
Contributor

dmetasoul01 commented Jul 24, 2022

Motivations

The IO layer acts as a critical part for a table storage framework. However, current IO implementation suffers some drawbacks:

  1. The IO stack tightly coupled with Spark, making it difficult to adapt to other computing frameworks including both SQL engines and AI engines (Flink, Presto, Pandas, Dask, PyTorch, etc.).
  2. The file IO still relies on Hadoop FileSystem, which is inefficient on high-latency storages such as S3.
  3. The lack of expression evaluation capability makes it difficult to implement a compute engine independent MergeInto SQL with merge on read.

Goals

  1. Compute engine neutral. Native IO layer implements self-contained IO logics such as merge on read and provides interfaces for compute engines to invoke.
  2. Native. The compute engines are not just in Java world. We would also like to support popular Python data processing frameworks like Pandas/Dask, and AI frameworks such as PyTorch with C++ at its core.
  3. Fast. IO on object stores usually have high latency and lack of file semantics and is a drag on overall execution pipeline. Things got worse in cases when there are multiple files to merge. We would like the IO layer to enable concurrency and asynchronism on read and write paths.
  4. Feature-rich. Native IO layer should support commonly required data source features such as predicate push down, index filtering, atomic write, etc. We also would like the reader to support MergeInto SQL within the IO layer so that the merge logics are transparent for the compute engines.
  5. Easy to use(embed). Native IO layer itself is a library and expose its functionality via C interfaces with Java/Python wrappers. It should be easy to be embedded into compute engines.

NonGoals

  1. Native IO layer is NOT a distributed execution framework. The native IO layer is a library inside a single execution unit. It itself is not aware of any distributed execution context. It is up to the higher level engines on whether to and how to read/write data in parallel (e.g. partitions in Spark, splits in Presto).
  2. Native IO layer is NOT a SQL engine. Native IO layer is not designed to be a SQL execution engine. Though the IO layer would have some expression evaluation capability, it primarily aims to provide table data read and write on data lakes. It acts as a data source to compute engines and should be used together with LakeSoul's meta layer.

Design

We use Arrow (arrow-rs) + DataFusion to implement the new IO layer with the following reasons:

  • Asynchronous IO with Parquet read and write, and the pipeline is executed asynchronously, which is in line with our design goals;
  • DataFusion brings a relatively complete implementation of physical plans and expressions, and can easily support MergeInto SQL with merge on read;
  • It is efficient and memory secure in rust with compiled native vectorized execution and is easy to provide bindings in other languages.

According to this design idea, the overall modules and execution logic are divided as follows:
image
The above diagram shows the logical hierarchy of the native IO layer. It has (roughly) the following parts from bottom up:

  • IO with datafusion's object store with async reader/writer traits.

  • File format based on object store with async.

  • Merge on read execution plan. The execution plan is combined with datafusion's builtin hash join or sort merge join and customized projection/filter plans to support MergeInto SQL. A typical plan with multiple files to merge would be in the following form:

    image

  • Reader interface in Rust and C. Provide a simple interface to iterate the merged arrow record batches asynchronously. The rust interface could be the following:

    pub struct LakeSoulReaderConfig {
      files: Vec<String>, 
      primary_keys: Vec<String>, // primary keys
      columns: Vec<String>, // column filters pushdown
      filters: Vec<Expr>, // predicate filters pushdown
      object_store_options: HashMap<String, String>, // object store options
    }
    
    pub struct LakeSoulReader {
      config: LakeSoulReaderConfig
    }
    
    impl LakeSoulReader {
      pub fn new(config: LakeSoulReaderConfig) -> Self {
      }
       
      pub fn read(&self) -> SendableRecordBatchStream {
      }
    }

    And we could also expose an extern "C" interface with callback to support async IO.

  • JNI/Python wrapper. In JNI wrapper, we could provide a native method to accept arrow schema and array pointers together with a callback object, like the following:

    public native void nextBatch(Consumer<boolean> callback, long schemaAddr, long arrayAddr);

    In which the native implementation (asynchronously) iterate the native stream and get the next available arrow record batch, populate the arrow c data structs by their pointers and call the callback with a boolean arg to indicate the end of stream. The expected usage pattern would like (in Scala):

    val p = Promise[Option[VectorSchemaRoot]]()
    
    tryWithResource(ArrowSchema.allocateNew(allocator)) { consumerSchema =>
      tryWithResource(ArrowArray.allocateNew(allocator)) { consumerArray =>
        val schemaPtr: Long = consumerSchema.memoryAddress
        val arrayPtr: Long = consumerArray.memoryAddress
        
        reader.nextBatch((hasNext) => {
          if (hasNext) {
            val root: VectorSchemaRoot = Data.importVectorSchemaRoot(allocator, consumerArray, consumerSchema, provider)
            p.success(Some(root))
          } else {
            p.success(None)
          }
        })
      }
    }
    
    val fut: Future[Option[VectorSchemaRoot]] = p.future
    // get recordbatch from future either sync or async
  • Compute engine adapters. For example in Spark, we could implement a vectorized reader based on the above interface, and implement the datasource v2 interfaces.

Plan

We plan to first implement a default overwrite merge logic reader tracked under this issue. Further support of MergeInto SQL would be in a separated tracking issue.

Development Branch

develop/native_io

Tasks

@dmetasoul01 dmetasoul01 added enhancement New feature or request epic labels Jul 24, 2022
@dmetasoul01 dmetasoul01 self-assigned this Jul 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request epic
Projects
None yet
Development

No branches or pull requests

1 participant