Skip to content

Commit

Permalink
First pass at making the Confluent KDS able to write Avro back.
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Hughes <[email protected]>
  • Loading branch information
Jim Hughes committed Dec 21, 2021
1 parent 617f886 commit 7740c8c
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@

package org.locationtech.geomesa.kafka.confluent

import java.io.{InputStream, OutputStream}
import java.net.URL
import java.util.Date
import com.typesafe.scalalogging.LazyLogging
import io.confluent.kafka.schemaregistry.client.{CachedSchemaRegistryClient, SchemaRegistryClient}
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.joda.time.format.{DateTimeFormatter, ISODateTimeFormat}
import org.locationtech.geomesa.features.SerializationOption.SerializationOption
import org.locationtech.geomesa.features.avro.AvroSimpleFeatureTypeParser.{GeoMesaAvroDateFormat, GeoMesaAvroGeomFormat, GeoMesaAvroVisibilityField}
import org.locationtech.geomesa.features.{ScalaSimpleFeature, SimpleFeatureSerializer}
import org.locationtech.geomesa.security.SecurityUtils
import org.locationtech.geomesa.utils.text.WKTUtils
import org.locationtech.jts.geom.Geometry
import org.opengis.feature.simple.{SimpleFeature, SimpleFeatureType}

import java.io.{InputStream, OutputStream}
import java.net.URL
import java.util.Date
import scala.collection.JavaConverters._
import scala.util.control.NonFatal

