Skip to content

Commit

Permalink
MODULE: Remove AbstractGatwaySender.mustGroupTransactionEvents()
Browse files Browse the repository at this point in the history
  • Loading branch information
jake-at-work authored and albertogpz committed Feb 4, 2022
1 parent f7c0d21 commit 477a823
Show file tree
Hide file tree
Showing 22 changed files with 579 additions and 490 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.wan;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.BlockingQueue;

import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.EntryEvent;
Expand Down Expand Up @@ -77,8 +78,8 @@ protected void initializeMessageQueue(String id, boolean cleanQueues) {
logger.debug("The target Regions are(PGSEP) {}", targetRs);
}

ParallelGatewaySenderQueue queue =
new ParallelGatewaySenderQueue(sender, targetRs, index, nDispatcher, cleanQueues);
final ParallelGatewaySenderQueue queue =
createParallelGatewaySenderQueue(sender, targetRs, index, nDispatcher, cleanQueues);

queue.start();
this.queue = queue;
Expand All @@ -88,6 +89,14 @@ protected void initializeMessageQueue(String id, boolean cleanQueues) {
}
}

protected @NotNull ParallelGatewaySenderQueue createParallelGatewaySenderQueue(
final @NotNull AbstractGatewaySender sender,
final @NotNull Set<Region<?, ?>> targetRegions, final int index, final int dispatcherThreads,
final boolean cleanQueues) {
return new ParallelGatewaySenderQueue(sender, targetRegions, index, dispatcherThreads,
cleanQueues);
}

