Skip to content

Commit

Permalink
Revert serialization changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinrr888 committed Jan 2, 2025
1 parent 8aa8d77 commit ee7056a
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
*/
package org.apache.accumulo.core.fate;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collections;
Expand All @@ -42,6 +45,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.util.CountDownTimer;
import org.apache.accumulo.core.util.Retry;
Expand Down Expand Up @@ -422,6 +426,31 @@ protected void seededTx() {
unreservedRunnableCount.increment();
}

protected static byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
byte[] sera = serialize(so);
byte[] data = new byte[sera.length + 2];
System.arraycopy(sera, 0, data, 2, sera.length);
data[0] = 'O';
data[1] = ' ';
return data;
}
}

protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
return (Serializable) deserialize(sera);
} else if (data[0] == 'S') {
return new String(data, 2, data.length - 2, UTF_8);
} else {
throw new IllegalStateException("Bad node data " + txInfo);
}
}

/**
* this is a temporary method used to create a dummy lock when using a FateStore outside the
* context of a Manager (one example is testing) so reservations can still be made.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,10 @@ private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String
int maxAttempts = 5;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
var mutator = mutatorFactory.get();
mutator = mutator.putName(serialize(txName)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
mutator =
mutator.putName(serializeTxInfo(txName)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
if (autoCleanUp) {
mutator = mutator.putAutoClean(serialize(autoCleanUp));
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
}
var status = mutator.tryMutate();
if (status == FateMutator.Status.ACCEPTED) {
Expand Down Expand Up @@ -432,7 +433,7 @@ public Serializable getTransactionInfo(TxInfo txInfo) {
}
scanner.fetchColumn(cq.getColumnFamily(), cq.getColumnQualifier());

return scanner.stream().map(e -> (Serializable) deserialize(e.getValue().get())).findFirst()
return scanner.stream().map(e -> deserializeTxInfo(txInfo, e.getValue().get())).findFirst()
.orElse(null);
} catch (TableNotFoundException e) {
throw new IllegalStateException(tableName + " not found!", e);
Expand Down Expand Up @@ -487,7 +488,7 @@ public void setStatus(TStatus status) {
public void setTransactionInfo(TxInfo txInfo, Serializable so) {
verifyReservedAndNotDeleted(true);

final byte[] serialized = serialize(so);
final byte[] serialized = serializeTxInfo(so);

newMutator(fateId).putTxInfo(txInfo, serialized).mutate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ private Map<TxInfo,Serializable> deserializeTxInfo(DataInputBuffer buffer) throw
while (length != 0) {
Preconditions.checkArgument(length >= 0);
TxInfo type = TxInfo.values()[buffer.readInt()];
txInfo.put(type, (Serializable) deserialize(buffer.readNBytes(length - 1)));
txInfo.put(type, AbstractFateStore.deserializeTxInfo(type, buffer.readNBytes(length - 1)));

// if we have reached the end of the buffer (= reached the end of the tx info data)
if (buffer.getPosition() == buffer.getLength()) {
Expand Down Expand Up @@ -747,7 +747,7 @@ private byte[] serialize() {
// tx info
if (!txInfo.isEmpty()) {
for (var elt : txInfo.entrySet()) {
byte[] serTxInfo = AbstractFateStore.serialize(elt.getValue());
byte[] serTxInfo = serializeTxInfo(elt.getValue());
dos.writeInt(1 + serTxInfo.length);
dos.writeInt(elt.getKey().ordinal());
dos.write(serTxInfo);
Expand Down

0 comments on commit ee7056a

Please sign in to comment.