Expand Down Expand Up @@ -95,8 +97,73 @@ class ConfluentFeatureSerializer(
override def deserialize(id: String, bytes: Array[Byte], offset: Int, length: Int): SimpleFeature =
throw new NotImplementedError()

override def serialize(feature: SimpleFeature): Array[Byte] =
throw new NotImplementedError("ConfluentSerializer is read-only")
override def serialize(feature: SimpleFeature): Array[Byte] = {
// Strategy: We have the SFT.
// 1. Determine Avro schema.
// 2. Build Avro record
// 3. serialize with the io.confluent.kafka.serializers.KafkaAvroSerializer

// TODO: Cribbed code / refactor
val schemaId = Option(sft.getUserData.get(ConfluentMetadata.SchemaIdKey))
.map(_.asInstanceOf[String].toInt).getOrElse {
throw new IllegalStateException(s"Cannot create ConfluentFeatureSerializer because SimpleFeatureType " +
s"'${sft.getTypeName}' does not have schema id at key '${ConfluentMetadata.SchemaIdKey}'")
}
val schema: Schema = schemaRegistryClient.getById(schemaId)

val record: GenericData.Record = new GenericData.Record(schema)
schema.getFields.asScala.foreach { field =>
val name = field.name()
val ad = sft.getDescriptor(name)

if (ad != null) {
println(s"Handling $name with type ${ad.getType.getBinding}")
if (ad.getType.getBinding.isAssignableFrom(classOf[Geometry]) ||
classOf[Geometry].isAssignableFrom(ad.getType.getBinding)) {
println("Doing geometry handling!")
// Handling the geometry field!
// TODO: Add handling for WKB by reading the metadata
val geom = feature.getAttribute(name).asInstanceOf[Geometry]
record.put(name, WKTUtils.write(geom))
} else if (ad.getType.getBinding.isAssignableFrom(classOf[java.util.Date]) ||
classOf[java.util.Date].isAssignableFrom(ad.getType.getBinding)) {
val date: Date = feature.getAttribute(name).asInstanceOf[java.util.Date]
val formatter: DateTimeFormatter = ISODateTimeFormat.dateTime()
record.put(name, formatter.print(date.getTime)) // JNH: This may not be formatted properly
} else {
record.put(name, feature.getAttribute(name))
}
} else {
// TODO: Fix this!
// This is to handle "visibility" / missing fields
record.put(name, "")
}
}

// sft.getAttributeDescriptors.asScala.foreach { ad =>
// val name = ad.getLocalName
// println(s"Handling $name with type ${ad.getType.getBinding}")
// if (ad.getType.getBinding.isAssignableFrom(classOf[Geometry]) ||
// classOf[Geometry].isAssignableFrom(ad.getType.getBinding)) {
// println("Doing geometry handling!")
// // Handling the geometry field!
// // TODO: Add handling for WKB by reading the metadata
// val geom = feature.getAttribute(name).asInstanceOf[Geometry]
// record.put(name, WKTUtils.write(geom))
// } else if (ad.getType.getBinding.isAssignableFrom(classOf[java.util.Date]) ||
// classOf[java.util.Date].isAssignableFrom(ad.getType.getBinding)) {
// val date: Date = feature.getAttribute(name).asInstanceOf[java.util.Date]
// val parser: DateTimeFormatter = ISODateTimeFormat.dateTimeParser()
// record.put(name, date.toString) // JNH: This may not be formatted properly
// } else {
// record.put(name, feature.getAttribute(name))
// }
// }

val kas = new KafkaAvroSerializer(schemaRegistryClient)
// TODO: Wire through topic instead of "confluent-kds-test"
kas.serialize("confluent-kds-test", record)
}

override def serialize(feature: SimpleFeature, out: OutputStream): Unit =
throw new NotImplementedError("ConfluentSerializer is read-only")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import org.locationtech.geomesa.kafka.utils.{GeoMessage, GeoMessageSerializer}
import org.opengis.feature.simple.SimpleFeatureType

class ConfluentGeoMessageSerializer(sft: SimpleFeatureType, serializer: ConfluentFeatureSerializer)
extends GeoMessageSerializer(sft, null, null, null, 0) {
extends GeoMessageSerializer(sft, serializer, null, null, 0) {

override def serialize(msg: GeoMessage): (Array[Byte], Array[Byte], Map[String, Array[Byte]]) =
throw new NotImplementedError("Confluent data store is read-only")
// override def serialize(msg: GeoMessage): (Array[Byte], Array[Byte], Map[String, Array[Byte]]) =
// throw new NotImplementedError("Confluent data store is read-only")

override def deserialize(
key: Array[Byte],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,49 @@

package org.locationtech.geomesa.kafka.confluent

import java.util.{Date, Properties}
import io.confluent.kafka.serializers.KafkaAvroSerializer
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.kafka.clients.consumer.ConsumerConfig.{BOOTSTRAP_SERVERS_CONFIG, KEY_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER_CLASS_CONFIG}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.consumer.{Consumer, ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.geotools.data.DataStoreFinder
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.geotools.data.{DataStoreFinder, Transaction}
import org.junit.runner.RunWith
import org.locationtech.geomesa.features.avro.AvroSimpleFeatureTypeParser.{GeoMesaAvroDateFormat, GeoMesaAvroExcludeField, GeoMesaAvroGeomDefault, GeoMesaAvroGeomFormat, GeoMesaAvroGeomType, GeoMesaAvroVisibilityField}
import org.locationtech.geomesa.features.ScalaSimpleFeature
import org.locationtech.geomesa.features.avro.AvroSimpleFeatureTypeParser._
import org.locationtech.geomesa.kafka.KafkaConsumerVersions
import org.locationtech.geomesa.kafka.data.KafkaDataStore
import org.locationtech.geomesa.security.SecurityUtils
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
import org.locationtech.geomesa.utils.geotools.{FeatureUtils, SimpleFeatureTypes}
import org.locationtech.geomesa.utils.io.WithClose
import org.locationtech.geomesa.utils.text.WKBUtils
import org.locationtech.jts.geom.{Coordinate, GeometryFactory, Point, Polygon}
import org.specs2.mutable.{After, Specification}
import org.specs2.runner.JUnitRunner

import java.nio.ByteBuffer
import java.util.concurrent.Executors
import java.util.{Date, Properties}
import scala.collection.JavaConversions._
import scala.concurrent.duration._

object ConfluentKafkaDataStoreTest {
val topic = "confluent-kds-test"
}

import org.locationtech.geomesa.kafka.confluent.ConfluentKafkaDataStoreTest._

@RunWith(classOf[JUnitRunner])
class ConfluentKafkaDataStoreTest extends Specification {
sequential

"ConfluentKafkaDataStore" should {

val topic = "confluent-kds-test"

// val topic = "confluent-kds-test"

val schemaJson1 =
s"""{
Expand Down Expand Up @@ -73,7 +87,7 @@ class ConfluentKafkaDataStoreTest extends Specification {
| }
| ]
|}""".stripMargin
val schema1 = new Schema.Parser().parse(schemaJson1)
val schema1: Schema = new Schema.Parser().parse(schemaJson1)
val encodedSft1 = s"id:String:cardinality=high,*position:Point,speed:Double,date:Date"

val schemaJson2 =
Expand Down Expand Up @@ -128,7 +142,7 @@ class ConfluentKafkaDataStoreTest extends Specification {

"deserialize simple features when the schema and records are valid" in new ConfluentKafkaTestContext {
private val id1 = "1"
private val record1 = new GenericData.Record(schema1)
private val record1: GenericData.Record = new GenericData.Record(schema1)
record1.put("id", id1)
record1.put("position", "POINT(10 20)")
record1.put("speed", 12.325d)
Expand All @@ -143,6 +157,33 @@ class ConfluentKafkaDataStoreTest extends Specification {
record2.put("date", "2021-12-07T17:23:25.372-05:00")
record2.put("visibility", "hidden")

val consumer = getConsumer
def runnable = new Runnable {
override def run(): Unit = {
val frequency = java.time.Duration.ofMillis(100)
try {
var interrupted = false
while (!interrupted) {
try {
val result: ConsumerRecords[String, GenericRecord] = KafkaConsumerVersions.poll(consumer, frequency)
import scala.collection.JavaConversions._
result.iterator().foreach { cr =>
val key = cr.key()
val value = cr.value()
println(s"Key: $key value: $value")
}
} catch {
case t: Throwable =>
t.printStackTrace()
interrupted = true
}
}
}
}
}
val es = Executors.newFixedThreadPool(1)
es.execute(runnable)

private val producer = getProducer
producer.send(new ProducerRecord[String, GenericRecord](topic, id1, record1)).get
producer.send(new ProducerRecord[String, GenericRecord](topic, id2, record2)).get
Expand All @@ -164,6 +205,18 @@ class ConfluentKafkaDataStoreTest extends Specification {
feature.getDefaultGeometry mustEqual expectedPosition
SecurityUtils.getVisibility(feature) mustEqual ""
}
Thread.sleep(1000)

val sft = kds.getSchema(topic)
println(s"SFT: $sft")

val f2 = ScalaSimpleFeature.create(sft, "7", "id", "POINT(2 2)", 1.234d, "2017-01-01T00:00:02.000Z")
println(s"SF: $f2")

WithClose(kds.getFeatureWriterAppend(topic, Transaction.AUTO_COMMIT)) { writer =>
FeatureUtils.write(writer, f2, useProvidedFid = true)
}

}

"support the GeoMessage control operation for" >> {
Expand Down Expand Up @@ -296,6 +349,22 @@ class ConfluentKafkaDataStoreTest extends Specification {
new KafkaProducer[String, GenericRecord](producerProps)
}

def getConsumer: Consumer[String, GenericRecord] = {
val props = new Properties()
props.put(BOOTSTRAP_SERVERS_CONFIG, confluentKafka.brokers)
// props.put(ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
props.put(VALUE_DESERIALIZER_CLASS_CONFIG, classOf[KafkaAvroDeserializer].getName)
props.put("schema.registry.url", confluentKafka.schemaRegistryUrl)
props.put("group.id", "test")
// properties.foreach { case (k, v) => props.put(k, v) }

val consumer = new KafkaConsumer[String, GenericRecord](props)
val listener = new NoOpConsumerRebalanceListener()
KafkaConsumerVersions.subscribe(consumer, topic, listener)
consumer
}

protected def getStore: KafkaDataStore = {
val params = Map(
"kafka.schema.registry.url" -> confluentKafka.schemaRegistryUrl,
Expand All @@ -305,7 +374,8 @@ class ConfluentKafkaDataStoreTest extends Specification {
"kafka.topic.replication" -> 1,
"kafka.consumer.read-back" -> "Inf",
"kafka.zk.path" -> "",
"kafka.consumer.count" -> 1
"kafka.consumer.count" -> 1,
"kafka.serialization.type" -> "avro"
)

DataStoreFinder.getDataStore(params).asInstanceOf[KafkaDataStore]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,9 @@ class KafkaDataStore(
val producer = getTransactionalProducer(transaction)
val writer =
if (sft.isVisibilityRequired) {
new ModifyKafkaFeatureWriter(sft, producer, config.serialization, filter) with RequiredVisibilityWriter
new ModifyKafkaFeatureWriter(sft, producer, serialization(sft, config.serialization), filter) with RequiredVisibilityWriter
} else {
new ModifyKafkaFeatureWriter(sft, producer, config.serialization, filter)
new ModifyKafkaFeatureWriter(sft, producer, serialization(sft, config.serialization), filter)
}
if (config.clearOnStart && cleared.add(typeName)) {
writer.clear()
Expand All @@ -318,9 +318,9 @@ class KafkaDataStore(
val producer = getTransactionalProducer(transaction)
val writer =
if (sft.isVisibilityRequired) {
new AppendKafkaFeatureWriter(sft, producer, config.serialization) with RequiredVisibilityWriter
new AppendKafkaFeatureWriter(sft, producer, serialization(sft, config.serialization)) with RequiredVisibilityWriter
} else {
new AppendKafkaFeatureWriter(sft, producer, config.serialization)
new AppendKafkaFeatureWriter(sft, producer, serialization(sft, config.serialization))
}
if (config.clearOnStart && cleared.add(typeName)) {
writer.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ object KafkaFeatureWriter {
class AppendKafkaFeatureWriter(
sft: SimpleFeatureType,
producer: KafkaFeatureProducer,
serialization: SerializationType
serializer: GeoMessageSerializer
) extends KafkaFeatureWriter with LazyLogging {

protected val topic: String = KafkaDataStore.topic(sft)

protected val serializer: GeoMessageSerializer = GeoMessageSerializer(sft, serialization)
// protected val serializer: GeoMessageSerializer = GeoMessageSerializer(sft, serialization)

protected val feature = new ScalaSimpleFeature(sft, "-1")

Expand Down Expand Up @@ -98,9 +98,9 @@ object KafkaFeatureWriter {
class ModifyKafkaFeatureWriter(
sft: SimpleFeatureType,
producer: KafkaFeatureProducer,
serialization: SerializationType,
serializer: GeoMessageSerializer,
filter: Filter
) extends AppendKafkaFeatureWriter(sft, producer, serialization) {
) extends AppendKafkaFeatureWriter(sft, producer, serializer) {

import scala.collection.JavaConversions._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object GeoMessageSerializer {
}

class GeoMessageSerializerFactory {
def apply(sft: SimpleFeatureType, serialization: SerializationType, `lazy`: Boolean): GeoMessageSerializer =
def apply(sft: SimpleFeatureType, serialization: SerializationType, `lazy`: Boolean = false): GeoMessageSerializer =
GeoMessageSerializer.apply(sft, serialization, `lazy`)
}
}
Expand Down

0 comments on commit 7740c8c

Please sign in to comment.