Skip to content

Commit

Permalink
MODULE: Split out some factories.
Browse files Browse the repository at this point in the history
  • Loading branch information
jake-at-work authored and albertogpz committed Feb 3, 2022
1 parent ae74a42 commit 7a3d45c
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.internal.cache.wan;

import org.jetbrains.annotations.Nullable;

import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;

public interface MutableGatewaySenderAttributes extends GatewaySenderAttributes {

void setOrderPolicy(@Nullable OrderPolicy orderPolicy);

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
*/
package org.apache.geode.cache.wan.internal;

import static java.lang.String.format;

import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.logging.log4j.Logger;
Expand All @@ -27,17 +29,15 @@
import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.cache.wan.GatewaySenderFactory;
import org.apache.geode.cache.wan.GatewayTransportFilter;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderTypeFactory;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributesImpl;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
import org.apache.geode.internal.cache.xmlcache.CacheCreation;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.logging.internal.log4j.api.LogService;

Expand Down Expand Up @@ -217,119 +217,97 @@ public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(

@Override
public @NotNull GatewaySender create(final @NotNull String id, final int remoteDSId) {
int myDSId = InternalDistributedSystem.getAnyInstance().getDistributionManager()
.getDistributedSystemId();
attrs.setId(id);
attrs.setRemoteDs(remoteDSId);

validate(cache, attrs);

final GatewaySenderTypeFactory factory = getGatewaySenderTypeFactory(attrs);
factory.validate(attrs);

return createGatewaySender(factory, cache, statisticsClock, attrs);
}

static @NotNull GatewaySenderTypeFactory getGatewaySenderTypeFactory(
final @NotNull GatewaySenderAttributes attributes) {
if (attributes.isParallel()) {
return new ParallelGatewaySenderTypeFactory();
} else {
return new SerialGatewaySenderTypeFactory();
}
}

static void validate(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributesImpl attributes) {
final int myDSId = cache.getDistributionManager().getDistributedSystemId();
final int remoteDSId = attributes.getRemoteDSId();

if (remoteDSId == myDSId) {
throw new GatewaySenderException(
String.format(
format(
"GatewaySender %s cannot be created with remote DS Id equal to this DS Id. ",
id));
attributes.getId()));
}
if (remoteDSId < 0) {
throw new GatewaySenderException(
String.format("GatewaySender %s cannot be created with remote DS Id less than 0. ",
id));
format("GatewaySender %s cannot be created with remote DS Id less than 0. ",
attributes.getId()));
}
attrs.setId(id);
attrs.setRemoteDs(remoteDSId);
GatewaySender sender;

if (attrs.getDispatcherThreads() <= 0) {
if (attributes.getDispatcherThreads() <= 0) {
throw new GatewaySenderException(
String.format("GatewaySender %s can not be created with dispatcher threads less than 1",
id));
format("GatewaySender %s can not be created with dispatcher threads less than 1",
attributes.getId()));
}

// TODO jbarrett why only check these for a real cache.
// Verify socket read timeout if a proper logger is available
if (cache instanceof GemFireCacheImpl) {
// If socket read timeout is less than the minimum, log a warning.
// Ideally, this should throw a GatewaySenderException, but wan dunit tests
// were failing, and we were running out of time to change them.
if (attrs.getSocketReadTimeout() != 0
&& attrs.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
if (attributes.getSocketReadTimeout() != 0
&& attributes.getSocketReadTimeout() < GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT) {
logger.warn(
"{} cannot configure socket read timeout of {} milliseconds because it is less than the minimum of {} milliseconds. The default will be used instead.",
new Object[] {"GatewaySender " + id, attrs.getSocketReadTimeout(),
GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT});
attrs.setSocketReadTimeout(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT);
"GatewaySender " + attributes.getId(), attributes.getSocketReadTimeout(),
GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT);
attributes.setSocketReadTimeout(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT);
}

// Log a warning if the old system property is set.
if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
logger.warn(
"Obsolete java system property named {} was set to control {}. This property is no longer supported. Please use the GemFire API instead.",
new Object[] {GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY,
"GatewaySender socket read timeout"});
GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY,
"GatewaySender socket read timeout");
}
}
}
if (attrs.mustGroupTransactionEvents() && attrs.isBatchConflationEnabled()) {
throw new GatewaySenderException(
String.format(
"GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled",
id));
}

if (attrs.isParallel()) {
sender = createParallelGatewaySender(id);
} else {
sender = createSerialGatewaySender(id);
}
return sender;
}