@Override
public int eventQueueSize() {
ParallelGatewaySenderQueue queue = (ParallelGatewaySenderQueue) getQueue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@

import static org.apache.geode.cache.Region.SEPARATOR;
import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_BATCH_SIZE;
import static org.apache.geode.cache.wan.GatewaySender.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -38,6 +36,7 @@

import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
Expand Down Expand Up @@ -162,14 +161,14 @@ boolean getCleanQueues() {
* The peekedEventsProcessing queue is used when the batch size is reduced due to a
* MessageTooLargeException
*/
private final BlockingQueue<GatewaySenderEventImpl> peekedEventsProcessing =
new LinkedBlockingQueue<>();
protected BlockingQueue<GatewaySenderEventImpl> peekedEventsProcessing =
new LinkedBlockingQueue<GatewaySenderEventImpl>();

/**
* The peekedEventsProcessingInProgress boolean denotes that processing existing peeked events is
* in progress
*/
private boolean peekedEventsProcessingInProgress = false;
protected boolean peekedEventsProcessingInProgress = false;

public final AbstractGatewaySender sender;

Expand Down Expand Up @@ -245,8 +244,7 @@ private Object deserialize(Object serializedBytes) {
private final MetaRegionFactory metaRegionFactory;

public ParallelGatewaySenderQueue(AbstractGatewaySender sender, Set<Region<?, ?>> userRegions,
int idx,
int nDispatcher, boolean cleanQueues) {
int idx, int nDispatcher, boolean cleanQueues) {
this(sender, userRegions, idx, nDispatcher, new MetaRegionFactory(), cleanQueues);
}

Expand Down Expand Up @@ -1340,10 +1338,7 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
}
}

if (batch.size() > 0) {
peekEventsFromIncompleteTransactions(batch, prQ);
}

postProcessBatch(prQ, batch);

if (isDebugEnabled) {
logger.debug("{}: Peeked a batch of {} entries. The size of the queue is {}. localSize is {}",
Expand All @@ -1356,6 +1351,9 @@ public List peek(int batchSize, int timeToWait) throws InterruptedException, Cac
return batch;
}

protected void postProcessBatch(final @NotNull PartitionedRegion partitionedRegion,
final @NotNull List<GatewaySenderEventImpl> batch) {}

private boolean stopPeekingDueToTime(int timeToWait, long end) {
final boolean isDebugEnabled = logger.isDebugEnabled();
// If time to wait is -1 (don't wait) or time interval has elapsed
Expand All @@ -1372,78 +1370,6 @@ private boolean stopPeekingDueToTime(int timeToWait, long end) {
return false;
}

protected boolean mustGroupTransactionEvents() {
return sender.mustGroupTransactionEvents();
}

@VisibleForTesting
void peekEventsFromIncompleteTransactions(List<GatewaySenderEventImpl> batch,
PartitionedRegion prQ) {
if (!mustGroupTransactionEvents()) {
return;
}

Map<TransactionId, Integer> incompleteTransactionIdsInBatch =
getIncompleteTransactionsInBatch(batch);
if (incompleteTransactionIdsInBatch.size() == 0) {
return;
}

int retries = 0;
while (true) {
for (Iterator<Map.Entry<TransactionId, Integer>> iter =
incompleteTransactionIdsInBatch.entrySet().iterator(); iter.hasNext();) {
Map.Entry<TransactionId, Integer> pendingTransaction = iter.next();
TransactionId transactionId = pendingTransaction.getKey();
int bucketId = pendingTransaction.getValue();
List<Object> events = peekEventsWithTransactionId(prQ, bucketId, transactionId);
for (Object object : events) {
GatewaySenderEventImpl event = (GatewaySenderEventImpl) object;
batch.add(event);
peekedEvents.add(event);
if (logger.isDebugEnabled()) {
logger.debug(
"Peeking extra event: {}, bucketId: {}, isLastEventInTransaction: {}, batch size: {}",
event.getKey(), bucketId, event.isLastEventInTransaction(), batch.size());
}
if (event.isLastEventInTransaction()) {
iter.remove();
}
}
}
if (incompleteTransactionIdsInBatch.size() == 0 ||
retries >= sender.getRetriesToGetTransactionEventsFromQueue()) {
break;
}
retries++;
try {
Thread.sleep(GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (incompleteTransactionIdsInBatch.size() > 0) {
logger.warn("Not able to retrieve all events for transactions: {} after {} retries of {}ms",
incompleteTransactionIdsInBatch, retries, GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS);
stats.incBatchesWithIncompleteTransactions();
}
}

private Map<TransactionId, Integer> getIncompleteTransactionsInBatch(
List<GatewaySenderEventImpl> batch) {
Map<TransactionId, Integer> incompleteTransactionsInBatch = new HashMap<>();
for (GatewaySenderEventImpl event : batch) {
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.put(event.getTransactionId(), event.getBucketId());
}
}
}
return incompleteTransactionsInBatch;
}

@VisibleForTesting
static long calculateTimeToSleep(long timeToWait) {
if (timeToWait <= 0) {
Expand Down Expand Up @@ -1518,20 +1444,10 @@ private void addPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize)
}
}

private void addPreviouslyPeekedEvents(List<GatewaySenderEventImpl> batch, int batchSize) {
Set<TransactionId> incompleteTransactionsInBatch = new HashSet<>();
for (int i = 0; i < batchSize || incompleteTransactionsInBatch.size() != 0; i++) {
GatewaySenderEventImpl event = peekedEventsProcessing.remove();
batch.add(event);
if (mustGroupTransactionEvents()) {
if (event.getTransactionId() != null) {
if (event.isLastEventInTransaction()) {
incompleteTransactionsInBatch.remove(event.getTransactionId());
} else {
incompleteTransactionsInBatch.add(event.getTransactionId());
}
}
}
protected void addPreviouslyPeekedEvents(final @NotNull List<GatewaySenderEventImpl> batch,
final int batchSize) {
for (int i = 0; i < batchSize; i++) {
batch.add(peekedEventsProcessing.remove());
if (peekedEventsProcessing.isEmpty()) {
resetLastPeeked = false;
peekedEventsProcessingInProgress = false;
Expand Down Expand Up @@ -1590,6 +1506,7 @@ protected Object peekAhead(PartitionedRegion prQ, int bucketId) throws CacheExce
// finished with peeked object.
}

// TODO jbarrett move this
protected List<Object> peekEventsWithTransactionId(PartitionedRegion prQ, int bucketId,
TransactionId transactionId) throws CacheException {
List<Object> objects;
Expand All @@ -1611,11 +1528,13 @@ protected List<Object> peekEventsWithTransactionId(PartitionedRegion prQ, int bu
// finished with peeked objects.
}

// TODO jbarrett move this
@VisibleForTesting
public static Predicate<GatewaySenderEventImpl> getIsLastEventInTransactionPredicate() {
return GatewaySenderEventImpl::isLastEventInTransaction;
}

// TODO jbarrett move this
@VisibleForTesting
public static Predicate<GatewaySenderEventImpl> getHasTransactionIdPredicate(
TransactionId transactionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@
import java.util.stream.Collectors;

import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.asyncqueue.AsyncEvent;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedSystem;
Expand All @@ -46,6 +49,7 @@
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender.EventWrapper;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
Expand Down Expand Up @@ -125,13 +129,20 @@ protected void initializeMessageQueue(String id, boolean cleanQueues) {
final SerialSecondaryGatewayListener listener = getAndInitializeCacheListener();

// Create the region queue
queue = new SerialGatewaySenderQueue(sender, regionName, listener, cleanQueues);
queue = createRegionQueue(sender, regionName, listener, cleanQueues);

if (logger.isDebugEnabled()) {
logger.debug("Created queue: {}", queue);
}
}

protected @NotNull RegionQueue createRegionQueue(final @NotNull AbstractGatewaySender sender,
final @NotNull String regionName,
final @NotNull CacheListener<Long, AsyncEvent<?, ?>> listener,
final boolean cleanQueues) {
return new SerialGatewaySenderQueue(sender, regionName, listener, cleanQueues);
}

@Nullable
private SerialSecondaryGatewayListener getAndInitializeCacheListener() {
if (!sender.isPrimary()) {
Expand Down
Loading

0 comments on commit 477a823

Please sign in to comment.