Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow batches structured types #1879

Merged
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4d64dab
Initial version
sfc-gh-astachowski Aug 23, 2024
986967d
Initial tests
sfc-gh-astachowski Aug 26, 2024
c9f6287
Formatting
sfc-gh-astachowski Aug 26, 2024
0fc7b5a
Import formatting
sfc-gh-astachowski Aug 26, 2024
b409510
Added missing interface definitions
sfc-gh-astachowski Aug 26, 2024
1de2c39
Implemented review feedback
sfc-gh-astachowski Aug 27, 2024
969c59c
Further review feedback
sfc-gh-astachowski Aug 28, 2024
f658c45
Implementation and basic tests
sfc-gh-astachowski Aug 29, 2024
28c04b2
Formatting
sfc-gh-astachowski Aug 29, 2024
591eda4
Added internal annotations
sfc-gh-astachowski Aug 29, 2024
c50dff0
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 3, 2024
3f4988a
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 3, 2024
70d0bdb
Fixed memory leaks and added assertions of no memory leaks.
sfc-gh-astachowski Sep 3, 2024
867079a
Fixed a test
sfc-gh-astachowski Sep 5, 2024
b08efcf
Attempted test fix
sfc-gh-astachowski Sep 5, 2024
53dbc07
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 6, 2024
e2a0d6e
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 6, 2024
b8bb791
Formatting
sfc-gh-astachowski Sep 6, 2024
384efc8
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 9, 2024
bdca1db
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 19, 2024
4c9c8f2
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 19, 2024
3504d4f
Merge fixes
sfc-gh-astachowski Sep 23, 2024
67ae0c4
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 23, 2024
113af82
Added try-with-resources
sfc-gh-astachowski Sep 23, 2024
44f9c15
Merge fixes and closing in finally blocks
sfc-gh-astachowski Sep 24, 2024
273255f
Added exception on second convert() call
sfc-gh-astachowski Sep 27, 2024
c662531
Formatting
sfc-gh-astachowski Sep 27, 2024
0508e48
More general exception catch
sfc-gh-astachowski Sep 30, 2024
07bcb33
More general exception catches
sfc-gh-astachowski Sep 30, 2024
5ad9f3b
Added tests for SFArrowException
sfc-gh-astachowski Oct 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ static Types.MinorType deduceType(ValueVector vector) {
}
break;
}
case VECTOR:
return Types.MinorType.FIXED_SIZE_LIST;
case TIME:
return Types.MinorType.TIMEMILLI;
case TIMESTAMP_LTZ:
Expand Down Expand Up @@ -112,6 +114,19 @@ static FieldVector convert(
return new BigIntVectorConverter(allocator, vector, context, session, idx).convert();
case DECIMAL:
return new DecimalVectorConverter(allocator, vector, context, session, idx).convert();
case STRUCT:
return new StructVectorConverter(allocator, vector, context, session, idx, null)
.convert();
case LIST:
return new ListVectorConverter(allocator, vector, context, session, idx, null)
.convert();
case VARCHAR:
return new VarCharVectorConverter(allocator, vector, context, session, idx).convert();
case MAP:
return new MapVectorConverter(allocator, vector, context, session, idx, null).convert();
case FIXED_SIZE_LIST:
return new FixedSizeListVectorConverter(allocator, vector, context, session, idx, null)
.convert();
}
}
} catch (SFException ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import java.util.ArrayList;
import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.FixedSizeListVector;
import org.apache.arrow.vector.types.pojo.Field;

@SnowflakeJdbcInternalApi
public class FixedSizeListVectorConverter implements ArrowFullVectorConverter {
protected RootAllocator allocator;
protected ValueVector vector;
protected DataConversionContext context;
protected SFBaseSession session;
protected int idx;
protected Object valueTargetType;

FixedSizeListVectorConverter(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
int idx,
Object valueTargetType) {
this.allocator = allocator;
this.vector = vector;
this.context = context;
this.session = session;
this.idx = idx;
this.valueTargetType = valueTargetType;
}

@Override
public FieldVector convert() throws SFException, SnowflakeSQLException {
FixedSizeListVector listVector = (FixedSizeListVector) vector;
FieldVector dataVector = listVector.getDataVector();
FieldVector convertedDataVector =
ArrowFullVectorConverter.convert(
allocator, dataVector, context, session, 0, valueTargetType);
FixedSizeListVector convertedListVector =
FixedSizeListVector.empty(listVector.getName(), listVector.getListSize(), allocator);
ArrayList<Field> fields = new ArrayList<>();
fields.add(convertedDataVector.getField());
convertedListVector.initializeChildrenFromFields(fields);
convertedListVector.allocateNew();
convertedListVector.setValueCount(listVector.getValueCount());
ArrowBuf validityBuffer = listVector.getValidityBuffer();
convertedListVector
.getValidityBuffer()
.setBytes(0L, validityBuffer, 0L, validityBuffer.capacity());
convertedDataVector.makeTransferPair(convertedListVector.getDataVector()).transfer();

vector.close();
sfc-gh-astachowski marked this conversation as resolved.
Show resolved Hide resolved
return convertedListVector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import java.util.ArrayList;
import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.types.pojo.Field;

@SnowflakeJdbcInternalApi
public class ListVectorConverter implements ArrowFullVectorConverter {
protected RootAllocator allocator;
protected ValueVector vector;
protected DataConversionContext context;
protected SFBaseSession session;
protected int idx;
protected Object valueTargetType;

ListVectorConverter(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
int idx,
Object valueTargetType) {
this.allocator = allocator;
this.vector = vector;
this.context = context;
this.session = session;
this.idx = idx;
this.valueTargetType = valueTargetType;
}

protected ListVector initVector(String name, Field field) {
ListVector convertedListVector = ListVector.empty(name, allocator);
ArrayList<Field> fields = new ArrayList<>();
fields.add(field);
convertedListVector.initializeChildrenFromFields(fields);
return convertedListVector;
}

@Override
public FieldVector convert() throws SFException, SnowflakeSQLException {
ListVector listVector = (ListVector) vector;
FieldVector dataVector = listVector.getDataVector();
FieldVector convertedDataVector =
ArrowFullVectorConverter.convert(
allocator, dataVector, context, session, 0, valueTargetType);
ListVector convertedListVector = initVector(vector.getName(), dataVector.getField());
convertedListVector.allocateNew();
convertedListVector.setValueCount(listVector.getValueCount());
convertedListVector.getOffsetBuffer().setBytes(0, listVector.getOffsetBuffer());
ArrowBuf validityBuffer = listVector.getValidityBuffer();
convertedListVector
.getValidityBuffer()
.setBytes(0L, validityBuffer, 0L, validityBuffer.capacity());
convertedListVector.setLastSet(listVector.getLastSet());
convertedDataVector.makeTransferPair(convertedListVector.getDataVector()).transfer();

vector.close();
return convertedListVector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import java.util.ArrayList;
import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.complex.MapVector;
import org.apache.arrow.vector.types.pojo.Field;

@SnowflakeJdbcInternalApi
public class MapVectorConverter extends ListVectorConverter {

MapVectorConverter(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
int idx,
Object valueTargetType) {
super(allocator, vector, context, session, idx, valueTargetType);
}

@Override
protected ListVector initVector(String name, Field field) {
MapVector convertedMapVector = MapVector.empty(name, allocator, false);
ArrayList<Field> fields = new ArrayList<>();
fields.add(field);
convertedMapVector.initializeChildrenFromFields(fields);
return convertedMapVector;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.arrow.memory.ArrowBuf;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.util.TransferPair;

@SnowflakeJdbcInternalApi
public class StructVectorConverter implements ArrowFullVectorConverter {
protected RootAllocator allocator;
protected ValueVector vector;
protected DataConversionContext context;
protected SFBaseSession session;
protected int idx;
protected Map<String, Object> targetTypes;

StructVectorConverter(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
int idx,
Map<String, Object> targetTypes) {
this.allocator = allocator;
this.vector = vector;
this.context = context;
this.session = session;
this.idx = idx;
this.targetTypes = targetTypes;
}

public FieldVector convert() throws SFException, SnowflakeSQLException {
StructVector structVector = (StructVector) vector;
List<FieldVector> childVectors = structVector.getChildrenFromFields();
List<FieldVector> convertedVectors = new ArrayList<>();
for (FieldVector childVector : childVectors) {
Object targetType = null;
if (targetTypes != null) {
targetType = targetTypes.get(childVector.getName());
}
convertedVectors.add(
ArrowFullVectorConverter.convert(
allocator, childVector, context, session, idx, targetType));
}

List<Field> convertedFields =
convertedVectors.stream().map(ValueVector::getField).collect(Collectors.toList());
StructVector converted = StructVector.empty(vector.getName(), allocator);
converted.allocateNew();
converted.initializeChildrenFromFields(convertedFields);
for (FieldVector convertedVector : convertedVectors) {
TransferPair transferPair =
convertedVector.makeTransferPair(converted.getChild(convertedVector.getName()));
transferPair.transfer();
}
ArrowBuf validityBuffer = structVector.getValidityBuffer();
converted.getValidityBuffer().setBytes(0L, validityBuffer, 0L, validityBuffer.capacity());
converted.setValueCount(vector.getValueCount());

vector.close();
return converted;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.core.arrow.ArrowVectorConverter;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.util.Text;

@SnowflakeJdbcInternalApi
public class VarCharVectorConverter extends SimpleArrowFullVectorConverter<VarCharVector> {
public VarCharVectorConverter(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
int idx) {
super(allocator, vector, context, session, idx);
}

@Override
protected boolean matchingType() {
return (vector instanceof VarCharVector);
}

@Override
protected VarCharVector initVector() {
VarCharVector resultVector = new VarCharVector(vector.getName(), allocator);
resultVector.allocateNew(vector.getValueCount());
return resultVector;
}

@Override
protected void convertValue(ArrowVectorConverter from, VarCharVector to, int idx)
throws SFException {
to.set(idx, new Text(from.toString(idx)));
}
}
Loading