Skip to content

Commit

Permalink
KAFKA-15387: Remove Connect's deprecated task configurations endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
yashmayya committed Oct 8, 2024
1 parent 5624bc7 commit 4fb5f43
Show file tree
Hide file tree
Showing 10 changed files with 15 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,20 +327,6 @@ public ConnectorInfo connectorInfo(String connector) {
);
}

protected Map<ConnectorTaskId, Map<String, String>> buildTasksConfig(String connector) {
final ClusterConfigState configState = configBackingStore.snapshot();

if (!configState.contains(connector))
return Collections.emptyMap();

Map<ConnectorTaskId, Map<String, String>> configs = new HashMap<>();
for (ConnectorTaskId cti : configState.tasks(connector)) {
configs.put(cti, configState.rawTaskConfig(cti));
}

return configs;
}

@Override
public ConnectorStateInfo connectorStatus(String connName) {
ConnectorStatus connector = statusBackingStore.get(connName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ public interface Herder {
*/
void connectorConfig(String connName, Callback<Map<String, String>> callback);

/**
* Get the configuration for all tasks of a connector.
* @param connName name of the connector
* @param callback callback to invoke with the configuration
*/
void tasksConfig(String connName, Callback<Map<ConnectorTaskId, Map<String, String>>> callback);

/**
* Set the configuration for a connector. This supports creation and updating.
* @param connName name of the connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -904,26 +904,6 @@ public void connectorInfo(final String connName, final Callback<ConnectorInfo> c
);
}

@Override
public void tasksConfig(String connName, final Callback<Map<ConnectorTaskId, Map<String, String>>> callback) {
log.trace("Submitting tasks config request {}", connName);

addRequest(
() -> {
if (checkRebalanceNeeded(callback))
return null;

if (!configState.contains(connName)) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
} else {
callback.onCompletion(null, buildTasksConfig(connName));
}
return null;
},
forwardErrorAndTickThreadStages(callback)
);
}

@Override
protected Map<String, String> rawConfig(String connName) {
return configState.rawConnectorConfig(connName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,6 @@ public Map<String, String> getConnectorConfig(final @PathParam("connector") Stri
return requestHandler.completeRequest(cb);
}

@GET
@Path("/{connector}/tasks-config")
@Operation(deprecated = true, summary = "Get the configuration of all tasks for the specified connector")
public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(
final @PathParam("connector") String connector) throws Throwable {
log.warn("The 'GET /connectors/{connector}/tasks-config' endpoint is deprecated and will be removed in the next major release. "
+ "Please use the 'GET /connectors/{connector}/tasks' endpoint instead.");
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>();
herder.tasksConfig(connector, cb);
return requestHandler.completeRequest(cb);
}

@GET
@Path("/{connector}/status")
@Operation(summary = "Get the status for the specified connector")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public int generation() {
public synchronized void connectors(Callback<Collection<String>> callback) {
callback.onCompletion(null, connectors());
}

@Override
public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
ConnectorInfo connectorInfo = connectorInfo(connName);
Expand Down Expand Up @@ -636,15 +636,4 @@ public int hashCode() {
return Objects.hash(seq);
}
}

@Override
public void tasksConfig(String connName, Callback<Map<ConnectorTaskId, Map<String, String>>> callback) {
Map<ConnectorTaskId, Map<String, String>> tasksConfig = buildTasksConfig(connName);
if (tasksConfig.isEmpty()) {
callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
return;
}
callback.onCompletion(null, tasksConfig);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -54,7 +53,6 @@
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

import java.io.File;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -566,35 +564,6 @@ public void testStoppedState() throws Exception {
);
}

/**
* The <strong><em>GET /connectors/{connector}/tasks-config</em></strong> endpoint was deprecated in
* <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-970%3A+Deprecate+and+remove+Connect%27s+redundant+task+configurations+endpoint">KIP-970</a>
* and is slated for removal in the next major release. This test verifies that the deprecation warning log is emitted on trying to use the
* deprecated endpoint.
*/
@Test
public void testTasksConfigDeprecation() throws Exception {
connect = connectBuilder.build();
// start the clusters
connect.start();

connect.configureConnector(CONNECTOR_NAME, defaultSourceConnectorProps(TOPIC_NAME));
connect.assertions().assertConnectorAndExactlyNumTasksAreRunning(
CONNECTOR_NAME,
NUM_TASKS,
"Connector tasks did not start in time"
);

try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(ConnectorsResource.class)) {
connect.requestGet(connect.endpointForResource("connectors/" + CONNECTOR_NAME + "/tasks-config"));
List<LogCaptureAppender.Event> logEvents = logCaptureAppender.getEvents();
assertEquals(1, logEvents.size());
assertEquals(Level.WARN.toString(), logEvents.get(0).getLevel());
assertTrue(logEvents.get(0).getMessage().contains("deprecated"));
}

}

@Test
public void testCreateConnectorWithPausedInitialState() throws Exception {
connect = connectBuilder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2333,8 +2333,6 @@ public void testAccessors() throws Exception {
herder.connectorConfig(CONN1, connectorConfigCb);
FutureCallback<List<TaskInfo>> taskConfigsCb = new FutureCallback<>();
herder.taskConfigs(CONN1, taskConfigsCb);
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = new FutureCallback<>();
herder.tasksConfig(CONN1, tasksConfigCb);

herder.tick();
assertTrue(listConnectorsCb.isDone());
Expand All @@ -2351,11 +2349,6 @@ public void testAccessors() throws Exception {
new TaskInfo(TASK1, TASK_CONFIG),
new TaskInfo(TASK2, TASK_CONFIG)),
taskConfigsCb.get());
Map<ConnectorTaskId, Map<String, String>> tasksConfig = new HashMap<>();
tasksConfig.put(TASK0, TASK_CONFIG);
tasksConfig.put(TASK1, TASK_CONFIG);
tasksConfig.put(TASK2, TASK_CONFIG);
assertEquals(tasksConfig, tasksConfigCb.get());

// Config transformation should not occur when requesting connector or task info
verify(configTransformer, never()).transform(eq(CONN1), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ public void testGetConnectorConfigConnectorNotFound() {
}

@Test
public void testGetTasksConfig() throws Throwable {
public void testGetTaskConfigs() throws Throwable {
final ConnectorTaskId connectorTask0 = new ConnectorTaskId(CONNECTOR_NAME, 0);
final Map<String, String> connectorTask0Configs = new HashMap<>();
connectorTask0Configs.put("connector-task0-config0", "123");
Expand All @@ -498,31 +498,22 @@ public void testGetTasksConfig() throws Throwable {
final ConnectorTaskId connector2Task0 = new ConnectorTaskId(CONNECTOR2_NAME, 0);
final Map<String, String> connector2Task0Configs = Collections.singletonMap("connector2-task0-config0", "789");

final Map<ConnectorTaskId, Map<String, String>> expectedTasksConnector = new HashMap<>();
expectedTasksConnector.put(connectorTask0, connectorTask0Configs);
expectedTasksConnector.put(connectorTask1, connectorTask1Configs);
final Map<ConnectorTaskId, Map<String, String>> expectedTasksConnector2 = new HashMap<>();
expectedTasksConnector2.put(connector2Task0, connector2Task0Configs);
final List<TaskInfo> expectedTasksConnector = new ArrayList<>();
expectedTasksConnector.add(new TaskInfo(connectorTask0, connectorTask0Configs));
expectedTasksConnector.add(new TaskInfo(connectorTask1, connectorTask1Configs));

final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb1 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).tasksConfig(eq(CONNECTOR_NAME), cb1.capture());
final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb2 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).tasksConfig(eq(CONNECTOR2_NAME), cb2.capture());
final List<TaskInfo> expectedTasksConnector2 = new ArrayList<>();
expectedTasksConnector2.add(new TaskInfo(connector2Task0, connector2Task0Configs));

Map<ConnectorTaskId, Map<String, String>> tasksConfig = connectorsResource.getTasksConfig(CONNECTOR_NAME);
assertEquals(expectedTasksConnector, tasksConfig);
Map<ConnectorTaskId, Map<String, String>> tasksConfig2 = connectorsResource.getTasksConfig(CONNECTOR2_NAME);
assertEquals(expectedTasksConnector2, tasksConfig2);
}
final ArgumentCaptor<Callback<List<TaskInfo>>> cb1 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb1, expectedTasksConnector).when(herder).taskConfigs(eq(CONNECTOR_NAME), cb1.capture());
final ArgumentCaptor<Callback<List<TaskInfo>>> cb2 = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackResult(cb2, expectedTasksConnector2).when(herder).taskConfigs(eq(CONNECTOR2_NAME), cb2.capture());

@Test
public void testGetTasksConfigConnectorNotFound() {
final ArgumentCaptor<Callback<Map<ConnectorTaskId, Map<String, String>>>> cb = ArgumentCaptor.forClass(Callback.class);
expectAndCallbackException(cb, new NotFoundException("not found"))
.when(herder).tasksConfig(eq(CONNECTOR_NAME), cb.capture());

assertThrows(NotFoundException.class, () ->
connectorsResource.getTasksConfig(CONNECTOR_NAME));
List<TaskInfo> taskConfigs = connectorsResource.getTaskConfigs(CONNECTOR_NAME);
assertEquals(expectedTasksConnector, taskConfigs);
List<TaskInfo> taskConfigs2 = connectorsResource.getTaskConfigs(CONNECTOR2_NAME);
assertEquals(expectedTasksConnector2, taskConfigs2);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,16 +677,13 @@ public void testAccessors() throws Exception {
Callback<ConnectorInfo> connectorInfoCb = mock(Callback.class);
Callback<Map<String, String>> connectorConfigCb = mock(Callback.class);
Callback<List<TaskInfo>> taskConfigsCb = mock(Callback.class);
Callback<Map<ConnectorTaskId, Map<String, String>>> tasksConfigCb = mock(Callback.class);

// Check accessors with empty worker
doNothing().when(listConnectorsCb).onCompletion(null, Collections.EMPTY_SET);
doNothing().when(connectorInfoCb).onCompletion(any(NotFoundException.class), isNull());
doNothing().when(taskConfigsCb).onCompletion(any(NotFoundException.class), isNull());
doNothing().when(tasksConfigCb).onCompletion(any(NotFoundException.class), isNull());
doNothing().when(connectorConfigCb).onCompletion(any(NotFoundException.class), isNull());


expectAdd(SourceSink.SOURCE);
expectConfigValidation(SourceSink.SOURCE, connConfig);

Expand All @@ -699,16 +696,11 @@ public void testAccessors() throws Exception {
TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE));
doNothing().when(taskConfigsCb).onCompletion(null, singletonList(taskInfo));

Map<ConnectorTaskId, Map<String, String>> tasksConfig = Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0),
taskConfig(SourceSink.SOURCE));
doNothing().when(tasksConfigCb).onCompletion(null, tasksConfig);

// All operations are synchronous for StandaloneHerder, so we don't need to actually wait after making each call
herder.connectors(listConnectorsCb);
herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);

herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
Herder.Created<ConnectorInfo> connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS);
Expand All @@ -719,7 +711,6 @@ public void testAccessors() throws Exception {
herder.connectorInfo(CONNECTOR_NAME, connectorInfoCb);
herder.connectorConfig(CONNECTOR_NAME, connectorConfigCb);
herder.taskConfigs(CONNECTOR_NAME, taskConfigsCb);
herder.tasksConfig(CONNECTOR_NAME, tasksConfigCb);
// Config transformation should not occur when requesting connector or task info
verify(transformer, never()).transform(eq(CONNECTOR_NAME), any());
}
Expand Down
1 change: 0 additions & 1 deletion docs/connect.html
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ <h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
<li><code>PATCH /connectors/{name}/config</code> - patch the configuration parameters for a specific connector, where <code>null</code> values in the JSON body indicates removing of the key from the final configuration</li>
<li><code>GET /connectors/{name}/status</code> - get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks</li>
<li><code>GET /connectors/{name}/tasks</code> - get a list of tasks currently running for a connector along with their configurations</li>
<li><code>GET /connectors/{name}/tasks-config</code> - get the configuration of all tasks for a specific connector. This endpoint is deprecated and will be removed in the next major release. Please use the <code>GET /connectors/{name}/tasks</code> endpoint instead. Note that the response structures of the two endpoints differ slightly, please refer to the <a href="/{{version}}/generated/connect_rest.yaml">OpenAPI documentation</a> for more details</li>
<li><code>GET /connectors/{name}/tasks/{taskid}/status</code> - get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed</li>
<li><code>PUT /connectors/{name}/pause</code> - pause the connector and its tasks, which stops message processing until the connector is resumed. Any resources claimed by its tasks are left allocated, which allows the connector to begin processing data quickly once it is resumed.</li>
<li id="connect_stopconnector"><code>PUT /connectors/{name}/stop</code> - stop the connector and shut down its tasks, deallocating any resources claimed by its tasks. This is more efficient from a resource usage standpoint than pausing the connector, but can cause it to take longer to begin processing data once resumed. Note that the offsets for a connector can be only modified via the offsets management endpoints if it is in the stopped state</li>
Expand Down

0 comments on commit 4fb5f43

Please sign in to comment.