Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void flush() {
for (ColumnWriterBase memColumn : columns.values()) {
long rows = rowCount - memColumn.getRowsWrittenSoFar();
if (rows > 0) {
memColumn.writePage(rowCount);
memColumn.writePage();
}
memColumn.finalizeColumnChunk();
}
Expand Down Expand Up @@ -195,7 +195,7 @@ private void sizeCheck() {
long rows = rowCount - writer.getRowsWrittenSoFar();
long remainingMem = props.getPageSizeThreshold() - usedMem;
if (remainingMem <= thresholdTolerance) {
writer.writePage(rowCount);
writer.writePage();
remainingMem = props.getPageSizeThreshold();
}
long rowsToFillPage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;

import org.apache.parquet.Ints;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
Expand Down Expand Up @@ -52,6 +51,7 @@ abstract class ColumnWriterBase implements ColumnWriter {

private Statistics<?> statistics;
private long rowsWrittenSoFar = 0;
private int pageRowCount;

ColumnWriterBase(
ColumnDescriptor path,
Expand Down Expand Up @@ -84,6 +84,10 @@ private void definitionLevel(int definitionLevel) {

private void repetitionLevel(int repetitionLevel) {
repetitionLevelColumn.writeInteger(repetitionLevel);
assert pageRowCount == 0 ? repetitionLevel == 0 : true : "Every page shall start on record boundaries";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the logic of adding verification here? I have encountered a situation where the valuecount is 0 but the replicationlevel is not 0. Is this situation itself normal? Why do you need to add this check after columnindex

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhaochengzhch, I think the message describes it. We require to end/start pages at record boundaries so the repetition level shall be 0 when the page row count is 0 (which means we are starting a page). If the repetition level is not 0 at this point it breaks the mentioned requirement which is needed for column indexes.

if (repetitionLevel == 0) {
++pageRowCount;
}
}

/**
Expand Down Expand Up @@ -299,13 +303,9 @@ long getRowsWrittenSoFar() {

/**
* Writes the current data to a new page in the page store
*
* @param rowCount
* how many rows have been written so far
*/
void writePage(long rowCount) {
int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar);
this.rowsWrittenSoFar = rowCount;
void writePage() {
this.rowsWrittenSoFar += pageRowCount;
if (DEBUG)
LOG.debug("write page");
try {
Expand All @@ -318,6 +318,7 @@ void writePage(long rowCount) {
dataColumn.reset();
valueCount = 0;
resetStatistics();
pageRowCount = 0;
}

abstract void writePage(int rowCount, int valueCount, Statistics<?> statistics, ValuesWriter repetitionLevels,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ public void test() throws Exception {
for (int i = 0; i < rows; i++) {
columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0);
if ((i + 1) % 1000 == 0) {
columnWriterV2.writePage(i);
columnWriterV2.writePage();
}
}
columnWriterV2.writePage(rows);
columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
Expand Down Expand Up @@ -103,10 +103,10 @@ public void testOptional() throws Exception {
for (int i = 0; i < rows; i++) {
columnWriterV2.writeNull(0, 0);
if ((i + 1) % 1000 == 0) {
columnWriterV2.writePage(i);
columnWriterV2.writePage();
}
}
columnWriterV2.writePage(rows);
columnWriterV2.writePage();
columnWriterV2.finalizeColumnChunk();
List<DataPage> pages = pageWriter.getPages();
int valueCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,16 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
int r = rs[i % rs.length];
int d = ds[i % ds.length];
LOG.debug("write i: {}", i);
if (i != 0 && r == 0) {
memColumnsStore.endRecord();
}
if (d == 2) {
columnWriter.write((long)i, r, d);
} else {
columnWriter.writeNull(r, d);
}
memColumnsStore.endRecord();
}
memColumnsStore.endRecord();
memColumnsStore.flush();

ColumnReader columnReader = getColumnReader(memPageStore, path, mt);
Expand Down