The Kafka Connect REST API allows you to manage connectors that move data between Apache Kafka and other systems.
The Kafka Connect command line tool, also known as kc
or kafka-connect
, allows users to manage their Kafka Connect cluster and connectors. With this tool, users can retrieve information about the cluster and connectors, create new connectors, update existing connectors, delete connectors, and perform other actions.
This project aims to supported all features of the Kafka Connect REST API.
pip install kafka-connect-py
To get basic Connect cluster information including the worker version, the commit it’s on, and its Kafka cluster ID, use the following command:
kc info
To list the plugins installed on the worker, use the following command:
kc list-plugins
To format the result of the installed plugin list for easier readability, pipe the output to the jq
command:
kc list-plugins | jq
To create a connector instance with JSON data containing the connector’s configuration:
kc update source-debezium-orders-00 -d '{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "'$SCHEMA_REGISTRY_URL'",
"value.converter.basic.auth.credentials.source": "'$BASIC_AUTH_CREDENTIALS_SOURCE'",
"value.converter.basic.auth.user.info": "'$SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO'",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "asgard",
"table.whitelist": "demo.orders",
"database.history.kafka.bootstrap.servers": "'$BOOTSTRAP_SERVERS'",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"'$CLOUD_KEY'\" password=\"'$CLOUD_SECRET'\";",
"database.history.kafka.topic": "dbhistory.demo",
"topic.creation.default.replication.factor": "3",
"topic.creation.default.partitions": "3",
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"mysql-debezium-$1"
}'
Or create/update a connector instance with a JSON file:
kc update <connector> --config-file <config_file>
As mentioned above, if there’s a connector to update, you can use the update
sub-command to amend the configuration (see Create a Connector Instance above). Because update is used to both create and update connectors, it’s the standard command that you should use most of the time (which also means that you don’t have to completely rewrite your configs).
Use the following command to list of all extant connectors:
kc list [--expand=info|status] [--pattern=regex] [--state=running|paused|unassigned|failed]
Inspect the config for a given connector as follows:
kc config sink-elastic-orders-00
You can also look at a connector’s status. While the config command shows a connector’s static configuration, the status shows the connector as a runtime entity:
kc status sink-elastic-orders-00
You can also use list
with the --expand=status
option to show the status of many connectors at once. We can filter down the response using a regex pattern and/or connector state.
Use the following to show all connector names prefixed with the word sink-
and that are in a FAILED
connector state.
kc list --expand=status -p sink-.* -s failed
If something is wrong in your setup and you don’t think a config change would help, or if you simply don’t need a connector to run anymore, you can delete it by name:
kc delete sink-elastic-orders-00
The delete
sub-command also supports multiple deletions using the --all
option. On its own it will apply the sub-command to all connectors.
The following will delete all connector names prefixed with the word sink-
and that are in a PAUSED
connector state.
kc delete --all --pattern sink-.* -s paused
The --all
option is supported by several sub-commands, including delete
, restart
, resume
, and pause
. However, for better testing and control over the outcome of your actions, we recommend using the list filtering option before executing any of these sub-commands. This way, you can ensure that your filters are working as intended and avoid unintended consequences. To use list filtering, simply run the list
sub-command and apply your filters.
The following command returns the connector status:
kc status source-debezium-orders-00 | jq
If your connector fails, the details of the failure belong to the task. So to inspect the problem, you’ll need to find the stack trace for the task. The task is the entity that is actually running the connector and converter code, so the state for the stack trace lives in it.
kc task-status source-debezium-orders-00 <task-id> | jq
If after inspecting a task, you have determined that it has failed and you have fixed the reason for the failure (perhaps restarted a database), you can restart the connector with the following:
kc restart source-debezium-orders-00
Keep in mind though that restarting the connector doesn’t restart all of its tasks. You will also need to restart the failed task and then get its status again as follows:
kc task-status source-debezium-orders-00 <task-id>
What's more, you can restart the connector and all its failed tasks with the following:
kc restart source-debezium-orders-00 --include-tasks --failed-only
and check the status again:
kc status source-debezium-orders-00 | jq
Unlike restarting, pausing a connector does pause its tasks. This happens asynchronously, though, so when you pause a connector, you can’t rely on it pausing all of its tasks at exactly the same time. The tasks are running in a thread pool, so there’s no fancy mechanism to make this happen simultaneously.
A connector and its tasks can be paused as follows:
kc pause source-debezium-orders-00
Just as easily, a connector and its tasks can be resumed:
kc resume source-debezium-orders-00
A convenient way to display all of a connector’s tasks at once is as follows:
kc list-tasks source-debezium-orders-00 | jq
This information is similar to what you can get from other APIs, but it is broken down by task, and configs for each are shown. Get a List of Topics Used by a Connector
As of Apache Kafka 2.5, it is possible to get a list of topics used by a connector:
kc list-topics source-debezium-orders-00 | jq
This shows the topics that a connector is consuming from or producing to. This may not be particularly useful for connectors that are consuming from or producing to a single topic. However, some developers, for example, use regular expressions for topic names in Connect, so this is a major benefit in situations where topic names are derived computationally.
This could also be useful with a source connector that is using SMTs to dynamically change the topic names to which it is producing.
# Import the class
from kafka_connect import KafkaConnect
import json
# Instantiate the client
client = KafkaConnect(url="http://localhost:8083")
# Get the version and other details of the Kafka Connect cluster
cluster = client.get_cluster_info()
print(cluster)
# Get a list of active connectors
connectors = client.list_connectors(expand="status")
print(json.dumps(connectors, indent=2))
# Create a new connector
config = {
"name": "my-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "mytable",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified_at",
"validate.non.null": "false",
"incrementing.column.name": "id",
"topic.prefix": "my-connector-",
},
}
response = client.create_connector(config)
print(response)
# Update an existing connector
new_config = {
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://localhost:5432/mydatabase",
"connection.user": "myuser",
"connection.password": "mypassword",
"table.whitelist": "mytable",
"mode": "timestamp+incrementing",
"timestamp.column.name": "modified_at",
"validate.non.null": "false",
"incrementing.column.name": "id",
"topic.prefix": "my-connector-",
},
}
response = client.update_connector("my-connector", new_config)
print(response)
# Get status for a connector
response = client.get_connector_status("my-connector")
print(json.dumps(response, indent=2))
# Restart a connector
response = client.restart_connector("my-connector")
print(response)
# Delete a connector
response = client.delete_connector("my-connector")
print(response)
Apache 2.0 License - aidanmelen/kafka-connect-py
The entire Command Line Usage section was copied directly from the Confluence's Kafka Connect’s REST API course.