Skip to content

Commit

Permalink
RATIS-1942. GrpcLogAppender has `ILLEGAL TRANSITION: STARTING -> STAR…
Browse files Browse the repository at this point in the history
…TING` (#994)
  • Loading branch information
adoroszlai authored Dec 14, 2023
1 parent cedcd2a commit 2728ba8
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 32 deletions.
27 changes: 17 additions & 10 deletions ratis-common/src/main/java/org/apache/ratis/util/LifeCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,20 @@ public void setName(String name) {

/** Transition from the current state to the given state. */
public void transition(final State to) {
final State from = current.getAndSet(to);
State.validate(name, from, to);
current.updateAndGet(from -> {
State.validate(name, from, to);
return to;
});
}

/** Transition from the current state to the given state if the current state is not equal to the given state. */
public void transitionIfNotEqual(final State to) {
final State from = current.getAndSet(to);
if (from != to) {
State.validate(name, from, to);
}
current.updateAndGet(from -> {
if (from != to) {
State.validate(name, from, to);
}
return to;
});
}

/**
Expand Down Expand Up @@ -226,11 +230,14 @@ public State transitionAndGet(UnaryOperator<State> operator) {
* @return true iff the current state is equal to the specified from state.
*/
public boolean compareAndTransition(final State from, final State to) {
if (current.compareAndSet(from, to)) {
final State previous = current.getAndUpdate(state -> {
if (state != from) {
return state;
}
State.validate(name, from, to);
return true;
}
return false;
return to;
});
return previous == from;
}

/** @return the current state. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class GrpcServerProtocolClient implements Closeable {
public GrpcServerProtocolClient(RaftPeer target, int flowControlWindow,
TimeDuration requestTimeout, GrpcTlsConfig tlsConfig, boolean separateHBChannel) {
raftPeerId = target.getId();
LOG.info("Build channel for {}", raftPeerId);
LOG.info("Build channel for {}", target);
useSeparateHBChannel = separateHBChannel;
channel = buildChannel(target, flowControlWindow, tlsConfig);
blockingStub = RaftServerProtocolServiceGrpc.newBlockingStub(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,27 +250,27 @@ public PeerChanges(RaftPeer[] all, RaftPeer[] newPeers, RaftPeer[] removed) {
public static RaftGroup initRaftGroup(Collection<String> ids, Collection<String> listenerIds) {
Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * (ids.size() + listenerIds.size())).iterator();
Stream<RaftPeer> peer = ids.stream()
.map(RaftPeerId::valueOf)
.map(id -> RaftPeer.newBuilder().setId(id)
.setAddress(addresses.next())
.setAdminAddress(addresses.next())
.setClientAddress(addresses.next())
.setDataStreamAddress(addresses.next())
.build());
.map(id -> RaftPeer.newBuilder().setId(id))
.map(p -> assignAddresses(p, addresses))
.map(RaftPeer.Builder::build);
Stream<RaftPeer> listener = listenerIds.stream()
.map(RaftPeerId::valueOf)
.map(id -> RaftPeer.newBuilder().setId(id)
.setAddress(addresses.next())
.setAdminAddress(addresses.next())
.setClientAddress(addresses.next())
.setDataStreamAddress(addresses.next())
.setStartupRole(RaftProtos.RaftPeerRole.LISTENER)
.build());
.map(id -> RaftPeer.newBuilder().setId(id))
.map(p -> assignAddresses(p, addresses))
.map(p -> p.setStartupRole(RaftProtos.RaftPeerRole.LISTENER))
.map(RaftPeer.Builder::build);
final RaftPeer[] peers = Stream.concat(peer, listener).toArray(RaftPeer[]::new);

return RaftGroup.valueOf(RaftGroupId.randomId(), peers);
}

private static RaftPeer.Builder assignAddresses(RaftPeer.Builder builder, Iterator<InetSocketAddress> addresses) {
return builder
.setAddress(addresses.next())
.setAdminAddress(addresses.next())
.setClientAddress(addresses.next())
.setDataStreamAddress(addresses.next());
}

private final Supplier<File> rootTestDir = JavaUtils.memoize(
() -> new File(BaseTest.getRootTestDir(),
JavaUtils.getClassSimpleName(getClass()) + Integer.toHexString(ThreadLocalRandom.current().nextInt())));
Expand Down Expand Up @@ -468,10 +468,13 @@ public PeerChanges addNewPeers(String[] ids, boolean startNewPeer,
if (emptyPeer) {
raftGroup = RaftGroup.valueOf(group.getGroupId(), Collections.emptyList());
} else {
Iterator<InetSocketAddress> addresses = NetUtils.createLocalServerAddress(4 * ids.length).iterator();
final Collection<RaftPeer> newPeers = StreamSupport.stream(peerIds.spliterator(), false)
.map(id -> RaftPeer.newBuilder().setId(id)
.setStartupRole(startRole)
.build()).collect(Collectors.toSet());
.setStartupRole(startRole))
.map(p -> assignAddresses(p, addresses))
.map(RaftPeer.Builder::build)
.collect(Collectors.toSet());
newPeers.addAll(group.getPeers());
raftGroup = RaftGroup.valueOf(group.getGroupId(), newPeers);
}
Expand Down
53 changes: 49 additions & 4 deletions ratis-test/src/test/java/org/apache/ratis/util/TestLifeCycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@
*/
package org.apache.ratis.util;

import org.junit.Assert;
import org.apache.ratis.util.function.TriConsumer;
import org.junit.Test;

import static org.apache.ratis.util.LifeCycle.State.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.*;

Expand All @@ -31,7 +34,7 @@ public class TestLifeCycle {
* while this test uses successors.
*/
@Test(timeout = 1000)
public void testIsValid() throws Exception {
public void testIsValid() {
final Map<LifeCycle.State, List<LifeCycle.State>> successors
= new EnumMap<>(LifeCycle.State.class);
put(NEW, successors, STARTING, CLOSED);
Expand All @@ -44,10 +47,52 @@ public void testIsValid() throws Exception {
put(CLOSED, successors);

final List<LifeCycle.State> states = Arrays.asList(LifeCycle.State.values());
states.stream().forEach(
states.forEach(
from -> states.forEach(
to -> Assert.assertEquals(from + " -> " + to,
to -> assertEquals(from + " -> " + to,
successors.get(from).contains(to),
isValid(from, to))));
}

@Test
public void validTransitions() {
testValidTransition((from, subject, to) -> assertTrue(subject.compareAndTransition(from, to)));
testValidTransition((from, subject, to) -> subject.transition(to));
testValidTransition((from, subject, to) -> assertEquals(to, subject.transitionAndGet(any -> to)));
testValidTransition((from, subject, to) -> subject.transitionIfNotEqual(to));
testValidTransition((from, subject, to) -> assertTrue(subject.transitionIfValid(to)));
}

private static void testValidTransition(TriConsumer<LifeCycle.State, LifeCycle, LifeCycle.State> op) {
LifeCycle subject = new LifeCycle("subject");
for (LifeCycle.State to : new LifeCycle.State[] { STARTING, RUNNING, PAUSING, PAUSED, CLOSING, CLOSED }) {
LifeCycle.State from = subject.getCurrentState();
op.accept(from, subject, to);
assertEquals(to, subject.getCurrentState());
}
}

@Test
public void invalidTransitions() {
testInvalidTransition((from, subject, to) -> subject.compareAndTransition(from, to), true);
testInvalidTransition((from, subject, to) -> subject.transition(to), true);
testInvalidTransition((from, subject, to) -> subject.transitionIfNotEqual(to), true);
testInvalidTransition((from, subject, to) -> assertFalse(subject.transitionIfValid(to)), false);
testInvalidTransition((from, subject, to) -> subject.transitionAndGet(any -> to), true);
}

private static void testInvalidTransition(TriConsumer<LifeCycle.State, LifeCycle, LifeCycle.State> op, boolean shouldThrow) {
LifeCycle subject = new LifeCycle("subject");
for (LifeCycle.State to : new LifeCycle.State[] { RUNNING, EXCEPTION, CLOSING }) {
LifeCycle.State from = subject.getCurrentState();
try {
op.accept(from, subject, to);
assertFalse(shouldThrow);
} catch (IllegalStateException e) {
assertTrue(shouldThrow);
assertEquals("Should be in original state", from, subject.getCurrentState());
}
}
}

}

0 comments on commit 2728ba8

Please sign in to comment.