Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ private static <T extends HoodieRecordPayload, R extends IndexedRecord> HoodieFi
BloomFilter filter = createBloomFilter(config);
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(hoodieTable.getHadoopConf(),
config.getHFileCompressionAlgorithm(), config.getHFileBlockSize(), config.getHFileMaxFileSize(),
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION,
filter, HFILE_COMPARATOR);

return new HoodieHFileWriter<>(instantTime, path, hfileConfig, schema, taskContextSupplier, config.populateMetaFields());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ public class HoodieHFileConfig {
private final Configuration hadoopConf;
private final BloomFilter bloomFilter;
private final KeyValue.KVComparator hfileComparator;
private final String keyFieldName;

public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compressionAlgorithm, int blockSize,
long maxFileSize, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
long maxFileSize, String keyFieldName, boolean prefetchBlocksOnOpen, boolean cacheDataInL1,
boolean dropBehindCacheCompaction, BloomFilter bloomFilter, KeyValue.KVComparator hfileComparator) {
this.hadoopConf = hadoopConf;
this.compressionAlgorithm = compressionAlgorithm;
Expand All @@ -56,6 +57,7 @@ public HoodieHFileConfig(Configuration hadoopConf, Compression.Algorithm compres
this.dropBehindCacheCompaction = dropBehindCacheCompaction;
this.bloomFilter = bloomFilter;
this.hfileComparator = hfileComparator;
this.keyFieldName = keyFieldName;
}

public Configuration getHadoopConf() {
Expand Down Expand Up @@ -97,4 +99,8 @@ public BloomFilter getBloomFilter() {
public KeyValue.KVComparator getHfileComparator() {
return hfileComparator;
}

public String getKeyFieldName() {
return keyFieldName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.io.Writable;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;

import java.io.DataInput;
import java.io.DataOutput;
Expand All @@ -63,6 +65,8 @@ public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends IndexedR
private final String instantTime;
private final TaskContextSupplier taskContextSupplier;
private final boolean populateMetaFields;
private final Schema schema;
private final Option<Schema.Field> keyFieldSchema;
private HFile.Writer writer;
private String minRecordKey;
private String maxRecordKey;
Expand All @@ -77,6 +81,8 @@ public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig hfileC
this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
this.hfileConfig = hfileConfig;
this.schema = schema;
this.keyFieldSchema = Option.ofNullable(schema.getField(hfileConfig.getKeyFieldName()));

// TODO - compute this compression ratio dynamically by looking at the bytes written to the
// stream and the actual file size reported by HDFS
Expand Down Expand Up @@ -121,8 +127,25 @@ public boolean canWrite() {
}

@Override
public void writeAvro(String recordKey, IndexedRecord object) throws IOException {
byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
public void writeAvro(String recordKey, IndexedRecord record) throws IOException {
byte[] value = null;
boolean isRecordSerialized = false;
if (keyFieldSchema.isPresent()) {
GenericRecord keyExcludedRecord = (GenericRecord) record;
int keyFieldPos = this.keyFieldSchema.get().pos();
boolean isKeyAvailable = (record.get(keyFieldPos) != null && !(record.get(keyFieldPos).toString().isEmpty()));
if (isKeyAvailable) {
Object originalKey = keyExcludedRecord.get(keyFieldPos);
keyExcludedRecord.put(keyFieldPos, StringUtils.EMPTY_STRING);
value = HoodieAvroUtils.avroToBytes(keyExcludedRecord);
keyExcludedRecord.put(keyFieldPos, originalKey);
isRecordSerialized = true;
}
}
if (!isRecordSerialized) {
value = HoodieAvroUtils.avroToBytes((GenericRecord) record);
}

KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value);
writer.append(kv);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta

// Virtual keys support for metadata table. This Field is
// from the metadata payload schema.
private static final String RECORD_KEY_FIELD = HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY;
private static final String RECORD_KEY_FIELD_NAME = HoodieMetadataPayload.KEY_FIELD_NAME;

protected HoodieWriteConfig metadataWriteConfig;
protected HoodieWriteConfig dataWriteConfig;
Expand Down Expand Up @@ -217,8 +217,8 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi

// RecordKey properties are needed for the metadata table records
final Properties properties = new Properties();
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD);
properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD);
properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), RECORD_KEY_FIELD_NAME);
properties.put("hoodie.datasource.write.recordkey.field", RECORD_KEY_FIELD_NAME);
builder.withProperties(properties);

if (writeConfig.isMetricsOn()) {
Expand Down Expand Up @@ -454,7 +454,7 @@ private boolean bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.setPayloadClassName(HoodieMetadataPayload.class.getName())
.setBaseFileFormat(HoodieFileFormat.HFILE.toString())
.setRecordKeyFields(RECORD_KEY_FIELD)
.setRecordKeyFields(RECORD_KEY_FIELD_NAME)
.setPopulateMetaFields(dataWriteConfig.getMetadataConfig().populateMetaFields())
.setKeyGeneratorClassProp(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
.initTable(hadoopConf.get(), metadataWriteConfig.getBasePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public HoodieTableMetadataKeyGenerator(TypedProperties config) {

@Override
public String getRecordKey(GenericRecord record) {
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.SCHEMA_FIELD_ID_KEY, isConsistentLogicalTimestampEnabled());
return KeyGenUtils.getRecordKey(record, HoodieMetadataPayload.KEY_FIELD_NAME, isConsistentLogicalTimestampEnabled());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private HoodieHFileWriter createHFileWriter(Schema avroSchema, boolean populateM
String instantTime = "000";

HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
HoodieHFileReader.KEY_FIELD_NAME, PREFETCH_ON_OPEN, CACHE_DATA_IN_L1, DROP_BEHIND_CACHE_COMPACTION, filter, HFILE_COMPARATOR);
return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, avroSchema, mockTaskContextSupplier, populateMetaFields);
}

Expand Down
Loading