Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support Avro publishing #373

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions gradle/libs.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ versions += [
rxjava2: "2.0.9",
slf4j: "1.8.0-beta2",
logback: "1.3.0-alpha5",
mockito: "1.+"
mockito: "1.+",
avro: "1.11.1"
]

libs += [
Expand All @@ -29,7 +30,8 @@ libs += [
slf4jsimple: "org.slf4j:slf4j-simple:$versions.slf4j",
logback_core: "ch.qos.logback:logback-core:$versions.logback",
logback_classic: "ch.qos.logback:logback-classic:$versions.logback",
mockito_core: "org.mockito:mockito-core:$versions.mockito"
mockito_core: "org.mockito:mockito-core:$versions.mockito",
avro: "org.apache.avro:avro:$versions.avro"
]

ext {
Expand Down
22 changes: 22 additions & 0 deletions nakadi-java-client/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
plugins {
id "com.github.davidmc24.gradle.plugin.avro" version "1.3.0"
}

apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: "com.github.davidmc24.gradle.plugin.avro-base"

dependencies {
implementation project.libs.guava
Expand All @@ -7,6 +12,7 @@ dependencies {
implementation project.libs.okhttp3log
implementation project.libs.rxjava2
implementation project.libs.slf4j
implementation project.libs.avro

testImplementation project.libs.junit
testImplementation project.libs.logback_core
Expand All @@ -16,6 +22,11 @@ dependencies {
}

sourceSets {
main {
java {
srcDirs = ["src/main/java", "build/generated/sources"]
}
}
test {
java {
srcDir 'src/test/resources'
Expand Down Expand Up @@ -103,3 +114,14 @@ publishing {
signing {
sign publishing.publications.shadow
}

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

def generateAvro = tasks.register("generateAvro", GenerateAvroJavaTask) {
source("src/main/resources/nakadi-envelope-schema", "src/test/resources/avro-schemas")
outputDir = file("build/generated/sources")
}

tasks.named("compileJava").configure {
source(generateAvro)
}
68 changes: 68 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/AvroPayloadSerializer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package nakadi;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.zalando.nakadi.generated.avro.Envelope;
import org.zalando.nakadi.generated.avro.Metadata;
import org.zalando.nakadi.generated.avro.PublishingBatch;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class AvroPayloadSerializer implements PayloadSerializer {

private Map<String, EventTypeSchemaPair<Schema>> etSchemas;
public AvroPayloadSerializer(Map<String, EventTypeSchemaPair<Schema>> etSchemas) {
this.etSchemas = etSchemas;
}

@Override
public <T> byte[] toBytes(final String eventTypeName, final Collection<T> events) {
try {
final List<Envelope> envelops = events.stream()
.map(event -> {
EventEnvelope realEvent = (EventEnvelope) event;
try {
final ByteArrayOutputStream payloadOutStream = new ByteArrayOutputStream();
EventTypeSchemaPair<Schema> etSchemaPair = etSchemas.get(realEvent.getMetadata().eventType());
if(etSchemaPair == null){
throw new InvalidEventTypeException("Unexpected event-type "+ realEvent.getMetadata().eventType() + " provided during avro serialization");
}
new GenericDatumWriter(etSchemaPair.schema()).write(realEvent.data(),
EncoderFactory.get().directBinaryEncoder(payloadOutStream, null));

final EventMetadata metadata = realEvent.getMetadata();
return Envelope.newBuilder()
.setMetadata(Metadata.newBuilder()
.setEventType(metadata.eventType())
.setVersion(etSchemaPair.version())
.setOccurredAt(metadata.occurredAt().toInstant())
.setEid(metadata.eid())
.setPartition(metadata.partition())
.setPartitionCompactionKey(metadata.partitionCompactionKey())
.build())
.setPayload(ByteBuffer.wrap(payloadOutStream.toByteArray()))
.build();
} catch (IOException io) {
throw new RuntimeException();
}
})
.collect(Collectors.toList());
return PublishingBatch.newBuilder().setEvents(envelops)
.build().toByteBuffer().array();
} catch (IOException io) {
throw new RuntimeException();
}
}

@Override
public String payloadMimeType() {
return "application/avro-binary";
}
}
21 changes: 21 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/EventEnvelope.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package nakadi;

import org.apache.avro.generic.GenericRecord;

public class EventEnvelope<T extends GenericRecord> implements Event<T>{
private final T data;
private final EventMetadata metadata;

public EventEnvelope(final T data, final EventMetadata metadata) {
this.data = data;
this.metadata = metadata;
}

public T data() {
return data;
}

public EventMetadata getMetadata() {
return metadata;
}
}
11 changes: 11 additions & 0 deletions nakadi-java-client/src/main/java/nakadi/EventMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ public EventMetadata spanCtx(Map<String, String> spanCtx) {
return this;
}

/**
* Set the schema version of the event.
*
* @param version schema version of the event
* @return this
*/
public EventMetadata version(String version) {
this.version = version;
return this;
}

/**
* The version of the schema used to validate this event.
*
Expand Down
1 change: 0 additions & 1 deletion nakadi-java-client/src/main/java/nakadi/EventResource.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package nakadi;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down
103 changes: 52 additions & 51 deletions nakadi-java-client/src/main/java/nakadi/EventResourceReal.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.reflect.TypeToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -14,8 +17,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventResourceReal implements EventResource {

Expand All @@ -32,6 +33,7 @@ public class EventResourceReal implements EventResource {

private final NakadiClient client;
private final JsonSupport jsonSupport;
private final PayloadSerializer payloadSerializer;
private volatile RetryPolicy retryPolicy;
private volatile String flowId;
private boolean enablePublishingCompression;
Expand All @@ -44,12 +46,21 @@ public EventResourceReal(NakadiClient client) {

@VisibleForTesting
EventResourceReal(NakadiClient client, JsonSupport jsonSupport, CompressionSupport compressionSupport) {
this(client, jsonSupport, compressionSupport, new JsonPayloadSerializer(jsonSupport));
}

@VisibleForTesting
EventResourceReal(NakadiClient client,
JsonSupport jsonSupport,
CompressionSupport compressionSupport,
PayloadSerializer payloadSerializer) {
this.client = client;
this.jsonSupport = jsonSupport;
this.compressionSupport = compressionSupport;
if(client != null && client.enablePublishingCompression()) {
this.enablePublishingCompression = true;
}
this.payloadSerializer = payloadSerializer;
}

private static Response timed(Supplier<Response> sender, NakadiClient client, int eventCount) {
Expand All @@ -63,7 +74,7 @@ private static Response timed(Supplier<Response> sender, NakadiClient client, in
emitMetric(client, response, eventCount);
}
client.metricCollector().duration(
MetricCollector.Timer.eventSend, (System.nanoTime() - start), TimeUnit.NANOSECONDS);
MetricCollector.Timer.eventSend, (System.nanoTime() - start), TimeUnit.NANOSECONDS);
}
}

Expand Down Expand Up @@ -100,11 +111,11 @@ private static void emitMetric(NakadiClient client, Response response, int event

@Override
public final <T> Response send(String eventTypeName, Collection<T> events) {
return send(eventTypeName,events, SENTINEL_HEADERS);
return send(eventTypeName, events, SENTINEL_HEADERS);
}

@Override public <T> Response send(String eventTypeName, Collection<T> events,
Map<String, Object> headers) {
Map<String, Object> headers) {
NakadiException.throwNonNull(eventTypeName, "Please provide an event type name");
NakadiException.throwNonNull(events, "Please provide one or more events");
NakadiException.throwNonNull(headers, "Please provide some headers");
Expand All @@ -113,14 +124,11 @@ public final <T> Response send(String eventTypeName, Collection<T> events) {
throw new NakadiException(Problem.localProblem("event send called with zero events", ""));
}

List<EventRecord<T>> collect =
events.stream().map(e -> new EventRecord<>(eventTypeName, e)).collect(Collectors.toList());

if (collect.get(0).event() instanceof String) {
if (new ArrayList<>(events).get(0) instanceof String) {
return sendUsingSupplier(eventTypeName,
() -> ("[" + Joiner.on(",").join(events) + "]").getBytes(Charsets.UTF_8), headers);
() -> ("[" + Joiner.on(",").join(events) + "]").getBytes(Charsets.UTF_8), headers);
} else {
return sendBatchOfEvents(collect, headers);
return sendBatchOfEvents(eventTypeName, events, headers);
}
}

Expand Down Expand Up @@ -159,7 +167,7 @@ public <T> Response send(String eventTypeName, T event) {
}

@Override public <T> BatchItemResponseCollection sendBatch(String eventTypeName, List<T> events,
Map<String, Object> headers) {
Map<String, Object> headers) {

List<BatchItemResponse> items = Lists.newArrayList();
try (Response send = send(eventTypeName, events, headers)) {
Expand All @@ -172,69 +180,62 @@ public <T> Response send(String eventTypeName, T event) {
}

private Response sendUsingSupplier(String eventTypeName, ContentSupplier supplier,
Map<String, Object> headers) {
Map<String, Object> headers) {
// todo: close
return timed(() -> client.resourceProvider()
.newResource()
.retryPolicy(retryPolicy)
.postEventsThrowing(
collectionUri(eventTypeName).buildString(), options(headers), supplier),
client,
1);
.newResource()
.retryPolicy(retryPolicy)
.postEventsThrowing(
collectionUri(eventTypeName).buildString(), options(headers), supplier),
client,
1);
}

private <T> Response sendBatchOfEvents(List<EventRecord<T>> events, Map<String, Object> headers) {
private <T> Response sendBatchOfEvents(final String eventTypeName, Collection<T> events, Map<String, Object> headers) {
NakadiException.throwNonNull(events, "Please provide one or more event records");

String topic = events.get(0).eventType();
List<Object> eventList =
events.stream().map(this::mapEventRecordToSerdes).collect(Collectors.toList());

ContentSupplier supplier;

if(enablePublishingCompression) {
supplier = supplyObjectAsCompressedAndSetHeaders(eventList, headers);
supplier = supplyObjectAsCompressedAndSetHeaders(eventTypeName, events, headers);
} else {
supplier = () -> jsonSupport.toJsonBytesCompressed(eventList);
supplier = () -> payloadSerializer.toBytes(eventTypeName, events);
}

final ContentSupplier finalSupplier = supplier;

// todo: close
return timed(() -> client.resourceProvider()
.newResource()
.retryPolicy(retryPolicy)
.postEventsThrowing(
collectionUri(topic).buildString(),
options(headers),
finalSupplier),
client,
eventList.size());
}

@VisibleForTesting <T> Object mapEventRecordToSerdes(EventRecord<T> er) {
return jsonSupport.transformEventRecord(er);
.newResource()
.retryPolicy(retryPolicy)
.postEventsThrowing(
collectionUri(eventTypeName).buildString(),
options(headers),
finalSupplier),
client,
events.size());
}

private ResourceOptions options(Map<String, Object> headers) {
final ResourceOptions options = ResourceSupport.options(APPLICATION_JSON);
options.tokenProvider(client.resourceTokenProvider());
if (flowId != null) {
options.flowId(flowId);
}
options.headers(headers);
return options;
final ResourceOptions options = ResourceSupport.options(APPLICATION_JSON);
options.tokenProvider(client.resourceTokenProvider());
if (flowId != null) {
options.flowId(flowId);
}
options.header("Content-Type", payloadSerializer.payloadMimeType());
options.headers(headers);
return options;
}

private UriBuilder collectionUri(String topic) {
return UriBuilder.builder(client.baseURI())
.path(PATH_EVENT_TYPES)
.path(topic)
.path(PATH_COLLECTION);
.path(PATH_EVENT_TYPES)
.path(topic)
.path(PATH_COLLECTION);
}

private <T> ContentSupplier supplyObjectAsCompressedAndSetHeaders(T sending, Map<String, Object> headers) {
final byte[] json = jsonSupport.toJsonBytesCompressed(sending);
private <T> ContentSupplier supplyObjectAsCompressedAndSetHeaders(String eventTypeName, Collection<T> sending, Map<String, Object> headers) {
final byte[] json = payloadSerializer.toBytes(eventTypeName, sending);
return supplyBytesAsCompressedAndSetHeaders(json, headers);
}

Expand All @@ -248,7 +249,7 @@ private <T> ContentSupplier supplyStringAsCompressedAndSetHeaders(String sending
}

private ContentSupplier supplyBytesAsCompressedAndSetHeaders(
byte[] json, Map<String, Object> headers) {
byte[] json, Map<String, Object> headers) {

// force the compression outside the lambda to access the length
final byte[] compressed = compressionSupport.compress(json);
Expand Down
Loading