private @NotNull GatewaySender createSerialGatewaySender(final @NotNull String id) {
if (attrs.getAsyncEventListeners().size() > 0) {
throw new GatewaySenderException(
String.format(
"SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.",
id));
}
if (attrs.mustGroupTransactionEvents() && attrs.getDispatcherThreads() > 1) {
if (attributes.mustGroupTransactionEvents() && attributes.isBatchConflationEnabled()) {
throw new GatewaySenderException(
String.format(
"SerialGatewaySender %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1",
id));
}
if (attrs.getOrderPolicy() == null && attrs.getDispatcherThreads() > 1) {
attrs.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY);
}

GatewaySender sender = null;
if (cache instanceof GemFireCacheImpl) {
sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
cache.addGatewaySender(sender);
if (!attrs.isManualStart()) {
sender.start();
}
} else if (cache instanceof CacheCreation) {
sender = new SerialGatewaySenderCreation(cache, attrs);
cache.addGatewaySender(sender);
} else {
throw new IllegalStateException();
format(
"GatewaySender %s cannot be created with both group transaction events set to true and batch conflation enabled",
attributes.getId()));
}
return sender;
}

private @NotNull GatewaySender createParallelGatewaySender(final @NotNull String id) {
if ((attrs.getOrderPolicy() != null)
&& attrs.getOrderPolicy().equals(OrderPolicy.THREAD)) {
throw new GatewaySenderException(
String.format("Parallel Gateway Sender %s can not be created with OrderPolicy %s",
id, attrs.getOrderPolicy()));
}

@NotNull
private static GatewaySender createGatewaySender(final @NotNull GatewaySenderTypeFactory factory,
final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributesImpl attributes) {
final GatewaySender sender;
if (cache instanceof GemFireCacheImpl) {
sender = new ParallelGatewaySenderImpl(cache, statisticsClock, attrs);
sender = factory.create(cache, clock, attributes);
cache.addGatewaySender(sender);

if (!attrs.isManualStart()) {
if (!attributes.isManualStart()) {
sender.start();
}
} else if (cache instanceof CacheCreation) {
sender = new ParallelGatewaySenderCreation(cache, attrs);
sender = factory.createCreation(cache, attributes);
cache.addGatewaySender(sender);
} else {
throw new IllegalStateException();
Expand Down Expand Up @@ -383,4 +361,5 @@ public void configureGatewaySender(GatewaySender senderCreation) {
attrs.setEnforceThreadsConnectSameReceiver(
senderCreation.getEnforceThreadsConnectSameReceiver());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.statistics.StatisticsClock;

public interface GatewaySenderTypeFactory {

void validate(@NotNull MutableGatewaySenderAttributes attributes) throws GatewaySenderException;

GatewaySender create(@NotNull InternalCache cache, @NotNull StatisticsClock clock,
@NotNull GatewaySenderAttributes attributes);

GatewaySender createCreation(@NotNull InternalCache cache,
@NotNull GatewaySenderAttributes attributes);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.parallel;

import static java.lang.String.format;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.internal.GatewaySenderTypeFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;

public class ParallelGatewaySenderTypeFactory implements GatewaySenderTypeFactory {
@Override
public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
throws GatewaySenderException {
if ((attributes.getOrderPolicy() != null)
&& attributes.getOrderPolicy().equals(GatewaySender.OrderPolicy.THREAD)) {
throw new GatewaySenderException(
format("Parallel Gateway Sender %s can not be created with OrderPolicy %s",
attributes.getId(), attributes.getOrderPolicy()));
}
}

@Override
public GatewaySender create(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderImpl(cache, clock, attributes);
}

@Override
public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderCreation(cache, attributes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.serial;

import static java.lang.String.format;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.internal.GatewaySenderTypeFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;

public class SerialGatewaySenderTypeFactory implements GatewaySenderTypeFactory {
@Override
public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
throws GatewaySenderException {

if (!attributes.getAsyncEventListeners().isEmpty()) {
throw new GatewaySenderException(
format(
"SerialGatewaySender %s cannot define a remote site because at least AsyncEventListener is already added. Both listeners and remote site cannot be defined for the same gateway sender.",
attributes.getId()));
}

if (attributes.mustGroupTransactionEvents() && attributes.getDispatcherThreads() > 1) {
throw new GatewaySenderException(
format(
"SerialGatewaySender %s cannot be created with group transaction events set to true when dispatcher threads is greater than 1",
attributes.getId()));
}

if (attributes.getOrderPolicy() == null && attributes.getDispatcherThreads() > 1) {
attributes.setOrderPolicy(GatewaySender.DEFAULT_ORDER_POLICY);
}

}

@Override
public GatewaySender create(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
return new SerialGatewaySenderImpl(cache, clock, attributes);
}

@Override
public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new SerialGatewaySenderCreation(cache, attributes);
}
}
Loading

0 comments on commit 7a3d45c

Please sign in to comment.