Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: multiple patches during long running tests for LMQ over RocksDB #8915

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ public BrokerController(
this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat()) : new BrokerStatsManager(this.brokerConfig.getBrokerClusterName(), this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if (ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion())) {
this.configStorage = new ConfigStorage(messageStoreConfig.getStorePathRootDir());
this.configStorage = new ConfigStorage(messageStoreConfig);
this.topicConfigManager = new TopicConfigManagerV2(this, configStorage);
this.subscriptionGroupManager = new SubscriptionGroupManagerV2(this, configStorage);
this.consumerOffsetManager = new ConsumerOffsetManagerV2(this, configStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@
*/
package org.apache.rocketmq.broker.config.v2;

import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.config.AbstractRocksDBStorage;
import org.apache.rocketmq.common.config.ConfigHelper;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
Expand All @@ -43,8 +50,37 @@ public class ConfigStorage extends AbstractRocksDBStorage {
public static final String DATA_VERSION_KEY = "data_version";
public static final byte[] DATA_VERSION_KEY_BYTES = DATA_VERSION_KEY.getBytes(StandardCharsets.UTF_8);

public ConfigStorage(String storePath) {
super(storePath + File.separator + "config" + File.separator + "rdb");
private final ScheduledExecutorService scheduledExecutorService;

/**
* Number of write ops since previous flush.
*/
private final AtomicInteger writeOpsCounter;

private final MessageStoreConfig messageStoreConfig;

public ConfigStorage(MessageStoreConfig messageStoreConfig) {
super(messageStoreConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "rdb");
this.messageStoreConfig = messageStoreConfig;
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("config_storage_"));
writeOpsCounter = new AtomicInteger(0);
}

private void statNettyMemory() {
PooledByteBufAllocatorMetric metric = AbstractRocksDBStorage.POOLED_ALLOCATOR.metric();
LOGGER.info("Netty Memory Usage: {}", metric);
}

@Override
public synchronized boolean start() {
boolean started = super.start();
if (started) {
scheduledExecutorService.scheduleWithFixedDelay(() -> statRocksdb(LOGGER), 1, 10, TimeUnit.SECONDS);
scheduledExecutorService.scheduleWithFixedDelay(this::statNettyMemory, 10, 10, TimeUnit.SECONDS);
} else {
LOGGER.error("Failed to start config storage");
}
return started;
}

@Override
Expand All @@ -58,15 +94,15 @@ protected boolean postLoad() {
initOptions();
List<ColumnFamilyDescriptor> cfDescriptors = new ArrayList<>();

ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigOptions();
ColumnFamilyOptions defaultOptions = ConfigHelper.createConfigColumnFamilyOptions();
this.cfOptions.add(defaultOptions);
cfDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, defaultOptions));

// Start RocksDB instance
open(cfDescriptors);

this.defaultCFHandle = cfHandles.get(0);
} catch (final Exception e) {
} catch (Exception e) {
AbstractRocksDBStorage.LOGGER.error("postLoad Failed. {}", this.dbPath, e);
return false;
}
Expand All @@ -75,7 +111,7 @@ protected boolean postLoad() {

@Override
protected void preShutdown() {

scheduledExecutorService.shutdown();
}

protected void initOptions() {
Expand Down Expand Up @@ -105,6 +141,15 @@ public byte[] get(ByteBuffer key) throws RocksDBException {

public void write(WriteBatch writeBatch) throws RocksDBException {
db.write(ableWalWriteOptions, writeBatch);
accountWriteOpsForWalFlush();
}

private void accountWriteOpsForWalFlush() throws RocksDBException {
int writeCount = writeOpsCounter.incrementAndGet();
if (writeCount >= messageStoreConfig.getRocksdbFlushWalFrequency()) {
this.db.flushWal(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be flushWal(true) better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The frequency of commit-offset may be very high, I prefer to keep this periodic flush async.

writeOpsCounter.getAndAdd(-writeCount);
}
}

public RocksIterator iterate(ByteBuffer beginKey, ByteBuffer endKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ public void updateSubscriptionGroupConfig(final SubscriptionGroupConfig config)
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
// fdatasync on core metadata change
persist();
} catch (RocksDBException e) {
log.error("update subscription group config error", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ public void updateTopicConfig(final TopicConfig topicConfig) {
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
ConfigHelper.stampDataVersion(writeBatch, dataVersion, stateMachineVersion);
configStorage.write(writeBatch);
// fdatasync on core metadata change
this.persist();
} catch (RocksDBException e) {
log.error("Failed to update topic config", e);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -44,6 +45,8 @@ public class ConsumerOffsetManagerV2Test {
@Mock
private BrokerController controller;

private MessageStoreConfig messageStoreConfig;

@Rule
public TemporaryFolder tf = new TemporaryFolder();

Expand All @@ -60,7 +63,9 @@ public void setUp() throws IOException {
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();

File configStoreDir = tf.newFolder();
configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage);
}
Expand All @@ -84,7 +89,9 @@ public void testCommitOffset_Standard() {
consumerOffsetManagerV2.getOffsetTable().clear();
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId));

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId));
}
Expand All @@ -106,7 +113,9 @@ public void testCommitOffset_LMQ() {

configStorage.shutdown();

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic, queueId));
}
Expand All @@ -129,7 +138,9 @@ public void testCommitPullOffset_LMQ() {

configStorage.shutdown();

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryPullOffset(group, topic, queueId));
}
Expand Down Expand Up @@ -157,7 +168,10 @@ public void testRemoveByTopicAtGroup() {
Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId));

