diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 3f72aefdb5fdb..f2b35d13e849c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -327,20 +327,6 @@ public ConnectorInfo connectorInfo(String connector) { ); } - protected Map> buildTasksConfig(String connector) { - final ClusterConfigState configState = configBackingStore.snapshot(); - - if (!configState.contains(connector)) - return Collections.emptyMap(); - - Map> configs = new HashMap<>(); - for (ConnectorTaskId cti : configState.tasks(connector)) { - configs.put(cti, configState.rawTaskConfig(cti)); - } - - return configs; - } - @Override public ConnectorStateInfo connectorStatus(String connName) { ConnectorStatus connector = statusBackingStore.get(connName); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index fbdfcab09318b..52be401bbfaba 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -102,13 +102,6 @@ public interface Herder { */ void connectorConfig(String connName, Callback> callback); - /** - * Get the configuration for all tasks of a connector. - * @param connName name of the connector - * @param callback callback to invoke with the configuration - */ - void tasksConfig(String connName, Callback>> callback); - /** * Set the configuration for a connector. This supports creation and updating. * @param connName name of the connector diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 908c2929481b5..69310a91bcc5f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -906,26 +906,6 @@ public void connectorInfo(final String connName, final Callback c ); } - @Override - public void tasksConfig(String connName, final Callback>> callback) { - log.trace("Submitting tasks config request {}", connName); - - addRequest( - () -> { - if (checkRebalanceNeeded(callback)) - return null; - - if (!configState.contains(connName)) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); - } else { - callback.onCompletion(null, buildTasksConfig(connName)); - } - return null; - }, - forwardErrorAndTickThreadStages(callback) - ); - } - @Override protected Map rawConfig(String connName) { return configState.rawConnectorConfig(connName); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 5722012b741af..dec053b0a4105 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -176,18 +176,6 @@ public Map getConnectorConfig(final @PathParam("connector") Stri return requestHandler.completeRequest(cb); } - @GET - @Path("/{connector}/tasks-config") - @Operation(deprecated = true, summary = "Get the configuration of all tasks for the specified connector") - public Map> getTasksConfig( - final @PathParam("connector") String connector) throws Throwable { - log.warn("The 'GET /connectors/{connector}/tasks-config' endpoint is deprecated and will be removed in the next major release. " - + "Please use the 'GET /connectors/{connector}/tasks' endpoint instead."); - FutureCallback>> cb = new FutureCallback<>(); - herder.tasksConfig(connector, cb); - return requestHandler.completeRequest(cb); - } - @GET @Path("/{connector}/status") @Operation(summary = "Get the status for the specified connector") diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java index 19b539c3405e4..dee293b0c4ce0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -149,7 +149,7 @@ public int generation() { public synchronized void connectors(Callback> callback) { callback.onCompletion(null, connectors()); } - + @Override public synchronized void connectorInfo(String connName, Callback callback) { ConnectorInfo connectorInfo = connectorInfo(connName); @@ -635,15 +635,4 @@ public int hashCode() { return Objects.hash(seq); } } - - @Override - public void tasksConfig(String connName, Callback>> callback) { - Map> tasksConfig = buildTasksConfig(connName); - if (tasksConfig.isEmpty()) { - callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); - return; - } - callback.onCompletion(null, tasksConfig); - } - } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 1a78643950d12..195905f3b76d3 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; -import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -55,7 +54,6 @@ import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.slf4j.event.Level; import java.io.File; import java.io.FileOutputStream; @@ -568,35 +566,6 @@ public void testStoppedState() throws Exception { ); } - /** - * The GET /connectors/{connector}/tasks-config endpoint was deprecated in - * KIP-970 - * and is slated for removal in the next major release. This test verifies that the deprecation warning log is emitted on trying to use the - * deprecated endpoint. - */ - @Test - public void testTasksConfigDeprecation() throws Exception { - connect = connectBuilder.build(); - // start the clusters - connect.start(); - - connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME)); - connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( - CONNECTOR_NAME, - NUM_TASKS, - "Connector tasks did not start in time" - ); - - try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(ConnectorsResource.class)) { - connect.requestGet(connect.endpointForResource("connectors/" + CONNECTOR_NAME + "/tasks-config")); - List logEvents = logCaptureAppender.getEvents(); - assertEquals(1, logEvents.size()); - assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel()); - assertTrue(logEvents.get(0).getMessage().contains("deprecated")); - } - - } - @Test public void testCreateConnectorWithPausedInitialState() throws Exception { connect = connectBuilder.build(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index bbc074e97308f..6b4d066ca1016 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -2333,8 +2333,6 @@ public void testAccessors() throws Exception { herder.connectorConfig(CONN1, connectorConfigCb); FutureCallback> taskConfigsCb = new FutureCallback<>(); herder.taskConfigs(CONN1, taskConfigsCb); - FutureCallback>> tasksConfigCb = new FutureCallback<>(); - herder.tasksConfig(CONN1, tasksConfigCb); herder.tick(); assertTrue(listConnectorsCb.isDone()); @@ -2351,11 +2349,6 @@ public void testAccessors() throws Exception { new TaskInfo(TASK1, TASK_CONFIG), new TaskInfo(TASK2, TASK_CONFIG)), taskConfigsCb.get()); - Map> tasksConfig = new HashMap<>(); - tasksConfig.put(TASK0, TASK_CONFIG); - tasksConfig.put(TASK1, TASK_CONFIG); - tasksConfig.put(TASK2, TASK_CONFIG); - assertEquals(tasksConfig, tasksConfigCb.get()); // Config transformation should not occur when requesting connector or task info verify(configTransformer, never()).transform(eq(CONN1), any()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index d4675b51df5fb..392d082d3305e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -486,7 +486,7 @@ public void testGetConnectorConfigConnectorNotFound() { } @Test - public void testGetTasksConfig() throws Throwable { + public void testGetTaskConfigs() throws Throwable { final ConnectorTaskId connectorTask0 = new ConnectorTaskId(CONNECTOR_NAME, 0); final Map connectorTask0Configs = new HashMap<>(); connectorTask0Configs.put("connector-task0-config0", "123"); @@ -498,31 +498,22 @@ public void testGetTasksConfig() throws Throwable { final ConnectorTaskId connector2Task0 = new ConnectorTaskId(CONNECTOR2_NAME, 0); final Map connector2Task0Configs = Collections.singletonMap("connector2-task0-config0", "789"); - final Map> expectedTasksConnector = new HashMap<>(); - expectedTasksConnector.put(connectorTask0, connectorTask0Configs); - expectedTasksConnector.put(connectorTask1, connectorTask1Configs); - final Map> expectedTasksConnector2 = new HashMap<>(); - expectedTasksConnector2.put(connector2Task0, connector2Task0Configs); + final List expectedTasksConnector = new ArrayList<>(); + expectedTasksConnector.add(new TaskInfo(connectorTask0, connectorTask0Configs)); + expectedTasksConnector.add(new TaskInfo(connectorTask1, connectorTask1Configs)); - final ArgumentCaptor>>> cb1 = ArgumentCaptor.forClass(Callback.class); - expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).tasksConfig(eq(CONNECTOR_NAME), cb1.capture()); - final ArgumentCaptor>>> cb2 = ArgumentCaptor.forClass(Callback.class); - expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).tasksConfig(eq(CONNECTOR2_NAME), cb2.capture()); + final List expectedTasksConnector2 = new ArrayList<>(); + expectedTasksConnector2.add(new TaskInfo(connector2Task0, connector2Task0Configs)); - Map> tasksConfig = connectorsResource.getTasksConfig(CONNECTOR_NAME); - assertEquals(expectedTasksConnector, tasksConfig); - Map> tasksConfig2 = connectorsResource.getTasksConfig(CONNECTOR2_NAME); - assertEquals(expectedTasksConnector2, tasksConfig2); - } + final ArgumentCaptor>> cb1 = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).taskConfigs(eq(CONNECTOR_NAME), cb1.capture()); + final ArgumentCaptor>> cb2 = ArgumentCaptor.forClass(Callback.class); + expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).taskConfigs(eq(CONNECTOR2_NAME), cb2.capture()); - @Test - public void testGetTasksConfigConnectorNotFound() { - final ArgumentCaptor>>> cb = ArgumentCaptor.forClass(Callback.class); - expectAndCallbackException(cb, new NotFoundException("not found")) - .when(herder).tasksConfig(eq(CONNECTOR_NAME), cb.capture()); - - assertThrows(NotFoundException.class, () -> - connectorsResource.getTasksConfig(CONNECTOR_NAME)); + List taskConfigs = connectorsResource.getTaskConfigs(CONNECTOR_NAME); + assertEquals(expectedTasksConnector, taskConfigs); + List taskConfigs2 = connectorsResource.getTaskConfigs(CONNECTOR2_NAME); + assertEquals(expectedTasksConnector2, taskConfigs2); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index 857bbb77b6388..c220ca9c70d0c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -677,16 +677,13 @@ public void testAccessors() throws Exception { Callback connectorInfoCb = mock(Callback.class); Callback> connectorConfigCb = mock(Callback.class); Callback> taskConfigsCb = mock(Callback.class); - Callback>> tasksConfigCb = mock(Callback.class); // Check accessors with empty worker doNothing().when(listConnectorsCb).onCompletion(null, Collections.EMPTY_SET); doNothing().when(connectorInfoCb).onCompletion(any(NotFoundException.class), isNull()); doNothing().when(taskConfigsCb).onCompletion(any(NotFoundException.class), isNull()); - doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), isNull()); doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull()); - expectAdd(SourceSink.SOURCE); expectConfigValidation(SourceSink.SOURCE, connConfig); @@ -699,16 +696,11 @@ public void testAccessors() throws Exception { TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)); doNothing().when(taskConfigsCb).onCompletion(null, singletonList(taskInfo)); - Map> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), - taskConfig(SourceSink.SOURCE)); - doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig); - // All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call herder.connectors(listConnectorsCb); herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); - herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); @@ -719,7 +711,6 @@ public void testAccessors() throws Exception { herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb); herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb); herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb); - herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb); // Config transformation should not occur when requesting connector or task info verify(transformer, never()).transform(eq(CONNECTOR_NAME), any()); } diff --git a/docs/connect.html b/docs/connect.html index 3724ad8f68dea..16cbf0f39ea54 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -294,7 +294,6 @@

REST API

  • PATCH /connectors/{name}/config - patch the configuration parameters for a specific connector, where null values in the JSON body indicates removing of the key from the final configuration
  • GET /connectors/{name}/status - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks - get a list of tasks currently running for a connector along with their configurations
  • -
  • GET /connectors/{name}/tasks-config - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the GET /connectors/{name}/tasks endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the OpenAPI documentation for more details
  • GET /connectors/{name}/tasks/{taskid}/status - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
  • PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed.
  • PUT /connectors/{name}/stop - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state