Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8166]optimize: make compression type configurable in producer clinet level #8167

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@
import org.apache.rocketmq.common.ServiceState;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
Expand Down Expand Up @@ -118,11 +115,6 @@ public class DefaultMQProducerImpl implements MQProducerInner {
private MQFaultStrategy mqFaultStrategy;
private ExecutorService asyncSenderExecutor;

// compression related
private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));
private final Compressor compressor = CompressorFactory.getCompressor(compressType);

humkum marked this conversation as resolved.
Show resolved Hide resolved
// backpressure related
private Semaphore semaphoreAsyncSendNum;
private Semaphore semaphoreAsyncSendSize;
Expand Down Expand Up @@ -900,7 +892,7 @@ private SendResult sendKernelImpl(final Message msg,
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
sysFlag |= compressType.getCompressionFlag();
sysFlag |= this.defaultMQProducer.getCompressType().getCompressionFlag();
msgBodyCompressed = true;
}

Expand Down Expand Up @@ -1070,7 +1062,7 @@ private boolean tryToCompressMessage(final Message msg) {
if (body != null) {
if (body.length >= this.defaultMQProducer.getCompressMsgBodyOverHowmuch()) {
try {
byte[] data = compressor.compress(body, compressLevel);
byte[] data = this.defaultMQProducer.getCompressor().compress(body, this.defaultMQProducer.getCompressLevel());
if (data != null) {
msg.setBody(data);
return true;
Expand Down Expand Up @@ -1763,22 +1755,6 @@ public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}

public int getCompressLevel() {
return compressLevel;
}

public void setCompressLevel(int compressLevel) {
this.compressLevel = compressLevel;
}

public CompressionType getCompressType() {
return compressType;
}

public void setCompressType(CompressionType compressType) {
this.compressType = compressType;
}

public ServiceState getServiceState() {
return serviceState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import org.apache.rocketmq.client.trace.hook.EndTransactionTraceHookImpl;
import org.apache.rocketmq.client.trace.hook.SendMessageTraceHookImpl;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.compression.CompressionType;
import org.apache.rocketmq.common.compression.Compressor;
import org.apache.rocketmq.common.compression.CompressorFactory;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageBatch;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
Expand Down Expand Up @@ -170,6 +173,21 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer {

private RPCHook rpcHook = null;

/**
* Compress level of compress algorithm.
*/
private int compressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));

/**
* Compress type of compress algorithm, default using ZLIB.
*/
private CompressionType compressType = CompressionType.of(System.getProperty(MixAll.MESSAGE_COMPRESS_TYPE, "ZLIB"));

/**
* Compressor of compress algorithm.
*/
private Compressor compressor = CompressorFactory.getCompressor(compressType);

/**
* Default constructor.
*/
Expand Down Expand Up @@ -1344,4 +1362,25 @@ public void setStartDetectorEnable(boolean startDetectorEnable) {
super.setStartDetectorEnable(startDetectorEnable);
this.defaultMQProducerImpl.getMqFaultStrategy().setStartDetectorEnable(startDetectorEnable);
}

public int getCompressLevel() {
return compressLevel;
}

public void setCompressLevel(int compressLevel) {
this.compressLevel = compressLevel;
}

public CompressionType getCompressType() {
return compressType;
}

public void setCompressType(CompressionType compressType) {
this.compressType = compressType;
this.compressor = CompressorFactory.getCompressor(compressType);
}

public Compressor getCompressor() {
return compressor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ public static void main(String[] args) throws MQClientException {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
producer.setCompressType(CompressionType.of(compressType));
producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,8 @@ public void run() {
String compressType = commandLine.hasOption("ct") ? commandLine.getOptionValue("ct").trim() : "ZLIB";
int compressLevel = commandLine.hasOption("cl") ? Integer.parseInt(commandLine.getOptionValue("cl")) : 5;
int compressOverHowMuch = commandLine.hasOption("ch") ? Integer.parseInt(commandLine.getOptionValue("ch")) : 4096;
producer.getDefaultMQProducerImpl().setCompressType(CompressionType.of(compressType));
producer.getDefaultMQProducerImpl().setCompressLevel(compressLevel);
producer.setCompressType(CompressionType.of(compressType));
producer.setCompressLevel(compressLevel);
producer.setCompressMsgBodyOverHowmuch(compressOverHowMuch);
System.out.printf("compressType: %s compressLevel: %s%n", compressType, compressLevel);
} else {
Expand Down
Loading