You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This example shows how to create a custom producer interceptor. Java class link:interceptors/src/main/java/org/hifly/kafka/interceptor/producer/CreditCardProducerInterceptor.java[_CreditCardProducerInterceptor_] will mask a sensitive info on producer record (credit card).
478
+
This example shows how to create a custom producer interceptor. Java class link:interceptors/src/main/java/org/hifly/kafka/interceptor/producer/CreditCardProducerInterceptor.java[_CreditCardProducerInterceptor_] will mask a sensitive info on producer record (credit card number).
Run 2 consumer instances (2 different shells/terminals) belonging to the same group and subscribed to _user_ and _user_clicks_ topics. Consumers uses
626
+
Run 2 consumer instances (2 different shells/terminals) belonging to the same consumer group and subscribed to _user_ and _user_clicks_ topics. Consumers uses
622
627
link:https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html[_org.apache.kafka.clients.consumer.RangeAssignor_] to distribute partition ownership.
Run 2 consumer instances (2 different shells/terminals) belonging to the same group and subscribed to _user_ and _user_clicks_ topics; consumers uses
664
+
Run 2 consumer instances (2 different shells/terminals) belonging to the same consumer group and subscribed to _user_ and _user_clicks_ topics; consumers uses
660
665
link:https://kafka.apache.org/37/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html[_org.apache.kafka.clients.consumer.RoundRobinAssignor_] to distribute partition ownership.
661
666
662
667
[source,bash]
@@ -730,7 +735,7 @@ Try to shut down consumer instances (CTRL+C) and then re-start them again; verif
730
735
731
736
=== Read from the closest replica
732
737
733
-
This example shows how to use the feature (since Apache Kafka® 2.4+) for consumers to read messages from the closest replica.
738
+
This example shows how to use the feature (since Apache Kafka® 2.4+) for consumers to read messages from the closest replica, even if it is not a leader of the partition.
734
739
735
740
Start a cluster with 3 brokers on 3 different racks, _dc1_, _dc2_ and _dc3_:
This solution could be implemented on consumer side to handle errors in processing records without blocking the input topic.
799
804
800
805
. Consumer processes records and commit the offset (_auto-commit_).
801
-
. If a record can't be processed _(simple condition here is the existence of a specific HEADER)_, it is sent to a retry topic, if the number of retries is not yet exhausted.
806
+
. If a record can't be processed _(simple condition here to raise an error, is the existence of a specific message HEADER named ERROR)_, it is sent to a retry topic, if the number of retries is not yet exhausted.
802
807
. When the number of retries is exhausted, record is sent to a DLQ topic.
803
808
. Number of retries is set at Consumer instance level.
Implementation of a sample Kafka Connect Source Connector; it executes _unix commands_ (e.g. _fortune_, _ls -ltr, netstat_) and sends its output to a topic.
1674
1679
1675
-
IMPORTANT: commands are executed on connect worker node.
1680
+
IMPORTANT: unix commands are executed on connect worker node.
1676
1681
1677
-
This connector relies on Confluent Schema Registry to convert Avro messages using converter:
1682
+
This connector relies on Confluent Schema Registry to convert messages using an Avro converter:
This will create an image based on link:https://hub.docker.com/r/confluentinc/cp-kafka-connect-base/tags[_confluentinc/cp-kafka-connect-base_] using a custom link:kafka-unixcommand-connector/Dockerfile[_Dockerfile_].
1718
1723
1719
-
It will use the Confluent utility link:https://docs.confluent.io/kafka-connectors/confluent-hub/client.html[_confluent-hub install_] to install the plugin in connect.
1724
+
It will use the confluent-hub utility link:https://docs.confluent.io/kafka-connectors/confluent-hub/client.html[_confluent-hub install_] to install the plugin in connect.
1720
1725
1721
1726
1722
1727
Deploy the connector:
@@ -1794,7 +1799,7 @@ A MongoDB sink connector will be created with this link:kafka-smt-custom/config/
1794
1799
1795
1800
Original json messages will be sent to _test_ topic.
1796
1801
1797
-
Sink connector will apply the SMT and store the records in MongoDB _pets_ collection from _Tutorial2_ database.
1802
+
Sink connector will apply the SMT and store the records in MongoDB _pets_ collection from _Tutorial2_ database, using a key generated by the SMT.
1798
1803
1799
1804
Teardown:
1800
1805
@@ -1944,7 +1949,7 @@ A MongoDB sink connector will be created with this link:kafka-connect-sink-dlq/c
1944
1949
}
1945
1950
----
1946
1951
1947
-
Send json messages to _test_ topic (second message is a bad json message):
1952
+
Send json messages to _test_ topic (second message is a malformed json message):
1948
1953
1949
1954
[source,bash]
1950
1955
----
@@ -1990,7 +1995,7 @@ Run the example:
1990
1995
scripts/bootstrap-connect-sink-http.sh
1991
1996
----
1992
1997
1993
-
A web application listening on port _8010_ will start up.
1998
+
A web application, exposing REST APIs, listening on port _8010_ will start up.
1994
1999
1995
2000
A HTTP sink connector will be created with this link:kafka-connect-sink-http/config/http_sink.json[config]:
1996
2001
@@ -2074,7 +2079,7 @@ A S3 sink connector will be created with this link:kafka-connect-sink-s3/config/
2074
2079
}
2075
2080
----
2076
2081
2077
-
Sink connector will read messages from topic _gaming-player-activity_ and store in S3 bucket _gaming-player-activity-bucket_ using _io.confluent.connect.s3.format.avro.AvroFormat_ as format class.
2082
+
Sink connector will read messages from topic _gaming-player-activity_ and store them in a S3 bucket _gaming-player-activity-bucket_ using _io.confluent.connect.s3.format.avro.AvroFormat_ as format class.
2078
2083
2079
2084
Sink connector will generate a new object storage entry every 100 messages (_flush_size_).
Same example but Sink connector will read Avro messages from topic _gaming-player-activity_ and store them in S3 bucket _gaming-player-activity-bucket_ using _io.confluent.connect.s3.format.parquet.ParquetFormat_ as format class.
2108
+
Same example but Sink connector will read Avro messages from topic _gaming-player-activity_ and store them in a S3 bucket _gaming-player-activity-bucket_ using _io.confluent.connect.s3.format.parquet.ParquetFormat_ as format class.
2104
2109
2105
2110
The format of data stored in MinIO will be Parquet.
In this example, some SMT transformations (in chain) are used to create an Event Router starting from an input _outbox table_.
2226
+
In this example, some SMT transformations (chained) are used to create an Event Router starting from an input _outbox table_.
2222
2227
2223
-
The outbox table contains different operations for the same aggregate (_Consumer Loan_); the different operations are sent on specific topics following this routing:
2228
+
The outbox table contains different operations for the same aggregate (_Consumer Loan_); the different operations are sent on specific topics following these routing rules:
0 commit comments