Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4674,4 +4674,30 @@
default value of DEFAULT_RACK is returned for all node names.
</description>
</property>

<property>
<name>ozone.om.edekcacheloader.interval.ms</name>
<value>1000</value>
<description>When KeyProvider is configured, the interval time of warming
up edek cache on OM starts up. All edeks will be loaded
from KMS into provider cache. The edek cache loader will try to warm up the
cache until succeed or OM leaves active state.
</description>
</property>

<property>
<name>ozone.om.edekcacheloader.initial.delay.ms</name>
<value>3000</value>
<description>When KeyProvider is configured, the time delayed until the first
attempt to warm up edek cache on OM start up.
</description>
</property>

<property>
<name>ozone.om.edekcacheloader.max-retries</name>
<value>10</value>
<description>When KeyProvider is configured, the max retries allowed to attempt
warm up edek cache if none of key successful on OM start up.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -622,4 +622,17 @@ private OMConfigKeys() {
public static final String OZONE_OM_MAX_BUCKET =
"ozone.om.max.buckets";
public static final int OZONE_OM_MAX_BUCKET_DEFAULT = 100000;

public static final String OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY =
"ozone.om.edekcacheloader.initial.delay.ms";

public static final int OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT = 3000;

public static final String OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_KEY = "ozone.om.edekcacheloader.interval.ms";

public static final int OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_DEFAULT = 1000;

public static final String OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY =
"ozone.om.edekcacheloader.max-retries";
public static final int OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT = 10;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@
import java.util.Random;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.BooleanSupplier;
import javax.xml.bind.DatatypeConverter;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
import org.apache.hadoop.crypto.key.kms.server.MiniKMS;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -95,6 +97,7 @@
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.exceptions.OMException;
Expand All @@ -106,6 +109,7 @@
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.test.Whitebox;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Flaky;
import org.apache.ozone.test.tag.Unhealthy;
Expand Down Expand Up @@ -211,6 +215,34 @@ static void reInitClient() throws IOException {
store = ozClient.getObjectStore();
}

@Test
public void testWarmupEDEKCacheOnStartup() throws Exception {

createVolumeAndBucket("vol", "buck", BucketLayout.OBJECT_STORE);

@SuppressWarnings("unchecked") KMSClientProvider spy = getKMSClientProvider();
assertTrue(spy.getEncKeyQueueSize(TEST_KEY) > 0);

conf.setInt(OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, 0);
cluster.restartOzoneManager();

GenericTestUtils.waitFor(new BooleanSupplier() {
@Override
public boolean getAsBoolean() {
final KMSClientProvider kspy = getKMSClientProvider();
return kspy.getEncKeyQueueSize(TEST_KEY) > 0;
}
}, 1000, 60000);
}

private KMSClientProvider getKMSClientProvider() {
LoadBalancingKMSClientProvider lbkmscp =
(LoadBalancingKMSClientProvider) Whitebox.getInternalState(
cluster.getOzoneManager().getKmsProvider(), "extension");
assert lbkmscp.getProviders().length == 1;
return lbkmscp.getProviders()[0];
}


@ParameterizedTest
@EnumSource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_DEFAULT;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_HTTP_AUTH_TYPE;
Expand Down Expand Up @@ -97,6 +103,7 @@
import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PrepareStatusResponse.PrepareStatus;
import static org.apache.hadoop.security.UserGroupInformation.getCurrentUser;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.ozone.graph.PrintableGraph.GraphType.FILE_NAME;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -105,6 +112,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ProtocolMessageEnum;
import java.io.BufferedWriter;
Expand Down Expand Up @@ -137,6 +145,8 @@
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -386,6 +396,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private KeyManager keyManager;
private PrefixManagerImpl prefixManager;
private final UpgradeFinalizer<OzoneManager> upgradeFinalizer;
private ExecutorService edekCacheLoader = null;

/**
* OM super user / admin list.
Expand Down Expand Up @@ -727,6 +738,110 @@ private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
omHostName = HddsUtils.getHostName(conf);
}

public void initializeEdekCache(OzoneConfiguration conf) {
int edekCacheLoaderDelay =
conf.getInt(OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_KEY, OZONE_OM_EDEKCACHELOADER_INITIAL_DELAY_MS_DEFAULT);
int edekCacheLoaderInterval =
conf.getInt(OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_KEY, OZONE_OM_EDEKCACHELOADER_INTERVAL_MS_DEFAULT);
int edekCacheLoaderMaxRetries =
conf.getInt(OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_KEY, OZONE_OM_EDEKCACHELOADER_MAX_RETRIES_DEFAULT);
if (kmsProvider != null) {
edekCacheLoader = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Warm Up EDEK Cache Thread #%d")
.build());
warmUpEdekCache(edekCacheLoader, edekCacheLoaderDelay, edekCacheLoaderInterval, edekCacheLoaderMaxRetries);
}
}

static class EDEKCacheLoader implements Runnable {
private final String[] keyNames;
private final KeyProviderCryptoExtension kp;
private int initialDelay;
private int retryInterval;
private int maxRetries;

EDEKCacheLoader(final String[] names, final KeyProviderCryptoExtension kp,
final int delay, final int interval, final int maxRetries) {
this.keyNames = names;
this.kp = kp;
this.initialDelay = delay;
this.retryInterval = interval;
this.maxRetries = maxRetries;
}

@Override
public void run() {
LOG.info("Warming up {} EDEKs... (initialDelay={}, "
+ "retryInterval={}, maxRetries={})", keyNames.length, initialDelay, retryInterval,
maxRetries);
try {
Thread.sleep(initialDelay);
} catch (InterruptedException ie) {
LOG.info("EDEKCacheLoader interrupted before warming up.");
return;
}

boolean success = false;
int retryCount = 0;
IOException lastSeenIOE = null;
long warmUpEDEKStartTime = monotonicNow();

while (!success && retryCount < maxRetries) {
try {
kp.warmUpEncryptedKeys(keyNames);
LOG.info("Successfully warmed up {} EDEKs.", keyNames.length);
success = true;
} catch (IOException ioe) {
lastSeenIOE = ioe;
LOG.info("Failed to warm up EDEKs.", ioe);
} catch (Exception e) {
LOG.error("Cannot warm up EDEKs.", e);
throw e;
}

if (!success) {
try {
Thread.sleep(retryInterval);
} catch (InterruptedException ie) {
LOG.info("EDEKCacheLoader interrupted during retry.");
break;
}
retryCount++;
}
}

long warmUpEDEKTime = monotonicNow() - warmUpEDEKStartTime;
LOG.debug("Time taken to load EDEK keys to the cache: {}", warmUpEDEKTime);
if (!success) {
LOG.warn("Max retry {} reached, unable to warm up EDEKs.", maxRetries);
if (lastSeenIOE != null) {
LOG.warn("Last seen exception:", lastSeenIOE);
}
}
}
}

public void warmUpEdekCache(final ExecutorService executor, final int delay, final int interval, int maxRetries) {
Set<String> keys = new HashSet<>();
try (
TableIterator<String, ? extends Table.KeyValue<String, OmBucketInfo>> iterator =
metadataManager.getBucketTable().iterator()) {
while (iterator.hasNext()) {
Table.KeyValue<String, OmBucketInfo> entry = iterator.next();
if (entry.getValue().getEncryptionKeyInfo() != null) {
String encKey = entry.getValue().getEncryptionKeyInfo().getKeyName();
keys.add(encKey);
}
}
} catch (IOException ex) {
LOG.error("Error while retrieving encryption keys for warming up EDEK cache", ex);
}
String[] edeks = new String[keys.size()];
edeks = keys.toArray(edeks);
executor.execute(new EDEKCacheLoader(edeks, getKmsProvider(), delay, interval, maxRetries));
}

public boolean isStopped() {
return omState == State.STOPPED;
}
Expand Down Expand Up @@ -2299,6 +2414,10 @@ public boolean stop() {
if (versionManager != null) {
versionManager.close();
}

if (edekCacheLoader != null) {
edekCacheLoader.shutdown();
}
return true;
} catch (Exception e) {
LOG.error("OzoneManager stop failed.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public SnapshotInfo getLatestSnapshot() {
@Override
public void notifyLeaderChanged(RaftGroupMemberId groupMemberId,
RaftPeerId newLeaderId) {
RaftPeerId currentPeerId = groupMemberId.getPeerId();
if (newLeaderId.equals(currentPeerId)) {
// warmup cache
ozoneManager.initializeEdekCache(ozoneManager.getConfiguration());
}
// Initialize OMHAMetrics
ozoneManager.omHAMetricsInit(newLeaderId.toString());
LOG.info("{}: leader changed to {}", groupMemberId, newLeaderId);
Expand Down