From 72ed000da1cf3449e7b0a58d178c514e68056845 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 24 Mar 2022 19:52:39 -0700 Subject: [PATCH 1/3] Fixed Hudi's custom `ParquetWriter` implementations to rely on `ParquetWriter::getDataSize` in lieu of FileSystem wrapper that works incorrectly in the presence of caching --- .../org/apache/hudi/io/storage/HoodieParquetWriter.java | 7 +------ .../io/storage/row/HoodieInternalRowParquetWriter.java | 2 +- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java index 3cee8c816d41..957a0ff52e91 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetWriter.java @@ -97,7 +97,7 @@ public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOEx @Override public boolean canWrite() { - return fs.getBytesWritten(file) < maxFileSize; + return getDataSize() < maxFileSize; } @Override @@ -107,9 +107,4 @@ public void writeAvro(String key, IndexedRecord object) throws IOException { writeSupport.add(key); } } - - @Override - public long getBytesWritten() { - return fs.getBytesWritten(file); - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java index 6c2e8c50cdf6..7e64d83879f0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowParquetWriter.java @@ -56,7 +56,7 @@ public HoodieInternalRowParquetWriter(Path file, HoodieRowParquetConfig parquetC @Override public boolean canWrite() { - return fs.getBytesWritten(file) < maxFileSize; + return getDataSize() < maxFileSize; } @Override From cc90c74e6d37af59aaf94770241fb46799f3fe19 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 24 Mar 2022 19:52:50 -0700 Subject: [PATCH 2/3] Killing dead-code --- .../java/org/apache/hudi/io/storage/HoodieFileWriter.java | 2 -- .../java/org/apache/hudi/io/storage/HoodieHFileWriter.java | 5 ----- .../java/org/apache/hudi/io/storage/HoodieOrcWriter.java | 5 ----- 3 files changed, 12 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java index a5792349cad1..9f749566b255 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriter.java @@ -37,8 +37,6 @@ public interface HoodieFileWriter { void writeAvro(String key, R oldRecord) throws IOException; - long getBytesWritten(); - default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) { String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement()); HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java index 5dcd2e0a32e5..be79f5033475 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java @@ -187,9 +187,4 @@ public void readFields(DataInput in) throws IOException { writer.close(); writer = null; } - - @Override - public long getBytesWritten() { - return fs.getBytesWritten(file); - } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java index 85d36cc685cd..3fe8be05c09f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/storage/HoodieOrcWriter.java @@ -165,9 +165,4 @@ public void close() throws IOException { writer.close(); } - - @Override - public long getBytesWritten() { - return fs.getBytesWritten(file); - } } From dd1c5ba905eb898ae3fe65b900bdafec751e4e9f Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Fri, 25 Mar 2022 14:16:40 -0700 Subject: [PATCH 3/3] Updated tests --- .../table/action/commit/TestJavaCopyOnWriteActionExecutor.java | 2 +- .../hudi/table/action/commit/TestCopyOnWriteActionExecutor.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java index 8f296d510617..1bf1b4cccbf5 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java @@ -402,7 +402,7 @@ public void testFileSizeUpsertRecords() throws Exception { counts++; } } - assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); + assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file"); } @Test diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java index 0b29cf25f9e3..4ae845636076 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java @@ -437,7 +437,7 @@ public void testFileSizeUpsertRecords() throws Exception { counts++; } } - assertEquals(3, counts, "If the number of records are more than 1150, then there should be a new file"); + assertEquals(5, counts, "If the number of records are more than 1150, then there should be a new file"); } @Test