Skip to content

Commit

Permalink
MODULE: Extract out TX grouping factories.
Browse files Browse the repository at this point in the history
  • Loading branch information
jake-at-work authored and albertogpz committed Feb 3, 2022
1 parent 7a3d45c commit 35b072e
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.txgrouping.parallel.TxGroupingParallelGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.txgrouping.serial.TxGroupingSerialGatewaySenderTypeFactory;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
Expand Down Expand Up @@ -231,9 +233,17 @@ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
static @NotNull GatewaySenderTypeFactory getGatewaySenderTypeFactory(
final @NotNull GatewaySenderAttributes attributes) {
if (attributes.isParallel()) {
return new ParallelGatewaySenderTypeFactory();
if (attributes.mustGroupTransactionEvents()) {
return new TxGroupingParallelGatewaySenderTypeFactory();
} else {
return new ParallelGatewaySenderTypeFactory();
}
} else {
return new SerialGatewaySenderTypeFactory();
if (attributes.mustGroupTransactionEvents()) {
return new TxGroupingSerialGatewaySenderTypeFactory();
} else {
return new SerialGatewaySenderTypeFactory();
}
}
}

Expand Down Expand Up @@ -285,13 +295,6 @@ static void validate(final @NotNull InternalCache cache,
}
}
}

if (attributes.mustGroupTransactionEvents() && attributes.isBatchConflationEnabled()) {
throw new GatewaySenderException(
format(
"GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled",
attributes.getId()));
}
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

public interface GatewaySenderTypeFactory {

@NotNull
String getType();

void validate(@NotNull MutableGatewaySenderAttributes attributes) throws GatewaySenderException;

GatewaySender create(@NotNull InternalCache cache, @NotNull StatisticsClock clock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,20 @@
import org.apache.geode.internal.statistics.StatisticsClock;

public class ParallelGatewaySenderTypeFactory implements GatewaySenderTypeFactory {

@Override
public @NotNull String getType() {
return "ParallelGatewaySender";
}

@Override
public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
throws GatewaySenderException {
if ((attributes.getOrderPolicy() != null)
&& attributes.getOrderPolicy().equals(GatewaySender.OrderPolicy.THREAD)) {
throw new GatewaySenderException(
format("Parallel Gateway Sender %s can not be created with OrderPolicy %s",
attributes.getId(), attributes.getOrderPolicy()));
format("%s %s can not be created with OrderPolicy %s",
getType(), attributes.getId(), attributes.getOrderPolicy()));
}
}

Expand All @@ -52,4 +58,5 @@ public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderCreation(cache, attributes);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,21 @@
import org.apache.geode.internal.statistics.StatisticsClock;

public class SerialGatewaySenderTypeFactory implements GatewaySenderTypeFactory {

@Override
public @NotNull String getType() {
return "SerialGatewaySender";
}

@Override
public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
throws GatewaySenderException {

if (!attributes.getAsyncEventListeners().isEmpty()) {
throw new GatewaySenderException(
format(
"SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.",
attributes.getId()));
}

if (attributes.mustGroupTransactionEvents() && attributes.getDispatcherThreads() > 1) {
throw new GatewaySenderException(
format(
"SerialGatewaySender %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1",
attributes.getId()));
"%s %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.",
getType(), attributes.getId()));
}

if (attributes.getOrderPolicy() == null && attributes.getDispatcherThreads() > 1) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping;

import static java.lang.String.format;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.internal.GatewaySenderTypeFactory;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;

public abstract class CommonTxGroupingGatewaySenderFactory {
public static void validate(final @NotNull GatewaySenderTypeFactory factory,
final @NotNull MutableGatewaySenderAttributes attributes) {
if (attributes.isBatchConflationEnabled()) {
throw new GatewaySenderException(
format(
"%s %s cannot be created with both group transaction events set to true and batch conflation enabled",
factory.getType(), attributes.getId()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping.parallel;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.txgrouping.CommonTxGroupingGatewaySenderFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;

public class TxGroupingParallelGatewaySenderTypeFactory extends ParallelGatewaySenderTypeFactory {

@Override
public @NotNull String getType() {
return "TxGroupingParallelGatewaySender";
}

@Override
public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
throws GatewaySenderException {
super.validate(attributes);

CommonTxGroupingGatewaySenderFactory.validate(this, attributes);
}

@Override
public GatewaySender create(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderImpl(cache, clock, attributes);
}

@Override
public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderCreation(cache, attributes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping.serial;

import static java.lang.String.format;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.txgrouping.CommonTxGroupingGatewaySenderFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;

public class TxGroupingSerialGatewaySenderTypeFactory extends SerialGatewaySenderTypeFactory {

@Override
public @NotNull String getType() {
return "TxGroupingSerialGatewaySender";
}

@Override
public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
throws GatewaySenderException {

super.validate(attributes);

CommonTxGroupingGatewaySenderFactory.validate(this, attributes);

if (attributes.getDispatcherThreads() > 1) {
throw new GatewaySenderException(
format(
"%s %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1",
getType(), attributes.getId()));
}

}

@Override
public GatewaySender create(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
return new SerialGatewaySenderImpl(cache, clock, attributes);
}

@Override
public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new SerialGatewaySenderCreation(cache, attributes);
}

}

0 comments on commit 35b072e

Please sign in to comment.