Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 28 additions & 5 deletions docker/compose/docker-compose_hadoop284_hive233_spark244.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ services:
ports:
- "50070:50070"
- "8020:8020"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
env_file:
- ./hadoop.env
healthcheck:
Expand All @@ -45,6 +47,8 @@ services:
ports:
- "50075:50075"
- "50010:50010"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
links:
- "namenode"
- "historyserver"
Expand Down Expand Up @@ -99,6 +103,8 @@ services:
SERVICE_PRECONDITION: "namenode:50070 hive-metastore-postgresql:5432"
ports:
- "9083:9083"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
healthcheck:
test: ["CMD", "nc", "-z", "hivemetastore", "9083"]
interval: 30s
Expand All @@ -118,6 +124,8 @@ services:
SERVICE_PRECONDITION: "hivemetastore:9083"
ports:
- "10000:10000"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
depends_on:
- "hivemetastore"
links:
Expand All @@ -136,6 +144,8 @@ services:
ports:
- "8080:8080"
- "7077:7077"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
environment:
- INIT_DAEMON_STEP=setup_spark
links:
Expand All @@ -154,6 +164,8 @@ services:
- sparkmaster
ports:
- "8081:8081"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
environment:
- "SPARK_MASTER=spark://sparkmaster:7077"
links:
Expand All @@ -167,7 +179,7 @@ services:
hostname: zookeeper
container_name: zookeeper
ports:
- '2181:2181'
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

