Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 4ce6664
Author: Alberto Gomez <[email protected]>
Date:   Tue Dec 14 19:30:37 2021 +0100

    Fix tests

commit 77db403
Author: Alberto Gomez <[email protected]>
Date:   Tue Dec 14 18:43:41 2021 +0100

    Added exceptions to public API changes

commit 8c2f77d
Author: Alberto Gomez <[email protected]>
Date:   Tue Dec 14 17:52:24 2021 +0100

    Use --type=TxGroupingParallelGatewaySender... as suggested in jbarrett's review

commit cbe859d
Author: Alberto Gomez <[email protected]>
Date:   Mon Dec 13 12:36:50 2021 +0100

    remove() refactored as suggested on jbarrett's review

commit 43194d0
Author: Alberto Gomez <[email protected]>
Date:   Thu Dec 2 06:52:38 2021 +0100

    Updated with Kirk's first review comments

commit 0c3d814
Author: Alberto Gomez <[email protected]>
Date:   Fri Nov 5 08:16:10 2021 +0100

    MODULE: Add gfsh create tx-grouping gw-sender cases.

commit a7416a3
Author: Alberto Gomez <[email protected]>
Date:   Fri Nov 5 08:15:46 2021 +0100

    MODULE: Do not allow changing group-transaction-events

commit 74f9274
Author: Alberto Gomez <[email protected]>
Date:   Fri Nov 5 08:15:28 2021 +0100

    MODULE: Fix some GatewaySenderFactoryImpl tests

commit 1959233
Author: Alberto Gomez <[email protected]>
Date:   Fri Nov 5 08:13:54 2021 +0100

    MODULE: Move remaining tx-grouping functionality and DUnit tests

commit 5aa6bc7
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 19:50:56 2021 -0700

    MODULE: Move tests

commit 07884e8
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 19:24:12 2021 -0700

    MODULE: ServiceLoader

commit 0be467b
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 16:31:02 2021 -0700

    MODULE: Stupid simple module.

commit 477a823
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 16:08:08 2021 -0700

    MODULE: Remove AbstractGatwaySender.mustGroupTransactionEvents()

commit f7c0d21
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 14:14:38 2021 -0700

    MODULE: Create TX grouping sender impls.

commit 35b072e
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 14:01:04 2021 -0700

    MODULE: Extract out TX grouping factories.

commit 7a3d45c
Author: Jacob Barrett <[email protected]>
Date:   Wed Oct 13 13:19:15 2021 -0700

    MODULE: Split out some factories.

commit ae74a42
Author: Jacob Barrett <[email protected]>
Date:   Thu Sep 30 15:24:56 2021 -0700

    MODULE: Extract interface for GatewaySenderAttributes.

commit 8c2d23e
Author: Jacob Barrett <[email protected]>
Date:   Thu Sep 30 17:22:04 2021 -0700

    GEODE-6588: Cleanup GatewaySenderFactoryImpl

commit 2457963
Author: Jacob Barrett <[email protected]>
Date:   Thu Sep 30 16:07:40 2021 -0700

    GEODE-6588: Cleanup AbstractGatewaySender

commit 042013d
Author: Jacob Barrett <[email protected]>
Date:   Thu Sep 30 14:54:03 2021 -0700

    GEODE-6588: Cleanup GatewaySenderAttributes
  • Loading branch information
