Skip to content

Commit

Permalink
Merge pull request #781 from jpmorganchase/feature/add-jpa-pagination…
Browse files Browse the repository at this point in the history
…-for-resend

add jpa pagination for retrieve all transaction query
  • Loading branch information
Krish1979 authored Jun 4, 2019
2 parents 36ec657 + 070df2a commit 48d4208
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,26 @@ public interface EncryptedTransactionDAO {
Optional<EncryptedTransaction> retrieveByHash(MessageHash hash);

/**
* Retrieves a list of all transactions stored in the database
* Retrieves a list of transactions stored in the database
*
* @return The list of all rows in the database
* @param offset the start offset
* @param maxResult the maximum number of records to return
* @return The list of requested rows from the database
*/
List<EncryptedTransaction> retrieveAllTransactions();
List<EncryptedTransaction> retrieveTransactions(int offset, int maxResult);

/**
* Retrieve the total transaction count.
*
* @return the transaction count
*/
long transactionCount();

/**
* Deletes a transaction that has the given hash as its digest
*
* @param hash The hash of the message to be deleted
* @throws javax.persistence.EntityNotFoundException
* @throws javax.persistence.EntityNotFoundException
*/
void delete(MessageHash hash);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import javax.persistence.PersistenceContext;
import java.util.List;
import java.util.Optional;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.transaction.Transactional;

/**
Expand All @@ -21,9 +23,9 @@ public class EncryptedTransactionDAOImpl implements EncryptedTransactionDAO {
private static final Logger LOGGER = LoggerFactory.getLogger(EncryptedTransactionDAOImpl.class);

private static final String FIND_HASH_EQUAL
= "SELECT et FROM EncryptedTransaction et WHERE et.hash.hashBytes = :hash";
= "SELECT et FROM EncryptedTransaction et WHERE et.hash.hashBytes = :hash";

private static final String FIND_ALL = "SELECT et FROM EncryptedTransaction et";
private static final String FIND_ALL = "SELECT et FROM EncryptedTransaction et ORDER BY et.timestamp,et.hash";

@PersistenceContext(unitName = "tessera")
private EntityManager entityManager;
Expand All @@ -40,31 +42,46 @@ public Optional<EncryptedTransaction> retrieveByHash(final MessageHash hash) {
LOGGER.info("Retrieving payload with hash {}", hash);

return entityManager
.createQuery(FIND_HASH_EQUAL, EncryptedTransaction.class)
.setParameter("hash", hash.getHashBytes())
.getResultStream()
.findAny();
.createQuery(FIND_HASH_EQUAL, EncryptedTransaction.class)
.setParameter("hash", hash.getHashBytes())
.getResultStream()
.findAny();
}

@Override
public List<EncryptedTransaction> retrieveAllTransactions() {
LOGGER.info("Fetching all EncryptedTransaction database rows");
public List<EncryptedTransaction> retrieveTransactions(int offset, int maxResult) {
LOGGER.info("Fetching batch(offset:{},maxResult:{}) EncryptedTransaction database rows", offset, maxResult);

return entityManager
.createQuery(FIND_ALL, EncryptedTransaction.class)
.getResultList();
.createQuery(FIND_ALL, EncryptedTransaction.class)
.setFirstResult(offset)
.setMaxResults(maxResult)
.getResultList();
}

@Override
public long transactionCount() {

CriteriaBuilder criteriaBuilder = entityManager.getCriteriaBuilder();

CriteriaQuery<Long> countQuery = criteriaBuilder.createQuery(Long.class);
countQuery.select(criteriaBuilder.count(countQuery.from(EncryptedTransaction.class)));

return entityManager.createQuery(countQuery)
.getSingleResult();

}

@Override
public void delete(final MessageHash hash) {
LOGGER.info("Deleting transaction with hash {}", hash);

final EncryptedTransaction message = entityManager
.createQuery(FIND_HASH_EQUAL, EncryptedTransaction.class)
.setParameter("hash", hash.getHashBytes())
.getResultStream()
.findAny()
.orElseThrow(EntityNotFoundException::new);
.createQuery(FIND_HASH_EQUAL, EncryptedTransaction.class)
.setParameter("hash", hash.getHashBytes())
.getResultStream()
.findAny()
.orElseThrow(EntityNotFoundException::new);

entityManager.remove(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public class TransactionManagerImpl implements TransactionManager {
private final MessageHashFactory messageHashFactory = MessageHashFactory.create();

public TransactionManagerImpl(
Base64Decoder base64Decoder,
PayloadEncoder payloadEncoder,
EncryptedTransactionDAO encryptedTransactionDAO,
PayloadPublisher payloadPublisher,
Enclave enclave,
EncryptedRawTransactionDAO encryptedRawTransactionDAO,
ResendManager resendManager) {
Base64Decoder base64Decoder,
PayloadEncoder payloadEncoder,
EncryptedTransactionDAO encryptedTransactionDAO,
PayloadPublisher payloadPublisher,
Enclave enclave,
EncryptedRawTransactionDAO encryptedRawTransactionDAO,
ResendManager resendManager) {

this.base64Decoder = Objects.requireNonNull(base64Decoder);
this.payloadEncoder = Objects.requireNonNull(payloadEncoder);
Expand All @@ -77,20 +77,20 @@ public SendResponse send(SendRequest sendRequest) {
final String sender = sendRequest.getFrom();

final PublicKey senderPublicKey = Optional.ofNullable(sender)
.map(base64Decoder::decode)
.map(PublicKey::from)
.orElseGet(enclave::defaultPublicKey);
.map(base64Decoder::decode)
.map(PublicKey::from)
.orElseGet(enclave::defaultPublicKey);

final byte[][] recipients = Stream.of(sendRequest)
.filter(sr -> Objects.nonNull(sr.getTo()))
.flatMap(s -> Stream.of(s.getTo()))
.map(base64Decoder::decode)
.toArray(byte[][]::new);
.filter(sr -> Objects.nonNull(sr.getTo()))
.flatMap(s -> Stream.of(s.getTo()))
.map(base64Decoder::decode)
.toArray(byte[][]::new);

final List<PublicKey> recipientList = Stream
.of(recipients)
.map(PublicKey::from)
.collect(Collectors.toList());
.of(recipients)
.map(PublicKey::from)
.collect(Collectors.toList());

recipientList.add(senderPublicKey);

Expand All @@ -101,8 +101,8 @@ public SendResponse send(SendRequest sendRequest) {
final EncodedPayload payload = enclave.encryptPayload(raw, senderPublicKey, recipientList);

final MessageHash transactionHash = Optional.of(payload)
.map(EncodedPayload::getCipherText)
.map(messageHashFactory::createFromCipherText).get();
.map(EncodedPayload::getCipherText)
.map(messageHashFactory::createFromCipherText).get();

final EncryptedTransaction newTransaction
= new EncryptedTransaction(transactionHash, this.payloadEncoder.encode(payload));
Expand Down Expand Up @@ -172,16 +172,21 @@ public ResendResponse resend(ResendRequest request) {
PublicKey recipientPublicKey = PublicKey.from(publicKeyData);
if (request.getType() == ResendRequestType.ALL) {

encryptedTransactionDAO
.retrieveAllTransactions()
.stream()
.map(EncryptedTransaction::getEncodedPayload)
.map(payloadEncoder::decode)
.filter(payload -> {
final boolean isRecipient = payload.getRecipientKeys().contains(recipientPublicKey);
final boolean isSender = Objects.equals(payload.getSenderKey(), recipientPublicKey);
return isRecipient || isSender;
}).forEach(payload -> {
int offset = 0;
final int maxResult = 10000;

while (offset < encryptedTransactionDAO.transactionCount()) {

encryptedTransactionDAO
.retrieveTransactions(offset, maxResult)
.stream()
.map(EncryptedTransaction::getEncodedPayload)
.map(payloadEncoder::decode)
.filter(payload -> {
final boolean isRecipient = payload.getRecipientKeys().contains(recipientPublicKey);
final boolean isSender = Objects.equals(payload.getSenderKey(), recipientPublicKey);
return isRecipient || isSender;
}).forEach(payload -> {

final EncodedPayload prunedPayload;

Expand Down Expand Up @@ -209,6 +214,9 @@ public ResendResponse resend(ResendRequest request) {

});

offset += maxResult;
}

return new ResendResponse();
} else {

Expand Down Expand Up @@ -277,21 +285,21 @@ public ReceiveResponse receive(ReceiveRequest request) {
final byte[] key = base64Decoder.decode(request.getKey());

final Optional<byte[]> to = Optional
.ofNullable(request.getTo())
.filter(str -> !str.isEmpty())
.map(base64Decoder::decode);
.ofNullable(request.getTo())
.filter(str -> !str.isEmpty())
.map(base64Decoder::decode);

final MessageHash hash = new MessageHash(key);
LOGGER.info("Lookup transaction {}",hash);

final EncryptedTransaction encryptedTransaction = encryptedTransactionDAO
.retrieveByHash(hash)
.orElseThrow(() -> new TransactionNotFoundException("Message with hash " + hash + " was not found"));
.retrieveByHash(hash)
.orElseThrow(() -> new TransactionNotFoundException("Message with hash " + hash + " was not found"));

final EncodedPayload payload = Optional.of(encryptedTransaction)
.map(EncryptedTransaction::getEncodedPayload)
.map(payloadEncoder::decode)
.orElseThrow(() -> new IllegalStateException("Unable to decode previously encoded payload"));
.map(EncryptedTransaction::getEncodedPayload)
.map(payloadEncoder::decode)
.orElseThrow(() -> new IllegalStateException("Unable to decode previously encoded payload"));

PublicKey recipientKey = to.map(PublicKey::from)
.orElse(searchForRecipientKey(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ public void fetchingAllTransactionsReturnsAll() {
).peek(entityManager::persist)
.collect(Collectors.toList());

final List<EncryptedTransaction> retrievedList = encryptedTransactionDAO.retrieveAllTransactions();
final List<EncryptedTransaction> retrievedList = encryptedTransactionDAO.retrieveTransactions(0, Integer.MAX_VALUE);

assertThat(encryptedTransactionDAO.transactionCount()).isEqualTo(payloads.size());
assertThat(retrievedList).hasSameSizeAs(payloads);
assertThat(retrievedList).hasSameElementsAs(payloads);

Expand Down Expand Up @@ -304,8 +305,9 @@ public void fetchingAllTransactionsReturnsAll() {
).peek(entityManager::persist)
.collect(Collectors.toList());

final List<EncryptedTransaction> retrievedList = encryptedTransactionDAO.retrieveAllTransactions();
final List<EncryptedTransaction> retrievedList = encryptedTransactionDAO.retrieveTransactions(0, Integer.MAX_VALUE);

assertThat(encryptedTransactionDAO.transactionCount()).isEqualTo(payloads.size());
assertThat(retrievedList).hasSameSizeAs(payloads);
assertThat(retrievedList).hasSameElementsAs(payloads);

Expand Down Expand Up @@ -479,8 +481,9 @@ public void fetchingAllTransactionsReturnsAll() {
).peek(entityManager::persist)
.collect(Collectors.toList());

final List<EncryptedTransaction> retrievedList = encryptedTransactionDAO.retrieveAllTransactions();
final List<EncryptedTransaction> retrievedList = encryptedTransactionDAO.retrieveTransactions(0, Integer.MAX_VALUE);

assertThat(encryptedTransactionDAO.transactionCount()).isEqualTo(payloads.size());
assertThat(retrievedList).hasSameSizeAs(payloads);
assertThat(retrievedList).hasSameElementsAs(payloads);

Expand Down
Loading

0 comments on commit 48d4208

Please sign in to comment.