Expand All @@ -176,7 +188,7 @@ services:
hostname: kafkabroker
container_name: kafkabroker
ports:
- '9092:9092'
- "9092:9092"
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
Expand All @@ -186,7 +198,9 @@ services:
hostname: presto-coordinator-1
image: apachehudi/hudi-hadoop_2.8.4-prestobase_0.271:latest
ports:
- '8090:8090'
- "8090:8090"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
environment:
- PRESTO_JVM_MAX_HEAP=512M
- PRESTO_QUERY_MAX_MEMORY=1GB
Expand Down Expand Up @@ -226,7 +240,9 @@ services:
hostname: trino-coordinator-1
image: apachehudi/hudi-hadoop_2.8.4-trinocoordinator_368:latest
ports:
- '8091:8091'
- "8091:8091"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
links:
- "hivemetastore"
volumes:
Expand All @@ -239,7 +255,9 @@ services:
image: apachehudi/hudi-hadoop_2.8.4-trinoworker_368:latest
depends_on: [ "trino-coordinator-1" ]
ports:
- '8092:8092'
- "8092:8092"
# JVM debugging port (will be mapped to a random port on host)
- "5005"
links:
- "hivemetastore"
- "hiveserver"
Expand Down Expand Up @@ -268,6 +286,8 @@ services:
- sparkmaster
ports:
- '4040:4040'
# JVM debugging port (mapped to 5006 on the host)
- "5006:5005"
environment:
- "SPARK_MASTER=spark://sparkmaster:7077"
links:
Expand All @@ -286,6 +306,9 @@ services:
container_name: adhoc-2
env_file:
- ./hadoop.env
ports:
# JVM debugging port (mapped to 5005 on the host)
- "5005:5005"
depends_on:
- sparkmaster
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
Expand Down Expand Up @@ -93,7 +94,8 @@ public HoodieConcatHandle(HoodieWriteConfig config, String instantTime, HoodieTa
public void write(GenericRecord oldRecord) {
String key = KeyGenUtils.getRecordKeyFromGenericRecord(oldRecord, keyGeneratorOpt);
try {
fileWriter.writeAvro(key, oldRecord);
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to write old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void write(HoodieRecord record, Option<IndexedRecord> avroRecord) {
fileWriter.writeAvro(record.getRecordKey(),
rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));
} else {
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) avroRecord.get()), record);
fileWriter.writeAvroWithMetadata(record.getKey(), rewriteRecord((GenericRecord) avroRecord.get()));
}
// update the new location of record, so we know where to find it next
record.unseal();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.model.HoodieRecord;
Expand Down Expand Up @@ -292,13 +293,7 @@ protected boolean writeRecord(HoodieRecord<T> hoodieRecord, Option<IndexedRecord
}
try {
if (indexedRecord.isPresent() && !isDelete) {
// Convert GenericRecord to GenericRecord with hoodie commit metadata in schema
if (preserveMetadata && useWriterSchemaForCompaction) { // useWriteSchema will be true only in case of compaction.
fileWriter.writeAvro(hoodieRecord.getRecordKey(),
rewriteRecordWithMetadata((GenericRecord) indexedRecord.get(), newFilePath.getName()));
} else {
fileWriter.writeAvroWithMetadata(rewriteRecord((GenericRecord) indexedRecord.get()), hoodieRecord);
}
writeToFile(hoodieRecord.getKey(), (GenericRecord) indexedRecord.get(), preserveMetadata && useWriterSchemaForCompaction);
recordsWritten++;
} else {
recordsDeleted++;
Expand Down Expand Up @@ -352,14 +347,9 @@ public void write(GenericRecord oldRecord) {
}

if (copyOldRecord) {
// this should work as it is, since this is an existing record
try {
// rewrite file names
// do not preserve FILENAME_METADATA_FIELD
if (preserveMetadata && useWriterSchemaForCompaction) {
oldRecord.put(HoodieRecord.FILENAME_METADATA_FIELD_POS, newFilePath.getName());
}
fileWriter.writeAvro(key, oldRecord);
// NOTE: We're enforcing preservation of the record metadata to keep existing semantic
writeToFile(new HoodieKey(key, partitionPath), oldRecord, true);
} catch (IOException | RuntimeException e) {
String errMsg = String.format("Failed to merge old record into new file for key %s from old file %s to new file %s with writerSchema %s",
key, getOldFilePath(), newFilePath, writeSchemaWithMetaFields.toString(true));
Expand All @@ -370,6 +360,16 @@ public void write(GenericRecord oldRecord) {
}
}

protected void writeToFile(HoodieKey key, GenericRecord avroRecord, boolean shouldPreserveRecordMetadata) throws IOException {
if (shouldPreserveRecordMetadata) {
// NOTE: `FILENAME_METADATA_FIELD` has to be rewritten to correctly point to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can create handle reuse this method ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally. Previously it relied on preserveMetadata fields which were not available in CreateHandle, so had to bail on moving it to WriteHandle class, but then after refactoring forgot to update CreateHandle

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do want to review all handles more holistically and cleanup quite a bit of duplication and unnecessary complication that we've currently amassed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is great if you can do that.

// file holding this record even in cases when overall metadata is preserved
fileWriter.writeAvro(key.getRecordKey(), rewriteRecordWithMetadata(avroRecord, newFilePath.getName()));
} else {
fileWriter.writeAvroWithMetadata(key, rewriteRecord(avroRecord));
}
}

protected void writeIncomingRecords() throws IOException {
// write out any pending records (this can happen when inserts are turned into updates)
Iterator<HoodieRecord<T>> newRecordsItr = (keyToNewRecords instanceof ExternalSpillableMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*/
public class HoodieSortedMergeHandle<T extends HoodieRecordPayload, I, K, O> extends HoodieMergeHandle<T, I, K, O> {

private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
private final Queue<String> newRecordKeysSorted = new PriorityQueue<>();

public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, TaskContextSupplier taskContextSupplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.generic.GenericRecord;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;

import org.apache.avro.generic.IndexedRecord;
Expand All @@ -29,17 +30,17 @@

public interface HoodieFileWriter<R extends IndexedRecord> {

void writeAvroWithMetadata(R newRecord, HoodieRecord record) throws IOException;
void writeAvroWithMetadata(HoodieKey key, R newRecord) throws IOException;

boolean canWrite();

void close() throws IOException;

void writeAvro(String key, R oldRecord) throws IOException;

default void prepRecordWithMetadata(R avroRecord, HoodieRecord record, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
default void prepRecordWithMetadata(HoodieKey key, R avroRecord, String instantTime, Integer partitionId, AtomicLong recordIndex, String fileName) {
String seqId = HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex.getAndIncrement());
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, record.getRecordKey(), record.getPartitionPath(), fileName);
HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, key.getRecordKey(), key.getPartitionPath(), fileName);
HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, instantTime, seqId);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,6 @@

package org.apache.hudi.io.storage;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
Expand All @@ -40,6 +30,15 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
Expand Down Expand Up @@ -111,13 +110,13 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
}

@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(avroRecord, record, instantTime,
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
writeAvro(key.getRecordKey(), avroRecord);
} else {
writeAvro(record.getRecordKey(), (IndexedRecord) avroRecord);
writeAvro(key.getRecordKey(), avroRecord);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,35 @@

package org.apache.hudi.io.storage;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.AvroOrcUtils;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_AVRO_BLOOM_FILTER_METADATA_KEY;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_BLOOM_FILTER_TYPE_CODE;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MAX_RECORD_KEY_FOOTER;
import static org.apache.hudi.avro.HoodieAvroWriteSupport.HOODIE_MIN_RECORD_KEY_FOOTER;

public class HoodieOrcWriter<T extends HoodieRecordPayload, R extends IndexedRecord>
implements HoodieFileWriter<R>, Closeable {
Expand Down Expand Up @@ -94,10 +95,10 @@ public HoodieOrcWriter(String instantTime, Path file, HoodieOrcConfig config, Sc
}

@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
prepRecordWithMetadata(avroRecord, record, instantTime,
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), RECORD_INDEX, file.getName());
writeAvro(record.getRecordKey(), avroRecord);
writeAvro(key.getRecordKey(), avroRecord);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@

package org.apache.hudi.io.storage;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroWriteSupport;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;

Expand Down Expand Up @@ -84,12 +83,12 @@ public HoodieParquetWriter(String instantTime,
}

@Override
public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws IOException {
public void writeAvroWithMetadata(HoodieKey key, R avroRecord) throws IOException {
if (populateMetaFields) {
prepRecordWithMetadata(avroRecord, record, instantTime,
prepRecordWithMetadata(key, avroRecord, instantTime,
taskContextSupplier.getPartitionIdSupplier().get(), recordIndex, file.getName());
super.write(avroRecord);
writeSupport.add(record.getRecordKey());
writeSupport.add(key.getRecordKey());
} else {
super.write(avroRecord);
}
Expand Down
Loading