Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.async;

import org.apache.hudi.client.AbstractClusteringClient;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

/**
* Async clustering service that runs in a separate thread.
* Currently, only one clustering thread is allowed to run at any time.
*/
public abstract class AsyncClusteringService extends HoodieAsyncService {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(AsyncClusteringService.class);

private final int maxConcurrentClustering;
private transient AbstractClusteringClient clusteringClient;

public AsyncClusteringService(AbstractHoodieWriteClient writeClient) {
this(writeClient, false);
}

public AsyncClusteringService(AbstractHoodieWriteClient writeClient, boolean runInDaemonMode) {
super(runInDaemonMode);
this.clusteringClient = createClusteringClient(writeClient);
this.maxConcurrentClustering = 1;
}

protected abstract AbstractClusteringClient createClusteringClient(AbstractHoodieWriteClient client);

/**
* Start clustering service.
*/
@Override
protected Pair<CompletableFuture, ExecutorService> startService() {
ExecutorService executor = Executors.newFixedThreadPool(maxConcurrentClustering,
r -> {
Thread t = new Thread(r, "async_clustering_thread");
t.setDaemon(isRunInDaemonMode());
return t;
});

return Pair.of(CompletableFuture.allOf(IntStream.range(0, maxConcurrentClustering).mapToObj(i -> CompletableFuture.supplyAsync(() -> {
try {
while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextAsyncServiceInstant();
if (null != instant) {
LOG.info("Starting clustering for instant " + instant);
clusteringClient.cluster(instant);
LOG.info("Finished clustering for instant " + instant);
}
}
LOG.info("Clustering executor shutting down properly");
} catch (InterruptedException ie) {
LOG.warn("Clustering executor got interrupted exception! Stopping", ie);
} catch (IOException e) {
LOG.error("Clustering executor failed", e);
throw new HoodieIOException(e.getMessage(), e);
}
return true;
}, executor)).toArray(CompletableFuture[]::new)), executor);
}

/**
* Update the write client to be used for clustering.
*/
public synchronized void updateWriteClient(AbstractHoodieWriteClient writeClient) {
this.clusteringClient.updateWriteClient(writeClient);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,14 @@
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.IntStream;

/**
Expand All @@ -54,9 +50,6 @@ public abstract class AsyncCompactService extends HoodieAsyncService {
private final int maxConcurrentCompaction;
private transient AbstractCompactor compactor;
protected transient HoodieEngineContext context;
private transient BlockingQueue<HoodieInstant> pendingCompactions = new LinkedBlockingQueue<>();
private transient ReentrantLock queueLock = new ReentrantLock();
private transient Condition consumed = queueLock.newCondition();

public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClient client) {
this(context, client, false);
Expand All @@ -71,51 +64,6 @@ public AsyncCompactService(HoodieEngineContext context, AbstractHoodieWriteClien

protected abstract AbstractCompactor createCompactor(AbstractHoodieWriteClient client);

/**
* Enqueues new Pending compaction.
*/
public void enqueuePendingCompaction(HoodieInstant instant) {
pendingCompactions.add(instant);
}

/**
* Wait till outstanding pending compactions reduces to the passed in value.
*
* @param numPendingCompactions Maximum pending compactions allowed
* @throws InterruptedException
*/
public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) throws InterruptedException {
try {
queueLock.lock();
while (!isShutdown() && (pendingCompactions.size() > numPendingCompactions)) {
consumed.await();
}
} finally {
queueLock.unlock();
}
}

/**
* Fetch Next pending compaction if available.
*
* @return
* @throws InterruptedException
*/
private HoodieInstant fetchNextCompactionInstant() throws InterruptedException {
LOG.info("Compactor waiting for next instant for compaction upto 60 seconds");
HoodieInstant instant = pendingCompactions.poll(10, TimeUnit.SECONDS);
if (instant != null) {
try {
queueLock.lock();
// Signal waiting thread
consumed.signal();
} finally {
queueLock.unlock();
}
}
return instant;
}

