Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import java.util.UUID;

import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static io.airlift.units.Duration.nanosSince;
import static io.trino.plugin.raptor.legacy.DatabaseTesting.createTestingJdbi;
import static io.trino.plugin.raptor.legacy.metadata.SchemaDaoUtil.createTablesWithRetry;
import static io.trino.plugin.raptor.legacy.metadata.TestDatabaseShardManager.createShardManager;
import static io.trino.plugin.raptor.legacy.storage.organization.ShardOrganizationManager.createOrganizationSets;
import static io.trino.plugin.raptor.legacy.storage.organization.TestCompactionSetCreator.extractIndexes;
import static io.trino.plugin.raptor.legacy.storage.organization.TestShardOrganizer.createShardOrganizer;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.TimestampType.TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -208,4 +208,19 @@ private ShardOrganizationManager createShardOrganizationManager(long intervalMil
new Duration(intervalMillis, MILLISECONDS),
new Duration(5, MINUTES));
}

private static class MockJobFactory
implements JobFactory
{
@Override
public Runnable create(OrganizationSet organizationSet)
{
return () -> sleepUninterruptibly(10, MILLISECONDS);
}
}

private static ShardOrganizer createShardOrganizer()
{
return new ShardOrganizer(new MockJobFactory(), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static com.google.common.util.concurrent.Uninterruptibles.awaitUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;

public class TestShardOrganizer
Expand All @@ -33,7 +36,10 @@ public class TestShardOrganizer
public void testShardOrganizerInProgress()
throws Exception
{
ShardOrganizer organizer = createShardOrganizer();
CountDownLatch canComplete = new CountDownLatch(1);
ShardOrganizer organizer = new ShardOrganizer(
organizationSet -> () -> checkState(awaitUninterruptibly(canComplete, 10, SECONDS)),
1);

Set<UUID> shards = ImmutableSet.of(UUID.randomUUID());
OrganizationSet organizationSet = new OrganizationSet(1L, shards, OptionalInt.empty());
Expand All @@ -43,26 +49,12 @@ public void testShardOrganizerInProgress()
assertThat(organizer.inProgress(getOnlyElement(shards))).isTrue();
assertThat(organizer.getShardsInProgress()).isEqualTo(1);

canComplete.countDown();
while (organizer.inProgress(getOnlyElement(shards))) {
MILLISECONDS.sleep(10);
}
assertThat(organizer.inProgress(getOnlyElement(shards))).isFalse();
assertThat(organizer.getShardsInProgress()).isEqualTo(0);
organizer.shutdown();
}

private static class MockJobFactory
implements JobFactory
{
@Override
public Runnable create(OrganizationSet organizationSet)
{
return () -> sleepUninterruptibly(10, MILLISECONDS);
}
}

static ShardOrganizer createShardOrganizer()
{
return new ShardOrganizer(new MockJobFactory(), 1);
}
}