Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
type: fix
issue: 5636
jira: SMILE-7648
title: "Previously, the number of threads allocated to the $expunge operation in certain cases could be more
than configured, this would cause hundreds of threads to be created and all available database connections
to be consumed. This has been fixed."
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.util.function.Consumer;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.oneOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

Expand Down Expand Up @@ -98,10 +99,10 @@ public void tenItemsBatch5() throws InterruptedException {
getPartitionRunner(5).runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertThat(partitionCall1.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall1.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(5, partitionCall1.size);
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertThat(partitionCall2.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall2.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(5, partitionCall2.size);
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
}
Expand All @@ -119,14 +120,38 @@ public void nineItemsBatch5() throws InterruptedException {
getPartitionRunner(5).runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();
PartitionCall partitionCall1 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 0);
assertThat(partitionCall1.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall1.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(true, nums.remove(partitionCall1.size));
PartitionCall partitionCall2 = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, 1);
assertThat(partitionCall2.threadName, isOneOf(TEST_THREADNAME_1, TEST_THREADNAME_2));
assertThat(partitionCall2.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
assertEquals(true, nums.remove(partitionCall2.size));
assertNotEquals(partitionCall1.threadName, partitionCall2.threadName);
}



/**
* See #5636 $expunge operation ignoring ExpungeThreadCount setting in certain cases
*/
@Test
public void testExpunge_withTasksSizeBiggerThanExecutorQueue_usesConfiguredNumberOfThreads() throws InterruptedException {
// setup
List<IResourcePersistentId> resourceIds = buildPidList(2500);
Consumer<List<IResourcePersistentId>> partitionConsumer = buildPartitionConsumer(myLatch);
// with batch size = 2 we expect 2500/2 runnableTasks to be created
myLatch.setExpectedCount(1250);

// execute
getPartitionRunner(2, 2).runInPartitionedThreads(resourceIds, partitionConsumer);
List<HookParams> calls = myLatch.awaitExpected();

// validate - only two threads should be used for execution
for (int i = 0; i < 1250; i++) {
PartitionCall partitionCall = (PartitionCall) PointcutLatch.getLatchInvocationParameter(calls, i);
assertThat(partitionCall.threadName, is(oneOf(TEST_THREADNAME_1, TEST_THREADNAME_2)));
}
}

@Test
public void tenItemsOneThread() throws InterruptedException {
List<IResourcePersistentId> resourceIds = buildPidList(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ private ExecutorService buildExecutor(int numberOfTasks) {
}
ourLog.info("Slot become available after {}ms", sw.getMillis());
};

// setting corePoolSize and maximumPoolSize to be the same as threadCount
// to ensure that the number of allocated threads for the expunge operation does not exceed the configured limit
// see ThreadPoolExecutor documentation for details
return new ThreadPoolExecutor(
threadCount,
MAX_POOL_SIZE,
threadCount,
0L,
TimeUnit.MILLISECONDS,
executorQueue,
Expand Down