Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…o feature/azure-vault-versioning
  • Loading branch information
chris-j-h committed Dec 18, 2018
2 parents 02a759a + 08fa1bb commit c248de1
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@
import com.quorum.tessera.client.P2pClient;
import com.quorum.tessera.node.model.Party;
import com.quorum.tessera.node.model.PartyInfo;
import com.quorum.tessera.sync.ResendPartyStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.ConnectException;
import java.util.Objects;
import java.util.Set;

/**
* Polls every so often to all known nodes for any new discoverable nodes This
Expand All @@ -23,24 +21,20 @@ public class PartyInfoPoller implements Runnable {

private final PartyInfoParser partyInfoParser;

private final ResendPartyStore resendPartyStore;

private final P2pClient p2pClient;

public PartyInfoPoller(final PartyInfoService partyInfoService,
final PartyInfoParser partyInfoParser,
final ResendPartyStore resendPartyStore,
final P2pClient p2pClient) {
final PartyInfoParser partyInfoParser,
final P2pClient p2pClient) {
this.partyInfoService = Objects.requireNonNull(partyInfoService);
this.partyInfoParser = Objects.requireNonNull(partyInfoParser);
this.resendPartyStore = Objects.requireNonNull(resendPartyStore);
this.p2pClient = Objects.requireNonNull(p2pClient);
}

/**
* Iterates over all known parties and contacts them for the current state
* of their known node discovery list
*
* <p>
* It then updates this nodes list of data with any new information
* collected
*/
Expand All @@ -53,18 +47,14 @@ public void run() {
final byte[] encodedPartyInfo = partyInfoParser.to(partyInfo);

partyInfo
.getParties()
.stream()
.filter(party -> !party.getUrl().equals(partyInfo.getUrl()))
.map(Party::getUrl)
.map(url -> pollSingleParty(url, encodedPartyInfo))
.filter(Objects::nonNull)
.map(partyInfoParser::from)
.forEach(newPartyInfo -> {
Set<Party> newPartiesFound = partyInfoService.findUnsavedParties(newPartyInfo);
resendPartyStore.addUnseenParties(newPartiesFound);
partyInfoService.updatePartyInfo(newPartyInfo);
});
.getParties()
.stream()
.filter(party -> !party.getUrl().equals(partyInfo.getUrl()))
.map(Party::getUrl)
.map(url -> pollSingleParty(url, encodedPartyInfo))
.filter(Objects::nonNull)
.map(partyInfoParser::from)
.forEach(partyInfoService::updatePartyInfo);

LOGGER.debug("Polled {}. PartyInfo : {}", getClass().getSimpleName(), partyInfo);
}
Expand All @@ -74,7 +64,7 @@ public void run() {
* connect to the target, it returns null, otherwise throws any exception
* that can be thrown from {@link javax.ws.rs.client.Client}
*
* @param url the target URL to call
* @param url the target URL to call
* @param encodedPartyInfo the encoded current party information
* @return the encoded partyinfo from the target node, or null is the node
* could not be reached
Expand All @@ -98,6 +88,4 @@ private byte[] pollSingleParty(final String url, final byte[] encodedPartyInfo)

}



}
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package com.quorum.tessera.node;

import com.quorum.tessera.encryption.PublicKey;
import com.quorum.tessera.node.model.Party;
import com.quorum.tessera.node.model.PartyInfo;

import java.util.Set;

