High-performance Go library to manipulate parquet files, initially developed at Twilio Segment.
Parquet has been established as a powerful solution to represent columnar data on persistent storage mediums, achieving levels of compression and query performance that enable managing data sets at scales that reach the petabytes. In addition, having intensive data applications sharing a common format creates opportunities for interoperation in our tool kits, providing greater leverage and value to engineers maintaining and operating those systems.
The creation and evolution of large scale data management systems, combined with realtime expectations come with challenging maintenance and performance requirements, that existing solutions to use parquet with Go were not addressing.
The parquet-go/parquet-go
package was designed and developed to respond to those
challenges, offering high level APIs to read and write parquet files, while
keeping a low compute and memory footprint in order to be used in environments
where data volumes and cost constraints require software to achieve high levels
of efficiency.
Columnar storage allows Parquet to store data more efficiently than, say, using JSON or Protobuf. For more information, refer to the Parquet Format Specification.
The package is distributed as a standard Go module that programs can take a dependency on and install with the following command:
go get github.com/parquet-go/parquet-go
Go 1.18 or later is required to use the package.
The package is currently released as a pre-v1 version, which gives maintainers the freedom to break backward compatibility to help improve the APIs as we learn which initial design decisions would need to be revisited to better support the use cases that the library solves for. These occurrences are expected to be rare in frequency and documentation will be produce to guide users on how to adapt their programs to breaking changes.
The following sections describe how to use APIs exposed by the library, highlighting the use cases with code examples to demonstrate how they are used in practice.
Writing Parquet Files: parquet.GenericWriter[T]
A parquet file is a collection of rows sharing the same schema, arranged in columns to support faster scan operations on subsets of the data set.
For simple use cases, the parquet.WriteFile[T]
function allows the creation
of parquet files on the file system from a slice of Go values representing the
rows to write to the file.
type RowType struct { FirstName, LastName string }
if err := parquet.WriteFile("file.parquet", []RowType{
{FirstName: "Bob"},
{FirstName: "Alice"},
}); err != nil {
...
}
The parquet.GenericWriter[T]
type denormalizes rows into columns, then encodes
the columns into a parquet file, generating row groups, column chunks, and pages
based on configurable heuristics.
type RowType struct { FirstName, LastName string }
writer := parquet.NewGenericWriter[RowType](output)
_, err := writer.Write([]RowType{
...
})
if err != nil {
...
}
// Closing the writer is necessary to flush buffers and write the file footer.
if err := writer.Close(); err != nil {
...
}
Explicit declaration of the parquet schema on a writer is useful when the
application needs to ensure that data written to a file adheres to a predefined
schema, which may differ from the schema derived from the writer's type
parameter. The parquet.Schema
type is a in-memory representation of the schema
of parquet rows, translated from the type of Go values, and can be used for this
purpose.
schema := parquet.SchemaOf(new(RowType))
writer := parquet.NewGenericWriter[any](output, schema)
...
Reading Parquet Files: parquet.GenericReader[T]
For simple use cases where the data set fits in memory and the program will
read most rows of the file, the parquet.ReadFile[T]
function returns a slice
of Go values representing the rows read from the file.
type RowType struct { FirstName, LastName string }
rows, err := parquet.ReadFile[RowType]("file.parquet")
if err != nil {
...
}
for _, c := range rows {
fmt.Printf("%+v\n", c)
}
The expected schema of rows can be explicitly declared when the reader is constructed, which is useful to ensure that the program receives rows matching an specific format; for example, when dealing with files from remote storage sources that applications cannot trust to have used an expected schema.
Configuring the schema of a reader is done by passing a parquet.Schema
instance as argument when constructing a reader. When the schema is declared,
conversion rules implemented by the package are applied to ensure that rows
read by the application match the desired format (see Evolving Parquet Schemas).
schema := parquet.SchemaOf(new(RowType))
reader := parquet.NewReader(file, schema)
...
Inspecting Parquet Files: parquet.File
Sometimes, lower-level APIs can be useful to leverage the columnar layout of
parquet files. The parquet.File
type is intended to provide such features to
Go applications, by exposing APIs to iterate over the various parts of a
parquet file.
f, err := parquet.OpenFile(file, size)
if err != nil {
...
}
for _, rowGroup := range f.RowGroups() {
for _, columnChunk := range rowGroup.ColumnChunks() {
...
}
}
Evolving Parquet Schemas: parquet.Convert
Parquet files embed all the metadata necessary to interpret their content, including a description of the schema of the tables represented by the rows and columns they contain.
Parquet files are also immutable; once written, there is not mechanism for updating a file. If their contents need to be changed, rows must be read, modified, and written to a new file.
Because applications evolve, the schema written to parquet files also tend to evolve over time. Those requirements creating challenges when applications need to operate on parquet files with heterogenous schemas: algorithms that expect new columns to exist may have issues dealing with rows that come from files with mismatching schema versions.
To help build applications that can handle evolving schemas, parquet-go/parquet-go
implements conversion rules that create views of row groups to translate between
schema versions.
The parquet.Convert
function is the low-level routine constructing conversion
rules from a source to a target schema. The function is used to build converted
views of parquet.RowReader
or parquet.RowGroup
, for example:
type RowTypeV1 struct { ID int64; FirstName string }
type RowTypeV2 struct { ID int64; FirstName, LastName string }
source := parquet.SchemaOf(RowTypeV1{})
target := parquet.SchemaOf(RowTypeV2{})
conversion, err := parquet.Convert(target, source)
if err != nil {
...
}
targetRowGroup := parquet.ConvertRowGroup(sourceRowGroup, conversion)
...
Conversion rules are automatically applied by the parquet.CopyRows
function
when the reader and writers passed to the function also implement the
parquet.RowReaderWithSchema
and parquet.RowWriterWithSchema
interfaces.
The copy determines whether the reader and writer schemas can be converted from
one to the other, and automatically applies the conversion rules to facilitate
the translation between schemas.
At this time, conversion rules only supports adding or removing columns from the schemas, there are no type conversions performed, nor ways to rename columns, etc... More advanced conversion rules may be added in the future.
Sorting Row Groups: parquet.GenericBuffer[T]
The parquet.GenericWriter[T]
type is optimized for minimal memory usage,
keeping the order of rows unchanged and flushing pages as soon as they are filled.
Parquet supports expressing columns by which rows are sorted through the declaration of sorting columns on row groups. Sorting row groups requires buffering all rows before ordering and writing them to a parquet file.
To help with those use cases, the parquet-go/parquet-go
package exposes the
parquet.GenericBuffer[T]
type which acts as a buffer of rows and implements
sort.Interface
to allow applications to sort rows prior to writing them
to a file.
The columns that rows are ordered by are configured when creating
parquet.GenericBuffer[T]
instances using the parquet.SortingColumns
function
to construct row group options configuring the buffer. The type of parquet
columns defines how values are compared, see Parquet Logical Types
for details.
When written to a file, the buffer is materialized into a single row group with
the declared sorting columns. After being written, buffers can be reused by
calling their Reset
method.
The following example shows how to use a parquet.GenericBuffer[T]
to order rows
written to a parquet file:
type RowType struct { FirstName, LastName string }
buffer := parquet.NewGenericBuffer[RowType](
parquet.SortingRowGroupConfig(
parquet.SortingColumns(
parquet.Ascending("LastName"),
parquet.Ascending("FistName"),
),
),
)
buffer.Write([]RowType{
{FirstName: "Luke", LastName: "Skywalker"},
{FirstName: "Han", LastName: "Solo"},
{FirstName: "Anakin", LastName: "Skywalker"},
})
sort.Sort(buffer)
writer := parquet.NewGenericWriter[RowType](output)
_, err := parquet.CopyRows(writer, buffer.Rows())
if err != nil {
...
}
if err := writer.Close(); err != nil {
...
}
Merging Row Groups: parquet.MergeRowGroups
Parquet files are often used as part of the underlying engine for data processing or storage layers, in which cases merging multiple row groups into one that contains more rows can be a useful operation to improve query performance; for example, bloom filters in parquet files are stored for each row group, the larger the row group, the fewer filters need to be stored and the more effective they become.
The parquet-go/parquet-go
package supports creating merged views of row groups,
where the view contains all the rows of the merged groups, maintaining the order
defined by the sorting columns of the groups.
There are a few constraints when merging row groups:
-
The sorting columns of all the row groups must be the same, or the merge operation must be explicitly configured a set of sorting columns which are a prefix of the sorting columns of all merged row groups.
-
The schemas of row groups must all be equal, or the merge operation must be explicitly configured with a schema that all row groups can be converted to, in which case the limitations of schema conversions apply.
Once a merged view is created, it may be written to a new parquet file or buffer in order to create a larger row group:
merge, err := parquet.MergeRowGroups(rowGroups)
if err != nil {
...
}
writer := parquet.NewGenericWriter[RowType](output)
_, err := parquet.CopyRows(writer, merge)
if err != nil {
...
}
if err := writer.Close(); err != nil {
...
}
Using Bloom Filters: parquet.BloomFilter
Parquet files can embed bloom filters to help improve the performance of point lookups in the files. The format of parquet bloom filters is documented in the parquet specification: Parquet Bloom Filter
By default, no bloom filters are created in parquet files, but applications can
configure the list of columns to create filters for using the parquet.BloomFilters
option when instantiating writers; for example:
type RowType struct {
FirstName string `parquet:"first_name"`
LastName string `parquet:"last_name"`
}
const filterBitsPerValue = 10
writer := parquet.NewGenericWriter[RowType](output,
parquet.BloomFilters(
// Configures the write to generate split-block bloom filters for the
// "first_name" and "last_name" columns of the parquet schema of rows
// witten by the application.
parquet.SplitBlockFilter(filterBitsPerValue, "first_name"),
parquet.SplitBlockFilter(filterBitsPerValue, "last_name"),
),
)
...
Generating bloom filters requires to know how many values exist in a column
chunk in order to properly size the filter, which requires buffering all the
values written to the column in memory. Because of it, the memory footprint
of parquet.GenericWriter[T]
increases linearly with the number of columns
that the writer needs to generate filters for. This extra cost is optimized
away when rows are copied from a parquet.GenericBuffer[T]
to a writer, since
in this case the number of values per column in known since the buffer already
holds all the values in memory.
When reading parquet files, column chunks expose the generated bloom filters
with the parquet.ColumnChunk.BloomFilter
method, returning a
parquet.BloomFilter
instance if a filter was available, or nil
when there
were no filters.
Using bloom filters in parquet files is useful when performing point-lookups in parquet files; searching for column rows matching a given value. Programs can quickly eliminate column chunks that they know does not contain the value they search for by checking the filter first, which is often multiple orders of magnitude faster than scanning the column.
The following code snippet hilights how filters are typically used:
var candidateChunks []parquet.ColumnChunk
for _, rowGroup := range file.RowGroups() {
columnChunk := rowGroup.ColumnChunks()[columnIndex]
bloomFilter := columnChunk.BloomFilter()
if bloomFilter != nil {
if ok, err := bloomFilter.Check(value); err != nil {
...
} else if !ok {
// Bloom filters may return false positives, but never return false
// negatives, we know this column chunk does not contain the value.
continue
}
}
candidateChunks = append(candidateChunks, columnChunk)
}
The following sections describe common optimization techniques supported by the library.
Lower level APIs used to read parquet files offer more efficient ways to access
column values. Consecutive sequences of values are grouped into pages which are
represented by the parquet.Page
interface.
A column chunk may contain multiple pages, each holding a section of the column
values. Applications can retrieve the column values either by reading them into
buffers of parquet.Value
, or type asserting the pages to read arrays of
primitive Go values. The following example demonstrates how to use both
mechanisms to read column values:
pages := column.Pages()
defer func() {
checkErr(pages.Close())
}()
for {
p, err := pages.ReadPage()
if err != nil {
... // io.EOF when there are no more pages
}
switch page := p.Values().(type) {
case parquet.Int32Reader:
values := make([]int32, page.NumValues())
_, err := page.ReadInt32s(values)
...
case parquet.Int64Reader:
values := make([]int64, page.NumValues())
_, err := page.ReadInt64s(values)
...
default:
values := make([]parquet.Value, page.NumValues())
_, err := page.ReadValues(values)
...
}
}
Reading arrays of typed values is often preferable when performing aggregations on the values as this model offers a more compact representation of the values in memory, and pairs well with the use of optimizations like SIMD vectorization.
Applications that deal with columnar storage are sometimes designed to work with columnar data throughout the abstraction layers; it then becomes possible to write columns of values directly instead of reconstructing rows from the column values. The package offers two main mechanisms to satisfy those use cases:
The first solution assumes that the program works with in-memory arrays of typed
values, for example slices of primitive Go types like []float32
; this would be
the case if the application is built on top of a framework like
Apache Arrow.
parquet.GenericBuffer[T]
is an implementation of the parquet.RowGroup
interface which maintains in-memory buffers of column values. Rows can be
written by either boxing primitive values into arrays of parquet.Value
,
or type asserting the columns to a access specialized versions of the write
methods accepting arrays of Go primitive types.
When using either of these models, the application is responsible for ensuring that the same number of rows are written to each column or the resulting parquet file will be malformed.
The following examples demonstrate how to use these two models to write columns of Go values:
type RowType struct { FirstName, LastName string }
func writeColumns(buffer *parquet.GenericBuffer[RowType], firstNames []string) error {
values := make([]parquet.Value, len(firstNames))
for i := range firstNames {
values[i] = parquet.ValueOf(firstNames[i])
}
_, err := buffer.ColumnBuffers()[0].WriteValues(values)
return err
}
type RowType struct { ID int64; Value float32 }
func writeColumns(buffer *parquet.GenericBuffer[RowType], ids []int64, values []float32) error {
if len(ids) != len(values) {
return fmt.Errorf("number of ids and values mismatch: ids=%d values=%d", len(ids), len(values))
}
columns := buffer.ColumnBuffers()
if err := columns[0].(parquet.Int64Writer).WriteInt64s(ids); err != nil {
return err
}
if err := columns[1].(parquet.FloatWriter).WriteFloats(values); err != nil {
return err
}
return nil
}
The latter is more efficient as it does not require boxing the input into an
intermediary array of parquet.Value
. However, it may not always be the right
model depending on the situation, sometimes the generic abstraction can be a
more expressive model.
Programs that need full control over the construction of row groups can choose
to provide their own implementation of the parquet.RowGroup
interface, which
includes defining implementations of parquet.ColumnChunk
and parquet.Page
to expose column values of the row group.
This model can be preferable when the underlying storage or in-memory
representation of the data needs to be optimized further than what can be
achieved by using an intermediary buffering layer with parquet.GenericBuffer[T]
.
See parquet.RowGroup for the full interface documentation.
When generating parquet files, the writer needs to buffer all pages before it can create the row group. This may require significant amounts of memory as the entire file content must be buffered prior to generating it. In some cases, the files might even be larger than the amount of memory available to the program.
The parquet.GenericWriter[T]
can be configured to use disk storage instead as
a scratch buffer when generating files, by configuring a different page buffer
pool using the parquet.ColumnPageBuffers
option and parquet.PageBufferPool
interface.
The parquet-go/parquet-go
package provides an implementation of the interface
which uses temporary files to store pages while a file is generated, allowing
programs to use local storage as swap space to hold pages and keep memory
utilization to a minimum. The following example demonstrates how to configure
a parquet writer to use on-disk page buffers:
type RowType struct { ... }
writer := parquet.NewGenericWriter[RowType](output,
parquet.ColumnPageBuffers(
parquet.NewFileBufferPool("", "buffers.*"),
),
)
When a row group is complete, pages buffered to disk need to be copied back to
the output file. This results in doubling I/O operations and storage space
requirements (the system needs to have enough free disk space to hold two copies
of the file). The resulting write amplification can often be optimized away by
the kernel if the file system supports copy-on-write of disk pages since copies
between os.File
instances are optimized using copy_file_range(2)
(on linux).
See parquet.PageBufferPool for the full interface documentation.
While initial design and development occurred at Twilio Segment, the project is now maintained by the open source community. We welcome external contributors. to participate in the form of discussions or code changes. Please review to the Contribution guidelines as well as the Code of Condution before submitting contributions.
The project uses Github Actions for CI.
The package has debugging capabilities built in which can be turned on using the
PARQUETGODEBUG
environment variable. The value follows a model similar to
GODEBUG
, it must be formatted as a comma-separated list of key=value
pairs.
The following debug flag are currently supported:
tracebuf=1
turns on tracing of internal buffers, which validates that reference counters are set to zero when buffers are reclaimed by the garbage collector. When the package detects that a buffer was leaked, it logs an error message along with the stack trace captured when the buffer was last used.