diff --git a/extensions/gc/GarbageCollector.js b/extensions/gc/GarbageCollector.js index f6a302f29..f61ef9705 100644 --- a/extensions/gc/GarbageCollector.js +++ b/extensions/gc/GarbageCollector.js @@ -132,6 +132,8 @@ class GarbageCollector extends EventEmitter { kafka: { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic: this._gcConfig.topic, groupId: this._gcConfig.consumer.groupId, diff --git a/extensions/gc/GarbageCollectorProducer.js b/extensions/gc/GarbageCollectorProducer.js index 3a3e5b7d2..d93e5fd51 100644 --- a/extensions/gc/GarbageCollectorProducer.js +++ b/extensions/gc/GarbageCollectorProducer.js @@ -26,6 +26,8 @@ class GarbageCollectorProducer { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); producer.once('error', () => {}); diff --git a/extensions/lifecycle/LifecycleQueuePopulator.js b/extensions/lifecycle/LifecycleQueuePopulator.js index d41f9ad06..18aa4705c 100644 --- a/extensions/lifecycle/LifecycleQueuePopulator.js +++ b/extensions/lifecycle/LifecycleQueuePopulator.js @@ -62,6 +62,8 @@ class LifecycleQueuePopulator extends QueuePopulatorExtension { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic, }); producer.once('error', done); diff --git a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js index 7b059f127..766eb461a 100644 --- a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js +++ b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js @@ -380,6 +380,8 @@ class LifecycleBucketProcessor { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._lcConfig.objectTasksTopic, }); producer.once('error', err => { @@ -418,6 +420,8 @@ class LifecycleBucketProcessor { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, backlogMetrics: this._kafkaConfig.backlogMetrics, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic: this._lcConfig.bucketTasksTopic, groupId: this._lcConfig.bucketProcessor.groupId, diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index 6456d032f..2f346f0ac 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -872,6 +872,8 @@ class LifecycleConductor { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic: this.lcConfig.bucketTasksTopic, }); producer.once('error', cb); diff --git a/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js b/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js index d73914626..d1e1da2c8 100644 --- a/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js +++ b/extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js @@ -78,6 +78,8 @@ class LifecycleObjectProcessor extends EventEmitter { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, backlogMetrics: this._kafkaConfig.backlogMetrics, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic, groupId: this._processConfig.groupId, diff --git a/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js b/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js index 84bc74695..e5b0a53ca 100644 --- a/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js +++ b/extensions/lifecycle/objectProcessor/LifecycleObjectTransitionProcessor.js @@ -75,6 +75,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }); producer.once('error', cb); producer.once('ready', () => { @@ -121,6 +123,8 @@ class LifecycleObjectTransitionProcessor extends LifecycleObjectProcessor { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, backlogMetrics: this._kafkaConfig.backlogMetrics, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic, groupId: this._processConfig.groupId, diff --git a/extensions/mongoProcessor/MongoQueueProcessor.js b/extensions/mongoProcessor/MongoQueueProcessor.js index 6811079f4..ee4790089 100644 --- a/extensions/mongoProcessor/MongoQueueProcessor.js +++ b/extensions/mongoProcessor/MongoQueueProcessor.js @@ -135,6 +135,8 @@ class MongoQueueProcessor { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, queueProcessor: this.processKafkaEntry.bind(this), circuitBreaker: this.mongoProcessorConfig.circuitBreaker, diff --git a/extensions/notification/destination/KafkaNotificationDestination.js b/extensions/notification/destination/KafkaNotificationDestination.js index 75f0e0ea8..1f6fb4ef7 100644 --- a/extensions/notification/destination/KafkaNotificationDestination.js +++ b/extensions/notification/destination/KafkaNotificationDestination.js @@ -52,6 +52,8 @@ class KafkaNotificationDestination extends NotificationDestination { topic, pollIntervalMs, auth, + compressionType: 'none', + requiredAcks: '1', }); producer.once('error', done); producer.once('ready', () => { diff --git a/extensions/notification/queueProcessor/QueueProcessor.js b/extensions/notification/queueProcessor/QueueProcessor.js index 1ccac92b4..eb5591b55 100644 --- a/extensions/notification/queueProcessor/QueueProcessor.js +++ b/extensions/notification/queueProcessor/QueueProcessor.js @@ -157,6 +157,8 @@ class QueueProcessor extends EventEmitter { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic: internalTopic, groupId: consumerGroupId, diff --git a/extensions/replication/failedCRR/FailedCRRConsumer.js b/extensions/replication/failedCRR/FailedCRRConsumer.js index 5bb7b042d..cd92212df 100644 --- a/extensions/replication/failedCRR/FailedCRRConsumer.js +++ b/extensions/replication/failedCRR/FailedCRRConsumer.js @@ -40,6 +40,8 @@ class FailedCRRConsumer { kafka: { hosts: this._kafkaConfig.hosts, site: this._kafkaConfig.site, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, }, topic: this._topic, groupId: 'backbeat-retry-group', diff --git a/extensions/replication/failedCRR/FailedCRRProducer.js b/extensions/replication/failedCRR/FailedCRRProducer.js index 333915f12..824d45719 100644 --- a/extensions/replication/failedCRR/FailedCRRProducer.js +++ b/extensions/replication/failedCRR/FailedCRRProducer.js @@ -28,6 +28,8 @@ class FailedCRRProducer { this._producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); this._producer.once('error', () => {}); diff --git a/extensions/replication/queueProcessor/QueueProcessor.js b/extensions/replication/queueProcessor/QueueProcessor.js index 7d59cb282..a24d9046c 100644 --- a/extensions/replication/queueProcessor/QueueProcessor.js +++ b/extensions/replication/queueProcessor/QueueProcessor.js @@ -372,6 +372,8 @@ class QueueProcessor extends EventEmitter { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic: this.repConfig.replicationStatusTopic, }); producer.once('error', done); @@ -401,6 +403,8 @@ class QueueProcessor extends EventEmitter { site: this.kafkaConfig.site, backlogMetrics: options && options.enableBacklogMetrics ? this.kafkaConfig.backlogMetrics : undefined, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic, groupId, diff --git a/extensions/replication/replay/ReplayProducer.js b/extensions/replication/replay/ReplayProducer.js index 8e0a6fc09..c4e05bf2d 100644 --- a/extensions/replication/replay/ReplayProducer.js +++ b/extensions/replication/replay/ReplayProducer.js @@ -28,6 +28,8 @@ class ReplayProducer { this._producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); this._producer.once('error', () => {}); diff --git a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js index 4c366473c..317ca0fef 100644 --- a/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js +++ b/extensions/replication/replicationStatusProcessor/ReplicationStatusProcessor.js @@ -390,6 +390,8 @@ class ReplicationStatusProcessor { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic: this.repConfig.replicationStatusTopic, groupId: this.repConfig.replicationStatusProcessor.groupId, diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index 9832aa383..ae72b0c27 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -91,6 +91,9 @@ class BackbeatConsumer extends EventEmitter { }, site: joi.string(), maxPollIntervalMs: joi.number().min(45000).default(300000), + // Kafka producer params + compressionType: joi.string(), + requiredAcks: joi.string(), }).required(), topic: joi.string().required(), groupId: joi.string().required(), @@ -116,6 +119,8 @@ class BackbeatConsumer extends EventEmitter { this._kafkaHosts = kafka.hosts; this._kafkaBacklogMetricsConfig = kafka.backlogMetrics; this._maxPollIntervalMs = kafka.maxPollIntervalMs; + this._producerCompressionType = kafka.compressionType; + this._producerRequiredAcks = kafka.requiredAcks; this._site = kafka.site; this._fromOffset = fromOffset; this._log = new Logger(clientId); @@ -839,6 +844,8 @@ class BackbeatConsumer extends EventEmitter { assert.strictEqual(this._consumer, null); producer = new BackbeatProducer({ kafka: { hosts: this._kafkaHosts }, + compressionType: this._producerCompressionType, + requiredAcks: this._producerRequiredAcks, topic: this._topic, }); producer.on('ready', () => { @@ -973,6 +980,8 @@ class BackbeatConsumer extends EventEmitter { })); const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaHosts }, + compressionType: this._producerCompressionType, + requiredAcks: this._producerRequiredAcks, topic: this._topic, }); return producer.on('ready', () => { diff --git a/lib/BackbeatProducer.js b/lib/BackbeatProducer.js index 6fe22b786..08297ead2 100644 --- a/lib/BackbeatProducer.js +++ b/lib/BackbeatProducer.js @@ -57,7 +57,7 @@ class BackbeatProducer extends EventEmitter { pollIntervalMs: joi.number().default(DEFAULT_POLL_INTERVAL), maxRequestSize: joi.number().default(KAFKA_PRODUCER_MESSAGE_MAX_BYTES), compressionType: joi.string().default(KAFKA_PRODUCER_DEFAULT_COMPRESSION_TYPE), - requiredAcks: joi.number().default(KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS), + requiredAcks: joi.string().default(KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS), } ); } diff --git a/lib/MetricsConsumer.js b/lib/MetricsConsumer.js index 9fdc4efda..28d1df0c4 100644 --- a/lib/MetricsConsumer.js +++ b/lib/MetricsConsumer.js @@ -63,6 +63,8 @@ class MetricsConsumer { kafka: { hosts: this.kafkaConfig.hosts, site: this.kafkaConfig.site, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, }, topic: this.mConfig.topic, groupId: `${this.mConfig.groupIdPrefix}-${this._id}`, diff --git a/lib/MetricsProducer.js b/lib/MetricsProducer.js index 0493234b1..e23afdad7 100644 --- a/lib/MetricsProducer.js +++ b/lib/MetricsProducer.js @@ -24,6 +24,8 @@ class MetricsProducer { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic: this._topic, }); producer.once('error', done); diff --git a/lib/api/BackbeatAPI.js b/lib/api/BackbeatAPI.js index fa8722072..73cc1e537 100644 --- a/lib/api/BackbeatAPI.js +++ b/lib/api/BackbeatAPI.js @@ -1290,6 +1290,8 @@ class BackbeatAPI { const producer = new BackbeatProducer({ kafka: { hosts: this._kafkaConfig.hosts }, maxRequestSize: this._kafkaConfig.maxRequestSize, + compressionType: this._kafkaConfig.compressionType, + requiredAcks: this._kafkaConfig.requiredAcks, topic, }); diff --git a/lib/config.joi.js b/lib/config.joi.js index a83d92a80..12706a4be 100644 --- a/lib/config.joi.js +++ b/lib/config.joi.js @@ -33,7 +33,7 @@ const joiSchema = joi.object({ maxRequestSize: joi.number().default(KAFKA_PRODUCER_MESSAGE_MAX_BYTES), site: joi.string(), compressionType: joi.string().default(KAFKA_PRODUCER_DEFAULT_COMPRESSION_TYPE), - requiredAcks: joi.number().default(KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS), + requiredAcks: joi.string().default(KAFKA_PRODUCER_DEFAULT_REQUIRED_ACKS), }, transport: transportJoi, s3: hostPortJoi.required(), diff --git a/lib/queuePopulator/IngestionPopulator.js b/lib/queuePopulator/IngestionPopulator.js index c077ce0ef..08c95c170 100644 --- a/lib/queuePopulator/IngestionPopulator.js +++ b/lib/queuePopulator/IngestionPopulator.js @@ -171,6 +171,8 @@ class IngestionPopulator { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic, pollIntervalMs: POLL_INTERVAL_MS, }); diff --git a/lib/queuePopulator/LogReader.js b/lib/queuePopulator/LogReader.js index b941582f8..cacc69b84 100644 --- a/lib/queuePopulator/LogReader.js +++ b/lib/queuePopulator/LogReader.js @@ -592,6 +592,8 @@ class LogReader { const producer = new BackbeatProducer({ kafka: { hosts: this.kafkaConfig.hosts }, maxRequestSize: this.kafkaConfig.maxRequestSize, + compressionType: this.kafkaConfig.compressionType, + requiredAcks: this.kafkaConfig.requiredAcks, topic, }); producer.once('error', done);