Skip to content

Commit

Permalink
fix flaky test in tiered storage
Browse files Browse the repository at this point in the history
  • Loading branch information
ShadowySpirits committed Feb 20, 2023
1 parent 0f70499 commit d80c6a0
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ public class TieredContainerManager {
private final TieredMessageStoreConfig storeConfig;

public static TieredContainerManager getInstance(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return instance;
}

if (instance == null) {
synchronized (TieredContainerManager.class) {
if (instance == null) {
try {
instance = new TieredContainerManager(storeConfig);
} catch (Exception ignored) {
} catch (Exception e) {
logger.error("TieredContainerManager#getInstance: create container manager failed", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public long getCommitMsgQueueOffset() {
}
}

private void loadFromMetadata() {
protected void loadFromMetadata() {
fileSegmentList.clear();
needCommitFileSegmentList.clear();

metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(), metadata -> {
if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public static boolean isSystemTopic(final String topic) {
}

public static TieredMetadataStore getMetadataStore(TieredMessageStoreConfig storeConfig) {
if (storeConfig == null) {
return metadataStoreInstance;
}

if (metadataStoreInstance == null) {
synchronized (TieredMetadataStore.class) {
if (metadataStoreInstance == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public void setUp() {
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(storePath));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyContainerManager();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public void setUp() {
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(storePath));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyContainerManager();
}

public Triple<TieredMessageFetcher, ByteBuffer, ByteBuffer> buildFetcher() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.rocketmq.tieredstore;

import java.lang.reflect.Field;
import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.Assert;

public class TieredStoreTestUtil {
public static void destroyMetadataStore() {
TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(null);
if (metadataStore != null) {
metadataStore.destroy();
}
try {
Field field = TieredStoreUtil.class.getDeclaredField("metadataStoreInstance");
field.setAccessible(true);
field.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage());
}
}

public static void destroyContainerManager() {
TieredContainerManager containerManager = TieredContainerManager.getInstance(null);
if (containerManager != null) {
containerManager.cleanup();
}
try {
Field field = TieredContainerManager.class.getDeclaredField("instance");
field.setAccessible(true);
field.set(null, null);
} catch (NoSuchFieldException | IllegalAccessException e) {
Assert.fail(e.getClass().getCanonicalName() + ": " + e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
Expand Down Expand Up @@ -51,8 +52,8 @@ public void setUp() {
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(storePath));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyContainerManager();
}


Expand All @@ -66,7 +67,9 @@ public void testLoadAndDestroy() {
boolean load = containerManager.load();
Assert.assertTrue(load);

Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> containerManager.getAllMQContainer().size() == 2);
Awaitility.await()
.atMost(3, TimeUnit.SECONDS)
.until(() -> containerManager.getAllMQContainer().size() == 2);

TieredMessageQueueContainer container = containerManager.getMQContainer(mq);
Assert.assertNotNull(container);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
Expand Down Expand Up @@ -49,7 +50,7 @@ public void setUp() {
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(storePath));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredStoreTestUtil.destroyMetadataStore();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.common.BoundaryType;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
Expand Down Expand Up @@ -60,8 +61,8 @@ public void setUp() {
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(storePath));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredContainerManager.getInstance(storeConfig).cleanup();
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyContainerManager();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.apache.rocketmq.tieredstore.container.TieredCommitLog;
import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -58,7 +58,7 @@ public void setUp() {
@After
public void tearDown() throws IOException {
FileUtils.deleteDirectory(new File(storePath));
TieredStoreUtil.getMetadataStore(storeConfig).destroy();
TieredStoreTestUtil.destroyMetadataStore();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@
package org.apache.rocketmq.tieredstore.metrics;

import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.io.IOException;
import org.apache.rocketmq.tieredstore.TieredMessageFetcher;
import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
import org.junit.After;
import org.junit.Test;

public class TieredStoreMetricsManagerTest {

@After
public void tearDown() throws IOException {
TieredStoreTestUtil.destroyMetadataStore();
TieredStoreTestUtil.destroyContainerManager();
}


@Test
public void getMetricsView() {
TieredStoreMetricsManager.getMetricsView();
Expand Down

0 comments on commit d80c6a0

Please sign in to comment.