-
Notifications
You must be signed in to change notification settings - Fork 96
GH-698: Improve and fix Avro read consumers #718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1872d41
d07b417
d2b33ae
1766e8b
6d77139
47572c0
331dabc
5361009
c2bd7b3
3d9faf6
8897832
eaef6c8
0f94366
2cfb81a
f7aba15
dde2dca
8525421
1ac4fae
b9cf11b
c61cc3a
84ac2c7
0fd70de
737ca3b
4e6efdb
8350f40
6788d51
7f7c4e7
e0550da
0eb69a3
bccc14a
577f8aa
1a03b8a
141b6fd
3c173ec
f8aa184
bff71c4
74717c3
6c75d87
9aaf5cf
62d08a9
d6538e6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -332,7 +332,17 @@ private static <T> T buildBaseTypeSchema( | |
|
|
||
| case List: | ||
| case FixedSizeList: | ||
| return buildArraySchema(builder.array(), field, namespace); | ||
| // Arrow uses "$data$" as the field name for list items, that is not a valid Avro name | ||
| Field itemField = field.getChildren().get(0); | ||
| if (ListVector.DATA_VECTOR_NAME.equals(itemField.getName())) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we perhaps want to check for invalid names more generally and mangle/normalize them?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Or just normalize all field names to something consistent in Avro?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I think for list / map types using the constant defined names for children makes sense, with "item" instead of "$data$" for list items. More generally, we could normalise illegal chars to "_" to match the Avro name rules. Per my understanding similar rules are already enforced in C++, but are not part of the Arrow spec or Java implementation. Very happy to put the normalisation in, it's probably a more useful behaviour than throwing an error in the adapter. Would you like me to do it? |
||
| Field safeItemField = | ||
| new Field("item", itemField.getFieldType(), itemField.getChildren()); | ||
| Field safeListField = | ||
| new Field(field.getName(), field.getFieldType(), List.of(safeItemField)); | ||
| return buildArraySchema(builder.array(), safeListField, namespace); | ||
| } else { | ||
| return buildArraySchema(builder.array(), field, namespace); | ||
| } | ||
|
|
||
| case Map: | ||
| return buildMapSchema(builder.map(), field, namespace); | ||
|
|
||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.arrow.adapter.avro.consumers; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.arrow.vector.FieldVector; | ||
| import org.apache.avro.io.Decoder; | ||
|
|
||
| /** | ||
| * Consumer wrapper which consumes nullable type values from avro decoder. Write the data to the | ||
| * underlying {@link FieldVector}. | ||
| * | ||
| * @param <T> The vector within consumer or its delegate. | ||
| */ | ||
| public class AvroNullableConsumer<T extends FieldVector> extends BaseAvroConsumer<T> { | ||
|
|
||
| private final Consumer<T> delegate; | ||
| private final int nullIndex; | ||
|
|
||
| /** Instantiate a AvroNullableConsumer. */ | ||
| @SuppressWarnings("unchecked") | ||
| public AvroNullableConsumer(Consumer<T> delegate, int nullIndex) { | ||
| super((T) delegate.getVector()); | ||
| this.delegate = delegate; | ||
| this.nullIndex = nullIndex; | ||
| } | ||
|
|
||
| @Override | ||
| public void consume(Decoder decoder) throws IOException { | ||
| int typeIndex = decoder.readInt(); | ||
| if (typeIndex == nullIndex) { | ||
| decoder.readNull(); | ||
| delegate.addNull(); | ||
| } else { | ||
| delegate.consume(decoder); | ||
| } | ||
| currentIndex++; | ||
| } | ||
|
|
||
| @Override | ||
| public void addNull() { | ||
| // Can be called by containers of nullable types | ||
| delegate.addNull(); | ||
| currentIndex++; | ||
| } | ||
|
|
||
| @Override | ||
| public void setPosition(int index) { | ||
| if (index < 0 || index > vector.getValueCount()) { | ||
| throw new IllegalArgumentException("Index out of bounds"); | ||
| } | ||
| delegate.setPosition(index); | ||
| super.setPosition(index); | ||
| } | ||
|
|
||
| @Override | ||
| public boolean resetValueVector(T vector) { | ||
| boolean delegateOk = delegate.resetValueVector(vector); | ||
| boolean thisOk = super.resetValueVector(vector); | ||
| return thisOk && delegateOk; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() throws Exception { | ||
| super.close(); | ||
| delegate.close(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,74 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.arrow.adapter.avro.consumers.logical; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; | ||
| import org.apache.arrow.util.Preconditions; | ||
| import org.apache.arrow.vector.Decimal256Vector; | ||
| import org.apache.avro.io.Decoder; | ||
|
|
||
| /** | ||
| * Consumer which consume 256-bit decimal type values from avro decoder. Write the data to {@link | ||
| * Decimal256Vector}. | ||
| */ | ||
| public abstract class AvroDecimal256Consumer extends BaseAvroConsumer<Decimal256Vector> { | ||
|
|
||
| protected AvroDecimal256Consumer(Decimal256Vector vector) { | ||
| super(vector); | ||
| } | ||
|
|
||
| /** Consumer for decimal logical type with 256 bit width and original bytes type. */ | ||
| public static class BytesDecimal256Consumer extends AvroDecimal256Consumer { | ||
|
|
||
| private ByteBuffer cacheBuffer; | ||
|
|
||
| /** Instantiate a BytesDecimal256Consumer. */ | ||
| public BytesDecimal256Consumer(Decimal256Vector vector) { | ||
| super(vector); | ||
| } | ||
|
|
||
| @Override | ||
| public void consume(Decoder decoder) throws IOException { | ||
| cacheBuffer = decoder.readBytes(cacheBuffer); | ||
| byte[] bytes = new byte[cacheBuffer.limit()]; | ||
| Preconditions.checkArgument(bytes.length <= 32, "Decimal bytes length should <= 32."); | ||
| cacheBuffer.get(bytes); | ||
| vector.setBigEndian(currentIndex++, bytes); | ||
| } | ||
| } | ||
|
|
||
| /** Consumer for decimal logical type with 256 bit width and original fixed type. */ | ||
| public static class FixedDecimal256Consumer extends AvroDecimal256Consumer { | ||
|
|
||
| private final byte[] reuseBytes; | ||
|
|
||
| /** Instantiate a FixedDecimal256Consumer. */ | ||
| public FixedDecimal256Consumer(Decimal256Vector vector, int size) { | ||
| super(vector); | ||
| Preconditions.checkArgument(size <= 32, "Decimal bytes length should <= 32."); | ||
| reuseBytes = new byte[size]; | ||
| } | ||
|
|
||
| @Override | ||
| public void consume(Decoder decoder) throws IOException { | ||
| decoder.readFixed(reuseBytes); | ||
| vector.setBigEndian(currentIndex++, reuseBytes); | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.arrow.adapter.avro.consumers.logical; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; | ||
| import org.apache.arrow.vector.TimeStampMicroTZVector; | ||
| import org.apache.avro.io.Decoder; | ||
|
|
||
| /** | ||
| * Consumer which consumes timestamp-micros values from avro decoder. Write the data to {@link | ||
| * TimeStampMicroTZVector}. | ||
| */ | ||
| public class AvroTimestampMicrosTzConsumer extends BaseAvroConsumer<TimeStampMicroTZVector> { | ||
|
|
||
| /** Instantiate a AvroTimestampMicrosTzConsumer. */ | ||
| public AvroTimestampMicrosTzConsumer(TimeStampMicroTZVector vector) { | ||
| super(vector); | ||
| } | ||
|
|
||
| @Override | ||
| public void consume(Decoder decoder) throws IOException { | ||
| vector.set(currentIndex++, decoder.readLong()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.arrow.adapter.avro.consumers.logical; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; | ||
| import org.apache.arrow.vector.TimeStampMilliTZVector; | ||
| import org.apache.avro.io.Decoder; | ||
|
|
||
| /** | ||
| * Consumer which consume timestamp-millis values from avro decoder. Write the data to {@link | ||
| * TimeStampMilliTZVector}. | ||
| */ | ||
| public class AvroTimestampMillisTzConsumer extends BaseAvroConsumer<TimeStampMilliTZVector> { | ||
|
|
||
| /** Instantiate a AvroTimestampMillisTzConsumer. */ | ||
| public AvroTimestampMillisTzConsumer(TimeStampMilliTZVector vector) { | ||
| super(vector); | ||
| } | ||
|
|
||
| @Override | ||
| public void consume(Decoder decoder) throws IOException { | ||
| vector.set(currentIndex++, decoder.readLong()); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.arrow.adapter.avro.consumers.logical; | ||
|
|
||
| import java.io.IOException; | ||
| import org.apache.arrow.adapter.avro.consumers.BaseAvroConsumer; | ||
| import org.apache.arrow.vector.TimeStampNanoVector; | ||
| import org.apache.avro.io.Decoder; | ||
|
|
||
| /** | ||
| * Consumer which consume local-timestamp-nanos values from avro decoder. Write the data to {@link | ||
| * TimeStampNanoVector}. | ||
| */ | ||
| public class AvroTimestampNanosConsumer extends BaseAvroConsumer<TimeStampNanoVector> { | ||
|
|
||
| /** Instantiate a AvroTimestampNanosConsumer. */ | ||
| public AvroTimestampNanosConsumer(TimeStampNanoVector vector) { | ||
| super(vector); | ||
| } | ||
|
|
||
| @Override | ||
| public void consume(Decoder decoder) throws IOException { | ||
| vector.set(currentIndex++, decoder.readLong()); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The funny thing is, arrow-java shouldn't be doing that, it was just never corrected...