/**
* Start Compaction Service.
*/
Expand All @@ -134,7 +82,7 @@ protected Pair<CompletableFuture, ExecutorService> startService() {
context.setProperty(EngineProperty.COMPACTION_POOL_NAME, COMPACT_POOL_NAME);

while (!isShutdownRequested()) {
final HoodieInstant instant = fetchNextCompactionInstant();
final HoodieInstant instant = fetchNextAsyncServiceInstant();

if (null != instant) {
LOG.info("Starting Compaction for instant " + instant);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,26 @@

package org.apache.hudi.async;

import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.Serializable;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

/**
* Base Class for running clean/delta-sync/compaction in separate thread and controlling their life-cycle.
* Base Class for running clean/delta-sync/compaction/clustering in separate thread and controlling their life-cycle.
*/
public abstract class HoodieAsyncService implements Serializable {

Expand All @@ -50,6 +55,12 @@ public abstract class HoodieAsyncService implements Serializable {
private transient CompletableFuture future;
// Run in daemon mode
private final boolean runInDaemonMode;
// Queue to hold pending compaction/clustering instants
private transient BlockingQueue<HoodieInstant> pendingInstants = new LinkedBlockingQueue<>();
// Mutex lock for synchronized access to pendingInstants queue
private transient ReentrantLock queueLock = new ReentrantLock();
// Condition instance to use with the queueLock
private transient Condition consumed = queueLock.newCondition();

protected HoodieAsyncService() {
this(false);
Expand Down Expand Up @@ -165,4 +176,51 @@ private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
public boolean isRunInDaemonMode() {
return runInDaemonMode;
}

/**
* Wait till outstanding pending compaction/clustering reduces to the passed in value.
*
* @param numPending Maximum pending compactions/clustering allowed
* @throws InterruptedException
*/
public void waitTillPendingAsyncServiceInstantsReducesTo(int numPending) throws InterruptedException {
try {
queueLock.lock();
while (!isShutdown() && (pendingInstants.size() > numPending)) {
consumed.await();
}
} finally {
queueLock.unlock();
}
}

/**
* Enqueues new pending clustering instant.
* @param instant {@link HoodieInstant} to enqueue.
*/
public void enqueuePendingAsyncServiceInstant(HoodieInstant instant) {
LOG.info("Enqueuing new pending clustering instant: " + instant.getTimestamp());
pendingInstants.add(instant);
}

/**
* Fetch next pending compaction/clustering instant if available.
*
* @return {@link HoodieInstant} corresponding to the next pending compaction/clustering.
* @throws InterruptedException
*/
HoodieInstant fetchNextAsyncServiceInstant() throws InterruptedException {
LOG.info("Waiting for next instant upto 10 seconds");
HoodieInstant instant = pendingInstants.poll(10, TimeUnit.SECONDS);
if (instant != null) {
try {
queueLock.lock();
// Signal waiting thread
consumed.signal();
} finally {
queueLock.unlock();
}
}
return instant;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.client;

import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;

import java.io.IOException;
import java.io.Serializable;

/**
* Client will run one round of clustering.
*/
public abstract class AbstractClusteringClient<T extends HoodieRecordPayload, I, K, O> implements Serializable {

private static final long serialVersionUID = 1L;

protected transient AbstractHoodieWriteClient<T, I, K, O> clusteringClient;

public AbstractClusteringClient(AbstractHoodieWriteClient<T, I, K, O> clusteringClient) {
this.clusteringClient = clusteringClient;
}

/**
* Run clustering for the instant.
* @param instant
* @throws IOException
*/
public abstract void cluster(HoodieInstant instant) throws IOException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here also we can think of something like AsyncServiceClient.
may be we can have a common method as below.

   public abstract void doAction(HoodieInstant instant) throw IOException;

same abstract class for both clustering and compaction.
Not too strong on this suggestion though. Let's see what others have to say.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, lets try to add java docs for all public methods. I do understand that AbstractCompactor does not have java docs. Its fine. atleast for the code we write, lets try to add java docs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same abstract class for both clustering and compaction.
Not too strong on this suggestion though. Let's see what others have to say.

@satishkotha what are your thoughts about this?


/**
* Update the write client used by async clustering.
* @param writeClient
*/
public void updateWriteClient(AbstractHoodieWriteClient<T, I, K, O> writeClient) {
this.clusteringClient = writeClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,13 @@ public class HoodieClusteringConfig extends HoodieConfig {
.key("hoodie.clustering.inline.max.commits")
.defaultValue("4")
.sinceVersion("0.7.0")
.withDocumentation("Config to control frequency of clustering");
.withDocumentation("Config to control frequency of inline clustering");

public static final ConfigProperty<String> ASYNC_CLUSTERING_MAX_COMMIT_PROP = ConfigProperty
.key("hoodie.clustering.async.max.commits")
.defaultValue("4")
.sinceVersion("0.9.0")
.withDocumentation("Config to control frequency of async clustering");

// Any strategy specific params can be saved with this prefix
public static final String CLUSTERING_STRATEGY_PARAM_PREFIX = "hoodie.clustering.plan.strategy.";
Expand Down Expand Up @@ -177,6 +183,11 @@ public Builder withInlineClusteringNumCommits(int numCommits) {
return this;
}

public Builder withAsyncClusteringMaxCommits(int numCommits) {
clusteringConfig.setValue(ASYNC_CLUSTERING_MAX_COMMIT_PROP, String.valueOf(numCommits));
return this;
}

public Builder fromProperties(Properties props) {
this.clusteringConfig.getProps().putAll(props);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,10 @@ public int getInlineClusterMaxCommits() {
return getInt(HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMIT_PROP);
}

public int getAsyncClusterMaxCommits() {
return getInt(HoodieClusteringConfig.ASYNC_CLUSTERING_MAX_COMMIT_PROP);
}

public String getPayloadClass() {
return getString(HoodieCompactionConfig.PAYLOAD_CLASS_PROP);
}
Expand Down
Loading