public interface PartyInfoService {

/**
Expand All @@ -17,7 +14,7 @@ public interface PartyInfoService {

/**
* Update the PartyInfo data store with the provided encoded data.This can happen when endpoint /partyinfo is triggered,
or by a response from this node hitting another node /partyinfo endpoint
* or by a response from this node hitting another node /partyinfo endpoint
*
* @param partyInfo
* @return updated PartyInfo object
Expand All @@ -32,13 +29,4 @@ public interface PartyInfoService {
*/
String getURLFromRecipientKey(PublicKey key);

/**
* Searches the provided {@link PartyInfo} for recipients that haven't yet been saved to
* the list of all known hosts
*
* @param partyInfoWithUnsavedRecipients received party info that should be diffed
* @return the list of all unknown recipients
*/
Set<Party> findUnsavedParties(PartyInfo partyInfoWithUnsavedRecipients);

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -37,11 +35,11 @@ public PartyInfoServiceImpl(final PartyInfoStore partyInfoStore,


final Set<Party> initialParties = configService
.getPeers()
.stream()
.map(Peer::getUrl)
.map(Party::new)
.collect(toSet());
.getPeers()
.stream()
.map(Peer::getUrl)
.map(Party::new)
.collect(toSet());

final Set<Recipient> ourKeys = enclave
.getPublicKeys()
Expand All @@ -50,7 +48,6 @@ public PartyInfoServiceImpl(final PartyInfoStore partyInfoStore,
.map(key -> new Recipient(key, advertisedUrl))
.collect(toSet());


partyInfoStore.store(new PartyInfo(advertisedUrl, ourKeys, initialParties));

}
Expand Down Expand Up @@ -96,8 +93,6 @@ public PartyInfo updatePartyInfo(final PartyInfo partyInfo) {
.filter(recipient -> Objects.equals(recipient.getUrl(), incomingUrl))
.collect(Collectors.toSet());

// TODO NL - check if we should add the unsaved parties to the resend party store (in the same way in which we are doing it in PartyInfoPoller)

//TODO: instead of adding the peers every time, if a new peer is added at runtime then this should be added separately
final Set<Party> parties = peerUrls.stream().map(Party::new).collect(toSet());

Expand All @@ -110,24 +105,14 @@ public PartyInfo updatePartyInfo(final PartyInfo partyInfo) {
public String getURLFromRecipientKey(final PublicKey key) {

final Recipient retrievedRecipientFromStore = partyInfoStore
.getPartyInfo()
.getRecipients()
.stream()
.filter(recipient -> key.equals(recipient.getKey()))
.findAny()
.orElseThrow(() -> new KeyNotFoundException("Recipient not found for key: "+ key.encodeToBase64()));
.getPartyInfo()
.getRecipients()
.stream()
.filter(recipient -> key.equals(recipient.getKey()))
.findAny()
.orElseThrow(() -> new KeyNotFoundException("Recipient not found for key: " + key.encodeToBase64()));

return retrievedRecipientFromStore.getUrl();
}

@Override
public Set<Party> findUnsavedParties(final PartyInfo partyInfoWithUnsavedRecipients) {
final Set<Party> knownHosts = this.getPartyInfo().getParties();

final Set<Party> incomingRecipients = new HashSet<>(partyInfoWithUnsavedRecipients.getParties());
incomingRecipients.removeAll(knownHosts);

return Collections.unmodifiableSet(incomingRecipients);
}

}
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
package com.quorum.tessera.sync;

import com.quorum.tessera.config.Config;
import com.quorum.tessera.config.Peer;
import com.quorum.tessera.node.model.Party;
import com.quorum.tessera.sync.model.SyncableParty;

import java.util.*;

import static java.util.stream.Collectors.toSet;

/**
* An in-memory store of outstanding parties to contact for transaction resending
*/
public class ResendPartyStoreImpl implements ResendPartyStore {

private Set<Party> allSeenParties;

private Queue<SyncableParty> outstandingParties;

public ResendPartyStoreImpl(final Config config) {
public ResendPartyStoreImpl() {
this.outstandingParties = new LinkedList<>();

final Set<Party> initialParties = config
.getPeers()
.stream()
.map(Peer::getUrl)
.map(Party::new)
.collect(toSet());

this.addUnseenParties(initialParties);
this.allSeenParties = new HashSet<>();
}

@Override
public void addUnseenParties(final Collection<Party> partiesToRequestFrom) {
partiesToRequestFrom
final Set<Party> knownParties = new HashSet<>(partiesToRequestFrom);
knownParties.removeAll(allSeenParties);
this.allSeenParties.addAll(knownParties);

knownParties
.stream()
.map(party -> new SyncableParty(party, 0))
.forEach(outstandingParties::add);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public SyncPoller(final ExecutorService executorService,
@Override
public void run() {

this.resendPartyStore.addUnseenParties(partyInfoService.getPartyInfo().getParties());

Optional<SyncableParty> nextPartyToSend = this.resendPartyStore.getNextParty();

while (nextPartyToSend.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import com.quorum.tessera.client.P2pClient;
import com.quorum.tessera.encryption.Enclave;
import com.quorum.tessera.encryption.PublicKey;
import java.util.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Base64;
import java.util.Objects;

public class TransactionRequesterImpl implements TransactionRequester {
Expand All @@ -19,8 +19,7 @@ public class TransactionRequesterImpl implements TransactionRequester {

private final P2pClient client;

public TransactionRequesterImpl(final Enclave enclave,
final P2pClient client) {
public TransactionRequesterImpl(final Enclave enclave, final P2pClient client) {
this.enclave = Objects.requireNonNull(enclave);
this.client = Objects.requireNonNull(client);
}
Expand All @@ -31,18 +30,18 @@ public boolean requestAllTransactionsFromNode(final String uri) {
LOGGER.debug("Requesting transactions get resent for {}", uri);

return this.enclave
.getPublicKeys()
.stream()
.map(this::createRequestAllEntity)
.allMatch(req -> this.makeRequest(uri, req));
.getPublicKeys()
.stream()
.map(this::createRequestAllEntity)
.allMatch(req -> this.makeRequest(uri, req));

}

/**
* Will make the desired request until succeeds or max tries has been
* reached
*
* @param uri the URI to call
* @param uri the URI to call
* @param request the request object to send
*/
private boolean makeRequest(final String uri, final ResendRequest request) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,24 @@

package com.quorum.tessera.transaction;

import com.quorum.tessera.encryption.EncodedPayloadWithRecipients;
import com.quorum.tessera.encryption.KeyNotFoundException;
import com.quorum.tessera.encryption.PublicKey;


/**
* Publishes messages from one node to another
*/
public interface PayloadPublisher {

void publishPayload(EncodedPayloadWithRecipients encodedPayloadWithRecipients,PublicKey recipientKey);


/**
* Formats, encodes and publishes encrypted messages using the target
* public key as the identifier, instead of the URL
*
* @param encodedPayloadWithRecipients the pre-formatted payload object
* (i.e. with all recipients still present)
* @param recipientKey the target public key to publish the
* payload to
* @throws KeyNotFoundException if the target public key is not known
*/
void publishPayload(EncodedPayloadWithRecipients encodedPayloadWithRecipients, PublicKey recipientKey);

}
Loading

0 comments on commit c248de1

Please sign in to comment.