Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-873466 arrow batches #1885

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d8faeba
Refactor init converters for reuse in other code
sfc-gh-astachowski Aug 22, 2024
8499a52
Formatting
sfc-gh-astachowski Aug 22, 2024
982b13e
Removed star imports
sfc-gh-astachowski Aug 22, 2024
c82e155
Documentation style fix
sfc-gh-astachowski Aug 22, 2024
7a8382b
Moved init converters to an already public class
sfc-gh-astachowski Aug 22, 2024
7aa57d1
Moved init converters to an appropriate interface
sfc-gh-astachowski Aug 22, 2024
7246575
Merge branch 'master' into init-converters-refactor
sfc-gh-astachowski Aug 22, 2024
48c2435
Changed the version of japicmp to include a needed bugfix
sfc-gh-astachowski Aug 22, 2024
6142094
Refactored unnecessary static modifiers
sfc-gh-astachowski Aug 22, 2024
a9ca5e6
Reformat
sfc-gh-astachowski Aug 22, 2024
c3ab389
Arrow batches initial (#1876)
sfc-gh-astachowski Aug 30, 2024
a5e4c60
Merge branch 'master' into SNOW-873466-arrow-batches
sfc-gh-astachowski Aug 30, 2024
d803f9f
Merge remote-tracking branch 'origin/SNOW-873466-arrow-batches' into …
sfc-gh-astachowski Sep 3, 2024
c705280
Added test category
sfc-gh-astachowski Sep 5, 2024
9347fd5
Added conditional ignore rule
sfc-gh-astachowski Sep 5, 2024
1ff3cc1
Merge branch 'master' into SNOW-873466-arrow-batches
sfc-gh-astachowski Sep 6, 2024
c94276f
Merge fixes
sfc-gh-astachowski Sep 6, 2024
c638ad1
Formatting
sfc-gh-astachowski Sep 6, 2024
d94f7c7
Renamed test
sfc-gh-astachowski Sep 6, 2024
5e289cf
Review feedback
sfc-gh-astachowski Sep 19, 2024
7d93e03
Fix wrongful exception
sfc-gh-astachowski Sep 19, 2024
d11a12b
Arrow batches all simple types (#1883)
sfc-gh-astachowski Sep 20, 2024
8c209d2
Arrow batches structured types (#1879)
sfc-gh-astachowski Oct 16, 2024
c40b734
Added timestamp support (#1884)
sfc-gh-astachowski Oct 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 97 additions & 33 deletions src/main/java/net/snowflake/client/core/SFArrowResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import net.snowflake.client.core.arrow.VarCharConverter;
import net.snowflake.client.core.arrow.VectorTypeConverter;
import net.snowflake.client.core.json.Converters;
import net.snowflake.client.jdbc.ArrowBatch;
import net.snowflake.client.jdbc.ArrowBatches;
import net.snowflake.client.jdbc.ArrowResultChunk;
import net.snowflake.client.jdbc.ArrowResultChunk.ArrowChunkIterator;
import net.snowflake.client.jdbc.ErrorCode;
Expand Down Expand Up @@ -112,6 +114,15 @@ public class SFArrowResultSet extends SFBaseResultSet implements DataConversionC
*/
private boolean formatDateWithTimezone;

/** The result set should be read either only as rows or only as batches */
private enum ReadingMode {
UNSPECIFIED,
ROW_MODE,
BATCH_MODE
}

private ReadingMode readingMode = ReadingMode.UNSPECIFIED;

@SnowflakeJdbcInternalApi protected Converters converters;

/**
Expand Down Expand Up @@ -238,6 +249,11 @@ public SFArrowResultSet(
}
}

@SnowflakeJdbcInternalApi
public long getAllocatedMemory() {
return rootAllocator.getAllocatedMemory();
}

private boolean fetchNextRow() throws SnowflakeSQLException {
if (sortResult) {
return fetchNextRowSorted();
Expand All @@ -246,6 +262,31 @@ private boolean fetchNextRow() throws SnowflakeSQLException {
}
}

private ArrowResultChunk fetchNextChunk() throws SnowflakeSQLException {
try {
logger.debug("Fetching chunk number " + nextChunkIndex);
eventHandler.triggerStateTransition(
BasicEvent.QueryState.CONSUMING_RESULT,
String.format(
BasicEvent.QueryState.CONSUMING_RESULT.getArgString(), queryId, nextChunkIndex));
ArrowResultChunk nextChunk = (ArrowResultChunk) chunkDownloader.getNextChunkToConsume();

if (nextChunk == null) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Expect chunk but got null for chunk index " + nextChunkIndex);
}
logger.debug("Chunk number " + nextChunkIndex + " fetched successfully.");
return nextChunk;
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryId, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
}
}

/**
* Goto next row. If end of current chunk, update currentChunkIterator to the beginning of next
* chunk, if any chunk not being consumed yet.
Expand All @@ -259,40 +300,19 @@ private boolean fetchNextRowUnsorted() throws SnowflakeSQLException {
return true;
} else {
if (nextChunkIndex < chunkCount) {
try {
eventHandler.triggerStateTransition(
BasicEvent.QueryState.CONSUMING_RESULT,
String.format(
BasicEvent.QueryState.CONSUMING_RESULT.getArgString(), queryId, nextChunkIndex));

ArrowResultChunk nextChunk = (ArrowResultChunk) chunkDownloader.getNextChunkToConsume();

if (nextChunk == null) {
throw new SnowflakeSQLLoggedException(
queryId,
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Expect chunk but got null for chunk index " + nextChunkIndex);
}
ArrowResultChunk nextChunk = fetchNextChunk();

currentChunkIterator.getChunk().freeData();
currentChunkIterator = nextChunk.getIterator(this);
if (currentChunkIterator.next()) {

logger.debug(
"Moving to chunk index: {}, row count: {}",
nextChunkIndex,
nextChunk.getRowCount());

nextChunkIndex++;
return true;
} else {
return false;
}
} catch (InterruptedException ex) {
throw new SnowflakeSQLLoggedException(
queryId, session, ErrorCode.INTERRUPTED.getMessageCode(), SqlState.QUERY_CANCELED);
currentChunkIterator.getChunk().freeData();
currentChunkIterator = nextChunk.getIterator(this);
if (currentChunkIterator.next()) {

logger.debug(
"Moving to chunk index: {}, row count: {}", nextChunkIndex, nextChunk.getRowCount());

nextChunkIndex++;
return true;
} else {
return false;
}
} else {
// always free current chunk
Expand Down Expand Up @@ -431,6 +451,11 @@ public boolean next() throws SFException, SnowflakeSQLException {
if (isClosed()) {
return false;
}
if (readingMode == ReadingMode.BATCH_MODE) {
logger.warn("Cannot read rows after getArrowBatches() was called.");
return false;
}
readingMode = ReadingMode.ROW_MODE;

// otherwise try to fetch again
if (fetchNextRow()) {
Expand Down Expand Up @@ -763,6 +788,45 @@ public BigDecimal getBigDecimal(int columnIndex, int scale) throws SFException {
return bigDec == null ? null : bigDec.setScale(scale, RoundingMode.HALF_UP);
}

public ArrowBatches getArrowBatches() {
if (readingMode == ReadingMode.ROW_MODE) {
logger.warn("Cannot read arrow batches after next() was called.");
return null;
}
readingMode = ReadingMode.BATCH_MODE;
return new SFArrowBatchesIterator();
}

private class SFArrowBatchesIterator implements ArrowBatches {
private boolean firstFetched = false;

@Override
public long getRowCount() throws SQLException {
return resultSetSerializable.getRowCount();
}

@Override
public boolean hasNext() {
return nextChunkIndex < chunkCount || !firstFetched;
}

@Override
public ArrowBatch next() throws SQLException {
if (!firstFetched) {
firstFetched = true;
return currentChunkIterator
.getChunk()
.getArrowBatch(
SFArrowResultSet.this, useSessionTimezone ? sessionTimeZone : null, nextChunkIndex);
} else {
nextChunkIndex++;
return fetchNextChunk()
.getArrowBatch(
SFArrowResultSet.this, useSessionTimezone ? sessionTimeZone : null, nextChunkIndex);
}
}
}

@Override
public boolean isLast() {
return nextChunkIndex == chunkCount && currentChunkIterator.isLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import net.snowflake.client.core.json.Converters;
import net.snowflake.client.jdbc.ArrowBatches;
import net.snowflake.client.jdbc.ErrorCode;
import net.snowflake.client.jdbc.FieldMetadata;
import net.snowflake.client.jdbc.SnowflakeLoggedFeatureNotSupportedException;
import net.snowflake.client.jdbc.SnowflakeResultSetSerializable;
import net.snowflake.client.jdbc.SnowflakeResultSetSerializableV1;
import net.snowflake.client.jdbc.SnowflakeSQLException;
Expand Down Expand Up @@ -137,6 +139,10 @@ public SFBaseSession getSession() {
return this.session;
}

public ArrowBatches getArrowBatches() throws SnowflakeLoggedFeatureNotSupportedException {
throw new SnowflakeLoggedFeatureNotSupportedException(session);
}

// default implementation
public boolean next() throws SFException, SnowflakeSQLException {
logger.trace("boolean next()", false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,20 @@
public final class ArrowVectorConverterUtil {
private ArrowVectorConverterUtil() {}

public static int getScale(ValueVector vector, SFBaseSession session)
throws SnowflakeSQLLoggedException {
try {
String scaleStr = vector.getField().getMetadata().get("scale");
return Integer.parseInt(scaleStr);
} catch (NullPointerException | NumberFormatException e) {
throw new SnowflakeSQLLoggedException(
session,
ErrorCode.INTERNAL_ERROR.getMessageCode(),
SqlState.INTERNAL_ERROR,
"Invalid scale metadata");
}
}

/**
* Given an arrow vector (a single column in a single record batch), return an arrow vector
* converter. Note, converter is built on top of arrow vector, so that arrow data can be converted
Expand Down Expand Up @@ -102,8 +116,7 @@ public static ArrowVectorConverter initConverter(
return new DateConverter(vector, idx, context, getFormatDateWithTimeZone);

case FIXED:
String scaleStr = vector.getField().getMetadata().get("scale");
int sfScale = Integer.parseInt(scaleStr);
int sfScale = getScale(vector, session);
switch (type) {
case TINYINT:
if (sfScale == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import net.snowflake.client.core.SFException;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.arrow.vector.FieldVector;

public abstract class AbstractFullVectorConverter implements ArrowFullVectorConverter {
private boolean converted;

protected abstract FieldVector convertVector()
throws SFException, SnowflakeSQLException, SFArrowException;

@Override
public FieldVector convert() throws SFException, SnowflakeSQLException, SFArrowException {
if (converted) {
throw new SFArrowException(
ArrowErrorCode.VECTOR_ALREADY_CONVERTED, "Convert has already been called");
} else {
converted = true;
return convertVector();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

public enum ArrowErrorCode {
VECTOR_ALREADY_CONVERTED,
CONVERT_FAILED,
CHUNK_FETCH_FAILED,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package net.snowflake.client.core.arrow.fullvectorconverters;

import net.snowflake.client.core.SFException;
import net.snowflake.client.core.SnowflakeJdbcInternalApi;
import net.snowflake.client.jdbc.SnowflakeSQLException;
import org.apache.arrow.vector.FieldVector;

@SnowflakeJdbcInternalApi
public interface ArrowFullVectorConverter {
FieldVector convert() throws SFException, SnowflakeSQLException, SFArrowException;
}
Loading