Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds FateOperation type #5218

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.accumulo.core.data.constraints.Constraint;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void create(String namespace)
NEW_NAMESPACE_NAME.validate(namespace);

try {
doNamespaceFateOperation(FateOperation.NAMESPACE_CREATE,
doNamespaceFateOperation(TFateOperation.NAMESPACE_CREATE,
Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8))), Collections.emptyMap(),
namespace);
} catch (NamespaceNotFoundException e) {
Expand Down Expand Up @@ -156,7 +156,7 @@ public void delete(String namespace) throws AccumuloException, AccumuloSecurityE
Map<String,String> opts = new HashMap<>();

try {
doNamespaceFateOperation(FateOperation.NAMESPACE_DELETE, args, opts, namespace);
doNamespaceFateOperation(TFateOperation.NAMESPACE_DELETE, args, opts, namespace);
} catch (NamespaceExistsException e) {
// should not happen
throw new AssertionError(e);
Expand All @@ -174,7 +174,7 @@ public void rename(String oldNamespaceName, String newNamespaceName)
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldNamespaceName.getBytes(UTF_8)),
ByteBuffer.wrap(newNamespaceName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
doNamespaceFateOperation(FateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
doNamespaceFateOperation(TFateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
}

@Override
Expand Down Expand Up @@ -385,7 +385,7 @@ public int addConstraint(String namespace, String constraintClassName)
return super.addConstraint(namespace, constraintClassName);
}

private String doNamespaceFateOperation(FateOperation op, List<ByteBuffer> args,
private String doNamespaceFateOperation(TFateOperation op, List<ByteBuffer> args,
Map<String,String> opts, String namespace) throws AccumuloSecurityException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
// caller should validate the namespace name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
Expand Down Expand Up @@ -274,7 +274,7 @@ public void create(String tableName, NewTableConfiguration ntc)
Map<String,String> opts = ntc.getProperties();

try {
doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_CREATE, args,
doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_CREATE, args,
opts);
} catch (TableNotFoundException e) {
// should not happen
Expand Down Expand Up @@ -304,7 +304,7 @@ private TFateId beginFateOperation(TFateInstanceType type)

// This method is for retrying in the case of network failures;
// anything else it passes to the caller to deal with
private void executeFateOperation(TFateId opid, FateOperation op, List<ByteBuffer> args,
private void executeFateOperation(TFateId opid, TFateOperation op, List<ByteBuffer> args,
Map<String,String> opts, boolean autoCleanUp)
throws ThriftSecurityException, TException, ThriftTableOperationException {
while (true) {
Expand Down Expand Up @@ -372,7 +372,7 @@ public String doBulkFateOperation(List<ByteBuffer> args, String tableName)
EXISTING_TABLE_NAME.validate(tableName);

try {
return doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(),
return doFateOperation(TFateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(),
tableName);
} catch (TableExistsException | NamespaceExistsException e) {
// should not happen
Expand Down Expand Up @@ -427,14 +427,14 @@ private <T> T handleFateOperation(FateOperationExecutor<T> executor, String tabl
}
}

String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String tableOrNamespaceName)
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
return doFateOperation(op, args, opts, tableOrNamespaceName, true);
}

String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String tableOrNamespaceName, boolean wait)
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
Expand Down Expand Up @@ -521,7 +521,7 @@ public void addSplits(String tableName, SortedSet<Text> splits)
return handleFateOperation(() -> {
TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
TFateId opid = beginFateOperation(t);
executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false);
executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args, Map.of(), false);
return new Pair<>(opid, splitsForTablet.getValue());
}, tableName);
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
Expand Down Expand Up @@ -645,8 +645,8 @@ public void merge(String tableName, Text start, Text end)
end == null ? EMPTY : TextUtil.getByteBuffer(end));
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_MERGE, args,
opts);
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_MERGE,
args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand All @@ -665,7 +665,7 @@ public void deleteRows(String tableName, Text start, Text end)
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_DELETE_RANGE, args, opts);
TFateOperation.TABLE_DELETE_RANGE, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand Down Expand Up @@ -760,7 +760,7 @@ public void delete(String tableName)
List<ByteBuffer> args = List.of(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE,
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_DELETE,
args, opts);
} catch (TableExistsException e) {
// should not happen
Expand Down Expand Up @@ -800,7 +800,7 @@ public void clone(String srcTableName, String newTableName, CloneConfiguration c

prependPropertiesToExclude(opts, config.getPropertiesToExclude());

doTableFateOperation(newTableName, AccumuloException.class, FateOperation.TABLE_CLONE, args,
doTableFateOperation(newTableName, AccumuloException.class, TFateOperation.TABLE_CLONE, args,
opts);
}

Expand All @@ -813,7 +813,7 @@ public void rename(String oldTableName, String newTableName) throws AccumuloSecu
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)),
ByteBuffer.wrap(newTableName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
doTableFateOperation(oldTableName, TableNotFoundException.class, FateOperation.TABLE_RENAME,
doTableFateOperation(oldTableName, TableNotFoundException.class, TFateOperation.TABLE_RENAME,
args, opts);
}

Expand Down Expand Up @@ -892,7 +892,7 @@ public void compact(String tableName, CompactionConfig config)
Map<String,String> opts = new HashMap<>();

try {
doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
doFateOperation(TFateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
} catch (TableExistsException | NamespaceExistsException e) {
// should not happen
throw new AssertionError(e);
Expand All @@ -912,7 +912,7 @@ public void cancelCompaction(String tableName)

try {
doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_CANCEL_COMPACT, args, opts);
TFateOperation.TABLE_CANCEL_COMPACT, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand Down Expand Up @@ -1455,17 +1455,17 @@ private void changeTableState(String tableName, boolean wait, TableState newStat

TableId tableId = context.getTableId(tableName);

FateOperation op = null;
TFateOperation op = null;
switch (newState) {
case OFFLINE:
op = FateOperation.TABLE_OFFLINE;
op = TFateOperation.TABLE_OFFLINE;
if (tableName.equals(AccumuloTable.METADATA.tableName())
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
throw new AccumuloException("Cannot set table to offline state");
}
break;
case ONLINE:
op = FateOperation.TABLE_ONLINE;
op = TFateOperation.TABLE_ONLINE;
if (tableName.equals(AccumuloTable.METADATA.tableName())
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
// Don't submit a Fate operation for this, these tables can only be online.
Expand Down Expand Up @@ -1694,7 +1694,7 @@ public void importTable(String tableName, Set<String> importDirs, ImportConfigur
checkedImportDirs.stream().map(s -> s.getBytes(UTF_8)).map(ByteBuffer::wrap).forEach(args::add);

try {
doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_IMPORT, args,
doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_IMPORT, args,
Collections.emptyMap());
} catch (TableNotFoundException e) {
// should not happen
Expand Down Expand Up @@ -1727,7 +1727,7 @@ public void exportTable(String tableName, String exportDir)
Map<String,String> opts = Collections.emptyMap();

try {
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_EXPORT,
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_EXPORT,
args, opts);
} catch (TableExistsException e) {
// should not happen
Expand Down Expand Up @@ -1782,7 +1782,7 @@ public int addConstraint(String tableName, String constraintClassName)
}

private void doTableFateOperation(String tableOrNamespaceName,
Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
Class<? extends Exception> namespaceNotFoundExceptionClass, TFateOperation op,
List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException,
AccumuloException, TableExistsException, TableNotFoundException {
try {
Expand Down Expand Up @@ -2212,7 +2212,7 @@ public void setTabletAvailability(String tableName, Range range, TabletAvailabil

try {
doTableFateOperation(tableName, AccumuloException.class,
FateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
TFateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
} catch (TableNotFoundException | TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ public static class TransactionStatus {
private final FateId fateId;
private final FateInstanceType instanceType;
private final TStatus status;
private final String txName;
private final Fate.FateOperation txName;
private final List<String> hlocks;
private final List<String> wlocks;
private final String top;
private final long timeCreated;

private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status,
String txName, List<String> hlocks, List<String> wlocks, String top, Long timeCreated) {
Fate.FateOperation txName, List<String> hlocks, List<String> wlocks, String top,
Long timeCreated) {

this.fateId = fateId;
this.instanceType = instanceType;
Expand Down Expand Up @@ -115,7 +116,7 @@ public TStatus getStatus() {
/**
* @return The name of the transaction running.
*/
public String getTxName() {
public Fate.FateOperation getTxName() {
return txName;
}

Expand Down Expand Up @@ -361,7 +362,9 @@ public static <T> FateStatus getTransactionStatus(
fateIds.forEach(fateId -> {

ReadOnlyFateTxStore<T> txStore = store.read(fateId);
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
// tx name will not be set if the tx is not seeded with work (it is NEW)
Fate.FateOperation txName = txStore.getTransactionInfo(Fate.TxInfo.TX_NAME) == null ? null
: ((Fate.FateOperation) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME));

List<String> hlocks = heldLocks.remove(fateId);

Expand Down
55 changes: 52 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
Expand Down Expand Up @@ -88,6 +89,53 @@ public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
}

public enum FateOperation {
COMMIT_COMPACTION(null),
NAMESPACE_CREATE(TFateOperation.NAMESPACE_CREATE),
NAMESPACE_DELETE(TFateOperation.NAMESPACE_DELETE),
NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
SHUTDOWN_TSERVER(null),
SYSTEM_SPLIT(null),
TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
TABLE_CLONE(TFateOperation.TABLE_CLONE),
TABLE_COMPACT(TFateOperation.TABLE_COMPACT),
TABLE_CREATE(TFateOperation.TABLE_CREATE),
TABLE_DELETE(TFateOperation.TABLE_DELETE),
TABLE_DELETE_RANGE(TFateOperation.TABLE_DELETE_RANGE),
TABLE_EXPORT(TFateOperation.TABLE_EXPORT),
TABLE_IMPORT(TFateOperation.TABLE_IMPORT),
TABLE_MERGE(TFateOperation.TABLE_MERGE),
TABLE_OFFLINE(TFateOperation.TABLE_OFFLINE),
TABLE_ONLINE(TFateOperation.TABLE_ONLINE),
TABLE_RENAME(TFateOperation.TABLE_RENAME),
TABLE_SPLIT(TFateOperation.TABLE_SPLIT),
TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY);

private final TFateOperation top;
private static final EnumSet<FateOperation> nonThriftOps =
EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT);

FateOperation(TFateOperation top) {
this.top = top;
}

public static FateOperation fromThrift(TFateOperation top) {
return FateOperation.valueOf(top.name());
kevinrr888 marked this conversation as resolved.
Show resolved Hide resolved
}

public static EnumSet<FateOperation> getNonThriftOps() {
return nonThriftOps;
}

public TFateOperation toThrift() {
if (top == null) {
throw new IllegalStateException(this + " does not have an equivalent thrift form");
}
return top;
}
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
Expand Down Expand Up @@ -437,14 +485,15 @@ public FateId startTransaction() {
return store.create();
}

public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
public void seedTransaction(FateOperation txName, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
store.seedTransaction(txName, fateKey, repo, autoCleanUp);
}

// start work in the transaction.. it is safe to call this
// multiple times for a transaction... but it will only seed once
public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp,
String goalMessage) {
public void seedTransaction(FateOperation txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp, String goalMessage) {
log.info("Seeding {} {}", fateId, goalMessage);
store.seedTransaction(txName, fateId, repo, autoCleanUp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
FateId create();

/**
* Seeds a transaction with the given repo if it does not exists. A fateId will be derived from
* the fateKey. If seeded, sets the following data for the fateId in the store.
* Seeds a transaction with the given repo if it does not exist. A fateId will be derived from the
* fateKey. If seeded, sets the following data for the fateId in the store.
*
* <ul>
* <li>Set the tx name</li>
Expand All @@ -66,7 +66,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
* empty optional otherwise. If there was a failure this could return an empty optional
* when it actually succeeded.
*/
Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp);

/**
Expand All @@ -84,7 +84,8 @@ Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
* failures. When there are no failures returns true if seeded and false otherwise. If
* there was a failure this could return false when it actually succeeded.
*/
boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp);
boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp);

/**
* An interface that allows read/write access to the data related to a single fate operation.
Expand Down
Loading
Loading