Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* <p>http://www.apache.org/licenses/LICENSE-2.0
* <p>
* <p>Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.ozone.annotations;

import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
import javax.annotation.processing.SupportedAnnotationTypes;
import javax.lang.model.element.Element;
import javax.lang.model.element.ExecutableElement;
import javax.lang.model.element.TypeElement;
import javax.lang.model.type.TypeMirror;
import javax.tools.Diagnostic.Kind;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;

/**
* Annotation Processor that verifies if the methods that are marked with
* Replicate annotation have proper method signature which throws
* TimeoutException.
*/
@SupportedAnnotationTypes("org.apache.hadoop.hdds.scm.metadata.Replicate")
public class ReplicateAnnotationProcessor extends AbstractProcessor {

private static final String ANNOTATION_SIMPLE_NAME = "Replicate";

@Override
public boolean process(Set<? extends TypeElement> annotations,
RoundEnvironment roundEnv) {
for (TypeElement annotation : annotations) {
if (!annotation.getSimpleName().contentEquals(ANNOTATION_SIMPLE_NAME)) {
continue;
}
roundEnv.getElementsAnnotatedWith(annotation)
.forEach(this::checkMethodSignature);
}
return false;
}

/**
* Checks if the method signature is correct.
*
* @param element the method element to be checked.
*/
private void checkMethodSignature(Element element) {
if (!(element instanceof ExecutableElement)) {
processingEnv.getMessager().printMessage(Kind.ERROR,
"Replicate annotation should be on method.");
return;
}
final ExecutableElement executableElement = (ExecutableElement) element;
final List<? extends TypeMirror> exceptions =
executableElement.getThrownTypes();

if (exceptions.stream().map(TypeMirror::toString)
.noneMatch(TimeoutException.class.getName()::equals)) {
processingEnv.getMessager().printMessage(Kind.ERROR,
"Method with Replicate annotation should declare " +
"TimeoutException in its signature.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.ozone.annotations.RequestFeatureValidatorProcessor
org.apache.ozone.annotations.RequestFeatureValidatorProcessor
org.apache.ozone.annotations.ReplicateAnnotationProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.security.KerberosInfo;
Expand Down Expand Up @@ -71,7 +72,7 @@ SCMVersionResponseProto getVersion(SCMVersionRequestProto versionRequest)
* @throws IOException
*/
SCMHeartbeatResponseProto sendHeartbeat(SCMHeartbeatRequestProto heartbeat)
throws IOException;
throws IOException, TimeoutException;

/**
* Register Datanode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.toLayoutVersionProto;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
Expand Down Expand Up @@ -120,7 +121,7 @@ public SCMDatanodeResponse processMessage(SCMDatanodeRequest request)
default:
throw new ServiceException("Unknown command type: " + cmdType);
}
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
throw new ServiceException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

/**
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
Expand Down Expand Up @@ -63,7 +64,7 @@ public interface ScmBlockLocationProtocol extends Closeable {
@Deprecated
default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationType type, ReplicationFactor factor, String owner,
ExcludeList excludeList) throws IOException {
ExcludeList excludeList) throws IOException, TimeoutException {
return allocateBlock(size, numBlocks, ReplicationConfig
.fromProtoTypeAndFactor(type, factor), owner, excludeList);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
* This interface allows the DefaultCA to be portable and use different DB
Expand All @@ -61,7 +62,8 @@ public interface CertificateStore {
*/
@Replicate
void storeValidCertificate(BigInteger serialID,
X509Certificate certificate, NodeType role) throws IOException;
X509Certificate certificate, NodeType role)
throws IOException, TimeoutException;

void storeValidScmCertificate(BigInteger serialID,
X509Certificate certificate) throws IOException;
Expand Down Expand Up @@ -95,7 +97,7 @@ Optional<Long> revokeCertificates(List<BigInteger> serialIDs,
CRLReason reason,
Date revocationTime,
CRLApprover approver)
throws IOException;
throws IOException, TimeoutException;

/**
* Deletes an expired certificate from the store. Please note: We don't
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
Expand Down Expand Up @@ -257,7 +258,8 @@ public Future<X509CertificateHolder> requestCertificate(
default:
return null; // cannot happen, keeping checkstyle happy.
}
} catch (CertificateException | IOException | OperatorCreationException e) {
} catch (CertificateException | IOException | OperatorCreationException |
TimeoutException e) {
LOG.error("Unable to issue a certificate.", e);
xcertHolder.completeExceptionally(
new SCMSecurityException(e, UNABLE_TO_ISSUE_CERTIFICATE));
Expand All @@ -268,7 +270,7 @@ public Future<X509CertificateHolder> requestCertificate(
private X509CertificateHolder signAndStoreCertificate(LocalDate beginDate,
LocalDate endDate, PKCS10CertificationRequest csr, NodeType role)
throws IOException,
OperatorCreationException, CertificateException {
OperatorCreationException, CertificateException, TimeoutException {

lock.lock();
X509CertificateHolder xcert;
Expand Down Expand Up @@ -313,7 +315,7 @@ public Future<Optional<Long>> revokeCertificates(
store.revokeCertificates(certificates,
getCACertificate(), reason, revocationTime, crlApprover)
);
} catch (IOException ex) {
} catch (IOException | TimeoutException ex) {
LOG.error("Revoking the certificate failed.", ex.getCause());
revoked.completeExceptionally(new SCMSecurityException(ex));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
Expand All @@ -43,7 +44,7 @@ public interface BlockManager extends Closeable {
*/
AllocatedBlock allocateBlock(long size, ReplicationConfig replicationConfig,
String owner,
ExcludeList excludeList) throws IOException;
ExcludeList excludeList) throws IOException, TimeoutException;

/**
* Deletes a list of blocks in an atomic operation. Internally, SCM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.client.ContainerBlockID;
Expand Down Expand Up @@ -159,7 +160,7 @@ public void stop() throws IOException {
public AllocatedBlock allocateBlock(final long size,
ReplicationConfig replicationConfig,
String owner, ExcludeList excludeList)
throws IOException {
throws IOException, TimeoutException {
if (LOG.isTraceEnabled()) {
LOG.trace("Size : {} , replicationConfig: {}", size, replicationConfig);
}
Expand Down Expand Up @@ -194,7 +195,7 @@ public AllocatedBlock allocateBlock(final long size,
* @return AllocatedBlock
*/
private AllocatedBlock newBlock(ContainerInfo containerInfo)
throws NotLeaderException {
throws NotLeaderException, TimeoutException {
try {
final Pipeline pipeline = pipelineManager
.getPipeline(containerInfo.getPipelineID());
Expand Down Expand Up @@ -255,7 +256,7 @@ public void deleteBlocks(List<BlockGroup> keyBlocksInfoList)

try {
deletedBlockLog.addTransactions(containerBlocks);
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
throw new IOException("Skip writing the deleted blocks info to"
+ " the delLog because addTransaction fails. " + keyBlocksInfoList
.size() + "Keys skipped", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
* The DeletedBlockLog is a persisted log in SCM to keep tracking
Expand All @@ -47,7 +48,7 @@ public interface DeletedBlockLog extends Closeable {
* @throws IOException
*/
DatanodeDeletedBlockTransactions getTransactions(int blockDeletionLimit)
throws IOException;
throws IOException, TimeoutException;

/**
* Return all failed transactions in the log. A transaction is considered
Expand All @@ -69,7 +70,7 @@ List<DeletedBlocksTransaction> getFailedTransactions()
* @param txIDs - transaction ID.
*/
void incrementCount(List<Long> txIDs)
throws IOException;
throws IOException, TimeoutException;

/**
* Commits a transaction means to delete all footprints of a transaction
Expand All @@ -95,7 +96,7 @@ void commitTransactions(List<DeleteBlockTransactionResult> transactionResults,
* @throws IOException
*/
void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException;
throws IOException, TimeoutException;

/**
* Returns the total number of valid transactions. A transaction is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.LinkedHashSet;
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -152,7 +153,8 @@ public List<DeletedBlocksTransaction> getFailedTransactions()
* @throws IOException
*/
@Override
public void incrementCount(List<Long> txIDs) throws IOException {
public void incrementCount(List<Long> txIDs)
throws IOException, TimeoutException {
lock.lock();
try {
ArrayList<Long> txIDsToUpdate = new ArrayList<>();
Expand Down Expand Up @@ -264,7 +266,7 @@ public void commitTransactions(
try {
deletedBlockLogStateManager.removeTransactionsFromDB(txIDsToBeDeleted);
metrics.incrBlockDeletionTransactionCompleted(txIDsToBeDeleted.size());
} catch (IOException e) {
} catch (IOException | TimeoutException e) {
LOG.warn("Could not commit delete block transactions: "
+ txIDsToBeDeleted, e);
}
Expand Down Expand Up @@ -342,7 +344,7 @@ public void onFlush() {
*/
@Override
public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
throws IOException {
throws IOException, TimeoutException {
lock.lock();
try {
ArrayList<DeletedBlocksTransaction> txsToBeAdded = new ArrayList<>();
Expand Down Expand Up @@ -388,7 +390,7 @@ private void getTransaction(DeletedBlocksTransaction tx,

@Override
public DatanodeDeletedBlockTransactions getTransactions(
int blockDeletionLimit) throws IOException {
int blockDeletionLimit) throws IOException, TimeoutException {
lock.lock();
try {
DatanodeDeletedBlockTransactions transactions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;

/**
* DeletedBlockLogStateManager interface to
Expand All @@ -33,15 +34,15 @@
public interface DeletedBlockLogStateManager {
@Replicate
void addTransactionsToDB(ArrayList<DeletedBlocksTransaction> txs)
throws IOException;
throws IOException, TimeoutException;

@Replicate
void removeTransactionsFromDB(ArrayList<Long> txIDs)
throws IOException;
throws IOException, TimeoutException;

@Replicate
void increaseRetryCountOfTransactionInDB(ArrayList<Long> txIDs)
throws IOException;
throws IOException, TimeoutException;

TableIterator<Long,
KeyValue<Long, DeletedBlocksTransaction>> getReadOnlyIterator();
Expand Down
Loading