Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion csharp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ This implementation is under development and may not be suitable for use in prod
- Time64
- Binary (fixed-length)
- List
- Struct

### Type Metadata

Expand All @@ -119,7 +120,6 @@ This implementation is under development and may not be suitable for use in prod
- Tensor
- Table
- Arrays
- Struct
- Union
- Dense
- Sparse
Expand Down
29 changes: 20 additions & 9 deletions csharp/src/Apache.Arrow/Arrays/StructArray.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,44 @@
using Apache.Arrow.Types;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace Apache.Arrow
{
public class StructArray : Array
{
private readonly List<Array> _fields;
private IReadOnlyList<IArrowArray> _fields;

public IEnumerable<Array> Fields => _fields;
public IReadOnlyList<IArrowArray> Fields =>
LazyInitializer.EnsureInitialized(ref _fields, () => InitializeFields());

public StructArray(
IArrowType dataType, int length,
IEnumerable<Array> children,
IEnumerable<IArrowArray> children,
ArrowBuffer nullBitmapBuffer, int nullCount = 0, int offset = 0)
: this(new ArrayData(
dataType, length, nullCount, offset, new[] { nullBitmapBuffer },
children.Select(child => child.Data)))
{ }
: this(new ArrayData(
dataType, length, nullCount, offset, new[] { nullBitmapBuffer },
children.Select(child => child.Data)))
{
_fields = children.ToArray();
}

public StructArray(ArrayData data)
: base(data)
{
data.EnsureDataType(ArrowTypeId.Struct);

_fields = new List<Array>();
}

public override void Accept(IArrowArrayVisitor visitor) => Accept(this, visitor);

private IReadOnlyList<IArrowArray> InitializeFields()
{
IArrowArray[] result = new IArrowArray[Data.Children.Length];
for (int i = 0; i < Data.Children.Length; i++)
{
result[i] = ArrowArrayFactory.BuildArray(Data.Children[i]);
}
return result;
}
}
}
38 changes: 27 additions & 11 deletions csharp/src/Apache.Arrow/Ipc/ArrowReaderImplementation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ private List<IArrowArray> BuildArrays(
return arrays;
}


