From 72cd9ed66e076cba393e0048d8fb28111b910eaf Mon Sep 17 00:00:00 2001 From: Nick Hitchan Date: Thu, 23 Apr 2020 15:31:10 -0400 Subject: [PATCH] Adds a readme --- README.md | 44 +++++++++ .../frit/keycloak/kafka/KafkaProperties.java | 90 +++++++++++-------- 2 files changed, 96 insertions(+), 38 deletions(-) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..25b5c48 --- /dev/null +++ b/README.md @@ -0,0 +1,44 @@ +# Kafka Event Producer for Keycloak + +A Keycloak SPI that publishes events to Kafka. + +# Build +``` +./gradlew clean uberJar +``` + +# Deploy +* Register the following Avro schemas with your Schema Registry. + * `src/main/avro/KeycloakAdminEvent.avsc` + * `src/main/avro/KeycloakEvent.avsc` +* Deploy (copy) the UberJar + * From - `build/libs/kafka-event-producer-{VERSION}-uber.jar` + * To - `{KEYCLOAK_HOME}/standalone/deployments` +* Edit your keycloak `standalone.xml` file (Located by default at `{KEYCLOAL_HOME}/standalone/configuration`) to configure Kafka Settings. The following section provides a sample configuration. + +``` + + + + + + + + + + + + + + +``` + +# Notes +* The name of the property directly corresponds to the kafka producer property name except for the following properties. +``` + + +``` + +# Contributing +* Pull requests are welcome! \ No newline at end of file diff --git a/src/main/java/com/frit/keycloak/kafka/KafkaProperties.java b/src/main/java/com/frit/keycloak/kafka/KafkaProperties.java index cffc04f..83f075a 100644 --- a/src/main/java/com/frit/keycloak/kafka/KafkaProperties.java +++ b/src/main/java/com/frit/keycloak/kafka/KafkaProperties.java @@ -5,10 +5,8 @@ import org.keycloak.events.EventType; import org.keycloak.events.admin.OperationType; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Properties; -import java.util.Set; +import java.util.*; +import java.util.function.Supplier; @SuppressWarnings("unchecked") public class KafkaProperties extends Properties implements KafkaConfiguration { @@ -24,40 +22,48 @@ public class KafkaProperties extends Properties implements KafkaConfiguration { private static final String TOPIC_EVENT_ADMIN_PROPERTY = "topic.event.admin"; public KafkaProperties(Config.Scope config) { - put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)); - put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.get(ProducerConfig.METADATA_MAX_AGE_CONFIG)); - put(ProducerConfig.BATCH_SIZE_CONFIG, config.get(ProducerConfig.BATCH_SIZE_CONFIG)); - put(ProducerConfig.ACKS_CONFIG, config.get(ProducerConfig.ACKS_CONFIG)); - put(ProducerConfig.LINGER_MS_CONFIG, config.get(ProducerConfig.LINGER_MS_CONFIG)); - put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG)); - put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)); - put(ProducerConfig.CLIENT_ID_CONFIG, config.get(ProducerConfig.CLIENT_ID_CONFIG)); - put(ProducerConfig.SEND_BUFFER_CONFIG, config.get(ProducerConfig.SEND_BUFFER_CONFIG)); - put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get(ProducerConfig.RECEIVE_BUFFER_CONFIG)); - put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get(ProducerConfig.MAX_REQUEST_SIZE_CONFIG)); - put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, config.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG)); - put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.get(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG)); - put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get(ProducerConfig.MAX_BLOCK_MS_CONFIG)); - put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get(ProducerConfig.BUFFER_MEMORY_CONFIG)); - put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.get(ProducerConfig.RETRY_BACKOFF_MS_CONFIG)); - put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get(ProducerConfig.COMPRESSION_TYPE_CONFIG)); - put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, config.get(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG)); - put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, config.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)); - put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, config.get(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)); - put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, config.get(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG)); - put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, config.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)); - put(ProducerConfig.RETRIES_CONFIG, config.get(ProducerConfig.RETRIES_CONFIG)); - put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, config.get(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG)); - put(ProducerConfig.PARTITIONER_CLASS_CONFIG, config.get(ProducerConfig.PARTITIONER_CLASS_CONFIG)); - put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, config.get(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)); - put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)); - put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, config.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)); - put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); - put(ProducerConfig.SECURITY_PROVIDERS_CONFIG, config.get(ProducerConfig.SECURITY_PROVIDERS_CONFIG)); - put(SCHEMA_REGISTRY_URL_PROPERTY, config.get(SCHEMA_REGISTRY_URL_PROPERTY)); - put(TOPIC_EVENT_USER_PROPERTY, config.get(TOPIC_EVENT_USER_PROPERTY)); - put(TOPIC_EVENT_ADMIN_PROPERTY, config.get(TOPIC_EVENT_ADMIN_PROPERTY)); + + // Required + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getValue(config, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + .orElseThrow(missingProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG))); + put(SCHEMA_REGISTRY_URL_PROPERTY, getValue(config, SCHEMA_REGISTRY_URL_PROPERTY) + .orElseThrow(missingProperty(SCHEMA_REGISTRY_URL_PROPERTY))); + put(TOPIC_EVENT_USER_PROPERTY, getValue(config, TOPIC_EVENT_USER_PROPERTY) + .orElseThrow(missingProperty(TOPIC_EVENT_USER_PROPERTY))); + put(TOPIC_EVENT_ADMIN_PROPERTY, getValue(config, TOPIC_EVENT_ADMIN_PROPERTY) + .orElseThrow(missingProperty(TOPIC_EVENT_ADMIN_PROPERTY))); + + // Optional + getValue(config, ProducerConfig.RETRIES_CONFIG).ifPresent(value -> put(ProducerConfig.RETRIES_CONFIG, value)); + getValue(config, ProducerConfig.BATCH_SIZE_CONFIG).ifPresent(value -> put(ProducerConfig.BATCH_SIZE_CONFIG, value)); + getValue(config, ProducerConfig.LINGER_MS_CONFIG).ifPresent(value -> put(ProducerConfig.LINGER_MS_CONFIG, value)); + getValue(config, ProducerConfig.BUFFER_MEMORY_CONFIG).ifPresent(value -> put(ProducerConfig.BUFFER_MEMORY_CONFIG, value)); + getValue(config, ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG).ifPresent(value -> put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, value)); + getValue(config, ProducerConfig.METADATA_MAX_AGE_CONFIG).ifPresent(value -> put(ProducerConfig.METADATA_MAX_AGE_CONFIG, value)); + getValue(config, ProducerConfig.ACKS_CONFIG).ifPresent(value -> put(ProducerConfig.ACKS_CONFIG, value)); + getValue(config, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG).ifPresent(value -> put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, value)); + getValue(config, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG).ifPresent(value -> put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, value)); + getValue(config, ProducerConfig.CLIENT_ID_CONFIG).ifPresent(value -> put(ProducerConfig.CLIENT_ID_CONFIG, value)); + getValue(config, ProducerConfig.SEND_BUFFER_CONFIG).ifPresent(value -> put(ProducerConfig.SEND_BUFFER_CONFIG, value)); + getValue(config, ProducerConfig.RECEIVE_BUFFER_CONFIG).ifPresent(value -> put(ProducerConfig.RECEIVE_BUFFER_CONFIG, value)); + getValue(config, ProducerConfig.MAX_REQUEST_SIZE_CONFIG).ifPresent(value -> put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, value)); + getValue(config, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG).ifPresent(value -> put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, value)); + getValue(config, ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG).ifPresent(value -> put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, value)); + getValue(config, ProducerConfig.MAX_BLOCK_MS_CONFIG).ifPresent(value -> put(ProducerConfig.MAX_BLOCK_MS_CONFIG, value)); + getValue(config, ProducerConfig.RETRY_BACKOFF_MS_CONFIG).ifPresent(value -> put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, value)); + getValue(config, ProducerConfig.COMPRESSION_TYPE_CONFIG).ifPresent(value -> put(ProducerConfig.COMPRESSION_TYPE_CONFIG, value)); + getValue(config, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG).ifPresent(value -> put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, value)); + getValue(config, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG).ifPresent(value -> put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, value)); + getValue(config, ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG).ifPresent(value -> put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, value)); + getValue(config, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG).ifPresent(value -> put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, value)); + getValue(config, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).ifPresent(value -> put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, value)); + getValue(config, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG).ifPresent(value -> put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, value)); + getValue(config, ProducerConfig.PARTITIONER_CLASS_CONFIG).ifPresent(value -> put(ProducerConfig.PARTITIONER_CLASS_CONFIG, value)); + getValue(config, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG).ifPresent(value -> put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, value)); + getValue(config, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG).ifPresent(value -> put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, value)); + getValue(config, ProducerConfig.TRANSACTION_TIMEOUT_CONFIG).ifPresent(value -> put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, value)); + getValue(config, ProducerConfig.TRANSACTIONAL_ID_CONFIG).ifPresent(value -> put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, value)); + getValue(config, ProducerConfig.SECURITY_PROVIDERS_CONFIG).ifPresent(value -> put(ProducerConfig.SECURITY_PROVIDERS_CONFIG, value)); String[] excludedEvents = config.getArray(EXCLUDE_EVENTS_PROPERTY); if (excludedEvents != null) { @@ -78,6 +84,14 @@ public KafkaProperties(Config.Scope config) { } } + private Optional getValue(Config.Scope config, String key) { + return Optional.ofNullable(config.get(key)); + } + + private Supplier missingProperty(String property) { + return () -> new KafkaEventException(String.format("Missing required property: %s", property)); + } + @Override public String getBootstrapServers() { return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);