Skip to content

Commit

Permalink
PARQUET-2359: Add simple plain Parquet configuration implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
amousavigourabi committed Nov 8, 2023
1 parent 56f40e4 commit 98c04cf
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.parquet.conf.HadoopParquetConfiguration;
import org.apache.parquet.conf.ParquetConfiguration;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.api.WriteSupport;
Expand All @@ -79,31 +80,39 @@ public class TestReadWrite {
@Parameterized.Parameters
public static Collection<Object[]> data() {
Object[][] data = new Object[][] {
{ false, false, false }, // use the new converters with hadoop config
{ true, false, false }, // use the old converters with hadoop config
{ false, true, false }, // use a local disk location with hadoop config
{ false, false, true }, // use the new converters with parquet config interface
{ true, false, true }, // use the old converters with parquet config interface
{ false, true, true } }; // use a local disk location with parquet config interface
{ true, false, false, false }, // use the old converters with hadoop config
{ true, false, true, false }, // use the old converters with parquet config interface
{ false, false, false, false }, // use the new converters with hadoop config
{ false, true, false, false }, // use a local disk location with hadoop config
{ false, false, true, false }, // use the new converters with parquet config interface
{ false, true, true, false }, // use a local disk location with parquet config interface
{ false, false, true, true }, // use the new converters with plain parquet config
{ false, true, true, true } }; // use a local disk location with plain parquet config
return Arrays.asList(data);
}

private final boolean compat;
private final boolean local;
private final boolean confInterface;
private final boolean plainConf;
private final Configuration testConf = new Configuration();
private final ParquetConfiguration parquetConf = new HadoopParquetConfiguration(true);
private final ParquetConfiguration hadoopConfWithInterface = new HadoopParquetConfiguration();
private final ParquetConfiguration plainParquetConf = new PlainParquetConfiguration();

public TestReadWrite(boolean compat, boolean local, boolean confInterface) {
public TestReadWrite(boolean compat, boolean local, boolean confInterface, boolean plainConf) {
this.compat = compat;
this.local = local;
this.confInterface = confInterface;
this.plainConf = plainConf;
this.testConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
this.testConf.setBoolean("parquet.avro.add-list-element-records", false);
this.testConf.setBoolean("parquet.avro.write-old-list-structure", false);
this.parquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
this.parquetConf.setBoolean("parquet.avro.add-list-element-records", false);
this.parquetConf.setBoolean("parquet.avro.write-old-list-structure", false);
this.hadoopConfWithInterface.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
this.hadoopConfWithInterface.setBoolean("parquet.avro.add-list-element-records", false);
this.hadoopConfWithInterface.setBoolean("parquet.avro.write-old-list-structure", false);
this.plainParquetConf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, compat);
this.plainParquetConf.setBoolean("parquet.avro.add-list-element-records", false);
this.plainParquetConf.setBoolean("parquet.avro.write-old-list-structure", false);
}

@Test
Expand Down Expand Up @@ -891,9 +900,15 @@ private ParquetWriter<GenericRecord> writer(String file, Schema schema) throws I
.withSchema(schema);
}
if (confInterface) {
return writerBuilder
.withConf(parquetConf)
.build();
if (plainConf) {
return writerBuilder
.withConf(hadoopConfWithInterface)
.build();
} else {
return writerBuilder
.withConf(plainParquetConf)
.build();
}
} else {
return writerBuilder
.withConf(testConf)
Expand All @@ -911,9 +926,15 @@ private ParquetReader<GenericRecord> reader(String file) throws IOException {
return new AvroParquetReader<>(testConf, new Path(file));
}
if (confInterface) {
return readerBuilder
.withConf(parquetConf)
.build();
if (plainConf) {
return readerBuilder
.withConf(hadoopConfWithInterface)
.build();
} else {
return readerBuilder
.withConf(plainParquetConf)
.build();
}
} else {
return readerBuilder
.withConf(testConf)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.conf;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
* Configuration for Parquet without Hadoop dependency.
*/
public class PlainParquetConfiguration implements ParquetConfiguration {

private final Map<String, String> map;

public PlainParquetConfiguration() {
map = new HashMap<>();
}

public PlainParquetConfiguration(Map<String, String> properties) {
map = new HashMap<>(properties);
}

@Override
public void set(String s, String s1) {
map.put(s, s1);
}

@Override
public void setLong(String name, long value) {
set(name, String.valueOf(value));
}

@Override
public void setInt(String name, int value) {
set(name, String.valueOf(value));
}

@Override
public void setBoolean(String name, boolean value) {
set(name, String.valueOf(value));
}

@Override
public void setStrings(String name, String... value) {
if (value.length > 0) {
StringBuilder sb = new StringBuilder(value[0]);
for (int i = 1; i < value.length; ++i) {
sb.append(',');
sb.append(value[i]);
}
set(name, sb.toString());
} else {
set(name, "");
}
}

@Override
public void setClass(String name, Class<?> value, Class<?> xface) {
if (xface.isAssignableFrom(value)) {
set(name, value.getName());
} else {
throw new RuntimeException(xface.getCanonicalName() + " is not assignable from " + value.getCanonicalName());
}
}

@Override
public String get(String name) {
return map.get(name);
}

@Override
public String get(String name, String defaultValue) {
String value = get(name);
if (value != null) {
return value;
} else {
return defaultValue;
}
}

@Override
public long getLong(String name, long defaultValue) {
String value = get(name);
if (value != null) {
return Long.parseLong(value);
} else {
return defaultValue;
}
}

@Override
public int getInt(String name, int defaultValue) {
String value = get(name);
if (value != null) {
return Integer.parseInt(value);
} else {
return defaultValue;
}
}

@Override
public boolean getBoolean(String name, boolean defaultValue) {
String value = get(name);
if (value != null) {
return Boolean.parseBoolean(value);
} else {
return defaultValue;
}
}

@Override
public String getTrimmed(String name) {
String value = get(name);
if (value != null) {
return value.trim();
} else {
return null;
}
}

@Override
public String getTrimmed(String name, String defaultValue) {
String value = get(name);
if (value != null) {
return value.trim();
} else {
return defaultValue;
}
}

@Override
public String[] getStrings(String name, String[] defaultValue) {
String value = get(name);
if (value != null) {
return value.split(",");
} else {
return defaultValue;
}
}

@Override
public Class<?> getClass(String name, Class<?> defaultValue) {
String value = get(name);
if (value != null) {
try {
return Class.forName(value);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
} else {
return defaultValue;
}
}

@Override
@SuppressWarnings("unchecked")
public <U> Class<? extends U> getClass(String name, Class<? extends U> defaultValue, Class<U> xface) {
Class<?> value = getClass(name, defaultValue);
if (value != null && value.isAssignableFrom(xface)) {
return (Class<? extends U>) value;
}
return defaultValue;
}

@Override
public Class<?> getClassByName(String name) throws ClassNotFoundException {
return Class.forName(name);
}

@Override
public Iterator<Map.Entry<String, String>> iterator() {
return map.entrySet().iterator();
}
}

0 comments on commit 98c04cf

Please sign in to comment.