Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,16 @@ public abstract class AbstractAsyncService implements Serializable {
private transient ExecutorService executor;
// Future tracking delta-sync/compaction
private transient CompletableFuture future;
// Run in daemon mode
private final boolean runInDaemonMode;

protected AbstractAsyncService() {
this(false);
}

protected AbstractAsyncService(boolean runInDaemonMode) {
shutdownRequested = false;
this.runInDaemonMode = runInDaemonMode;
}

protected boolean isShutdownRequested() {
Expand Down Expand Up @@ -129,26 +136,33 @@ public void start(Function<Boolean, Boolean> onShutdownCallback) {
*/
private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
LOG.info("Submitting monitor thread !!");
Executors.newSingleThreadExecutor().submit(() -> {
Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "Monitor Thread");
t.setDaemon(isRunInDaemonMode());
return t;
}).submit(() -> {
boolean error = false;
try {
LOG.info("Monitoring thread(s) !!");
future.get();
} catch (ExecutionException ex) {
LOG.error("Monitor noticed one or more threads failed. Requesting graceful shutdown of other threads", ex);
error = true;
shutdown(false);
} catch (InterruptedException ie) {
LOG.error("Got interrupted Monitoring threads", ie);
error = true;
shutdown(false);
} finally {
// Mark as shutdown
shutdown = true;
if (null != onShutdownCallback) {
onShutdownCallback.apply(error);
}
shutdown(false);
}
});
}

public boolean isRunInDaemonMode() {
return runInDaemonMode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.async;

import org.apache.hudi.client.Compactor;
import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

/**
* Async Compactor Service that runs in separate thread. Currently, only one compactor is allowed to run at any time.
*/
public class AsyncCompactService extends AbstractAsyncService {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncCompactService.class);

/**
* This is the job pool used by async compaction.
*/
public static final String COMPACT_POOL_NAME = "hoodiecompact";

private final int maxConcurrentCompaction;
private transient Compactor compactor;
private transient JavaSparkContext jssc;
private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
private transient ReentrantLock queueLock = new ReentrantLock();
private transient Condition consumed = queueLock.newCondition();

public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client) {
this(jssc, client, false);
}

public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient client, boolean runInDaemonMode) {
super(runInDaemonMode);
this.jssc = jssc;
this.compactor = new Compactor(client, jssc);
this.maxConcurrentCompaction = 1;
}

/**
* Enqueues new Pending compaction.
*/
public void enqueuePendingCompaction(HoodieInstant instant) {
pendingCompactions.add(instant);
}

/**
* Wait till outstanding pending compactions reduces to the passed in value.
*
* @param numPendingCompactions Maximum pending compactions allowed
* @throws InterruptedException
*/
public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
try {
queueLock.lock();
while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
consumed.await();
}
} finally {
queueLock.unlock();
}
}

/**
* Fetch Next pending compaction if available.
*
* @return
* @throws InterruptedException
*/
private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS);
if (instant != null) {
try {
queueLock.lock();
// Signal waiting thread
consumed.signal();
} finally {
queueLock.unlock();
}
}
return instant;
}

/**
* Start Compaction Service.
*/
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentCompaction,
r -> {
Thread t = new Thread(r, "async_compact_thread");
t.setDaemon(isRunInDaemonMode());
return t;
});
return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
// Set Compactor Pool Name for allowing users to prioritize compaction
LOG.info("Setting Spark Pool name for compaction to " + COMPACT_POOL_NAME);
jssc.setLocalProperty("spark.scheduler.pool", COMPACT_POOL_NAME);

while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextCompactionInstant();

if (null != instant) {
LOG.info("Starting Compaction for instant " + instant);
compactor.compact(instant);
LOG.info("Finished Compaction for instant " + instant);
}
}
LOG.info("Compactor shutting down properly!!");
} catch (InterruptedException ie) {
LOG.warn("Compactor executor thread got interrupted exception. Stopping", ie);
} catch (IOException e) {
LOG.error("Compactor executor failed", e);
throw new HoodieIOException(e.getMessage(), e);
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
}


/**
* Check whether compactor thread needs to be stopped.
* @return
*/
protected boolean shouldStopCompactor() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
* limitations under the License.
*/

package org.apache.hudi.utilities.deltastreamer;
package org.apache.hudi.client;

import org.apache.hudi.client.HoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,26 @@ public List<HoodieRecord> generateUpdates(String instantTime, Integer n) throws
return updates;
}

/**
* Generate update for each record in the dataset.
* @param instantTime
* @return
* @throws IOException
*/
public List<HoodieRecord> generateUpdatesForAllRecords(String instantTime) {
List<HoodieRecord> updates = new ArrayList<>();
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
existingKeys.values().forEach(kp -> {
try {
HoodieRecord record = generateUpdateRecord(kp.key, instantTime);
updates.add(record);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
});
return updates;
}

public List<HoodieRecord> generateUpdatesAsPerSchema(String commitTime, Integer n, String schemaStr) {
return generateUniqueUpdatesStream(commitTime, n, schemaStr).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public void generateDataByHoodieJavaApp(String hiveTableName, String tableType,
}

// Run Hoodie Java App
String cmd = String.format("%s %s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
" --commit-type %s --table-name %s", HOODIE_JAVA_APP, "HoodieJavaGenerateApp", hdfsUrl, HIVE_SERVER_JDBC_URL,
String cmd = String.format("%s --hive-sync --table-path %s --hive-url %s --table-type %s --hive-table %s" +
" --commit-type %s --table-name %s", HOODIE_GENERATE_APP, hdfsUrl, HIVE_SERVER_JDBC_URL,
tableType, hiveTableName, commitType, hoodieTableName);
if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
cmd = cmd + " --use-multi-partition-keys";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public abstract class ITTestBase {
protected static final String PRESTO_COORDINATOR = "/presto-coordinator-1";
protected static final String HOODIE_WS_ROOT = "/var/hoodie/ws";
protected static final String HOODIE_JAVA_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_app.sh";
protected static final String HOODIE_GENERATE_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_generate_app.sh";
protected static final String HOODIE_JAVA_STREAMING_APP = HOODIE_WS_ROOT + "/hudi-spark/run_hoodie_streaming_app.sh";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more importantly, we should also renable the test in TestDataSource

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enabled it after adding timed retry logic to wait for commits

protected static final String HUDI_HADOOP_BUNDLE =
HOODIE_WS_ROOT + "/docker/hoodie/hadoop/hive_base/target/hoodie-hadoop-mr-bundle.jar";
protected static final String HUDI_HIVE_SYNC_BUNDLE =
Expand Down
Loading