private ArrayData LoadPrimitiveField(
ref RecordBatchEnumerator recordBatchEnumerator,
Field field,
Expand All @@ -147,10 +146,10 @@ private ArrayData LoadPrimitiveField(
{

ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
recordBatchEnumerator.MoveNextBuffer();
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
recordBatchEnumerator.MoveNextBuffer();

if (!recordBatchEnumerator.MoveNextBuffer())
{
throw new Exception("Unable to move to the next buffer.");
}

int fieldLength = (int)fieldNode.Length;
int fieldNullCount = (int)fieldNode.NullCount;
Expand All @@ -165,13 +164,24 @@ private ArrayData LoadPrimitiveField(
throw new InvalidDataException("Null count length must be >= 0"); // TODO:Localize exception message
}

ArrowBuffer[] arrowBuff = new[] { nullArrowBuffer, valueArrowBuffer };
ArrowBuffer[] arrowBuff;
if (field.DataType.TypeId == ArrowTypeId.Struct)
{
arrowBuff = new[] { nullArrowBuffer };
}
else
{
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
recordBatchEnumerator.MoveNextBuffer();

arrowBuff = new[] { nullArrowBuffer, valueArrowBuffer };
}

ArrayData[] children = GetChildren(ref recordBatchEnumerator, field, bodyData);

return new ArrayData(field.DataType, fieldLength, fieldNullCount, 0, arrowBuff, children);
}


private ArrayData LoadVariableField(
ref RecordBatchEnumerator recordBatchEnumerator,
Field field,
Expand All @@ -180,9 +190,15 @@ private ArrayData LoadVariableField(
{

ArrowBuffer nullArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
recordBatchEnumerator.MoveNextBuffer();
if (!recordBatchEnumerator.MoveNextBuffer())
{
throw new Exception("Unable to move to the next buffer.");
}
ArrowBuffer offsetArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
recordBatchEnumerator.MoveNextBuffer();
if (!recordBatchEnumerator.MoveNextBuffer())
{
throw new Exception("Unable to move to the next buffer.");
}
ArrowBuffer valueArrowBuffer = BuildArrowBuffer(bodyData, recordBatchEnumerator.CurrentBuffer);
recordBatchEnumerator.MoveNextBuffer();

Expand Down Expand Up @@ -212,14 +228,14 @@ private ArrayData[] GetChildren(
{
if (!(field.DataType is NestedType type)) return null;

int childrenCount = type.Children.Count;
int childrenCount = type.Fields.Count;
var children = new ArrayData[childrenCount];
for (int index = 0; index < childrenCount; index++)
{
recordBatchEnumerator.MoveNextNode();
Flatbuf.FieldNode childFieldNode = recordBatchEnumerator.CurrentNode;

Field childField = type.Children[index];
Field childField = type.Fields[index];
ArrayData child = childField.DataType.IsFixedPrimitive()
? LoadPrimitiveField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData)
: LoadVariableField(ref recordBatchEnumerator, childField, in childFieldNode, bodyData);
Expand Down
19 changes: 15 additions & 4 deletions csharp/src/Apache.Arrow/Ipc/ArrowStreamWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ internal class ArrowRecordBatchFlatBufferBuilder :
IArrowArrayVisitor<Date64Array>,
IArrowArrayVisitor<ListArray>,
IArrowArrayVisitor<StringArray>,
IArrowArrayVisitor<BinaryArray>
IArrowArrayVisitor<BinaryArray>,
IArrowArrayVisitor<StructArray>
{
public readonly struct Buffer
{
Expand Down Expand Up @@ -102,6 +103,16 @@ public void Visit(BinaryArray array)
_buffers.Add(CreateBuffer(array.ValueBuffer));
}

public void Visit(StructArray array)
{
_buffers.Add(CreateBuffer(array.NullBitmapBuffer));

for (int i = 0; i < array.Fields.Count; i++)
{
array.Fields[i].Accept(this);
}
}

private void CreateBuffers(BooleanArray array)
{
_buffers.Add(CreateBuffer(array.NullBitmapBuffer));
Expand Down Expand Up @@ -204,7 +215,7 @@ private void CountSelfAndChildrenNodes(IArrowType type, ref int count)
{
if (type is NestedType nestedType)
{
foreach (Field childField in nestedType.Children)
foreach (Field childField in nestedType.Fields)
{
CountSelfAndChildrenNodes(childField.DataType, ref count);
}
Expand Down Expand Up @@ -467,12 +478,12 @@ private ValueTask WriteBufferAsync(ArrowBuffer arrowBuffer, CancellationToken ca
return System.Array.Empty<Offset<Flatbuf.Field>>();
}

int childrenCount = type.Children.Count;
int childrenCount = type.Fields.Count;
var children = new Offset<Flatbuf.Field>[childrenCount];

for (int i = 0; i < childrenCount; i++)
{
Field childField = type.Children[i];
Field childField = type.Fields[i];
StringOffset childFieldNameOffset = Builder.CreateString(childField.Name);
ArrowTypeFlatbufferBuilder.FieldType childFieldType = _fieldTypeBuilder.BuildFieldType(childField);
VectorOffset childFieldChildrenVectorOffset = Builder.CreateVectorOfTables(GetChildrenFieldOffsets(childField));
Expand Down
19 changes: 13 additions & 6 deletions csharp/src/Apache.Arrow/Ipc/ArrowTypeFlatbufferBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public struct FieldType
public readonly int Offset;

public static FieldType Build<T>(Flatbuf.Type type, Offset<T> offset)
where T: struct =>
where T : struct =>
new FieldType(type, offset.Value);

public FieldType(Flatbuf.Type type, int offset)
Expand All @@ -40,7 +40,7 @@ public FieldType(Flatbuf.Type type, int offset)
}
}

class TypeVisitor :
class TypeVisitor :
IArrowTypeVisitor<BooleanType>,
IArrowTypeVisitor<Int8Type>,
IArrowTypeVisitor<Int16Type>,
Expand All @@ -60,7 +60,8 @@ class TypeVisitor :
IArrowTypeVisitor<BinaryType>,
IArrowTypeVisitor<TimestampType>,
IArrowTypeVisitor<ListType>,
IArrowTypeVisitor<UnionType>
IArrowTypeVisitor<UnionType>,
IArrowTypeVisitor<StructType>
{
private FlatBufferBuilder Builder { get; }

Expand Down Expand Up @@ -100,7 +101,7 @@ public void Visit(ListType type)
{
Flatbuf.List.StartList(Builder);
Result = FieldType.Build(
Flatbuf.Type.List,
Flatbuf.Type.List,
Flatbuf.List.EndList(Builder));
}

Expand All @@ -118,14 +119,14 @@ public void Visit(StringType type)
}

public void Visit(TimestampType type)
{
{
StringOffset timezoneStringOffset = default;

if (!string.IsNullOrWhiteSpace(type.Timezone))
timezoneStringOffset = Builder.CreateString(type.Timezone);

Result = FieldType.Build(
Flatbuf.Type.Timestamp,
Flatbuf.Type.Timestamp,
Flatbuf.Timestamp.CreateTimestamp(Builder, ToFlatBuffer(type.Unit), timezoneStringOffset));
}

Expand Down Expand Up @@ -171,6 +172,12 @@ public void Visit(Time64Type type)
Flatbuf.Time.CreateTime(Builder, ToFlatBuffer(type.Unit), 64));
}

public void Visit(StructType type)
{
Flatbuf.Struct_.StartStruct_(Builder);
Result = FieldType.Build(Flatbuf.Type.Struct_, Flatbuf.Struct_.EndStruct_(Builder));
}

private void CreateIntType(NumberType type)
{
Result = FieldType.Build(
Expand Down
23 changes: 20 additions & 3 deletions csharp/src/Apache.Arrow/Ipc/MessageSerializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

using System;
using System.Diagnostics;
using System.IO;

namespace Apache.Arrow.Ipc
Expand Down Expand Up @@ -58,15 +59,28 @@ internal static Schema GetSchema(Flatbuf.Schema schema)
{
Flatbuf.Field field = schema.Fields(i).GetValueOrDefault();

schemaBuilder.Field(
new Field(field.Name, GetFieldArrowType(field), field.Nullable));
schemaBuilder.Field(FieldFromFlatbuffer(field));
}

return schemaBuilder.Build();
}

private static Field FieldFromFlatbuffer(Flatbuf.Field flatbufField)
{
Field[] childFields = null;
if (flatbufField.ChildrenLength > 0)
{
childFields = new Field[flatbufField.ChildrenLength];
for (int i = 0; i < flatbufField.ChildrenLength; i++)
{
Flatbuf.Field? childFlatbufField = flatbufField.Children(i);
childFields[i] = FieldFromFlatbuffer(childFlatbufField.Value);
}
}
return new Field(flatbufField.Name, GetFieldArrowType(flatbufField, childFields), flatbufField.Nullable);
}

private static Types.IArrowType GetFieldArrowType(Flatbuf.Field field)
private static Types.IArrowType GetFieldArrowType(Flatbuf.Field field, Field[] childFields = null)
{
switch (field.TypeType)
{
Expand Down Expand Up @@ -131,6 +145,9 @@ private static Types.IArrowType GetFieldArrowType(Flatbuf.Field field)
throw new InvalidDataException($"List type must have only one child.");
}
return new Types.ListType(GetFieldArrowType(field.Children(0).GetValueOrDefault()));
case Flatbuf.Type.Struct_:
Debug.Assert(childFields != null);
return new Types.StructType(childFields);
default:
throw new InvalidDataException($"Arrow primitive '{field.TypeType}' is unsupported.");
}
Expand Down
4 changes: 2 additions & 2 deletions csharp/src/Apache.Arrow/Types/ListType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public sealed class ListType : NestedType
public override ArrowTypeId TypeId => ArrowTypeId.List;
public override string Name => "list";

public Field ValueField => Children[0];
public Field ValueField => Fields[0];

public IArrowType ValueDataType => Children[0].DataType;
public IArrowType ValueDataType => Fields[0].DataType;

public ListType(Field valueField)
: base(valueField) { }
Expand Down
21 changes: 12 additions & 9 deletions csharp/src/Apache.Arrow/Types/NestedType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@ namespace Apache.Arrow.Types
{
public abstract class NestedType : ArrowType
{
public IReadOnlyList<Field> Children { get; }
[Obsolete("Use `Fields` instead")]
public IReadOnlyList<Field> Children => Fields;

protected NestedType(IReadOnlyList<Field> children)
public IReadOnlyList<Field> Fields { get; }

protected NestedType(IReadOnlyList<Field> fields)
{
if (children == null || children.Count == 0)
if (fields == null || fields.Count == 0)
{
throw new ArgumentNullException(nameof(children));
throw new ArgumentNullException(nameof(fields));
}
Children = children;
Fields = fields;
}

protected NestedType(Field child)
protected NestedType(Field field)
{
if (child == null)
if (field == null)
{
throw new ArgumentNullException(nameof(child));
throw new ArgumentNullException(nameof(field));
}
Children = new List<Field> { child };
Fields = new Field[] { field };
}
}
}
Loading