configStorage.shutdown();

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId));
Assert.assertEquals(queueOffset, consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
Expand All @@ -184,7 +198,10 @@ public void testRemoveByGroup() {
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic2, queueId));

configStorage.shutdown();

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
consumerOffsetManagerV2 = new ConsumerOffsetManagerV2(controller, configStorage);
consumerOffsetManagerV2.load();
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic, queueId));
Assert.assertEquals(-1L, consumerOffsetManagerV2.queryOffset(group, topic2, queueId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.rocketmq.remoting.protocol.subscription.GroupRetryPolicyType;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -38,6 +39,9 @@

@RunWith(MockitoJUnitRunner.class)
public class SubscriptionGroupManagerV2Test {

private MessageStoreConfig messageStoreConfig;

private ConfigStorage configStorage;

private SubscriptionGroupManagerV2 subscriptionGroupManagerV2;
Expand Down Expand Up @@ -68,7 +72,9 @@ public void setUp() throws IOException {
Mockito.doReturn(1L).when(messageStore).getStateMachineVersion();

File configStoreDir = tf.newFolder();
configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
messageStoreConfig = new MessageStoreConfig();
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());
configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage);
}
Expand Down Expand Up @@ -98,7 +104,10 @@ public void testUpdateSubscriptionGroupConfig() {

subscriptionGroupManagerV2.getSubscriptionGroupTable().clear();
configStorage.shutdown();

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage);
subscriptionGroupManagerV2.load();
found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
Assert.assertEquals(subscriptionGroupConfig, found);
Expand Down Expand Up @@ -132,7 +141,11 @@ public void testDeleteSubscriptionGroupConfig() {
Assert.assertNull(found);

configStorage.shutdown();

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();

subscriptionGroupManagerV2 = new SubscriptionGroupManagerV2(controller, configStorage);
subscriptionGroupManagerV2.load();
found = subscriptionGroupManagerV2.findSubscriptionGroupConfig(subscriptionGroupConfig.getGroupName());
Assert.assertNull(found);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -35,17 +36,19 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;


@RunWith(value = MockitoJUnitRunner.class)
public class TopicConfigManagerV2Test {

private ConfigStorage configStorage;
private MessageStoreConfig messageStoreConfig;

private TopicConfigManagerV2 topicConfigManagerV2;
private ConfigStorage configStorage;

@Mock
private BrokerController controller;

@Mock
private MessageStore messageStore;

@Rule
public TemporaryFolder tf = new TemporaryFolder();

Expand All @@ -61,17 +64,22 @@ public void setUp() throws IOException {
BrokerConfig brokerConfig = new BrokerConfig();
Mockito.doReturn(brokerConfig).when(controller).getBrokerConfig();

MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
messageStoreConfig = new MessageStoreConfig();
Mockito.doReturn(messageStoreConfig).when(controller).getMessageStoreConfig();
Mockito.doReturn(messageStore).when(controller).getMessageStore();

File configStoreDir = tf.newFolder();
configStorage = new ConfigStorage(configStoreDir.getAbsolutePath());
messageStoreConfig.setStorePathRootDir(configStoreDir.getAbsolutePath());

configStorage = new ConfigStorage(messageStoreConfig);
configStorage.start();
topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
}

@Test
public void testUpdateTopicConfig() {
TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
topicConfigManagerV2.load();

TopicConfig topicConfig = new TopicConfig();
String topicName = "T1";
topicConfig.setTopicName(topicName);
Expand All @@ -86,7 +94,9 @@ public void testUpdateTopicConfig() {

topicConfigManagerV2.getTopicConfigTable().clear();

configStorage = new ConfigStorage(messageStoreConfig);
Assert.assertTrue(configStorage.start());
topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
Assert.assertTrue(topicConfigManagerV2.load());

TopicConfig loaded = topicConfigManagerV2.selectTopicConfig(topicName);
Expand All @@ -111,12 +121,15 @@ public void testRemoveTopicConfig() {
topicConfig.setWriteQueueNums(4);
topicConfig.setOrder(true);
topicConfig.setTopicSysFlag(4);
TopicConfigManagerV2 topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
topicConfigManagerV2.updateTopicConfig(topicConfig);
topicConfigManagerV2.removeTopicConfig(topicName);
Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
Assert.assertTrue(configStorage.shutdown());

configStorage = new ConfigStorage(messageStoreConfig);
Assert.assertTrue(configStorage.start());
topicConfigManagerV2 = new TopicConfigManagerV2(controller, configStorage);
Assert.assertTrue(topicConfigManagerV2.load());
Assert.assertFalse(topicConfigManagerV2.containsTopic(topicName));
}
Expand Down
Loading
Loading