Skip to content

Commit

Permalink
Fix mirror deletion on range key (#153)
Browse files Browse the repository at this point in the history
Closes #155
  • Loading branch information
raminqaf authored Jan 13, 2023
1 parent 6399340 commit 631916d
Show file tree
Hide file tree
Showing 27 changed files with 110 additions and 174 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.bakdata.quick.common.api.client.gateway;

import com.bakdata.quick.common.api.client.HttpClient;
import com.bakdata.quick.common.api.client.gateway.GatewayClient;
import com.bakdata.quick.common.api.model.gateway.SchemaData;
import com.bakdata.quick.common.exception.HttpClientException;
import com.bakdata.quick.common.security.SecurityConfig;
Expand All @@ -39,8 +38,7 @@
*/
@Singleton
public class DefaultGatewayClient implements GatewayClient {
private static final MediaType JSON
= MediaType.get("application/json; charset=utf-8");
private static final MediaType JSON = MediaType.get("application/json; charset=utf-8");
private static final String PREFIX = "quick-gateway";

private final HttpClient client;
Expand Down Expand Up @@ -104,5 +102,4 @@ private static Throwable handleError(final Response response) {
return new HttpClientException(httpStatus);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface GatewayClient {
/**
* Updates the GraphQL schema.
*
* @param gateway name of the gateway
* @param gateway name of the gateway
* @param graphQLSchema GraphQL schema
* @return completed if schema is updated
*/
Expand All @@ -40,7 +40,7 @@ public interface GatewayClient {
* Fetches a write-type of a gateway.
*
* @param gateway name of the gateway
* @param type name of the type
* @param type name of the type
* @return write schema if gateway and type exist
*/
Single<SchemaData> getWriteSchema(final String gateway, final String type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.bakdata.quick.common.api.model.KeyValuePair;
import io.micronaut.http.annotation.Body;
import io.micronaut.http.annotation.Delete;
import io.micronaut.http.annotation.Header;
import io.micronaut.http.annotation.PathVariable;
import io.micronaut.http.annotation.Post;
import io.micronaut.http.client.annotation.Client;
Expand All @@ -35,8 +36,6 @@ public interface IngestClient {
<K, V> Completable sendData(@PathVariable final String topic, @Body final List<KeyValuePair<K, V>> keyValuePairs);

@Delete("/{topic}")
@Header(name = "X-API-Key", value = "${quick.api.token}")
<K> Completable deleteData(@PathVariable final String topic, @Body final List<K> key);

@Delete("/{topic}/{key}")
<K> Completable deleteDataSingle(@PathVariable final String topic, final K key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,7 @@ private static MirrorClient<String, TopicData> createMirrorClient(final TopicReg
final HttpClient client) {
final KnownTypeResolver<TopicData> typeResolver =
new KnownTypeResolver<>(TopicData.class, client.objectMapper());
final MirrorHost mirrorHost =
MirrorHost.createWithNoPrefix(topicRegistryConfig.getServiceName());
final MirrorHost mirrorHost = MirrorHost.createWithNoPrefix(topicRegistryConfig.getServiceName());

final MirrorValueParser<TopicData> mirrorValueParser =
new MirrorValueParser<>(typeResolver, client.objectMapper());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
@Getter
public class SecurityConfig {
private final boolean securityEnabled;

private final String apiKey;

public SecurityConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@

package com.bakdata.quick.common.type;

import io.reactivex.Completable;
import io.reactivex.Single;

/**
* Service for retrieving topic information.
* Service for retrieving topic information and deleting topics.
*/
public interface TopicTypeService {
<K, V> Single<QuickTopicData<K, V>> getTopicData(final String topic);

default Single<QuickTopicType> getKeyType(final String topic) {
return this.getTopicData(topic).map(info -> info.getKeyData().getType());
}

default Single<QuickTopicType> getValueType(final String topic) {
return this.getTopicData(topic).map(info -> info.getValueData().getType());
}
Completable deleteFromTopicRegistry(String topic);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.bakdata.quick.common.type.TopicTypeService;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
Expand Down Expand Up @@ -77,22 +78,8 @@ public <K, V> Single<QuickTopicData<K, V>> getTopicData(final String topic) {
}

@Override
public Single<QuickTopicType> getKeyType(final String topic) {
return this.getTopicData(topic).map(QuickTopicData::getKeyData).map(QuickData::getType);
}

@Override
public Single<QuickTopicType> getValueType(final String topic) {
return this.getTopicData(topic).map(QuickTopicData::getValueData).map(QuickData::getType);
}

@SuppressWarnings("unused") // nothing we can do with the disposable; the value will be in the future
private static CompletableFuture<QuickTopicData<?, ?>> singleToFuture(final Executor executor,
final Single<QuickTopicData<Object, Object>> single) {
final CompletableFuture<QuickTopicData<?, ?>> cf = new CompletableFuture<>();
final Disposable disposable = single.subscribeOn(Schedulers.from(executor))
.subscribe(cf::complete, cf::completeExceptionally);
return cf;
public Completable deleteFromTopicRegistry(final String topic) {
return this.topicRegistryClient.delete(topic);
}

private CompletableFuture<QuickTopicData<?, ?>> loadTopicData(final String key, final Executor executor) {
Expand Down Expand Up @@ -139,4 +126,13 @@ private <K> Single<TypeResolverWithSchema<K>> createResolver(final QuickTopicTyp
.doOnError(e -> log.error("No schema found for subject {}", subject, e))
.map(schema -> new TypeResolverWithSchema<>(this.conversionProvider.getTypeResolver(type, schema), schema));
}

@SuppressWarnings("unused") // nothing we can do with the disposable; the value will be in the future
private static CompletableFuture<QuickTopicData<?, ?>> singleToFuture(final Executor executor,
final Single<QuickTopicData<Object, Object>> single) {
final CompletableFuture<QuickTopicData<?, ?>> cf = new CompletableFuture<>();
final Disposable disposable = single.subscribeOn(Schedulers.from(executor))
.subscribe(cf::complete, cf::completeExceptionally);
return cf;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.reactivex.Completable;
import io.reactivex.Single;
import java.util.Optional;
import java.util.function.Supplier;
Expand Down Expand Up @@ -78,6 +79,11 @@ public <K, V> Single<QuickTopicData<K, V>> getTopicData(final String topic) {
return Single.just(topicInfo);
}

@Override
public Completable deleteFromTopicRegistry(final String topic) {
return Completable.complete();
}

private ConversionProvider avroConversionProvider() {
final SchemaConfig schemaConfig = new SchemaConfig(Optional.of(SchemaFormat.AVRO), Optional.empty());
final KafkaConfig kafkaConfig = new KafkaConfig("localhost:9092", this.urlSupplier.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public Completable deleteApplication(final String name) {
final Completable kafkaCleanUp = deployment
.map(k8sDeployment -> {
final Container container = k8sDeployment.getSpec().getTemplate().getSpec().getContainers().get(0);
return this.resources.createDeletionJob(deploymentName, container.getImage(), container.getArgs());
return this.resources.createDeletionJob(deploymentName, container.getImage(), container.getArgs(),
null);
})
.flatMapCompletable(this.kubeClient::deploy);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.bakdata.quick.manager.k8s;

import edu.umd.cs.findbugs.annotations.Nullable;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.batch.v1.Job;
Expand All @@ -38,7 +39,6 @@ public final class KubernetesResources {
*/
public static final int CONTAINER_PORT = 8080;
public static final int SERVICE_PORT = 80;
public static final String IMAGE_PULL_SECRET = "quick-image-secret";
public static final String QUICK_CONFIG_NAME = "quick-config";
public static final String QUICK_API_KEY_SECRET = "api-key-secret";
private final TemplateEngine engine;
Expand All @@ -60,8 +60,8 @@ public KubernetesResources() {
* Creates a dummy resource with a name.
*
* @param k8sResourceClass class of the resource
* @param name name of the resource
* @param <T> type of the resource
* @param name name of the resource
* @param <T> type of the resource
* @return a resource object with the given name
*/
@SuppressWarnings("OverlyBroadCatchBlock") // No need for handling reflective exceptions independently
Expand All @@ -87,20 +87,22 @@ public static <T extends HasMetadata> T forDeletion(final Class<T> k8sResourceCl
* <p>
* The job starts a pods that runs single time invoking the stream reset functionality of common kafka streams.
*
* @see
* <a href="https://github.com/bakdata/common-kafka-streams/blob/master/charts/streams-app-cleanup-job/templates/job.yaml">Helm
* Chart Definition</a>
* @see
* <a href="https://github.com/bakdata/common-kafka-streams/blob/master/src/main/java/com/bakdata/common_kafka_streams/CleanUpRunner.java">Clean
* Up Runner</a>
* @see <a
* href="https://github.com/bakdata/common-kafka-streams/blob/master/charts/streams-app-cleanup-job/templates/job
* .yaml">Helm Chart Definition</a>
* @see <a href="https://github.com/bakdata/common-kafka-streams/blob/master/src/main/java/com/bakdata
* /common_kafka_streams/CleanUpRunner.java">Clean Up Runner</a>
*/
public Job createDeletionJob(final String name, final String image, final Collection<String> arguments) {
public Job createDeletionJob(final String name, final String image, final Collection<String> arguments,
@Nullable final String apiKey) {
final Context root = new Context();
final String jobName = name + "-deletion";
root.setVariable("name", jobName);
root.setVariable("image", image);
root.setVariable("args", arguments);
root.setVariable("pullPolicy", "Always");
root.setVariable("apiKey", apiKey);

return this.loadResource(root, "streamsApp/deletion-job", Job.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.bakdata.quick.manager.mirror.resources.MirrorResources.MIRROR_IMAGE;

import com.bakdata.quick.common.api.model.manager.creation.MirrorCreationData;
import com.bakdata.quick.common.security.SecurityConfig;
import com.bakdata.quick.manager.config.DeploymentConfig;
import com.bakdata.quick.manager.k8s.ImageConfig;
import com.bakdata.quick.manager.k8s.KubernetesManagerClient;
Expand All @@ -43,18 +44,20 @@ public class KubernetesMirrorService implements MirrorService {
private final KubernetesManagerClient kubeClient;
private final DeploymentConfig deploymentConfig;
private final MirrorResourceLoader loader;
private final SecurityConfig securityConfig;

/**
* Injectable constructor.
*/
public KubernetesMirrorService(final KubernetesResources resources,
final KubernetesManagerClient kubernetesManagerClient,
final DeploymentConfig deploymentConfig,
final MirrorResourceLoader loader) {
final MirrorResourceLoader loader, final SecurityConfig securityConfig) {
this.resources = resources;
this.kubeClient = kubernetesManagerClient;
this.deploymentConfig = deploymentConfig;
this.loader = loader;
this.securityConfig = securityConfig;
}

@Override
Expand All @@ -77,10 +80,12 @@ public Completable deleteMirror(final String name) {
// we need this to properly delete all kafka related resources like the internal state store topic
final Single<Deployment> deployment = this.kubeClient.readDeployment(deploymentName);

final String apiKey = this.securityConfig.getApiKey();

// as well as all kafka related resources
final Completable kafkaCleanUp = deployment
.map(d -> d.getSpec().getTemplate().getSpec().getContainers().get(0).getArgs())
.map(list -> this.resources.createDeletionJob(deploymentName, imageConfig.asImageString(), list))
.map(list -> this.resources.createDeletionJob(deploymentName, imageConfig.asImageString(), list, apiKey))
.flatMapCompletable(this.kubeClient::deploy);

final Completable resourceDeletion = Single.fromCallable(() -> this.loader.forDeletion(deploymentName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public class KafkaTopicService implements TopicService {
* @param topicConfig configuration for Kafka topics
* @param kafkaConfig configuration for Kafka
*/

public KafkaTopicService(final TopicRegistryClient topicRegistryClient, final GatewayClient gatewayClient,
final GraphQLConverter graphQLConverter, final MirrorService mirrorService,
final GatewayService gatewayService, final QuickTopicConfig topicConfig,
Expand Down Expand Up @@ -165,15 +164,11 @@ public Completable createTopic(final String name, final QuickTopicType keyType,
@Override
public Completable deleteTopic(final String name) {
return Completable.defer(() -> {
log.debug("Delete topic {}", name);

// we don't need the cache, so make sure we get the current information
this.schemaRegistryClient.reset();
// deleting stuff that has to do with Kafka happens during the cleanup run
// the cleanup run is a job that is deployed when deleting the mirror
final Completable deleteMirror = this.deleteMirror(name);
final Completable deleteFromRegistry = this.topicRegistryClient.delete(name);
return deleteMirror.andThen(deleteFromRegistry);
return this.deleteMirror(name);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ spec:
- configMapRef:
name: quick-config
env:
- name: JAVA_TOOL_OPTIONS
value: "-XX:MinRAMPercentage=45.0 -XX:MaxRAMPercentage=70.0 -Djdk.tls.client.protocols=TLSv1.2"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: JAVA_TOOL_OPTIONS
value: "-XX:MinRAMPercentage=45.0 -XX:MaxRAMPercentage=70.0 -Djdk.tls.client.protocols=TLSv1.2"
[# th:if="${ apiKey }"]
- name: QUICK_API_TOKEN
value: [(${ apiKey })]
[/]
volumeMounts:
- name: log4j-config
mountPath: app/resources/log4j2.yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.bakdata.quick.manager.k8s.resource.QuickResources.ResourcePrefix;
import edu.umd.cs.findbugs.annotations.Nullable;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarSource;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.reactivex.Completable;
Expand Down Expand Up @@ -132,6 +134,14 @@ void shouldCreateDeletionJob() {
.satisfies(container -> {
assertThat(container.getImage()).isEqualTo(imageConfig.asImageString());
assertThat(container.getArgs()).contains("--input-topics=test");
assertThat(container.getEnv())
.hasSize(2)
.first()
.hasFieldOrPropertyWithValue("name", "POD_IP")
.extracting(EnvVar::getValueFrom)
.isNotNull()
.extracting(EnvVarSource::getFieldRef)
.hasFieldOrPropertyWithValue("fieldPath", "status.podIP");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.bakdata.quick.manager.k8s;

import com.bakdata.quick.common.security.SecurityConfig;
import com.bakdata.quick.manager.TestUtil;
import com.bakdata.quick.manager.config.ApplicationSpecificationConfig;
import com.bakdata.quick.manager.config.DeploymentConfig;
Expand Down Expand Up @@ -53,6 +54,7 @@ public abstract class KubernetesTest {
private DeploymentConfig deploymentConfig = null;
private KubernetesManagerClient managerClient = null;
private ApplicationSpecificationConfig appSpecConfig = null;
private SecurityConfig securityConfig = null;

/**
* Set up new k8s server and config.
Expand All @@ -66,6 +68,7 @@ public void setUpTests() {
"websecure");
this.managerClient = new KubernetesManagerClient(this.client);
this.appSpecConfig = TestUtil.newAppSpec();
this.securityConfig = new SecurityConfig(true, "test-api-key");
}

@AfterEach
Expand Down Expand Up @@ -136,7 +139,7 @@ protected static Optional<HasMetadata> findResource(final QuickResources quickRe
* @return A generated string for the deployment image spec. The format is [DOCKER_REGISTRY]/[IMAGE_NAME]:[TAG]
*/
protected static String getImage(final String imageName, final String imageTag) {
return String.format("%s/%s:%s", KubernetesTest.DOCKER_REGISTRY, imageName, imageTag);
return String.format("%s/%s:%s", DOCKER_REGISTRY, imageName, imageTag);
}

protected enum ResourceKind {
Expand Down
Loading

0 comments on commit 631916d

Please sign in to comment.