Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT DO NOT REVIEW] WAN transactions extracted to module #7810

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions boms/geode-all-bom/src/test/resources/expected-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,12 @@
<version>${version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-wan-txgrouping</artifactId>
<version>${version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.geode</groupId>
<artifactId>geode-web</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@
"Method org.apache.geode.cache.query.IndexStatistics.getReadLockCountLong()": "Added new methods.",
"Method org.apache.geode.management.MemberMXBean.getOffHeapFragments()": "Added new stat",
"Method org.apache.geode.management.MemberMXBean.getOffHeapFreedChunks()": "Added new stat",
"Method org.apache.geode.management.MemberMXBean.getOffHeapLargestFragment()": "Added new stat"
"Method org.apache.geode.management.MemberMXBean.getOffHeapLargestFragment()": "Added new stat",
"Class org.apache.geode.cache.wan.GatewaySender":"Added to support new types of gatewaysenders",
"Method org.apache.geode.cache.wan.GatewaySender.getType()":"Added to support new types of gatewaysenders",
"Class org.apache.geode.cache.wan.GatewaySenderFactory":"Added to support new types of gatewaysenders",
"Method org.apache.geode.cache.wan.GatewaySenderFactory.setType(java.lang.String)":"Added to support new types of gatewaysenders",
"Class org.apache.geode.management.GatewaySenderMXBean":"Added to support new types of gatewaysenders",
"Method org.apache.geode.management.GatewaySenderMXBean.getType()":"Added to support new types of gatewaysenders"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import org.apache.geode.cache.CacheFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.test.junit.categories.AEQTest;

@Category({AEQTest.class})
Expand Down Expand Up @@ -55,7 +55,7 @@ public void tearDown() {
@Test
public void testStopClearsStats() {

GatewaySenderAttributes attrs = new GatewaySenderAttributes();
GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
Expand All @@ -81,7 +81,7 @@ public void testStopClearsStats() {

@Test
public void testStopStart() {
GatewaySenderAttributes attrs = new GatewaySenderAttributes();
GatewaySenderAttributesImpl attrs = new GatewaySenderAttributesImpl();
String tempId = AsyncEventQueueImpl.ASYNC_EVENT_QUEUE_PREFIX + "id";
attrs.setId(tempId);
SerialAsyncEventQueueImpl queue = new SerialAsyncEventQueueImpl(cache,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.AsyncEventQueueConfigurationException;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.internal.cache.wan.InternalGatewaySender;
import org.apache.geode.internal.cache.xmlcache.AsyncEventQueueCreation;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
Expand All @@ -53,13 +53,14 @@ public class AsyncEventQueueFactoryImpl implements AsyncEventQueueFactory {
* Used internally to pass the attributes from this factory to the real GatewaySender it is
* creating.
*/
private final GatewaySenderAttributes gatewaySenderAttributes;
private final GatewaySenderAttributesImpl gatewaySenderAttributes;

public AsyncEventQueueFactoryImpl(InternalCache cache) {
this(cache, new GatewaySenderAttributes(), DEFAULT_BATCH_TIME_INTERVAL);
this(cache, new GatewaySenderAttributesImpl(), DEFAULT_BATCH_TIME_INTERVAL);
}

AsyncEventQueueFactoryImpl(InternalCache cache, GatewaySenderAttributes gatewaySenderAttributes,
AsyncEventQueueFactoryImpl(InternalCache cache,
GatewaySenderAttributesImpl gatewaySenderAttributes,
int batchTimeInterval) {
this.cache = cache;
this.gatewaySenderAttributes = gatewaySenderAttributes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}

@Override
public String getType() {
return "ParallelAsyncEventQueue";
}

private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
Expand All @@ -218,4 +223,5 @@ private ThreadsMonitoring getThreadMonitorObj() {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ public void setModifiedEventId(EntryEventImpl clonedEvent) {
clonedEvent.setEventId(newEventId);
}

@Override
public String getType() {
return "SerialAsyncEventQueue";
}

private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = cache.getDistributionManager();
if (distributionManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2706,6 +2706,7 @@ public void setOverflowDirectory(String value) {
* &lt;attribute name="id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="remote-distributed-system-id" use="required" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="parallel" type="{http://www.w3.org/2001/XMLSchema}boolean" /&gt;
* &lt;attribute name="type" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="manual-start" type="{http://www.w3.org/2001/XMLSchema}boolean" /&gt;
* &lt;attribute name="socket-buffer-size" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
* &lt;attribute name="socket-read-timeout" type="{http://www.w3.org/2001/XMLSchema}string" /&gt;
Expand Down Expand Up @@ -2746,6 +2747,8 @@ public static class GatewaySender {
protected String remoteDistributedSystemId;
@XmlAttribute(name = "parallel")
protected Boolean parallel;
@XmlAttribute(name = "type")
protected String type;
@XmlAttribute(name = "manual-start")
protected Boolean manualStart;
@XmlAttribute(name = "socket-buffer-size")
Expand Down Expand Up @@ -2913,11 +2916,26 @@ public void setRemoteDistributedSystemId(String value) {
remoteDistributedSystemId = value;
}

/**
* Gets the value of the mustGroupTransactionEvents property.
*
* possible object is
* {@link Boolean }
*
* @return the value of the property.
*/
public Boolean mustGroupTransactionEvents() {
return groupTransactionEvents;
}


/**
* Sets the value of the groupTransactionsEvent property.
*
* allowed object is
* {@link Boolean }
*
* @param value the value for the property.
*/
public void setGroupTransactionEvents(Boolean value) {
groupTransactionEvents = value;
}
Expand Down Expand Up @@ -2946,6 +2964,30 @@ public void setParallel(Boolean value) {
parallel = value;
}

/**
* Gets the value of the type property.
*
* possible object is
* {@link String }
*
* @return the type.
*/
public String getType() {
return type;
}

/**
* Sets the value of the type property.
*
* allowed object is
* {@link String }
*
* @param value the type.
*/
public void setType(String value) {
this.type = value;
}

/**
* Gets the value of the manualStart property.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
import java.util.List;

import org.apache.geode.annotations.Immutable;
import org.apache.geode.internal.lang.SystemProperty;
import org.apache.geode.internal.lang.SystemPropertyHelper;
import org.apache.geode.util.internal.GeodeGlossary;

/**
Expand Down Expand Up @@ -155,32 +153,6 @@ public interface GatewaySender {
int CONNECTION_RETRY_INTERVAL = Integer
.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "gateway-connection-retry-interval", 1000);

/**
* Number of times to retry to get events for a transaction from the gateway sender queue when
* group-transaction-events is set to true.
* When group-transaction-events is set to true and a batch ready to be sent does not contain
* all the events for all the transactions to which the events belong, the gateway sender will try
* to get the missing events of the transactions from the queue to add them to the batch
* before sending it.
* If the missing events are not in the queue when the gateway sender tries to get them
* it will retry for a maximum of times equal to the value set in this parameter before
* delivering the batch without the missing events and logging an error.
* Setting this parameter to a very low value could cause that under heavy load and
* group-transaction-events set to true, batches are sent with incomplete transactions. Setting it
* to a high value could cause that under heavy load and group-transaction-events set to true,
* batches are held for some time before being sent.
*/
int GET_TRANSACTION_EVENTS_FROM_QUEUE_RETRIES =
Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + "get-transaction-events-from-queue-retries",
10);
/**
* Milliseconds to wait before retrying to get events for a transaction from the
* gateway sender queue when group-transaction-events is true.
*/
int GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS =
SystemProperty.getProductIntegerProperty(
SystemPropertyHelper.GET_TRANSACTION_EVENTS_FROM_QUEUE_WAIT_TIME_MS).orElse(1);

/**
* The order policy. This enum is applicable only when concurrency-level is &gt; 1.
*
Expand Down Expand Up @@ -423,10 +395,13 @@ enum OrderPolicy {
*/
boolean isParallel();

String getType();

/**
* Returns groupTransactionEvents boolean property for this GatewaySender.
*
* @return groupTransactionEvents boolean property for this GatewaySender
* @deprecated use {@link #getType()}.
*
*/
boolean mustGroupTransactionEvents();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public interface GatewaySenderFactory {
*/
GatewaySenderFactory setGroupTransactionEvents(boolean groupTransactionEvents);

GatewaySenderFactory setType(String type);

/**
* Adds a <code>GatewayEventFilter</code>
*
Expand Down Expand Up @@ -212,7 +214,8 @@ public interface GatewaySenderFactory {
* @param filter The <code>GatewayEventSubstitutionFilter</code>
* @return this factory
*/
GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
GatewaySenderFactory setGatewayEventSubstitutionFilter(
GatewayEventSubstitutionFilter<?, ?> filter);

/**
* If true, receiver member id is checked by all dispatcher threads when the connection is
Expand Down
Loading