diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index d04192fbff..5cd7d876e4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -149,7 +149,7 @@ public void flush() { for (ColumnWriterBase memColumn : columns.values()) { long rows = rowCount - memColumn.getRowsWrittenSoFar(); if (rows > 0) { - memColumn.writePage(rowCount); + memColumn.writePage(); } memColumn.finalizeColumnChunk(); } @@ -195,7 +195,7 @@ private void sizeCheck() { long rows = rowCount - writer.getRowsWrittenSoFar(); long remainingMem = props.getPageSizeThreshold() - usedMem; if (remainingMem <= thresholdTolerance) { - writer.writePage(rowCount); + writer.writePage(); remainingMem = props.getPageSizeThreshold(); } long rowsToFillPage = diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java index 16085bb806..3788c82e46 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java @@ -20,7 +20,6 @@ import java.io.IOException; -import org.apache.parquet.Ints; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; @@ -52,6 +51,7 @@ abstract class ColumnWriterBase implements ColumnWriter { private Statistics statistics; private long rowsWrittenSoFar = 0; + private int pageRowCount; ColumnWriterBase( ColumnDescriptor path, @@ -84,6 +84,10 @@ private void definitionLevel(int definitionLevel) { private void repetitionLevel(int repetitionLevel) { repetitionLevelColumn.writeInteger(repetitionLevel); + assert pageRowCount == 0 ? repetitionLevel == 0 : true : "Every page shall start on record boundaries"; + if (repetitionLevel == 0) { + ++pageRowCount; + } } /** @@ -299,13 +303,9 @@ long getRowsWrittenSoFar() { /** * Writes the current data to a new page in the page store - * - * @param rowCount - * how many rows have been written so far */ - void writePage(long rowCount) { - int pageRowCount = Ints.checkedCast(rowCount - rowsWrittenSoFar); - this.rowsWrittenSoFar = rowCount; + void writePage() { + this.rowsWrittenSoFar += pageRowCount; if (DEBUG) LOG.debug("write page"); try { @@ -318,6 +318,7 @@ void writePage(long rowCount) { dataColumn.reset(); valueCount = 0; resetStatistics(); + pageRowCount = 0; } abstract void writePage(int rowCount, int valueCount, Statistics statistics, ValuesWriter repetitionLevels, diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java index d2d78c43d1..35fddaf0b0 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java @@ -65,10 +65,10 @@ public void test() throws Exception { for (int i = 0; i < rows; i++) { columnWriterV2.write(Binary.fromString("bar" + i % 10), 0, 0); if ((i + 1) % 1000 == 0) { - columnWriterV2.writePage(i); + columnWriterV2.writePage(); } } - columnWriterV2.writePage(rows); + columnWriterV2.writePage(); columnWriterV2.finalizeColumnChunk(); List pages = pageWriter.getPages(); int valueCount = 0; @@ -103,10 +103,10 @@ public void testOptional() throws Exception { for (int i = 0; i < rows; i++) { columnWriterV2.writeNull(0, 0); if ((i + 1) % 1000 == 0) { - columnWriterV2.writePage(i); + columnWriterV2.writePage(); } } - columnWriterV2.writePage(rows); + columnWriterV2.writePage(); columnWriterV2.finalizeColumnChunk(); List pages = pageWriter.getPages(); int valueCount = 0; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index c28649eef5..e5db38c945 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -138,13 +138,16 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { int r = rs[i % rs.length]; int d = ds[i % ds.length]; LOG.debug("write i: {}", i); + if (i != 0 && r == 0) { + memColumnsStore.endRecord(); + } if (d == 2) { columnWriter.write((long)i, r, d); } else { columnWriter.writeNull(r, d); } - memColumnsStore.endRecord(); } + memColumnsStore.endRecord(); memColumnsStore.flush(); ColumnReader columnReader = getColumnReader(memPageStore, path, mt);