albertogpz committed Jun 17, 2022
1 parent 1a3c817 commit fca45a7
Show file tree
Hide file tree
Showing 96 changed files with 4,900 additions and 3,242 deletions.
6 changes: 6 additions & 0 deletions boms/geode-all-bom/src/test/resources/expected-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@
<version>${version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-wan-txgrouping</artifactId>
<version>${version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-web</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@
"Method org.apache.geode.cache.query.IndexStatistics.getReadLockCountLong()": "Added new methods.",
"Method org.apache.geode.management.MemberMXBean.getOffHeapFragments()": "Added new stat",
"Method org.apache.geode.management.MemberMXBean.getOffHeapFreedChunks()": "Added new stat",
"Method org.apache.geode.management.MemberMXBean.getOffHeapLargestFragment()": "Added new stat"
"Method org.apache.geode.management.MemberMXBean.getOffHeapLargestFragment()": "Added new stat",
"Class org.apache.geode.cache.wan.GatewaySender":"Added to support new types of gatewaysenders",
"Method org.apache.geode.cache.wan.GatewaySender.getType()":"Added to support new types of gatewaysenders",
"Class org.apache.geode.cache.wan.GatewaySenderFactory":"Added to support new types of gatewaysenders",
"Method org.apache.geode.cache.wan.GatewaySenderFactory.setType(java.lang.String)":"Added to support new types of gatewaysenders",
"Class org.apache.geode.management.GatewaySenderMXBean":"Added to support new types of gatewaysenders",
"Method org.apache.geode.management.GatewaySenderMXBean.getType()":"Added to support new types of gatewaysenders"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.test.junit.categories.AEQTest;

@Category({AEQTest.class})
Expand Down Expand Up @@ -55,7 +55,7 @@ public void tearDown() {
@Test
public void testStopClearsStats() {

GatewaySenderAttributes attrs = new GatewaySenderAttributes();
GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
Expand All @@ -81,7 +81,7 @@ public void testStopClearsStats() {

@Test
public void testStopStart() {
GatewaySenderAttributes attrs = new GatewaySenderAttributes();
GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.xmlcache.AsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
Expand All @@ -53,13 +53,14 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
*/
private final GatewaySenderAttributes gatewaySenderAttributes;
private final GatewaySenderAttributesImpl gatewaySenderAttributes;

public AsyncEventQueueFactoryImpl(InternalCache cache) {
this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
this(cache, new GatewaySenderAttributesImpl(), DEFAULT_BATCH_TIME_INTERVAL);
}

AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes,
AsyncEventQueueFactoryImpl(InternalCache cache,
GatewaySenderAttributesImpl gatewaySenderAttributes,
int batchTimeInterval) {
this.cache = cache;
this.gatewaySenderAttributes = gatewaySenderAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}

@Override
public String getType() {
return "ParallelAsyncEventQueue";
}

private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
Expand All @@ -218,4 +223,5 @@ private ThreadsMonitoring getThreadMonitorObj() {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}

@Override
public String getType() {
return "SerialAsyncEventQueue";
}

private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2706,6 +2706,7 @@ public void setOverflowDirectory(String value) {
* &lt;attribute name="id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="remote-distributed-system-id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="parallel" type="{http://www.w3.org/2001/XMLSchema}boolean" /&gt;
* &lt;attribute name="type" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="manual-start" type="{http://www.w3.org/2001/XMLSchema}boolean" /&gt;
* &lt;attribute name="socket-buffer-size" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="socket-read-timeout" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
Expand Down Expand Up @@ -2746,6 +2747,8 @@ public static class GatewaySender {
protected String remoteDistributedSystemId;
@XmlAttribute(name = "parallel")
protected Boolean parallel;
@XmlAttribute(name = "type")
protected String type;
@XmlAttribute(name = "manual-start")
protected Boolean manualStart;
@XmlAttribute(name = "socket-buffer-size")
Expand Down Expand Up @@ -2946,6 +2949,29 @@ public void setParallel(Boolean value) {
parallel = value;
}

/**
* Gets the value of the parallel property.
*
* possible object is
* {@link String }
*
*/

public String getType() {
return type;
}

/**
* Sets the value of the type property.
*
* allowed object is
* {@link String }
*
*/
public void setType(String value) {
this.type = value;
}

/**
* Gets the value of the manualStart property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.util.List;

import org.apache.geode.annotations.Immutable;
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.util.internal.GeodeGlossary;

/**
Expand Down Expand Up @@ -155,32 +153,6 @@ public interface GatewaySender {
int CONNECTION_RETRY_INTERVAL = Integer
.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000);

/**
* Number of times to retry to get events for a transaction from the gateway sender queue when
* group-transaction-events is set to true.
* When group-transaction-events is set to true and a batch ready to be sent does not contain
* all the events for all the transactions to which the events belong, the gateway sender will try
* to get the missing events of the transactions from the queue to add them to the batch
* before sending it.
* If the missing events are not in the queue when the gateway sender tries to get them
* it will retry for a maximum of times equal to the value set in this parameter before
* delivering the batch without the missing events and logging an error.
* Setting this parameter to a very low value could cause that under heavy load and
* group-transaction-events set to true, batches are sent with incomplete transactions. Setting it
* to a high value could cause that under heavy load and group-transaction-events set to true,
* batches are held for some time before being sent.
*/
int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries",
10);
/**
* Milliseconds to wait before retrying to get events for a transaction from the
* gateway sender queue when group-transaction-events is true.
*/
int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
SystemProperty.getProductIntegerProperty(
SystemPropertyHelper.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS).orElse(1);

/**
* The order policy. This enum is applicable only when concurrency-level is &gt; 1.
*
Expand Down Expand Up @@ -423,10 +395,13 @@ enum OrderPolicy {
*/
boolean isParallel();

String getType();

/**
* Returns groupTransactionEvents boolean property for this GatewaySender.
*
* @return groupTransactionEvents boolean property for this GatewaySender
* @deprecated use {@link #getType()}.
*
*/
boolean mustGroupTransactionEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface GatewaySenderFactory {
*/
GatewaySenderFactory setGroupTransactionEvents(boolean groupTransactionEvents);

GatewaySenderFactory setType(String type);

/**
* Adds a <code>GatewayEventFilter</code>
*
Expand Down Expand Up @@ -212,7 +214,8 @@ public interface GatewaySenderFactory {
* @param filter The <code>GatewayEventSubstitutionFilter</code>
* @return this factory
*/
GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
GatewaySenderFactory setGatewayEventSubstitutionFilter(
GatewayEventSubstitutionFilter<?, ?> filter);

/**
* If true, receiver member id is checked by all dispatcher threads when the connection is
Expand Down
Loading

0 comments on commit fca45a7

Please sign in to comment.