Skip to content

Commit 0318b07

Browse files
JkSelfkou
authored andcommitted
ARROW-11776: [C++][Java] Support parquet write from ArrowReader to file (#14151)
This PR is aim to support parquet write from ArrowReader to file. Authored-by: Jia Ke <[email protected]> Signed-off-by: David Li <[email protected]>
1 parent d39479f commit 0318b07

File tree

6 files changed

+410
-1
lines changed

6 files changed

+410
-1
lines changed

dataset/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@
140140
<version>2.8.1</version>
141141
<scope>test</scope>
142142
</dependency>
143+
<dependency>
144+
<groupId>commons-io</groupId>
145+
<artifactId>commons-io</artifactId>
146+
<version>2.4</version>
147+
<scope>test</scope>
148+
</dependency>
143149
</dependencies>
144150
<build>
145151
<resources>

dataset/src/main/cpp/jni_wrapper.cc

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
#include "arrow/array.h"
2121
#include "arrow/array/concatenate.h"
22+
#include "arrow/c/bridge.h"
23+
#include "arrow/c/helpers.h"
2224
#include "arrow/dataset/api.h"
2325
#include "arrow/dataset/file_base.h"
2426
#include "arrow/filesystem/localfs.h"
@@ -176,6 +178,21 @@ class DisposableScannerAdaptor {
176178
}
177179
};
178180

181+
arrow::Result<std::shared_ptr<arrow::Schema>> SchemaFromColumnNames(
182+
const std::shared_ptr<arrow::Schema>& input,
183+
const std::vector<std::string>& column_names) {
184+
std::vector<std::shared_ptr<arrow::Field>> columns;
185+
for (arrow::FieldRef ref : column_names) {
186+
auto maybe_field = ref.GetOne(*input);
187+
if (maybe_field.ok()) {
188+
columns.push_back(std::move(maybe_field).ValueOrDie());
189+
} else {
190+
return arrow::Status::Invalid("Partition column '", ref.ToString(), "' is not in dataset schema");
191+
}
192+
}
193+
194+
return schema(std::move(columns))->WithMetadata(input->metadata());
195+
}
179196
} // namespace
180197

181198
using arrow::dataset::jni::CreateGlobalClassReference;
@@ -229,7 +246,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
229246
GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V"));
230247

231248
default_memory_pool_id = reinterpret_cast<jlong>(arrow::default_memory_pool());
232-
233249
return JNI_VERSION;
234250
JNI_METHOD_END(JNI_ERR)
235251
}
@@ -516,3 +532,49 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
516532
return CreateNativeRef(d);
517533
JNI_METHOD_END(-1L)
518534
}
535+
536+
/*
537+
* Class: org_apache_arrow_dataset_file_JniWrapper
538+
* Method: writeFromScannerToFile
539+
* Signature:
540+
* (JJJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
541+
*/
542+
JNIEXPORT void JNICALL
543+
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
544+
JNIEnv* env, jobject, jlong c_arrow_array_stream_address,
545+
jlong file_format_id, jstring uri, jobjectArray partition_columns,
546+
jint max_partitions, jstring base_name_template) {
547+
JNI_METHOD_START
548+
JavaVM* vm;
549+
if (env->GetJavaVM(&vm) != JNI_OK) {
550+
JniThrow("Unable to get JavaVM instance");
551+
}
552+
553+
auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address);
554+
std::shared_ptr<arrow::RecordBatchReader> reader =
555+
JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream));
556+
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
557+
arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader);
558+
JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
559+
auto scanner = JniGetOrThrow(scanner_builder->Finish());
560+
561+
std::shared_ptr<arrow::Schema> schema = reader->schema();
562+
563+
std::shared_ptr<arrow::dataset::FileFormat> file_format =
564+
JniGetOrThrow(GetFileFormat(file_format_id));
565+
arrow::dataset::FileSystemDatasetWriteOptions options;
566+
std::string output_path;
567+
auto filesystem = JniGetOrThrow(
568+
arrow::fs::FileSystemFromUri(JStringToCString(env, uri), &output_path));
569+
std::vector<std::string> partition_column_vector =
570+
ToStringVector(env, partition_columns);
571+
options.file_write_options = file_format->DefaultWriteOptions();
572+
options.filesystem = filesystem;
573+
options.base_dir = output_path;
574+
options.basename_template = JStringToCString(env, base_name_template);
575+
options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
576+
SchemaFromColumnNames(schema, partition_column_vector).ValueOrDie());
577+
options.max_partitions = max_partitions;
578+
JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
579+
JNI_METHOD_END()
580+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.dataset.file;
19+
20+
import org.apache.arrow.c.ArrowArrayStream;
21+
import org.apache.arrow.c.Data;
22+
import org.apache.arrow.memory.BufferAllocator;
23+
import org.apache.arrow.vector.ipc.ArrowReader;
24+
25+
/**
26+
* JNI-based utility to write datasets into files. It internally depends on C++ static method
27+
* FileSystemDataset::Write.
28+
*/
29+
public class DatasetFileWriter {
30+
31+
/**
32+
* Write the contents of an ArrowReader as a dataset.
33+
*
34+
* @param reader the datasource for writing
35+
* @param format target file format
36+
* @param uri target file uri
37+
* @param maxPartitions maximum partitions to be included in written files
38+
* @param partitionColumns columns used to partition output files. Empty to disable partitioning
39+
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
40+
* ID around all written files.
41+
*/
42+
public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri,
43+
String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
44+
try (final ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
45+
Data.exportArrayStream(allocator, reader, stream);
46+
JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(),
47+
format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
48+
}
49+
}
50+
51+
/**
52+
* Write the contents of an ArrowReader as a dataset, with default partitioning settings.
53+
*
54+
* @param reader the datasource for writing
55+
* @param format target file format
56+
* @param uri target file uri
57+
*/
58+
public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) {
59+
write(allocator, reader, format, uri, new String[0], 1024, "data_{i}");
60+
}
61+
}

dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,23 @@ private JniWrapper() {
4545
*/
4646
public native long makeFileSystemDatasetFactory(String uri, int fileFormat);
4747

48+
/**
49+
* Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally
50+
* depends on C++ write API: FileSystemDataset::Write.
51+
*
52+
* @param streamAddress the ArrowArrayStream address
53+
* @param fileFormat target file format (ID)
54+
* @param uri target file uri
55+
* @param partitionColumns columns used to partition output files
56+
* @param maxPartitions maximum partitions to be included in written files
57+
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
58+
* ID around all written files.
59+
*/
60+
public native void writeFromScannerToFile(long streamAddress,
61+
long fileFormat,
62+
String uri,
63+
String[] partitionColumns,
64+
int maxPartitions,
65+
String baseNameTemplate);
66+
4867
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.arrow.dataset.scanner;
19+
20+
import java.io.IOException;
21+
import java.util.Iterator;
22+
23+
import org.apache.arrow.memory.BufferAllocator;
24+
import org.apache.arrow.vector.VectorLoader;
25+
import org.apache.arrow.vector.VectorUnloader;
26+
import org.apache.arrow.vector.ipc.ArrowReader;
27+
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
28+
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
29+
import org.apache.arrow.vector.types.pojo.Schema;
30+
31+
/**
32+
* An implementation of {@link ArrowReader} that reads
33+
* the dataset from {@link Scanner}.
34+
*/
35+
public class ArrowScannerReader extends ArrowReader {
36+
private final Scanner scanner;
37+
38+
private Iterator<? extends ScanTask> taskIterator;
39+
40+
private ScanTask currentTask = null;
41+
private ArrowReader currentReader = null;
42+
43+
/**
44+
* Constructs a scanner reader using a Scanner.
45+
*
46+
* @param scanner scanning data over dataset
47+
* @param allocator to allocate new buffers
48+
*/
49+
public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
50+
super(allocator);
51+
this.scanner = scanner;
52+
this.taskIterator = scanner.scan().iterator();
53+
if (taskIterator.hasNext()) {
54+
currentTask = taskIterator.next();
55+
currentReader = currentTask.execute();
56+
}
57+
}
58+
59+
@Override
60+
protected void loadRecordBatch(ArrowRecordBatch batch) {
61+
throw new UnsupportedOperationException();
62+
}
63+
64+
@Override
65+
protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
66+
throw new UnsupportedOperationException();
67+
}
68+
69+
@Override
70+
public boolean loadNextBatch() throws IOException {
71+
if (currentReader == null) {
72+
return false;
73+
}
74+
boolean result = currentReader.loadNextBatch();
75+
76+
if (!result) {
77+
try {
78+
currentTask.close();
79+
currentReader.close();
80+
} catch (Exception e) {
81+
throw new IOException(e);
82+
}
83+
84+
while (!result) {
85+
if (!taskIterator.hasNext()) {
86+
return false;
87+
} else {
88+
currentTask = taskIterator.next();
89+
currentReader = currentTask.execute();
90+
result = currentReader.loadNextBatch();
91+
}
92+
}
93+
}
94+
95+
VectorLoader loader = new VectorLoader(this.getVectorSchemaRoot());
96+
VectorUnloader unloader =
97+
new VectorUnloader(currentReader.getVectorSchemaRoot());
98+
try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
99+
loader.load(recordBatch);
100+
}
101+
return true;
102+
}
103+
104+
@Override
105+
public long bytesRead() {
106+
return 0L;
107+
}
108+
109+
@Override
110+
protected void closeReadSource() throws IOException {
111+
try {
112+
currentTask.close();
113+
currentReader.close();
114+
scanner.close();
115+
} catch (Exception e) {
116+
throw new IOException(e);
117+
}
118+
}
119+
120+
@Override
121+
protected Schema readSchema() throws IOException {
122+
return scanner.schema();
123+
}
124+
}

0 commit comments

Comments
 (0)