Skip to content

Commit

Permalink
MODULE: Create TX grouping sender impls.
Browse files Browse the repository at this point in the history
  • Loading branch information
jake-at-work authored and albertogpz committed Feb 3, 2022
1 parent 35b072e commit f7c0d21
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ public abstract class AbstractGatewaySender implements InternalGatewaySender, Di

protected boolean isParallel;

protected boolean groupTransactionEvents;

protected int retriesToGetTransactionEventsFromQueue;

protected boolean isForInternalUse;
Expand Down Expand Up @@ -257,7 +255,6 @@ public AbstractGatewaySender(InternalCache cache, StatisticsClock statisticsCloc
alertThreshold = attrs.getAlertThreshold();
copyDeprecatedAttributes(attrs);
isParallel = attrs.isParallel();
groupTransactionEvents = attrs.mustGroupTransactionEvents();
retriesToGetTransactionEventsFromQueue = attrs.getRetriesToGetTransactionEventsFromQueue();
isForInternalUse = attrs.isForInternalUse();
diskStoreName = attrs.getDiskStoreName();
Expand Down Expand Up @@ -564,7 +561,7 @@ public boolean isParallel() {

@Override
public boolean mustGroupTransactionEvents() {
return groupTransactionEvents;
return false;
}

/**
Expand Down Expand Up @@ -743,7 +740,7 @@ public void setBatchTimeInterval(int batchTimeInterval) {
*
*/
public void setGroupTransactionEvents(boolean groupTransactionEvents) {
this.groupTransactionEvents = groupTransactionEvents;
// TODO jbarrett remove this
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping.parallel;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;

public class TxGroupingParallelGatewaySenderCreation extends ParallelGatewaySenderCreation {

public TxGroupingParallelGatewaySenderCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
super(cache, attributes);
}

@Override
public boolean mustGroupTransactionEvents() {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping.parallel;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.statistics.StatisticsClock;

public class TxGroupingParallelGatewaySenderImpl extends ParallelGatewaySenderImpl {

public TxGroupingParallelGatewaySenderImpl(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
super(cache, clock, attributes);
}

@Override
public boolean mustGroupTransactionEvents() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderImpl;
import org.apache.geode.cache.wan.internal.parallel.ParallelGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.txgrouping.CommonTxGroupingGatewaySenderFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.ParallelGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;

public class TxGroupingParallelGatewaySenderTypeFactory extends ParallelGatewaySenderTypeFactory {
Expand All @@ -47,12 +45,12 @@ public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
public GatewaySender create(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderImpl(cache, clock, attributes);
return new TxGroupingParallelGatewaySenderImpl(cache, clock, attributes);
}

@Override
public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new ParallelGatewaySenderCreation(cache, attributes);
return new TxGroupingParallelGatewaySenderCreation(cache, attributes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping.serial;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;

public class TxGroupingSerialGatewaySenderCreation extends SerialGatewaySenderCreation {

public TxGroupingSerialGatewaySenderCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
super(cache, attributes);
}

@Override
public boolean mustGroupTransactionEvents() {
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.cache.wan.internal.txgrouping.serial;

import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.statistics.StatisticsClock;

public class TxGroupingSerialGatewaySenderImpl extends SerialGatewaySenderImpl {

public TxGroupingSerialGatewaySenderImpl(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
super(cache, clock, attributes);
}

@Override
public boolean mustGroupTransactionEvents() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,12 @@
import org.jetbrains.annotations.NotNull;

import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderImpl;
import org.apache.geode.cache.wan.internal.serial.SerialGatewaySenderTypeFactory;
import org.apache.geode.cache.wan.internal.txgrouping.CommonTxGroupingGatewaySenderFactory;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.GatewaySenderAttributes;
import org.apache.geode.internal.cache.wan.GatewaySenderException;
import org.apache.geode.internal.cache.wan.MutableGatewaySenderAttributes;
import org.apache.geode.internal.cache.xmlcache.SerialGatewaySenderCreation;
import org.apache.geode.internal.statistics.StatisticsClock;

public class TxGroupingSerialGatewaySenderTypeFactory extends SerialGatewaySenderTypeFactory {
Expand Down Expand Up @@ -58,13 +56,13 @@ public void validate(final @NotNull MutableGatewaySenderAttributes attributes)
public GatewaySender create(final @NotNull InternalCache cache,
final @NotNull StatisticsClock clock,
final @NotNull GatewaySenderAttributes attributes) {
return new SerialGatewaySenderImpl(cache, clock, attributes);
return new TxGroupingSerialGatewaySenderImpl(cache, clock, attributes);
}

@Override
public GatewaySender createCreation(final @NotNull InternalCache cache,
final @NotNull GatewaySenderAttributes attributes) {
return new SerialGatewaySenderCreation(cache, attributes);
return new TxGroupingSerialGatewaySenderCreation(cache, attributes);
}

}

0 comments on commit f7c0d21

Please sign in to comment.