Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public ColumnWriteStore newColumnWriteStore(MessageType schema,
PageWriteStore pageStore) {
switch (writerVersion) {
case PARQUET_1_0:
return new ColumnWriteStoreV1(pageStore, this);
return new ColumnWriteStoreV1(schema, pageStore, this);
case PARQUET_2_0:
return new ColumnWriteStoreV2(schema, pageStore, this);
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.column.impl;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.util.Collections.unmodifiableMap;

import java.util.Arrays;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.schema.MessageType;

/**
* Base implementation for {@link ColumnWriteStore} to be extended to specialize for V1 and V2 pages.
*/
abstract class ColumnWriteStoreBase implements ColumnWriteStore {

// Used to support the deprecated workflow of ColumnWriteStoreV1 (lazy init of ColumnWriters)
private interface ColumnWriterProvider {
ColumnWriter getColumnWriter(ColumnDescriptor path);
}

private final ColumnWriterProvider columnWriterProvider;

// will flush even if size bellow the threshold by this much to facilitate page alignment
private static final float THRESHOLD_TOLERANCE_RATIO = 0.1f; // 10 %

private final Map<ColumnDescriptor, ColumnWriterBase> columns;
private final ParquetProperties props;
private final long thresholdTolerance;
private long rowCount;
private long rowCountForNextSizeCheck;

// To be used by the deprecated constructor of ColumnWriteStoreV1
@Deprecated
ColumnWriteStoreBase(
final PageWriteStore pageWriteStore,
final ParquetProperties props) {
this.props = props;
this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);

this.columns = new TreeMap<>();

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();

columnWriterProvider = new ColumnWriterProvider() {
@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
column = createColumnWriter(path, pageWriteStore.getPageWriter(path), props);
columns.put(path, column);
}
return column;
}
};
}

ColumnWriteStoreBase(
MessageType schema,
PageWriteStore pageWriteStore,
ParquetProperties props) {
this.props = props;
this.thresholdTolerance = (long) (props.getPageSizeThreshold() * THRESHOLD_TOLERANCE_RATIO);
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
mcolumns.put(path, createColumnWriter(path, pageWriter, props));
}
this.columns = unmodifiableMap(mcolumns);

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();

columnWriterProvider = new ColumnWriterProvider() {
@Override
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columns.get(path);
}
};
}

abstract ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props);

public ColumnWriter getColumnWriter(ColumnDescriptor path) {
return columnWriterProvider.getColumnWriter(path);
}

public Set<ColumnDescriptor> getColumnDescriptors() {
return columns.keySet();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Entry<ColumnDescriptor, ColumnWriterBase> entry : columns.entrySet()) {
sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
sb.append(entry.getValue().getTotalBufferedSize()).append(" bytes");
sb.append("\n");
}
return sb.toString();
}

@Override
public long getAllocatedSize() {
long total = 0;
for (ColumnWriterBase memColumn : columns.values()) {
total += memColumn.allocatedSize();
}
return total;
}

@Override
public long getBufferedSize() {
long total = 0;
for (ColumnWriterBase memColumn : columns.values()) {
total += memColumn.getTotalBufferedSize();
}
return total;
}

@Override
public void flush() {
for (ColumnWriterBase memColumn : columns.values()) {
long rows = rowCount - memColumn.getRowsWrittenSoFar();
if (rows > 0) {
memColumn.writePage(rowCount);
}
memColumn.finalizeColumnChunk();
}
}

public String memUsageString() {
StringBuilder b = new StringBuilder("Store {\n");
for (ColumnWriterBase memColumn : columns.values()) {
b.append(memColumn.memUsageString(" "));
}
b.append("}\n");
return b.toString();
}

public long maxColMemSize() {
long max = 0;
for (ColumnWriterBase memColumn : columns.values()) {
max = Math.max(max, memColumn.getBufferedSizeInMemory());
}
return max;
}

@Override
public void close() {
flush(); // calling flush() here to keep it consistent with the behavior before merging with master
for (ColumnWriterBase memColumn : columns.values()) {
memColumn.close();
}
}

@Override
public void endRecord() {
++rowCount;
if (rowCount >= rowCountForNextSizeCheck) {
sizeCheck();
}
}

