Skip to content

Commit

Permalink
[ISSUE #8166] optimize: make compression type configurable in produce…
Browse files Browse the repository at this point in the history
…r clinet level
  • Loading branch information
humkum authored May 23, 2024
1 parent 1b42515 commit dcc88c6
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -118,11 +115,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy;
private ExecutorService asyncSenderExecutor;

// compression related
private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
private final Compressor compressor = CompressorFactory.getCompressor(compressType);

// backpressure related
private Semaphore semaphoreAsyncSendNum;
private Semaphore semaphoreAsyncSendSize;
Expand Down Expand Up @@ -900,7 +892,7 @@ private SendResult sendKernelImpl(final Message msg,
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag();
msgBodyCompressed = true;
}

Expand Down Expand Up @@ -1070,7 +1062,7 @@ private boolean tryToCompressMessage(final Message msg) {
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = compressor.compress(body, compressLevel);
byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel());
if (data != null) {
msg.setBody(data);
return true;
Expand Down Expand Up @@ -1763,22 +1755,6 @@ public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}

public int getCompressLevel() {
return compressLevel;
}

public void setCompressLevel(int compressLevel) {
this.compressLevel = compressLevel;
}

public CompressionType getCompressType() {
return compressType;
}

public void setCompressType(CompressionType compressType) {
this.compressType = compressType;
}

public ServiceState getServiceState() {
return serviceState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
Expand Down Expand Up @@ -170,6 +173,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {

private RPCHook rpcHook = null;

/**
* Compress level of compress algorithm.
*/
private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

/**
* Compress type of compress algorithm, default using ZLIB.
*/
private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));

/**
* Compressor of compress algorithm.
*/
private Compressor compressor = CompressorFactory.getCompressor(compressType);

/**
* Default constructor.
*/
Expand Down Expand Up @@ -1344,4 +1362,25 @@ public void setStartDetectorEnable(boolean startDetectorEnable) {
super.setStartDetectorEnable(startDetectorEnable);
this.defaultMQProducerImpl.getMqFaultStrategy().setStartDetectorEnable(startDetectorEnable);
}

public int getCompressLevel() {
return compressLevel;
}

public void setCompressLevel(int compressLevel) {
this.compressLevel = compressLevel;
}

public CompressionType getCompressType() {
return compressType;
}

public void setCompressType(CompressionType compressType) {
this.compressType = compressType;
this.compressor = CompressorFactory.getCompressor(compressType);
}

public Compressor getCompressor() {
return compressor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public static void main(String[] args) throws MQClientException {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
producer.setCompressType(CompressionType.of(compressType));
producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void run() {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
producer.setCompressType(CompressionType.of(compressType));
producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
Expand Down

0 comments on commit dcc88c6

Please sign in to comment.