Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,10 @@ public Connection connect(

if (conn == null) {

this.bgStatus = this.pluginService.getStatus(BlueGreenStatus.class, this.bgdId);
BlueGreenStatus latestStatus = this.pluginService.getStatus(BlueGreenStatus.class, this.bgdId);
if (latestStatus != null) {
this.bgStatus = latestStatus;
}

routing = this.bgStatus.getConnectRouting().stream()
.filter(r -> r.isMatch(hostSpec, hostRole))
Expand Down Expand Up @@ -273,7 +276,10 @@ public <T, E extends Exception> T execute(

if (!result.isPresent()) {

this.bgStatus = this.pluginService.getStatus(BlueGreenStatus.class, this.bgdId);
BlueGreenStatus latestStatus = this.pluginService.getStatus(BlueGreenStatus.class, this.bgdId);
if (latestStatus != null) {
this.bgStatus = latestStatus;
}

routing = this.bgStatus.getExecuteRouting().stream()
.filter(r -> r.isMatch(currentHostSpec, hostRole))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,14 @@ protected Properties getMonitoringProperties() {
}

protected void prepareStatus(
final BlueGreenRole role,
final BlueGreenInterimStatus interimStatus) {
final @NonNull BlueGreenRole role,
final @NonNull BlueGreenInterimStatus interimStatus) {

this.processStatusLock.lock();
try {

// Detect changes
int statusHash = interimStatus == null ? 0 : interimStatus.getCustomHashCode();
int statusHash = interimStatus.getCustomHashCode();
int contextHash = this.getContextHash();
if (this.interimStatusHashes[role.getValue()] == statusHash
&& this.lastContextHash == contextHash) {
Expand Down Expand Up @@ -240,10 +240,8 @@ protected void prepareStatus(
}

protected void updatePhase(BlueGreenRole role, BlueGreenInterimStatus interimStatus) {

BlueGreenPhase latestInterimPhase = this.interimStatuses[role.getValue()] == null
? BlueGreenPhase.NOT_CREATED
: this.interimStatuses[role.getValue()].blueGreenPhase;
BlueGreenInterimStatus roleStatus = this.interimStatuses[role.getValue()];
BlueGreenPhase latestInterimPhase = roleStatus == null ? BlueGreenPhase.NOT_CREATED : roleStatus.blueGreenPhase;

if (latestInterimPhase != null
&& interimStatus.blueGreenPhase != null
Expand Down Expand Up @@ -292,10 +290,10 @@ protected void updateStatusCache() {
protected void updateCorrespondingNodes() {
this.correspondingNodes.clear();

if (this.interimStatuses[BlueGreenRole.SOURCE.getValue()] != null
&& !Utils.isNullOrEmpty(this.interimStatuses[BlueGreenRole.SOURCE.getValue()].startTopology)
&& this.interimStatuses[BlueGreenRole.TARGET.getValue()] != null
&& !Utils.isNullOrEmpty(this.interimStatuses[BlueGreenRole.TARGET.getValue()].startTopology)) {
BlueGreenInterimStatus sourceInterimStatus = this.interimStatuses[BlueGreenRole.SOURCE.getValue()];
BlueGreenInterimStatus targetInterimStatus = this.interimStatuses[BlueGreenRole.TARGET.getValue()];
if (sourceInterimStatus != null && !Utils.isNullOrEmpty(sourceInterimStatus.startTopology)
&& targetInterimStatus != null && !Utils.isNullOrEmpty(targetInterimStatus.startTopology)) {

HostSpec blueWriterHostSpec = getWriterHost(BlueGreenRole.SOURCE);
HostSpec greenWriterHostSpec = getWriterHost(BlueGreenRole.TARGET);
Expand All @@ -308,29 +306,30 @@ protected void updateCorrespondingNodes() {
blueWriterHostSpec.getHost(), Pair.create(blueWriterHostSpec, greenWriterHostSpec));
}

if (!sortedGreenReaderHostSpecs.isEmpty()) {
int greenIndex = 0;
for (HostSpec blueHostSpec : sortedBlueReaderHostSpecs) {
this.correspondingNodes.put(
blueHostSpec.getHost(), Pair.create(blueHostSpec, sortedGreenReaderHostSpecs.get(greenIndex++)));
greenIndex %= sortedGreenReaderHostSpecs.size();
}
} else {
// There's no green reader nodes. We have to map all blue reader nodes to a green writer.
for (HostSpec blueHostSpec : sortedBlueReaderHostSpecs) {
this.correspondingNodes.put(
blueHostSpec.getHost(), Pair.create(blueHostSpec, greenWriterHostSpec));
if (!Utils.isNullOrEmpty(sortedBlueReaderHostSpecs)) {
// Map blue readers to green nodes.
if (!Utils.isNullOrEmpty(sortedGreenReaderHostSpecs)) {
int greenIndex = 0;
for (HostSpec blueHostSpec : sortedBlueReaderHostSpecs) {
this.correspondingNodes.put(
blueHostSpec.getHost(), Pair.create(blueHostSpec, sortedGreenReaderHostSpecs.get(greenIndex++)));
greenIndex %= sortedGreenReaderHostSpecs.size();
}
} else {
// There's no green reader nodes. We have to map all blue reader nodes to the green writer.
for (HostSpec blueHostSpec : sortedBlueReaderHostSpecs) {
this.correspondingNodes.put(
blueHostSpec.getHost(), Pair.create(blueHostSpec, greenWriterHostSpec));
}
}
}
}

if (this.interimStatuses[BlueGreenRole.SOURCE.getValue()] != null
&& !Utils.isNullOrEmpty(this.interimStatuses[BlueGreenRole.SOURCE.getValue()].hostNames)
&& this.interimStatuses[BlueGreenRole.TARGET.getValue()] != null
&& !Utils.isNullOrEmpty(this.interimStatuses[BlueGreenRole.TARGET.getValue()].hostNames)) {
if (sourceInterimStatus != null && !Utils.isNullOrEmpty(sourceInterimStatus.hostNames)
&& targetInterimStatus != null && !Utils.isNullOrEmpty(targetInterimStatus.hostNames)) {

Set<String> blueHosts = this.interimStatuses[BlueGreenRole.SOURCE.getValue()].hostNames;
Set<String> greenHosts = this.interimStatuses[BlueGreenRole.TARGET.getValue()].hostNames;
Set<String> blueHosts = sourceInterimStatus.hostNames;
Set<String> greenHosts = targetInterimStatus.hostNames;

// Find corresponding cluster hosts
String blueClusterHost = blueHosts.stream().filter(this.rdsUtils::isWriterClusterDns)
Expand Down Expand Up @@ -380,15 +379,25 @@ protected void updateCorrespondingNodes() {
}
}

protected HostSpec getWriterHost(final BlueGreenRole role) {
return this.interimStatuses[role.getValue()].startTopology.stream()
protected @Nullable HostSpec getWriterHost(final BlueGreenRole role) {
BlueGreenInterimStatus interimStatus = this.interimStatuses[role.getValue()];
if (interimStatus == null) {
return null;
}

return interimStatus.startTopology.stream()
.filter(x -> x.getRole() == HostRole.WRITER)
.findFirst()
.orElse(null);
}

protected List<HostSpec> getReaderHosts(final BlueGreenRole role) {
return this.interimStatuses[role.getValue()].startTopology.stream()
protected @Nullable List<HostSpec> getReaderHosts(final BlueGreenRole role) {
BlueGreenInterimStatus interimStatus = this.interimStatuses[role.getValue()];
if (interimStatus == null) {
return null;
}

return interimStatus.startTopology.stream()
.filter(x -> x.getRole() != HostRole.WRITER)
.sorted(Comparator.comparing(HostSpec::getHost))
.collect(Collectors.toList());
Expand Down Expand Up @@ -608,31 +617,43 @@ protected BlueGreenStatus getStatusOfPreparation() {

protected List<ConnectRouting> addSubstituteBlueWithIpAddressConnectRouting() {
List<ConnectRouting> connectRouting = new ArrayList<>();
this.roleByHost.entrySet().stream()
.filter(x -> x.getValue() == BlueGreenRole.SOURCE && this.correspondingNodes.containsKey(x.getKey()))
.forEach(x -> {
HostSpec hostSpec = this.correspondingNodes.get(x.getKey()).getValue1();
Optional<String> blueIp = this.hostIpAddresses.get(hostSpec.getHost());
HostSpec substituteHostSpecWithIp = blueIp == null || !blueIp.isPresent()
? hostSpec
: this.hostSpecBuilder.copyFrom(hostSpec).host(blueIp.get()).build();

connectRouting.add(new SubstituteConnectRouting(
x.getKey(),
x.getValue(),
substituteHostSpecWithIp,
Collections.singletonList(hostSpec),
null));

connectRouting.add(new SubstituteConnectRouting(
this.getHostAndPort(
x.getKey(),
this.interimStatuses[x.getValue().getValue()].port),
x.getValue(),
substituteHostSpecWithIp,
Collections.singletonList(hostSpec),
null));
});
for (Map.Entry<String, BlueGreenRole> entry : this.roleByHost.entrySet()) {
String host = entry.getKey();
BlueGreenRole role = entry.getValue();
Pair<HostSpec, HostSpec> nodePair = this.correspondingNodes.get(host);
if (role == BlueGreenRole.TARGET || nodePair == null) {
continue;
}

HostSpec blueHostSpec = nodePair.getValue1();
Optional<String> blueIp = this.hostIpAddresses.get(blueHostSpec.getHost());
HostSpec blueIpHostSpec;
if (blueIp == null || !blueIp.isPresent()) {
blueIpHostSpec = blueHostSpec;
} else {
blueIpHostSpec = this.hostSpecBuilder.copyFrom(blueHostSpec).host(blueIp.get()).build();
}

connectRouting.add(new SubstituteConnectRouting(
host,
role,
blueIpHostSpec,
Collections.singletonList(blueHostSpec),
null));

BlueGreenInterimStatus interimStatus = this.interimStatuses[role.getValue()];
if (interimStatus == null) {
continue;
}

connectRouting.add(new SubstituteConnectRouting(
this.getHostAndPort(host, interimStatus.port),
role,
blueIpHostSpec,
Collections.singletonList(blueHostSpec),
null));
}

return connectRouting;
}

Expand Down Expand Up @@ -815,8 +836,10 @@ protected void createPostRouting(List<ConnectRouting> connectRouting) {
.forEach(x -> {
final String blueHost = x.getKey();
final boolean isBlueHostInstance = rdsUtils.isRdsInstance(blueHost);
HostSpec blueHostSpec = this.correspondingNodes.get(x.getKey()).getValue1();
HostSpec greenHostSpec = this.correspondingNodes.get(x.getKey()).getValue2();

Pair<HostSpec, HostSpec> nodePair = this.correspondingNodes.get(x.getKey());
HostSpec blueHostSpec = nodePair == null ? null : nodePair.getValue1();
HostSpec greenHostSpec = nodePair == null ? null : nodePair.getValue2();

if (greenHostSpec == null) {

Expand All @@ -826,13 +849,16 @@ protected void createPostRouting(List<ConnectRouting> connectRouting) {
x.getValue(),
this.bgdId));

BlueGreenInterimStatus interimStatus = this.interimStatuses[x.getValue().getValue()];
if (interimStatus == null) {
// Cannot find port using interimStatus; continue to the next roleByHost entry.
return;
}

connectRouting.add(new SuspendUntilCorrespondingNodeFoundConnectRouting(
this.getHostAndPort(
blueHost,
this.interimStatuses[x.getValue().getValue()].port),
this.getHostAndPort(blueHost, interimStatus.port),
x.getValue(),
this.bgdId));

} else {
final String greenHost = greenHostSpec.getHost();
Optional<String> greenIp = this.hostIpAddresses.get(greenHostSpec.getHost());
Expand All @@ -841,11 +867,16 @@ protected void createPostRouting(List<ConnectRouting> connectRouting) {
: this.hostSpecBuilder.copyFrom(greenHostSpec).host(greenIp.get()).build();

// Check whether green host is already been connected with blue (no-prefixes) IAM host name.
List<HostSpec> iamHosts = this.isAlreadySuccessfullyConnected(greenHost, blueHost)
// Green node has already changed its name, and it's not a new blue node (no prefixes).
? Collections.singletonList(blueHostSpec)
// Green node isn't yet changed its name, so we need to try both possible IAM host options.
: Arrays.asList(greenHostSpec, blueHostSpec);
List<HostSpec> iamHosts;
if (this.isAlreadySuccessfullyConnected(greenHost, blueHost)) {
// Green node has already changed its name, and it's not a new blue node (no prefixes).
iamHosts = blueHostSpec == null ? null : Collections.singletonList(blueHostSpec);
} else {
// Green node isn't yet changed its name, so we need to try both possible IAM host options.
iamHosts = blueHostSpec == null
? Collections.singletonList(greenHostSpec)
: Arrays.asList(greenHostSpec, blueHostSpec);
}

connectRouting.add(new SubstituteConnectRouting(
blueHost,
Expand All @@ -854,10 +885,14 @@ protected void createPostRouting(List<ConnectRouting> connectRouting) {
iamHosts,
isBlueHostInstance ? (iamHost) -> this.registerIamHost(greenHost, iamHost) : null));

BlueGreenInterimStatus interimStatus = this.interimStatuses[x.getValue().getValue()];
if (interimStatus == null) {
// Cannot find port; continue to the next roleByHost entry.
return;
}

connectRouting.add(new SubstituteConnectRouting(
this.getHostAndPort(
blueHost,
this.interimStatuses[x.getValue().getValue()].port),
this.getHostAndPort(blueHost, interimStatus.port),
x.getValue(),
greenHostSpecWithIp,
iamHosts,
Expand Down Expand Up @@ -996,9 +1031,9 @@ protected void logSwitchoverFinalSummary() {
return;
}

PhaseTimeInfo timeZero = this.rollback
? this.phaseTimeNano.get(BlueGreenPhase.PREPARATION.name())
: this.phaseTimeNano.get(BlueGreenPhase.IN_PROGRESS.name());
BlueGreenPhase timeZeroPhase = this.rollback ? BlueGreenPhase.PREPARATION : BlueGreenPhase.IN_PROGRESS;
String timeZeroKey = this.rollback ? timeZeroPhase.name() + " (rollback)" : timeZeroPhase.name();
PhaseTimeInfo timeZero = this.phaseTimeNano.get(timeZeroKey);
String divider = "----------------------------------------------------------------------------------\n";

String logMessage =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package software.amazon.jdbc.plugin.bluegreen;

@FunctionalInterface
public interface OnBlueGreenStatusChange {
void onBlueGreenStatusChanged(BlueGreenRole role, BlueGreenInterimStatus interimStatus);
}
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void testSwitchover(TestDriver testDriver) throws SQLException, Interrupt
hostId, host, testInstance.getPort(), dbName, startLatchAtomic, stop, finishLatchAtomic,
results.get(hostId)));
threadCount++;
threadFinishCount++;

threads.add(getBlueDnsMonitoringThread(
hostId, host, startLatchAtomic, stop, finishLatchAtomic, results.get(hostId)));
Expand Down
22 changes: 15 additions & 7 deletions wrapper/src/test/java/integration/host/TestEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -824,9 +824,15 @@ private static void deAuthorizeIP(TestEnvironment env) {
throw new RuntimeException(e);
}
}
env.auroraUtil.ec2DeauthorizesIP(env.runnerIP);
LOGGER.finest(String.format("Test runner IP %s de-authorized. Usage count: %d",
env.runnerIP, ipAddressUsageRefCount.get()));

if (!env.reuseDb) {
env.auroraUtil.ec2DeauthorizesIP(env.runnerIP);
LOGGER.finest(String.format("Test runner IP %s de-authorized. Usage count: %d",
env.runnerIP, ipAddressUsageRefCount.get()));
} else {
LOGGER.finest("The IP address usage count hit 0, but the REUSE_RDS_DB was set to true, so IP "
+ "de-authorization was skipped.");
}
} else {
LOGGER.finest("IP usage count: " + ipAddressUsageRefCount.get());
}
Expand Down Expand Up @@ -1515,10 +1521,12 @@ private void deleteBlueGreenDeployment() throws InterruptedException {
}

private void deleteCustomClusterParameterGroup(String groupName) {
try {
this.auroraUtil.deleteCustomClusterParameterGroup(groupName);
} catch (Exception ex) {
LOGGER.finest(String.format("Error deleting cluster parameter group %s. %s", groupName, ex));
if (!this.reuseDb) {
try {
this.auroraUtil.deleteCustomClusterParameterGroup(groupName);
} catch (Exception ex) {
LOGGER.finest(String.format("Error deleting cluster parameter group %s. %s", groupName, ex));
}
}
}

Expand Down
Loading