Skip to content

Commit

Permalink
Adds a readme
Browse files Browse the repository at this point in the history
  • Loading branch information
hitchan committed Apr 23, 2020
1 parent 3286c54 commit 72cd9ed
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 38 deletions.
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Kafka Event Producer for Keycloak

A Keycloak SPI that publishes events to Kafka.

# Build
```
./gradlew clean uberJar
```

# Deploy
* Register the following Avro schemas with your Schema Registry.
* `src/main/avro/KeycloakAdminEvent.avsc`
* `src/main/avro/KeycloakEvent.avsc`
* Deploy (copy) the UberJar
* From - `build/libs/kafka-event-producer-{VERSION}-uber.jar`
* To - `{KEYCLOAK_HOME}/standalone/deployments`
* Edit your keycloak `standalone.xml` file (Located by default at `{KEYCLOAL_HOME}/standalone/configuration`) to configure Kafka Settings. The following section provides a sample configuration.

```
<spi name="eventsListener">
<provider name="kafka" enabled="true">
<properties>
<property name="bootstrap.servers" value="http://localhost:9092"/>
<property name="schema.registry.url" value="http://localhost:8081"/>
<property name="retries" value="0"/>
<property name="batch.size" value="0"/>
<property name="linger.ms" value="1"/>
<property name="buffer.memory" value="33554432"/>
<property name="topic.event.user" value="streaming.keycloak.events.user"/>
<property name="topic.event.admin" value="streaming.keycloak.events.admin"/>
</properties>
</provider>
</spi>
```

# Notes
* The name of the property directly corresponds to the kafka producer property name except for the following properties.
```
<property name="topic.event.user" value="Your.Kafka.Topic.For.User.Events"/>
<property name="topic.event.admin" value="Your.Kafka.Topic.For.Admin.Events"/>
```

# Contributing
* Pull requests are welcome!
90 changes: 52 additions & 38 deletions src/main/java/com/frit/keycloak/kafka/KafkaProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@
import org.keycloak.events.EventType;
import org.keycloak.events.admin.OperationType;

import java.util.Enumeration;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.*;
import java.util.function.Supplier;

@SuppressWarnings("unchecked")
public class KafkaProperties extends Properties implements KafkaConfiguration {
Expand All @@ -24,40 +22,48 @@ public class KafkaProperties extends Properties implements KafkaConfiguration {
private static final String TOPIC_EVENT_ADMIN_PROPERTY = "topic.event.admin";

public KafkaProperties(Config.Scope config) {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, config.get(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
put(ProducerConfig.METADATA_MAX_AGE_CONFIG, config.get(ProducerConfig.METADATA_MAX_AGE_CONFIG));
put(ProducerConfig.BATCH_SIZE_CONFIG, config.get(ProducerConfig.BATCH_SIZE_CONFIG));
put(ProducerConfig.ACKS_CONFIG, config.get(ProducerConfig.ACKS_CONFIG));
put(ProducerConfig.LINGER_MS_CONFIG, config.get(ProducerConfig.LINGER_MS_CONFIG));
put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, config.get(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG));
put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, config.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG));
put(ProducerConfig.CLIENT_ID_CONFIG, config.get(ProducerConfig.CLIENT_ID_CONFIG));
put(ProducerConfig.SEND_BUFFER_CONFIG, config.get(ProducerConfig.SEND_BUFFER_CONFIG));
put(ProducerConfig.RECEIVE_BUFFER_CONFIG, config.get(ProducerConfig.RECEIVE_BUFFER_CONFIG));
put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, config.get(ProducerConfig.MAX_REQUEST_SIZE_CONFIG));
put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, config.get(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG));
put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, config.get(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG));
put(ProducerConfig.MAX_BLOCK_MS_CONFIG, config.get(ProducerConfig.MAX_BLOCK_MS_CONFIG));
put(ProducerConfig.BUFFER_MEMORY_CONFIG, config.get(ProducerConfig.BUFFER_MEMORY_CONFIG));
put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.get(ProducerConfig.RETRY_BACKOFF_MS_CONFIG));
put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.get(ProducerConfig.COMPRESSION_TYPE_CONFIG));
put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, config.get(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG));
put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, config.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, config.get(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG));
put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, config.get(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG));
put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, config.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION));
put(ProducerConfig.RETRIES_CONFIG, config.get(ProducerConfig.RETRIES_CONFIG));
put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, config.get(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG));
put(ProducerConfig.PARTITIONER_CLASS_CONFIG, config.get(ProducerConfig.PARTITIONER_CLASS_CONFIG));
put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, config.get(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG));
put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, config.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, config.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG));
put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG));
put(ProducerConfig.SECURITY_PROVIDERS_CONFIG, config.get(ProducerConfig.SECURITY_PROVIDERS_CONFIG));
put(SCHEMA_REGISTRY_URL_PROPERTY, config.get(SCHEMA_REGISTRY_URL_PROPERTY));
put(TOPIC_EVENT_USER_PROPERTY, config.get(TOPIC_EVENT_USER_PROPERTY));
put(TOPIC_EVENT_ADMIN_PROPERTY, config.get(TOPIC_EVENT_ADMIN_PROPERTY));

