@@ -149,6 +149,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
149149
150150 private long maxAge ;
151151
152+ private boolean configureSerializers = true ;
153+
152154 private volatile String transactionIdPrefix ;
153155
154156 private volatile String clientIdPrefix ;
@@ -178,16 +180,37 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
178180 @ Nullable Serializer <K > keySerializer ,
179181 @ Nullable Serializer <V > valueSerializer ) {
180182
181- this (configs , () -> keySerializer , () -> valueSerializer );
183+ this (configs , () -> keySerializer , () -> valueSerializer , true );
182184 }
183185
184186 /**
185- * Construct a factory with the provided configuration and {@link Serializer} Suppliers.
186- * Also configures a {@link #transactionIdPrefix} as a value from the
187- * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
188- * This config is going to be overridden with a suffix for target {@link Producer} instance.
189- * When the suppliers are invoked to get an instance, the serializers'
190- * {@code configure()} methods will be called with the configuration map.
187+ * Construct a factory with the provided configuration and {@link Serializer}s. Also
188+ * configures a {@link #transactionIdPrefix} as a value from the
189+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
190+ * be overridden with a suffix for target {@link Producer} instance. The serializers'
191+ * {@code configure()} methods will be called with the configuration map unless
192+ * {@code configureSerializers} is false..
193+ * @param configs the configuration.
194+ * @param keySerializer the key {@link Serializer}.
195+ * @param valueSerializer the value {@link Serializer}.
196+ * @param configureSerializers set to false if serializers are already fully
197+ * configured.
198+ * @since 2.8.7
199+ */
200+ public DefaultKafkaProducerFactory (Map <String , Object > configs ,
201+ @ Nullable Serializer <K > keySerializer ,
202+ @ Nullable Serializer <V > valueSerializer , boolean configureSerializers ) {
203+
204+ this (configs , () -> keySerializer , () -> valueSerializer , configureSerializers );
205+ }
206+
207+ /**
208+ * Construct a factory with the provided configuration and {@link Serializer}
209+ * Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
210+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
211+ * be overridden with a suffix for target {@link Producer} instance. When the
212+ * suppliers are invoked to get an instance, the serializers' {@code configure()}
213+ * methods will be called with the configuration map.
191214 * @param configs the configuration.
192215 * @param keySerializerSupplier the key {@link Serializer} supplier function.
193216 * @param valueSerializerSupplier the value {@link Serializer} supplier function.
@@ -197,7 +220,30 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
197220 @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
198221 @ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
199222
223+ this (configs , keySerializerSupplier , valueSerializerSupplier , true );
224+ }
225+
226+ /**
227+ * Construct a factory with the provided configuration and {@link Serializer}
228+ * Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
229+ * {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
230+ * be overridden with a suffix for target {@link Producer} instance. When the
231+ * suppliers are invoked to get an instance, the serializers' {@code configure()}
232+ * methods will be called with the configuration map unless
233+ * {@code configureSerializers} is false.
234+ * @param configs the configuration.
235+ * @param keySerializerSupplier the key {@link Serializer} supplier function.
236+ * @param valueSerializerSupplier the value {@link Serializer} supplier function.
237+ * @param configureSerializers set to false if serializers are already fully
238+ * configured.
239+ * @since 2.8.7
240+ */
241+ public DefaultKafkaProducerFactory (Map <String , Object > configs ,
242+ @ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
243+ @ Nullable Supplier <Serializer <V >> valueSerializerSupplier , boolean configureSerializers ) {
244+
200245 this .configs = new ConcurrentHashMap <>(configs );
246+ this .configureSerializers = configureSerializers ;
201247 this .keySerializerSupplier = keySerializerSupplier (keySerializerSupplier );
202248 this .valueSerializerSupplier = valueSerializerSupplier (valueSerializerSupplier );
203249 if (this .clientIdPrefix == null && configs .get (ProducerConfig .CLIENT_ID_CONFIG ) instanceof String ) {
@@ -212,6 +258,9 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
212258
213259 private Supplier <Serializer <K >> keySerializerSupplier (@ Nullable Supplier <Serializer <K >> keySerializerSupplier ) {
214260 this .rawKeySerializerSupplier = keySerializerSupplier ;
261+ if (!this .configureSerializers ) {
262+ return keySerializerSupplier ;
263+ }
215264 return keySerializerSupplier == null
216265 ? () -> null
217266 : () -> {
@@ -225,6 +274,9 @@ private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Seriali
225274
226275 private Supplier <Serializer <V >> valueSerializerSupplier (@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
227276 this .rawValueSerializerSupplier = valueSerializerSupplier ;
277+ if (!this .configureSerializers ) {
278+ return valueSerializerSupplier ;
279+ }
228280 return valueSerializerSupplier == null
229281 ? () -> null
230282 : () -> {
@@ -247,37 +299,79 @@ public void setBeanName(String name) {
247299 }
248300
249301 /**
250- * Set a key serializer.
302+ * Set a key serializer. The serializer will be configured using the producer
303+ * configuration, unless {@link #setConfigureSerializers(boolean)
304+ * configureSerializers} is false.
251305 * @param keySerializer the key serializer.
306+ * @see #setConfigureSerializers(boolean)
252307 */
253308 public void setKeySerializer (@ Nullable Serializer <K > keySerializer ) {
254309 this .keySerializerSupplier = keySerializerSupplier (() -> keySerializer );
255310 }
256311
257312 /**
258- * Set a value serializer.
313+ * Set a value serializer. The serializer will be configured using the producer
314+ * configuration, unless {@link #setConfigureSerializers(boolean)
315+ * configureSerializers} is false.
259316 * @param valueSerializer the value serializer.
317+ * @see #setConfigureSerializers(boolean)
260318 */
261319 public void setValueSerializer (@ Nullable Serializer <V > valueSerializer ) {
262320 this .valueSerializerSupplier = valueSerializerSupplier (() -> valueSerializer );
263321 }
264322
265323 /**
266- * Set a supplier to supply instances of the key serializer.
324+ * Set a supplier to supply instances of the key serializer. The serializer will be
325+ * configured using the producer configuration, unless
326+ * {@link #setConfigureSerializers(boolean) configureSerializers} is false.
267327 * @param keySerializerSupplier the supplier.
268328 * @since 2.8
329+ * @see #setConfigureSerializers(boolean)
269330 */
270331 public void setKeySerializerSupplier (Supplier <Serializer <K >> keySerializerSupplier ) {
271- this .keySerializerSupplier = keySerializerSupplier ;
332+ this .keySerializerSupplier = keySerializerSupplier ( keySerializerSupplier ) ;
272333 }
273334
274335 /**
275336 * Set a supplier to supply instances of the value serializer.
276- * @param valueSerializerSupplier the supplier.
337+ * @param valueSerializerSupplier the supplier. The serializer will be configured
338+ * using the producer configuration, unless {@link #setConfigureSerializers(boolean)
339+ * configureSerializers} is false.
277340 * @since 2.8
341+ * @see #setConfigureSerializers(boolean)
278342 */
279343 public void setValueSerializerSupplier (Supplier <Serializer <V >> valueSerializerSupplier ) {
280- this .valueSerializerSupplier = valueSerializerSupplier ;
344+ this .valueSerializerSupplier = valueSerializerSupplier (valueSerializerSupplier );
345+ }
346+
347+ /**
348+ * If true (default), programmatically provided serializers (via constructor or
349+ * setters) will be configured using the producer configuration. Set to false if the
350+ * serializers are already fully configured.
351+ * @return true to configure.
352+ * @since 2.8.7
353+ * @see #setKeySerializer(Serializer)
354+ * @see #setKeySerializerSupplier(Supplier)
355+ * @see #setValueSerializer(Serializer)
356+ * @see #setValueSerializerSupplier(Supplier)
357+ */
358+ public boolean isConfigureSerializers () {
359+ return this .configureSerializers ;
360+ }
361+
362+ /**
363+ * Set to false (default true) to prevent programmatically provided serializers (via
364+ * constructor or setters) from being configured using the producer configuration,
365+ * e.g. if the serializers are already fully configured.
366+ * @param configureSerializers false to not configure.
367+ * @since 2.8.7
368+ * @see #setKeySerializer(Serializer)
369+ * @see #setKeySerializerSupplier(Supplier)
370+ * @see #setValueSerializer(Serializer)
371+ * @see #setValueSerializerSupplier(Supplier)
372+ */
373+ public void setConfigureSerializers (boolean configureSerializers ) {
374+ this .configureSerializers = configureSerializers ;
281375 }
282376
283377 /**
@@ -441,10 +535,10 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
441535 Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
442536 producerProperties .putAll (overrideProperties );
443537 producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
444- DefaultKafkaProducerFactory <K , V > newFactory =
445- new DefaultKafkaProducerFactory <>(producerProperties ,
538+ DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
446539 getKeySerializerSupplier (),
447- getValueSerializerSupplier ());
540+ getValueSerializerSupplier (),
541+ isConfigureSerializers ());
448542 newFactory .setPhysicalCloseTimeout ((int ) getPhysicalCloseTimeout ().getSeconds ());
449543 newFactory .setProducerPerThread (isProducerPerThread ());
450544 for (ProducerPostProcessor <K , V > templatePostProcessor : getPostProcessors ()) {
0 commit comments