-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Overhauled to Arrow Back-End and Better Memory Safety #78
Changes from 34 commits
2ec0117
d437526
c33a8f0
f6f2f70
7d616f6
37dde32
a0cf08e
7ee89b0
24b7651
dd3c7a6
c81675a
5a632ac
9470501
3603fbd
5988002
c38aa23
82abe14
de6dc61
1caf39f
cf8094f
23d8ce4
486caf2
cbb2b89
bb582f1
7f75e47
d859ca1
f1cabdc
1ab7749
d92bd16
9d78a51
ddda3f6
ad838e2
472522f
748f286
b5bc7c5
5f317cd
1fb65b7
2b08439
030de0e
666cb8d
8c5dcc0
50c1426
fdc4ef2
eae7b7e
cbccde1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
julia 0.6 | ||
Arrow | ||
FlatBuffers 0.3.0 | ||
CategoricalArrays 0.3.0 | ||
DataFrames 0.11.0 | ||
DataStreams 0.3.0 | ||
WeakRefStrings 0.4.0 |
This file was deleted.
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,13 +2,14 @@ module Metadata | |
|
||
if Base.VERSION < v"0.7.0-DEV.2575" | ||
const Dates = Base.Dates | ||
using Compat | ||
else | ||
import Dates | ||
end | ||
|
||
using FlatBuffers | ||
|
||
@enum(Type_, BOOL = 0, INT8 = 1, INT16 = 2, INT32 = 3, INT64 = 4, | ||
@enum(DType, BOOL = 0, INT8 = 1, INT16 = 2, INT32 = 3, INT64 = 4, | ||
UINT8 = 5, UINT16 = 6, UINT32 = 7, UINT64 = 8, | ||
FLOAT = 9, DOUBLE = 10, UTF8 = 11, BINARY = 12, | ||
CATEGORY = 13, TIMESTAMP = 14, DATE = 15, TIME = 16) | ||
|
@@ -20,14 +21,15 @@ using FlatBuffers | |
# FlatBuffers.enumsizeof(::Type{TimeUnit}) = UInt8 | ||
|
||
mutable struct PrimitiveArray | ||
type_::Type_ | ||
dtype::DType | ||
encoding::Encoding | ||
offset::Int64 | ||
length::Int64 | ||
null_count::Int64 | ||
total_bytes::Int64 | ||
end | ||
|
||
# TODO why are these done this way rather with an abstract type??? | ||
mutable struct CategoryMetadata | ||
levels::PrimitiveArray | ||
ordered::Bool | ||
|
@@ -47,7 +49,7 @@ mutable struct TimeMetadata | |
unit::TimeUnit | ||
end | ||
|
||
@UNION TypeMetadata (Void,CategoryMetadata,TimestampMetadata,DateMetadata,TimeMetadata) | ||
@UNION TypeMetadata (Nothing,CategoryMetadata,TimestampMetadata,DateMetadata,TimeMetadata) | ||
|
||
mutable struct Column | ||
name::String | ||
|
@@ -57,8 +59,10 @@ mutable struct Column | |
user_metadata::String | ||
end | ||
|
||
function Column(name::String, values::PrimitiveArray, metadata::TypeMetadata=nothing, user_metadata::String="") | ||
return Column(name, values, FlatBuffers.typeorder(TypeMetadata, typeof(metadata)), metadata, user_metadata) | ||
function Column(name::String, values::PrimitiveArray, metadata::TypeMetadata=nothing, | ||
user_metadata::String="") | ||
Column(name, values, FlatBuffers.typeorder(TypeMetadata, typeof(metadata)), | ||
metadata, user_metadata) | ||
end | ||
|
||
mutable struct CTable | ||
|
@@ -73,8 +77,8 @@ end # module | |
|
||
# wesm/feather/cpp/src/metadata_generated.h | ||
# wesm/feather/cpp/src/types.h | ||
const Type_2julia = Dict{Metadata.Type_,DataType}( | ||
Metadata.BOOL => Arrow.Bool, | ||
const JULIA_TYPE_DICT = Dict{Metadata.DType,DataType}( | ||
Metadata.BOOL => Bool, | ||
Metadata.INT8 => Int8, | ||
Metadata.INT16 => Int16, | ||
Metadata.INT32 => Int32, | ||
|
@@ -85,15 +89,15 @@ const Type_2julia = Dict{Metadata.Type_,DataType}( | |
Metadata.UINT64 => UInt64, | ||
Metadata.FLOAT => Float32, | ||
Metadata.DOUBLE => Float64, | ||
Metadata.UTF8 => WeakRefString{UInt8}, | ||
Metadata.UTF8 => String, # can also be WeakRefString{UInt8} | ||
Metadata.BINARY => Vector{UInt8}, | ||
Metadata.CATEGORY => Int64, | ||
Metadata.TIMESTAMP => Int64, | ||
Metadata.DATE => Int64, | ||
Metadata.TIME => Int64 | ||
) | ||
|
||
const julia2Type_ = Dict{DataType,Metadata.Type_}( | ||
const MDATA_TYPE_DICT = Dict{DataType,Metadata.DType}( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should just spell out There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will change |
||
Bool => Metadata.BOOL, | ||
Int8 => Metadata.INT8, | ||
Int16 => Metadata.INT16, | ||
|
@@ -106,18 +110,63 @@ const julia2Type_ = Dict{DataType,Metadata.Type_}( | |
Float32 => Metadata.FLOAT, | ||
Float64 => Metadata.DOUBLE, | ||
String => Metadata.UTF8, | ||
Vector{UInt8} => Metadata.BINARY, | ||
Dates.DateTime => Metadata.INT64, | ||
Dates.Date => Metadata.INT32, | ||
WeakRefString{UInt8} => Metadata.UTF8 | ||
Vector{UInt8} => Metadata.BINARY, | ||
Dates.Time => Metadata.INT64, | ||
Dates.DateTime => Metadata.INT64, | ||
Dates.Date => Metadata.INT32, | ||
# WeakRefString{UInt8} => Metadata.UTF8 # not currently being used | ||
) | ||
|
||
const NON_PRIMITIVE_TYPES = Set([Metadata.UTF8, Metadata.BINARY]) | ||
|
||
const TimeUnit2julia = Dict{Metadata.TimeUnit,DataType}( | ||
Metadata.SECOND => Arrow.Second, | ||
Metadata.MILLISECOND => Arrow.Millisecond, | ||
Metadata.MICROSECOND => Arrow.Microsecond, | ||
Metadata.NANOSECOND => Arrow.Nanosecond | ||
const JULIA_TIME_DICT = Dict{Metadata.TimeUnit,DataType}( | ||
Metadata.SECOND => Dates.Second, | ||
Metadata.MILLISECOND => Dates.Millisecond, | ||
Metadata.MICROSECOND => Dates.Microsecond, | ||
Metadata.NANOSECOND => Dates.Nanosecond | ||
) | ||
const julia2TimeUnit = Dict{DataType,Metadata.TimeUnit}([(v, k) for (k,v) in TimeUnit2julia]) | ||
const MDATA_TIME_DICT = Dict{DataType,Metadata.TimeUnit}(v=>k for (k,v) in JULIA_TIME_DICT) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spell out in full here too |
||
|
||
|
||
isprimitivetype(t::Metadata.DType) = t ∉ NON_PRIMITIVE_TYPES | ||
|
||
|
||
juliatype(meta::Nothing, values_type::Metadata.DType) = JULIA_TYPE_DICT[values_type] | ||
juliatype(values_type::Metadata.DType) = juliatype(nothing, values_type) | ||
function juliatype(meta::Metadata.CategoryMetadata, values_type::Metadata.DType) | ||
JULIA_TYPE_DICT[meta.levels.dtype] | ||
end | ||
function juliatype(meta::Metadata.TimestampMetadata, values_type::Metadata.DType) | ||
Timestamp{JULIA_TIME_DICT[meta.unit]} | ||
end | ||
function juliatype(meta::Metadata.TimeMetadata, values_type::Metadata.DType) | ||
TimeOfDay{JULIA_TIME_DICT[meta.unit],JULIA_TYPE_DICT[values_type]} | ||
end | ||
juliatype(meta::Metadata.DateMetadata, values_type::Metadata.DType) = Datestamp | ||
|
||
function juliatype(col::Metadata.Column) | ||
T = juliatype(col.metadata, col.values.dtype) | ||
col.values.null_count == 0 ? T : Union{T,Missing} | ||
end | ||
|
||
feathertype(::Type{T}) where T = MDATA_TYPE_DICT[T] | ||
feathertype(::Type{Union{T,Missing}}) where T = feathertype(T) | ||
feathertype(::Type{<:Arrow.Datestamp}) = Metadata.INT32 | ||
feathertype(::Type{<:Arrow.Timestamp}) = Metadata.INT64 | ||
feathertype(::Type{<:Arrow.TimeOfDay{P,Int32}}) where P = Metadata.INT32 | ||
feathertype(::Type{<:Arrow.TimeOfDay{P,Int64}}) where P = Metadata.INT64 | ||
|
||
getmetadata(io::IO, ::Type{T}, A::ArrowVector) where T = nothing | ||
getmetadata(io::IO, ::Type{Union{T,Missing}}, A::ArrowVector) where T = getmetadata(io, T, A) | ||
getmetadata(io::IO, ::Type{Arrow.Datestamp}, A::ArrowVector) = Metadata.DateMetadata() | ||
function getmetadata(io::IO, ::Type{Arrow.Timestamp{T}}, A::ArrowVector) where T | ||
Metadata.TimestampMetadata(MDATA_TIME_DICT[T], "") | ||
end | ||
function getmetadata(io::IO, ::Type{Arrow.TimeOfDay{P,T}}, A::ArrowVector) where {P,T} | ||
Metadata.TimeMetadata(MDATA_TIME_DICT[P]) | ||
end | ||
# TODO Arrow standard says nothing about specifying whether DictEncoding is ordered! | ||
function getmetadata(io::IO, ::Type{T}, A::DictEncoding) where T | ||
vals = writecontents(Metadata.PrimitiveArray, io, levels(A)) | ||
Metadata.CategoryMetadata(vals, true) | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
|
||
# TODO currently there is no appending or anything like that | ||
|
||
mutable struct Sink <: Data.Sink | ||
filename::String | ||
schema::Data.Schema | ||
ctable::Metadata.CTable | ||
io::IO | ||
description::String | ||
metadata::String | ||
columns::Vector{ArrowVector} | ||
end | ||
|
||
function Sink(filename::AbstractString, sch::Data.Schema=Data.Schema(), | ||
cols::AbstractVector{<:ArrowVector}=Vector{ArrowVector}(undef, size(sch,2)); | ||
description::AbstractString="", metadata::AbstractString="") | ||
ctable = Metadata.CTable(description, 0, Metadata.Column[], FEATHER_VERSION, metadata) | ||
io = open(filename, "w+") | ||
Sink(filename, sch, ctable, io, description, metadata, cols) | ||
end | ||
function Sink(filename::AbstractString, df::DataFrame; description::AbstractString="", | ||
metadata::AbstractString="") | ||
Sink(filename, Data.schema(df), description=description, metadata=metadata) | ||
end | ||
|
||
# required by DataStreams | ||
function Sink(sch::Data.Schema, ::Type{Data.Column}, append::Bool, file::AbstractString; | ||
reference::Vector{UInt8}=UInt8[], kwargs...) | ||
Sink(file, sch) | ||
end | ||
function Sink(sink::Sink, sch::Data.Schema, ::Type{Data.Column}, append::Bool; | ||
reference::Vector{UInt8}=UInt8[]) | ||
Sink(sink.filename, sch, sink.columns) | ||
end | ||
|
||
Data.streamtypes(::Type{Sink}) = [Data.Column] | ||
|
||
size(sink::Sink) = size(sink.schema) | ||
size(sink::Sink, i::Integer) = size(sink.schema, i) | ||
|
||
|
||
""" | ||
write(filename::AbstractString, df::DataFrame) | ||
|
||
Write the dataframe `df` to the feather formatted file `filename`. | ||
""" | ||
function write(filename::AbstractString, df::AbstractDataFrame) | ||
sink = Feather.Sink(filename, df) | ||
Data.stream!(df, sink) | ||
Data.close!(sink) | ||
end | ||
|
||
|
||
function Data.streamto!(sink::Sink, ::Type{Data.Column}, val::AbstractVector{T}, row, col) where T | ||
sink.columns[col] = arrowformat(val) | ||
end | ||
|
||
# NOTE: the below is very inefficient, but we are forced to do it by the Feather format | ||
function Data.streamto!(sink::Sink, ::Type{Data.Column}, val::AbstractVector{Union{T,Missing}}, | ||
row, col) where T | ||
hasmissing = Compat.findfirst(ismissing, val) | ||
sink.columns[col] = arrowformat(hasmissing == nothing ? convert(AbstractVector{T}, val) : val) | ||
end | ||
|
||
|
||
function Metadata.PrimitiveArray(A::ArrowVector{J}, off::Integer, nbytes::Integer) where J | ||
Metadata.PrimitiveArray(feathertype(J), Metadata.PLAIN, off, length(A), nullcount(A), nbytes) | ||
end | ||
function Metadata.PrimitiveArray(A::DictEncoding, off::Integer, nbytes::Integer) | ||
Metadata.PrimitiveArray(feathertype(eltype(references(A))), Metadata.PLAIN, off, length(A), | ||
nullcount(A), nbytes) | ||
end | ||
|
||
|
||
writecontents(io::IO, A::Primitive) = writepadded(io, A) | ||
writecontents(io::IO, A::NullablePrimitive) = writepadded(io, A, bitmask, values) | ||
writecontents(io::IO, A::List) = writepadded(io, A, offsets, values) | ||
writecontents(io::IO, A::NullableList) = writepadded(io, A, bitmask, offsets, values) | ||
writecontents(io::IO, A::BitPrimitive) = writepadded(io, A, values) | ||
writecontents(io::IO, A::NullableBitPrimitive) = writepadded(io, A, bitmask, values) | ||
writecontents(io::IO, A::DictEncoding) = writecontents(io, references(A)) | ||
function writecontents(::Type{Metadata.PrimitiveArray}, io::IO, A::ArrowVector) | ||
a = position(io) | ||
writecontents(io, A) | ||
b = position(io) | ||
Metadata.PrimitiveArray(A, a, b-a) | ||
end | ||
|
||
|
||
function writecolumn(io::IO, name::AbstractString, A::ArrowVector{J}) where J | ||
vals = writecontents(Metadata.PrimitiveArray, io, A) | ||
Metadata.Column(String(name), vals, getmetadata(io, J, A), "") | ||
end | ||
function writecolumn(sink::Sink, col::String) | ||
writecolumn(sink.io, col, sink.columns[sink.schema[col]]) | ||
end | ||
writecolumns(sink::Sink) = Metadata.Column[writecolumn(sink, n) for n ∈ Data.header(sink.schema)] | ||
|
||
|
||
function writemetadata(io::IO, ctable::Metadata.CTable) | ||
meta = FlatBuffers.build!(ctable) | ||
rng = (meta.head+1):length(meta.bytes) | ||
writepadded(io, view(meta.bytes, rng)) | ||
Int32(length(rng)) | ||
end | ||
writemetadata(sink::Sink) = writemetadata(sink.io, sink.ctable) | ||
|
||
|
||
function Data.close!(sink::Sink) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is nice how all the writing happens in one place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Like I said I was contemplating breaking it up a little, but I'm not determined to do it. I definitely agree that it has to be done in such a way that it does not obscure where in the IO buffer you are at any point in the write process. Anyway, I'll leave this alone then, I didn't have any particularly inspired ideas about it: I think I just wanted to formalize how we deal with the header and tailer bytes a bit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like it just like this; before it was a little all over the place in terms of where all the IO happened. |
||
writepadded(sink.io, FEATHER_MAGIC_BYTES) | ||
cols = writecolumns(sink) | ||
ctable = Metadata.CTable(sink.description, size(sink,1), cols, FEATHER_VERSION, sink.metadata) | ||
sink.ctable = ctable | ||
len = writemetadata(sink) | ||
write(sink.io, Int32(len)) # these two writes combined are properly aligned | ||
write(sink.io, FEATHER_MAGIC_BYTES) | ||
close(sink.io) | ||
sink | ||
end | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you expound this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't even remember. I think I wrote that comment very early on, probably not realizing that a
@UNION
was coming from FlatBuffers. Will delete.