Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@

package org.apache.hudi.table;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
Expand Down Expand Up @@ -59,6 +54,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -75,10 +71,17 @@
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.storage.HoodieLayoutFactory;
import org.apache.hudi.table.storage.HoodieStorageLayout;

import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -776,8 +779,8 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri
* @param triggeringInstantTimestamp - The instant that is triggering this metadata write
* @return instance of {@link HoodieTableMetadataWriter}
*/
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<R> actionMetadata) {
// Each engine is expected to override this and
// provide the actual metadata writer, if enabled.
return Option.empty();
Expand All @@ -786,4 +789,8 @@ public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad
public HoodieTableMetadata getMetadataTable() {
return this.metadata;
}

public Runnable getPreExecuteRunnable() {
return Functions.noop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> {
if (!externalSchemaTransformation) {
return record;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
wrapper = new BoundedInMemoryExecutor<>(table.getConfig().getWriteBufferLimitBytes(), new IteratorBasedQueueProducer<>(readerIterator),
Option.of(new UpdateHandler(mergeHandle)), record -> {
if (!externalSchemaTransformation) {
return record;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.execution;

import org.apache.avro.Schema;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand All @@ -30,6 +29,8 @@
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

import java.util.Iterator;
import java.util.List;

Expand Down Expand Up @@ -84,8 +85,8 @@ protected List<WriteStatus> computeNext() {
schema = HoodieAvroUtils.addMetadataFields(schema);
}
bufferedIteratorExecutor =
new SparkBoundedInMemoryExecutor<>(hoodieConfig, inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig));
new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(), inputItr, getInsertHandler(),
getTransformFunction(schema, hoodieConfig), hoodieTable.getPreExecuteRunnable());
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.table;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.data.HoodieData;
Expand All @@ -39,6 +37,11 @@
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.fs.Path;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaRDD;

import java.io.IOException;
Expand Down Expand Up @@ -111,8 +114,8 @@ protected HoodieIndex getIndex(HoodieWriteConfig config, HoodieEngineContext con
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<T> actionMetadata) {
public <R extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstantTimestamp,
Option<R> actionMetadata) {
if (config.isMetadataTableEnabled()) {
// Create the metadata table writer. First time after the upgrade this creation might trigger
// metadata table bootstrapping. Bootstrapping process could fail and checking the table
Expand All @@ -132,4 +135,10 @@ public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> getMetad

return Option.empty();
}

@Override
public Runnable getPreExecuteRunnable() {
final TaskContext taskContext = TaskContext.get();
return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -68,15 +67,15 @@ void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle, Path so
Reader orcReader = OrcFile.createReader(sourceFilePath, OrcFile.readerOptions(table.getHadoopConf()));
TypeDescription orcSchema = orcReader.getSchema();
try (RecordReader reader = orcReader.rows(new Reader.Options(table.getHadoopConf()).schema(orcSchema))) {
wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config,
wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new OrcReaderIterator(reader, avroSchema, orcSchema), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
});
}, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieBootstrapHandle;
import org.apache.hudi.keygen.KeyGeneratorInterface;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -72,15 +71,15 @@ void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
try {
ParquetReader<IndexedRecord> reader =
AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();
wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config,
wrapper = new BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config.getWriteBufferLimitBytes(),
new ParquetReaderIterator(reader), new BootstrapRecordConsumer(bootstrapHandle), inp -> {
String recKey = keyGenerator.getKey(inp).getRecordKey();
GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);
BootstrapRecordPayload payload = new BootstrapRecordPayload(gr);
HoodieRecord rec = new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload);
return rec;
});
}, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.execution.SparkBoundedInMemoryExecutor;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
Expand Down Expand Up @@ -90,13 +89,13 @@ public void runMerge(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>

ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new SparkBoundedInMemoryExecutor(table.getConfig(), readerIterator,
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(), readerIterator,
new UpdateHandler(mergeHandle), record -> {
if (!externalSchemaTransformation) {
return record;
}
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
});
}, table.getPreExecuteRunnable());
wrapper.execute();
} catch (Exception e) {
throw new HoodieException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.testutils.HoodieClientTestHarness;

import org.apache.avro.generic.IndexedRecord;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -44,7 +47,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestSparkBoundedInMemoryExecutor extends HoodieClientTestHarness {
public class TestBoundedInMemoryExecutorInSpark extends HoodieClientTestHarness {

private final String instantTime = HoodieActiveTimeline.createNewInstantTime();

Expand All @@ -58,6 +61,11 @@ public void tearDown() throws Exception {
cleanupResources();
}

private Runnable getPreExecuteRunnable() {
final TaskContext taskContext = TaskContext.get();
return () -> TaskContext$.MODULE$.setTaskContext(taskContext);
}

@Test
public void testExecutor() {

Expand Down Expand Up @@ -85,10 +93,10 @@ protected Integer getResult() {
}
};

SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
int result = executor.execute();
// It should buffer and write 100 records
assertEquals(100, result);
Expand Down Expand Up @@ -131,11 +139,11 @@ protected Integer getResult() {
}
};

SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> executor = null;
try {
executor = new SparkBoundedInMemoryExecutor(hoodieWriteConfig, hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA));
SparkBoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;
executor = new BoundedInMemoryExecutor(hoodieWriteConfig.getWriteBufferLimitBytes(), hoodieRecords.iterator(), consumer,
getTransformFunction(HoodieTestDataGenerator.AVRO_SCHEMA), getPreExecuteRunnable());
BoundedInMemoryExecutor<HoodieRecord, Tuple2<HoodieRecord, Option<IndexedRecord>>, Integer> finalExecutor = executor;

Thread.currentThread().interrupt();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
*/
public interface Functions {

static Runnable noop() {
return () -> {
};
}

/**
* A function which has not any parameter.
*/
Expand Down
Loading