// Required
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getValue(config, ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
.orElseThrow(missingProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)));
put(SCHEMA_REGISTRY_URL_PROPERTY, getValue(config, SCHEMA_REGISTRY_URL_PROPERTY)
.orElseThrow(missingProperty(SCHEMA_REGISTRY_URL_PROPERTY)));
put(TOPIC_EVENT_USER_PROPERTY, getValue(config, TOPIC_EVENT_USER_PROPERTY)
.orElseThrow(missingProperty(TOPIC_EVENT_USER_PROPERTY)));
put(TOPIC_EVENT_ADMIN_PROPERTY, getValue(config, TOPIC_EVENT_ADMIN_PROPERTY)
.orElseThrow(missingProperty(TOPIC_EVENT_ADMIN_PROPERTY)));

// Optional
getValue(config, ProducerConfig.RETRIES_CONFIG).ifPresent(value -> put(ProducerConfig.RETRIES_CONFIG, value));
getValue(config, ProducerConfig.BATCH_SIZE_CONFIG).ifPresent(value -> put(ProducerConfig.BATCH_SIZE_CONFIG, value));
getValue(config, ProducerConfig.LINGER_MS_CONFIG).ifPresent(value -> put(ProducerConfig.LINGER_MS_CONFIG, value));
getValue(config, ProducerConfig.BUFFER_MEMORY_CONFIG).ifPresent(value -> put(ProducerConfig.BUFFER_MEMORY_CONFIG, value));
getValue(config, ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG).ifPresent(value -> put(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG, value));
getValue(config, ProducerConfig.METADATA_MAX_AGE_CONFIG).ifPresent(value -> put(ProducerConfig.METADATA_MAX_AGE_CONFIG, value));
getValue(config, ProducerConfig.ACKS_CONFIG).ifPresent(value -> put(ProducerConfig.ACKS_CONFIG, value));
getValue(config, ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG).ifPresent(value -> put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, value));
getValue(config, ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG).ifPresent(value -> put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, value));
getValue(config, ProducerConfig.CLIENT_ID_CONFIG).ifPresent(value -> put(ProducerConfig.CLIENT_ID_CONFIG, value));
getValue(config, ProducerConfig.SEND_BUFFER_CONFIG).ifPresent(value -> put(ProducerConfig.SEND_BUFFER_CONFIG, value));
getValue(config, ProducerConfig.RECEIVE_BUFFER_CONFIG).ifPresent(value -> put(ProducerConfig.RECEIVE_BUFFER_CONFIG, value));
getValue(config, ProducerConfig.MAX_REQUEST_SIZE_CONFIG).ifPresent(value -> put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, value));
getValue(config, ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG).ifPresent(value -> put(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG, value));
getValue(config, ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG).ifPresent(value -> put(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG, value));
getValue(config, ProducerConfig.MAX_BLOCK_MS_CONFIG).ifPresent(value -> put(ProducerConfig.MAX_BLOCK_MS_CONFIG, value));
getValue(config, ProducerConfig.RETRY_BACKOFF_MS_CONFIG).ifPresent(value -> put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, value));
getValue(config, ProducerConfig.COMPRESSION_TYPE_CONFIG).ifPresent(value -> put(ProducerConfig.COMPRESSION_TYPE_CONFIG, value));
getValue(config, ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG).ifPresent(value -> put(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, value));
getValue(config, ProducerConfig.METRICS_NUM_SAMPLES_CONFIG).ifPresent(value -> put(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG, value));
getValue(config, ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG).ifPresent(value -> put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, value));
getValue(config, ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG).ifPresent(value -> put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, value));
getValue(config, ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION).ifPresent(value -> put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, value));
getValue(config, ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG).ifPresent(value -> put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, value));
getValue(config, ProducerConfig.PARTITIONER_CLASS_CONFIG).ifPresent(value -> put(ProducerConfig.PARTITIONER_CLASS_CONFIG, value));
getValue(config, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG).ifPresent(value -> put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, value));
getValue(config, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG).ifPresent(value -> put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, value));
getValue(config, ProducerConfig.TRANSACTION_TIMEOUT_CONFIG).ifPresent(value -> put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, value));
getValue(config, ProducerConfig.TRANSACTIONAL_ID_CONFIG).ifPresent(value -> put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, value));
getValue(config, ProducerConfig.SECURITY_PROVIDERS_CONFIG).ifPresent(value -> put(ProducerConfig.SECURITY_PROVIDERS_CONFIG, value));

String[] excludedEvents = config.getArray(EXCLUDE_EVENTS_PROPERTY);
if (excludedEvents != null) {
Expand All @@ -78,6 +84,14 @@ public KafkaProperties(Config.Scope config) {
}
}

private Optional<String> getValue(Config.Scope config, String key) {
return Optional.ofNullable(config.get(key));
}

private Supplier<KafkaEventException> missingProperty(String property) {
return () -> new KafkaEventException(String.format("Missing required property: %s", property));
}

@Override
public String getBootstrapServers() {
return getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
Expand Down

0 comments on commit 72cd9ed

Please sign in to comment.