Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds FateOperation type #5218

Merged
merged 5 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import org.apache.accumulo.core.data.constraints.Constraint;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil;
Expand Down Expand Up @@ -126,7 +126,7 @@ public void create(String namespace)
NEW_NAMESPACE_NAME.validate(namespace);

try {
doNamespaceFateOperation(FateOperation.NAMESPACE_CREATE,
doNamespaceFateOperation(TFateOperation.NAMESPACE_CREATE,
Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8))), Collections.emptyMap(),
namespace);
} catch (NamespaceNotFoundException e) {
Expand Down Expand Up @@ -156,7 +156,7 @@ public void delete(String namespace) throws AccumuloException, AccumuloSecurityE
Map<String,String> opts = new HashMap<>();

try {
doNamespaceFateOperation(FateOperation.NAMESPACE_DELETE, args, opts, namespace);
doNamespaceFateOperation(TFateOperation.NAMESPACE_DELETE, args, opts, namespace);
} catch (NamespaceExistsException e) {
// should not happen
throw new AssertionError(e);
Expand All @@ -174,7 +174,7 @@ public void rename(String oldNamespaceName, String newNamespaceName)
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldNamespaceName.getBytes(UTF_8)),
ByteBuffer.wrap(newNamespaceName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
doNamespaceFateOperation(FateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
doNamespaceFateOperation(TFateOperation.NAMESPACE_RENAME, args, opts, oldNamespaceName);
}

@Override
Expand Down Expand Up @@ -385,7 +385,7 @@ public int addConstraint(String namespace, String constraintClassName)
return super.addConstraint(namespace, constraintClassName);
}

private String doNamespaceFateOperation(FateOperation op, List<ByteBuffer> args,
private String doNamespaceFateOperation(TFateOperation op, List<ByteBuffer> args,
Map<String,String> opts, String namespace) throws AccumuloSecurityException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
// caller should validate the namespace name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
Expand Down Expand Up @@ -274,7 +274,7 @@ public void create(String tableName, NewTableConfiguration ntc)
Map<String,String> opts = ntc.getProperties();

try {
doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_CREATE, args,
doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_CREATE, args,
opts);
} catch (TableNotFoundException e) {
// should not happen
Expand Down Expand Up @@ -304,7 +304,7 @@ private TFateId beginFateOperation(TFateInstanceType type)

// This method is for retrying in the case of network failures;
// anything else it passes to the caller to deal with
private void executeFateOperation(TFateId opid, FateOperation op, List<ByteBuffer> args,
private void executeFateOperation(TFateId opid, TFateOperation op, List<ByteBuffer> args,
Map<String,String> opts, boolean autoCleanUp)
throws ThriftSecurityException, TException, ThriftTableOperationException {
while (true) {
Expand Down Expand Up @@ -372,7 +372,7 @@ public String doBulkFateOperation(List<ByteBuffer> args, String tableName)
EXISTING_TABLE_NAME.validate(tableName);

try {
return doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(),
return doFateOperation(TFateOperation.TABLE_BULK_IMPORT2, args, Collections.emptyMap(),
tableName);
} catch (TableExistsException | NamespaceExistsException e) {
// should not happen
Expand Down Expand Up @@ -427,14 +427,14 @@ private <T> T handleFateOperation(FateOperationExecutor<T> executor, String tabl
}
}

String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String tableOrNamespaceName)
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
return doFateOperation(op, args, opts, tableOrNamespaceName, true);
}

String doFateOperation(FateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String doFateOperation(TFateOperation op, List<ByteBuffer> args, Map<String,String> opts,
String tableOrNamespaceName, boolean wait)
throws AccumuloSecurityException, TableExistsException, TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
Expand Down Expand Up @@ -521,7 +521,7 @@ public void addSplits(String tableName, SortedSet<Text> splits)
return handleFateOperation(() -> {
TFateInstanceType t = FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
TFateId opid = beginFateOperation(t);
executeFateOperation(opid, FateOperation.TABLE_SPLIT, args, Map.of(), false);
executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args, Map.of(), false);
return new Pair<>(opid, splitsForTablet.getValue());
}, tableName);
} catch (TableExistsException | NamespaceExistsException | NamespaceNotFoundException
Expand Down Expand Up @@ -645,8 +645,8 @@ public void merge(String tableName, Text start, Text end)
end == null ? EMPTY : TextUtil.getByteBuffer(end));
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_MERGE, args,
opts);
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_MERGE,
args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand All @@ -665,7 +665,7 @@ public void deleteRows(String tableName, Text start, Text end)
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_DELETE_RANGE, args, opts);
TFateOperation.TABLE_DELETE_RANGE, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand Down Expand Up @@ -760,7 +760,7 @@ public void delete(String tableName)
List<ByteBuffer> args = List.of(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_DELETE,
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_DELETE,
args, opts);
} catch (TableExistsException e) {
// should not happen
Expand Down Expand Up @@ -800,7 +800,7 @@ public void clone(String srcTableName, String newTableName, CloneConfiguration c

prependPropertiesToExclude(opts, config.getPropertiesToExclude());

doTableFateOperation(newTableName, AccumuloException.class, FateOperation.TABLE_CLONE, args,
doTableFateOperation(newTableName, AccumuloException.class, TFateOperation.TABLE_CLONE, args,
opts);
}

Expand All @@ -813,7 +813,7 @@ public void rename(String oldTableName, String newTableName) throws AccumuloSecu
List<ByteBuffer> args = Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)),
ByteBuffer.wrap(newTableName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
doTableFateOperation(oldTableName, TableNotFoundException.class, FateOperation.TABLE_RENAME,
doTableFateOperation(oldTableName, TableNotFoundException.class, TFateOperation.TABLE_RENAME,
args, opts);
}

Expand Down Expand Up @@ -892,7 +892,7 @@ public void compact(String tableName, CompactionConfig config)
Map<String,String> opts = new HashMap<>();

try {
doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
doFateOperation(TFateOperation.TABLE_COMPACT, args, opts, tableName, config.getWait());
} catch (TableExistsException | NamespaceExistsException e) {
// should not happen
throw new AssertionError(e);
Expand All @@ -912,7 +912,7 @@ public void cancelCompaction(String tableName)

try {
doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_CANCEL_COMPACT, args, opts);
TFateOperation.TABLE_CANCEL_COMPACT, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand Down Expand Up @@ -1455,17 +1455,17 @@ private void changeTableState(String tableName, boolean wait, TableState newStat

TableId tableId = context.getTableId(tableName);

FateOperation op = null;
TFateOperation op = null;
switch (newState) {
case OFFLINE:
op = FateOperation.TABLE_OFFLINE;
op = TFateOperation.TABLE_OFFLINE;
if (tableName.equals(AccumuloTable.METADATA.tableName())
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
throw new AccumuloException("Cannot set table to offline state");
}
break;
case ONLINE:
op = FateOperation.TABLE_ONLINE;
op = TFateOperation.TABLE_ONLINE;
if (tableName.equals(AccumuloTable.METADATA.tableName())
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
// Don't submit a Fate operation for this, these tables can only be online.
Expand Down Expand Up @@ -1694,7 +1694,7 @@ public void importTable(String tableName, Set<String> importDirs, ImportConfigur
checkedImportDirs.stream().map(s -> s.getBytes(UTF_8)).map(ByteBuffer::wrap).forEach(args::add);

try {
doTableFateOperation(tableName, AccumuloException.class, FateOperation.TABLE_IMPORT, args,
doTableFateOperation(tableName, AccumuloException.class, TFateOperation.TABLE_IMPORT, args,
Collections.emptyMap());
} catch (TableNotFoundException e) {
// should not happen
Expand Down Expand Up @@ -1727,7 +1727,7 @@ public void exportTable(String tableName, String exportDir)
Map<String,String> opts = Collections.emptyMap();

try {
doTableFateOperation(tableName, TableNotFoundException.class, FateOperation.TABLE_EXPORT,
doTableFateOperation(tableName, TableNotFoundException.class, TFateOperation.TABLE_EXPORT,
args, opts);
} catch (TableExistsException e) {
// should not happen
Expand Down Expand Up @@ -1782,7 +1782,7 @@ public int addConstraint(String tableName, String constraintClassName)
}

private void doTableFateOperation(String tableOrNamespaceName,
Class<? extends Exception> namespaceNotFoundExceptionClass, FateOperation op,
Class<? extends Exception> namespaceNotFoundExceptionClass, TFateOperation op,
List<ByteBuffer> args, Map<String,String> opts) throws AccumuloSecurityException,
AccumuloException, TableExistsException, TableNotFoundException {
try {
Expand Down Expand Up @@ -2212,7 +2212,7 @@ public void setTabletAvailability(String tableName, Range range, TabletAvailabil

try {
doTableFateOperation(tableName, AccumuloException.class,
FateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
TFateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
} catch (TableNotFoundException | TableExistsException e) {
// should not happen
throw new AssertionError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@
*/
package org.apache.accumulo.core.fate;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -45,7 +42,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.Retry;
Expand Down Expand Up @@ -426,31 +422,6 @@ protected void seededTx() {
unreservedRunnableCount.increment();
}

protected static byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
}

protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
}

/**
* this is a temporary method used to create a dummy lock when using a FateStore outside the
* context of a Manager (one example is testing) so reservations can still be made.
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,15 @@ public static class TransactionStatus {
private final FateId fateId;
private final FateInstanceType instanceType;
private final TStatus status;
private final String txName;
private final Fate.FateOperation txName;
private final List<String> hlocks;
private final List<String> wlocks;
private final String top;
private final long timeCreated;

private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status,
String txName, List<String> hlocks, List<String> wlocks, String top, Long timeCreated) {
Fate.FateOperation txName, List<String> hlocks, List<String> wlocks, String top,
Long timeCreated) {

this.fateId = fateId;
this.instanceType = instanceType;
Expand Down Expand Up @@ -115,7 +116,7 @@ public TStatus getStatus() {
/**
* @return The name of the transaction running.
*/
public String getTxName() {
public Fate.FateOperation getTxName() {
return txName;
}

Expand Down Expand Up @@ -361,7 +362,9 @@ public static <T> FateStatus getTransactionStatus(
fateIds.forEach(fateId -> {

ReadOnlyFateTxStore<T> txStore = store.read(fateId);
String txName = (String) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
// tx name will not be set if the tx is not seeded with work (it is NEW)
Fate.FateOperation txName = txStore.getTransactionInfo(Fate.TxInfo.TX_NAME) == null ? null
: ((Fate.FateOperation) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME));

List<String> hlocks = heldLocks.remove(fateId);

Expand Down
36 changes: 33 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
Expand Down Expand Up @@ -88,6 +89,34 @@ public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
}

public enum FateOperation {
COMMIT_COMPACTION,
NAMESPACE_CREATE,
NAMESPACE_DELETE,
NAMESPACE_RENAME,
SHUTDOWN_TSERVER,
SYSTEM_SPLIT,
TABLE_BULK_IMPORT2,
TABLE_CANCEL_COMPACT,
TABLE_CLONE,
TABLE_COMPACT,
TABLE_CREATE,
TABLE_DELETE,
TABLE_DELETE_RANGE,
TABLE_EXPORT,
TABLE_IMPORT,
TABLE_MERGE,
TABLE_OFFLINE,
TABLE_ONLINE,
TABLE_RENAME,
TABLE_SPLIT,
TABLE_TABLET_AVAILABILITY;

public static FateOperation fromThrift(TFateOperation tFateOp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you have a test for this or not, but it would be good to add one that verifies that every TFateOperation has a FateOperation value and vice-versa. This would help with identifying additions / deletions in future releases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not exactly the same. FateOperation has a few more types than TFateOperation. TFateOperation only includes the types used in thrift. Some operations are performed outside of thrift (for example see CompactionCoordinator.java).

Could probably still write some sort of test for this PR though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might look a little redundant, but I wonder if FateOperation should be FateOperation(TFateOperation) with null being an acceptable value. You could still write a test to ensure that all TFateOperations have an associated FateOperation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree a test would be good here, you could write something that iterates over TFateOperation values and verifies there's a matching value in FateOperation as @dlmarion mentioned. Because FateOperation is a super set this would catch any future additions to TFateOperation that were not properly added to FateOperation without having to change the test. There's probably not a good way to test the other way though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestions @dlmarion and @cshannon. I will add a test for this on Monday.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if FateOperation should be FateOperation(TFateOperation) with null being an acceptable value

I think this is a good way to connect the two more explicitly. I added this and the test in 30e4753

return FateOperation.valueOf(tFateOp.name());
}
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
Expand Down Expand Up @@ -437,14 +466,15 @@ public FateId startTransaction() {
return store.create();
}

public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
public void seedTransaction(FateOperation txName, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
store.seedTransaction(txName, fateKey, repo, autoCleanUp);
}

// start work in the transaction.. it is safe to call this
// multiple times for a transaction... but it will only seed once
public void seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean autoCleanUp,
String goalMessage) {
public void seedTransaction(FateOperation txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp, String goalMessage) {
log.info("Seeding {} {}", fateId, goalMessage);
store.seedTransaction(txName, fateId, repo, autoCleanUp);
}
Expand Down
Loading
Loading