Skip to content

Commit

Permalink
GEODE-6588: Cleanup GatewaySenderFactoryImpl
Browse files Browse the repository at this point in the history
  • Loading branch information
jake-at-work authored and albertogpz committed Feb 3, 2022
1 parent 2457963 commit 8c2d23e
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 543 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public interface GatewaySenderFactory {
*
* @param filter The <code>GatewayEventSubstitutionFilter</code>
*/
GatewaySenderFactory setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
GatewaySenderFactory setGatewayEventSubstitutionFilter(
GatewayEventSubstitutionFilter<?, ?> filter);

/**
* If true, receiver member id is checked by all dispatcher threads when the connection is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public interface InternalGatewaySenderFactory extends GatewaySenderFactory {

GatewaySenderFactory setBucketSorted(boolean bucketSorted);

GatewaySender create(String senderIdFromAsyncEventQueueId);

void configureGatewaySender(GatewaySender senderCreation);

GatewaySenderFactory setLocatorDiscoveryCallback(LocatorDiscoveryCallback myLocatorCallback);
Expand Down
1 change: 0 additions & 1 deletion geode-wan/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ dependencies {
compileOnly('org.apache.logging.log4j:log4j-api')
compileOnly('org.jetbrains:annotations')


testImplementation(project(':geode-junit'))
testImplementation('org.assertj:assertj-core')
testImplementation('junit:junit')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2008,68 +2008,6 @@ public static String createSenderWithDiskStore(String dsName, int remoteDsId, bo
return persistentDirectory.getName();
}


public static void createSenderWithListener(String dsName, int remoteDsName, boolean isParallel,
Integer maxMemory, Integer batchSize, boolean isConflation, boolean isPersistent,
GatewayEventFilter filter, boolean attachTwoListeners, boolean isManualStart) {
File persistentDirectory =
new File(dsName + "_disk_" + System.currentTimeMillis() + "_" + VM.getCurrentVMNum());
persistentDirectory.mkdir();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
File[] dirs1 = new File[] {persistentDirectory};

if (isParallel) {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setParallel(true);
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManualStart);
// set dispatcher threads
gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
gateway.addGatewayEventFilter(filter);
}
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}
gateway.setBatchConflationEnabled(isConflation);
gateway.create(dsName, remoteDsName);

} else {
GatewaySenderFactory gateway = cache.createGatewaySenderFactory();
gateway.setMaximumQueueMemory(maxMemory);
gateway.setBatchSize(batchSize);
gateway.setManualStart(isManualStart);
// set dispatcher threads
gateway.setDispatcherThreads(numDispatcherThreadsForTheRun);
((InternalGatewaySenderFactory) gateway).setLocatorDiscoveryCallback(new MyLocatorCallback());
if (filter != null) {
gateway.addGatewayEventFilter(filter);
}
gateway.setBatchConflationEnabled(isConflation);
if (isPersistent) {
gateway.setPersistenceEnabled(true);
gateway.setDiskStoreName(dsf.setDiskDirs(dirs1).create(dsName).getName());
} else {
DiskStore store = dsf.setDiskDirs(dirs1).create(dsName);
gateway.setDiskStoreName(store.getName());
}

eventListener1 = new MyGatewaySenderEventListener();
((InternalGatewaySenderFactory) gateway).addAsyncEventListener(eventListener1);
if (attachTwoListeners) {
eventListener2 = new MyGatewaySenderEventListener2();
((InternalGatewaySenderFactory) gateway).addAsyncEventListener(eventListener2);
}
((InternalGatewaySenderFactory) gateway).create(dsName);
}
}

public static void createReceiverInVMs(int maximumTimeBetweenPings, VM... vms) {
for (VM vm : vms) {
vm.invoke(() -> createReceiverWithMaximumTimeBetweenPings(maximumTimeBetweenPings));
Expand Down
Loading

0 comments on commit 8c2d23e

Please sign in to comment.