Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow batches complete (for testing) #1889

Draft
wants to merge 60 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
d8faeba
Refactor init converters for reuse in other code
sfc-gh-astachowski Aug 22, 2024
8499a52
Formatting
sfc-gh-astachowski Aug 22, 2024
982b13e
Removed star imports
sfc-gh-astachowski Aug 22, 2024
c82e155
Documentation style fix
sfc-gh-astachowski Aug 22, 2024
7a8382b
Moved init converters to an already public class
sfc-gh-astachowski Aug 22, 2024
7aa57d1
Moved init converters to an appropriate interface
sfc-gh-astachowski Aug 22, 2024
7246575
Merge branch 'master' into init-converters-refactor
sfc-gh-astachowski Aug 22, 2024
48c2435
Changed the version of japicmp to include a needed bugfix
sfc-gh-astachowski Aug 22, 2024
6142094
Refactored unnecessary static modifiers
sfc-gh-astachowski Aug 22, 2024
a9ca5e6
Reformat
sfc-gh-astachowski Aug 22, 2024
4d64dab
Initial version
sfc-gh-astachowski Aug 23, 2024
986967d
Initial tests
sfc-gh-astachowski Aug 26, 2024
c9f6287
Formatting
sfc-gh-astachowski Aug 26, 2024
0fc7b5a
Import formatting
sfc-gh-astachowski Aug 26, 2024
b409510
Added missing interface definitions
sfc-gh-astachowski Aug 26, 2024
1de2c39
Implemented review feedback
sfc-gh-astachowski Aug 27, 2024
969c59c
Further review feedback
sfc-gh-astachowski Aug 28, 2024
f658c45
Implementation and basic tests
sfc-gh-astachowski Aug 29, 2024
28c04b2
Formatting
sfc-gh-astachowski Aug 29, 2024
591eda4
Added internal annotations
sfc-gh-astachowski Aug 29, 2024
c3ab389
Arrow batches initial (#1876)
sfc-gh-astachowski Aug 30, 2024
a5e4c60
Merge branch 'master' into SNOW-873466-arrow-batches
sfc-gh-astachowski Aug 30, 2024
f12fdeb
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-all-simpl…
sfc-gh-astachowski Aug 30, 2024
c50dff0
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 3, 2024
a1cfe08
Added handling of remaining types
sfc-gh-astachowski Sep 3, 2024
d803f9f
Merge remote-tracking branch 'origin/SNOW-873466-arrow-batches' into …
sfc-gh-astachowski Sep 3, 2024
3f4988a
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 3, 2024
53638df
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-all-simpl…
sfc-gh-astachowski Sep 3, 2024
0fd7d0f
Add null time zone check
sfc-gh-astachowski Sep 3, 2024
73d6b4d
Removed timestamp support
sfc-gh-astachowski Sep 3, 2024
7424acf
Added timestamp support
sfc-gh-astachowski Sep 3, 2024
3dd20a9
Formatting
sfc-gh-astachowski Sep 3, 2024
28fb57a
Removed old comments
sfc-gh-astachowski Sep 3, 2024
afa1142
Fixed memory leak and added assertion of no leaks in tests.
sfc-gh-astachowski Sep 3, 2024
7c9ab7a
Fixed memory leaks and added assertions of no memory leaks.
sfc-gh-astachowski Sep 3, 2024
70d0bdb
Fixed memory leaks and added assertions of no memory leaks.
sfc-gh-astachowski Sep 3, 2024
867079a
Fixed a test
sfc-gh-astachowski Sep 5, 2024
b08efcf
Attempted test fix
sfc-gh-astachowski Sep 5, 2024
c705280
Added test category
sfc-gh-astachowski Sep 5, 2024
9347fd5
Added conditional ignore rule
sfc-gh-astachowski Sep 5, 2024
1ff3cc1
Merge branch 'master' into SNOW-873466-arrow-batches
sfc-gh-astachowski Sep 6, 2024
c94276f
Merge fixes
sfc-gh-astachowski Sep 6, 2024
53dbc07
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 6, 2024
31bb11b
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-all-simpl…
sfc-gh-astachowski Sep 6, 2024
ec255ee
Merge branch 'arrow-batches-all-simple-types' into arrow-batches-time…
sfc-gh-astachowski Sep 6, 2024
69cfe59
Merge fixes
sfc-gh-astachowski Sep 6, 2024
c638ad1
Formatting
sfc-gh-astachowski Sep 6, 2024
e2a0d6e
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 6, 2024
b8bb791
Formatting
sfc-gh-astachowski Sep 6, 2024
e26e56c
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-all-simpl…
sfc-gh-astachowski Sep 6, 2024
1c34bb9
Added null check
sfc-gh-astachowski Sep 6, 2024
de52645
Merge branch 'arrow-batches-all-simple-types' into arrow-batches-time…
sfc-gh-astachowski Sep 6, 2024
ef5238c
Formatting
sfc-gh-astachowski Sep 6, 2024
d94f7c7
Renamed test
sfc-gh-astachowski Sep 6, 2024
f4ace2d
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-all-simpl…
sfc-gh-astachowski Sep 9, 2024
660c0de
Merge branch 'arrow-batches-all-simple-types' into arrow-batches-time…
sfc-gh-astachowski Sep 9, 2024
384efc8
Merge branch 'SNOW-873466-arrow-batches' into arrow-batches-structure…
sfc-gh-astachowski Sep 9, 2024
e212d78
Merge branch 'arrow-batches-timestamps' into arrow-batches-complete
sfc-gh-astachowski Sep 10, 2024
93a35e7
Additional merge fixes
sfc-gh-astachowski Sep 10, 2024
36e8d66
Formatting
sfc-gh-astachowski Sep 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 95 additions & 33 deletions src/main/java/net/snowflake/client/core/SFArrowResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import net.snowflake.client.core.arrow.VarCharConverter;
import net.snowflake.client.core.arrow.VectorTypeConverter;
import net.snowflake.client.core.json.Converters;
import net.snowflake.client.jdbc.ArrowBatch;
import net.snowflake.client.jdbc.ArrowBatches;
import net.snowflake.client.jdbc.ArrowResultChunk;
import net.snowflake.client.jdbc.ArrowResultChunk.ArrowChunkIterator;
import net.snowflake.client.jdbc.ErrorCode;
Expand Down Expand Up @@ -112,6 +114,15 @@ public class SFArrowResultSet extends SFBaseResultSet implements DataConversionC
*/
private boolean formatDateWithTimezone;

/** The result set should be read either only as rows or only as batches */
private enum ReadingMode {
UNSPECIFIED,
ROW_MODE,
BATCH_MODE
}

private ReadingMode readingMode = ReadingMode.UNSPECIFIED;

@SnowflakeJdbcInternalApi protected Converters converters;

/**
Expand Down Expand Up @@ -238,6 +249,11 @@ public SFArrowResultSet(
}
}

@SnowflakeJdbcInternalApi
public long getAllocatedMemory() {
return rootAllocator.getAllocatedMemory();
}

private boolean fetchNextRow() throws SnowflakeSQLException {
if (sortResult) {
return fetchNextRowSorted();
Expand All @@ -246,6 +262,31 @@ private boolean fetchNextRow() throws SnowflakeSQLException {
}
}

private ArrowResultChunk fetchNextChunk() throws SnowflakeSQLException {
try {
logger.debug("Fetching chunk number " + nextChunkIndex);
eventHandler.triggerStateTransition(
BasicEvent.QueryState.CONSUMING_RESULT,
String.format(
BasicEvent.QueryState.CONSUMING_RESULT.getArgString(), queryId, nextChunkIndex));
ArrowResultChunk nextChunk = (ArrowResultChunk) chunkDownloader.getNextChunkToConsume();

if (nextChunk == null) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Expect chunk but got null for chunk index " + nextChunkIndex);
}
logger.debug("Chunk number " + nextChunkIndex + " fetched successfully.");
return nextChunk;
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryId, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
}
}

/**
* Goto next row. If end of current chunk, update currentChunkIterator to the beginning of next
* chunk, if any chunk not being consumed yet.
Expand All @@ -259,40 +300,19 @@ private boolean fetchNextRowUnsorted() throws SnowflakeSQLException {
return true;
} else {
if (nextChunkIndex < chunkCount) {
try {
eventHandler.triggerStateTransition(
BasicEvent.QueryState.CONSUMING_RESULT,
String.format(
BasicEvent.QueryState.CONSUMING_RESULT.getArgString(), queryId, nextChunkIndex));

ArrowResultChunk nextChunk = (ArrowResultChunk) chunkDownloader.getNextChunkToConsume();

if (nextChunk == null) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Expect chunk but got null for chunk index " + nextChunkIndex);
}
ArrowResultChunk nextChunk = fetchNextChunk();

currentChunkIterator.getChunk().freeData();
currentChunkIterator = nextChunk.getIterator(this);
if (currentChunkIterator.next()) {

logger.debug(
"Moving to chunk index: {}, row count: {}",
nextChunkIndex,
nextChunk.getRowCount());

nextChunkIndex++;
return true;
} else {
return false;
}
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryId, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
currentChunkIterator.getChunk().freeData();
currentChunkIterator = nextChunk.getIterator(this);
if (currentChunkIterator.next()) {

logger.debug(
"Moving to chunk index: {}, row count: {}", nextChunkIndex, nextChunk.getRowCount());

nextChunkIndex++;
return true;
} else {
return false;
}
} else {
// always free current chunk
Expand Down Expand Up @@ -431,6 +451,11 @@ public boolean next() throws SFException, SnowflakeSQLException {
if (isClosed()) {
return false;
}
if (readingMode == ReadingMode.BATCH_MODE) {
logger.warn("Cannot read rows after getArrowBatches() was called.");
return false;
}
readingMode = ReadingMode.ROW_MODE;

// otherwise try to fetch again
if (fetchNextRow()) {
Expand Down Expand Up @@ -763,6 +788,43 @@ public BigDecimal getBigDecimal(int columnIndex, int scale) throws SFException {
return bigDec == null ? null : bigDec.setScale(scale, RoundingMode.HALF_UP);
}

public ArrowBatches getArrowBatches() {
if (readingMode == ReadingMode.ROW_MODE) {
logger.warn("Cannot read arrow batches after next() was called.");
return null;
}
readingMode = ReadingMode.BATCH_MODE;
return new SFArrowBatchesIterator();
}

private class SFArrowBatchesIterator implements ArrowBatches {
private boolean firstFetched = false;

@Override
public long getRowCount() throws SQLException {
return resultSetSerializable.getRowCount();
}

@Override
public boolean hasNext() {
return nextChunkIndex < chunkCount || !firstFetched;
}

@Override
public ArrowBatch next() throws SQLException {
if (!firstFetched) {
firstFetched = true;
return currentChunkIterator
.getChunk()
.getArrowBatch(SFArrowResultSet.this, useSessionTimezone ? sessionTimeZone : null);
} else {
nextChunkIndex++;
return fetchNextChunk()
.getArrowBatch(SFArrowResultSet.this, useSessionTimezone ? sessionTimeZone : null);
}
}
}

@Override
public boolean isLast() {
return nextChunkIndex == chunkCount && currentChunkIterator.isLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import net.snowflake.client.core.json.Converters;
import net.snowflake.client.jdbc.ArrowBatches;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.FieldMetadata;
import net.snowflake.client.jdbc.SnowflakeLoggedFeatureNotSupportedException;
import net.snowflake.client.jdbc.SnowflakeResultSetSerializable;
import net.snowflake.client.jdbc.SnowflakeResultSetSerializableV1;
import net.snowflake.client.jdbc.SnowflakeSQLException;
Expand Down Expand Up @@ -137,6 +139,10 @@ public SFBaseSession getSession() {
return this.session;
}

public ArrowBatches getArrowBatches() throws SnowflakeLoggedFeatureNotSupportedException {
throw new SnowflakeLoggedFeatureNotSupportedException(session);
}

// default implementation
public boolean next() throws SFException, SnowflakeSQLException {
logger.trace("boolean next()", false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import java.util.Map;
import java.util.TimeZone;
import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import net.snowflake.client.jdbc.SnowflakeType;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.types.Types;

@SnowflakeJdbcInternalApi
public interface ArrowFullVectorConverter {
static Types.MinorType deduceType(ValueVector vector) {
Types.MinorType type = Types.getMinorTypeForArrowType(vector.getField().getType());
// each column's metadata
Map<String, String> customMeta = vector.getField().getMetadata();
if (type == Types.MinorType.DECIMAL) {
// Note: Decimal vector is different from others
return Types.MinorType.DECIMAL;
} else if (!customMeta.isEmpty()) {
SnowflakeType st = SnowflakeType.valueOf(customMeta.get("logicalType"));
switch (st) {
case FIXED:
{
String scaleStr = vector.getField().getMetadata().get("scale");
int sfScale = Integer.parseInt(scaleStr);
if (sfScale != 0) {
return Types.MinorType.DECIMAL;
}
break;
}
case VECTOR:
return Types.MinorType.FIXED_SIZE_LIST;
case TIME:
{
String scaleStr = vector.getField().getMetadata().get("scale");
int sfScale = Integer.parseInt(scaleStr);
if (sfScale == 0) {
return Types.MinorType.TIMESEC;
}
if (sfScale <= 3) {
return Types.MinorType.TIMEMILLI;
}
if (sfScale <= 6) {
return Types.MinorType.TIMEMICRO;
}
if (sfScale <= 9) {
return Types.MinorType.TIMENANO;
}
}
case TIMESTAMP_NTZ:
return Types.MinorType.TIMESTAMPNANO;
case TIMESTAMP_LTZ:
case TIMESTAMP_TZ:
return Types.MinorType.TIMESTAMPNANOTZ;
}
}
return type;
}

static FieldVector convert(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
TimeZone timeZoneToUse,
int idx,
Object targetType)
throws SnowflakeSQLException {
try {
if (targetType == null) {
targetType = deduceType(vector);
}
if (targetType instanceof Types.MinorType) {
switch ((Types.MinorType) targetType) {
case TINYINT:
return new TinyIntVectorConverter(allocator, vector, context, session, idx).convert();
case SMALLINT:
return new SmallIntVectorConverter(allocator, vector, context, session, idx).convert();
case INT:
return new IntVectorConverter(allocator, vector, context, session, idx).convert();
case BIGINT:
return new BigIntVectorConverter(allocator, vector, context, session, idx).convert();
case DECIMAL:
return new DecimalVectorConverter(allocator, vector, context, session, idx).convert();
case FLOAT8:
return new FloatVectorConverter(allocator, vector, context, session, idx).convert();
case BIT:
return new BitVectorConverter(allocator, vector, context, session, idx).convert();
case VARBINARY:
return new BinaryVectorConverter(allocator, vector, context, session, idx).convert();
case DATEDAY:
return new DateVectorConverter(allocator, vector, context, session, idx, timeZoneToUse)
.convert();
case TIMESEC:
return new TimeSecVectorConverter(allocator, vector).convert();
case TIMEMILLI:
return new TimeMilliVectorConverter(allocator, vector).convert();
case TIMEMICRO:
return new TimeMicroVectorConverter(allocator, vector).convert();
case TIMENANO:
return new TimeNanoVectorConverter(allocator, vector).convert();
case TIMESTAMPNANOTZ:
return new TimestampVectorConverter(allocator, vector, context, timeZoneToUse, false)
.convert();
case TIMESTAMPNANO:
return new TimestampVectorConverter(allocator, vector, context, timeZoneToUse, true)
.convert();
case STRUCT:
return new StructVectorConverter(
allocator, vector, context, session, timeZoneToUse, idx, null)
.convert();
case LIST:
return new ListVectorConverter(
allocator, vector, context, session, timeZoneToUse, idx, null)
.convert();
case VARCHAR:
return new VarCharVectorConverter(allocator, vector, context, session, idx).convert();
case MAP:
return new MapVectorConverter(
allocator, vector, context, session, timeZoneToUse, idx, null)
.convert();
case FIXED_SIZE_LIST:
return new FixedSizeListVectorConverter(
allocator, vector, context, session, timeZoneToUse, idx, null)
.convert();
}
}
} catch (SFException ex) {
throw new SnowflakeSQLException(
ex.getCause(), ex.getSqlState(), ex.getVendorCode(), ex.getParams());
}
return null;
}

FieldVector convert() throws SFException, SnowflakeSQLException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import net.snowflake.client.core.DataConversionContext;
import net.snowflake.client.core.SFBaseSession;
import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.core.arrow.ArrowVectorConverter;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.ValueVector;

@SnowflakeJdbcInternalApi
public class BigIntVectorConverter extends SimpleArrowFullVectorConverter<BigIntVector> {

public BigIntVectorConverter(
RootAllocator allocator,
ValueVector vector,
DataConversionContext context,
SFBaseSession session,
int idx) {
super(allocator, vector, context, session, idx);
}

@Override
protected boolean matchingType() {
return (vector instanceof BigIntVector);
}

@Override
protected BigIntVector initVector() {
BigIntVector resultVector = new BigIntVector(vector.getName(), allocator);
resultVector.allocateNew(vector.getValueCount());
return resultVector;
}

@Override
protected void convertValue(ArrowVectorConverter from, BigIntVector to, int idx)
throws SFException {
to.set(idx, from.toLong(idx));
}
}
Loading
Loading