private void sizeCheck() {
long minRecordToWait = Long.MAX_VALUE;
for (ColumnWriterBase writer : columns.values()) {
long usedMem = writer.getCurrentPageBufferedSize();
long rows = rowCount - writer.getRowsWrittenSoFar();
long remainingMem = props.getPageSizeThreshold() - usedMem;
if (remainingMem <= thresholdTolerance) {
writer.writePage(rowCount);
remainingMem = props.getPageSizeThreshold();
}
long rowsToFillPage =
usedMem == 0 ?
props.getMaxRowCountForPageSizeCheck()
: (long) ((float) rows) / usedMem * remainingMem;
if (rowsToFillPage < minRecordToWait) {
minRecordToWait = rowsToFillPage;
}
}
if (minRecordToWait == Long.MAX_VALUE) {
minRecordToWait = props.getMinRowCountForPageSizeCheck();
}

if (props.estimateNextSizeCheck()) {
// will check again halfway if between min and max
rowCountForNextSizeCheck = rowCount +
min(
max(minRecordToWait / 2, props.getMinRowCountForPageSizeCheck()),
props.getMaxRowCountForPageSizeCheck());
} else {
rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,121 +18,26 @@
*/
package org.apache.parquet.column.impl;

import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;

import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.schema.MessageType;

public class ColumnWriteStoreV1 implements ColumnWriteStore {

private final Map<ColumnDescriptor, ColumnWriterV1> columns = new TreeMap<ColumnDescriptor, ColumnWriterV1>();
private final PageWriteStore pageWriteStore;
private final ParquetProperties props;

public ColumnWriteStoreV1(PageWriteStore pageWriteStore,
ParquetProperties props) {
this.pageWriteStore = pageWriteStore;
this.props = props;
}

public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterV1 column = columns.get(path);
if (column == null) {
column = newMemColumn(path);
columns.put(path, column);
}
return column;
}

public Set<ColumnDescriptor> getColumnDescriptors() {
return columns.keySet();
}

private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
return new ColumnWriterV1(path, pageWriter, props);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (Entry<ColumnDescriptor, ColumnWriterV1> entry : columns.entrySet()) {
sb.append(Arrays.toString(entry.getKey().getPath())).append(": ");
sb.append(entry.getValue().getBufferedSizeInMemory()).append(" bytes");
sb.append("\n");
}
return sb.toString();
}

@Override
public long getAllocatedSize() {
Collection<ColumnWriterV1> values = columns.values();
long total = 0;
for (ColumnWriterV1 memColumn : values) {
total += memColumn.allocatedSize();
}
return total;
}

@Override
public long getBufferedSize() {
Collection<ColumnWriterV1> values = columns.values();
long total = 0;
for (ColumnWriterV1 memColumn : values) {
total += memColumn.getBufferedSizeInMemory();
}
return total;
}

@Override
public String memUsageString() {
StringBuilder b = new StringBuilder("Store {\n");
Collection<ColumnWriterV1> values = columns.values();
for (ColumnWriterV1 memColumn : values) {
b.append(memColumn.memUsageString(" "));
}
b.append("}\n");
return b.toString();
}
public class ColumnWriteStoreV1 extends ColumnWriteStoreBase {

public long maxColMemSize() {
Collection<ColumnWriterV1> values = columns.values();
long max = 0;
for (ColumnWriterV1 memColumn : values) {
max = Math.max(max, memColumn.getBufferedSizeInMemory());
}
return max;
public ColumnWriteStoreV1(MessageType schema, PageWriteStore pageWriteStore, ParquetProperties props) {
super(schema, pageWriteStore, props);
}

@Override
public void flush() {
Collection<ColumnWriterV1> values = columns.values();
for (ColumnWriterV1 memColumn : values) {
memColumn.flush();
}
@Deprecated
public ColumnWriteStoreV1(final PageWriteStore pageWriteStore,
final ParquetProperties props) {
super(pageWriteStore, props);
}

@Override
public void endRecord() {
// V1 does not take record boundaries into account
}

public void close() {
Collection<ColumnWriterV1> values = columns.values();
for (ColumnWriterV1 memColumn : values) {
memColumn.close();
}
ColumnWriterBase createColumnWriter(ColumnDescriptor path, PageWriter pageWriter, ParquetProperties props) {
return new ColumnWriterV1(path, pageWriter, props);
}

}
Loading