Skip to content

Commit

Permalink
Merge branch 'headers'
Browse files Browse the repository at this point in the history
  • Loading branch information
Javierlj committed Oct 22, 2019
2 parents 6680e40 + 965759d commit c4053c7
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.slf4j.LoggerFactory
* @param contentType Type of content. It can be: ContentType.JSON or ContentType.Plain
* @param method HTTP Method. It can be: HTTPMethod.POST, HTTPMethod.PUT, HTTPMethod.PATCH
*/
case class OrionSinkObject(content: String, url: String, contentType: ContentType.Value, method: HTTPMethod.Value)
case class OrionSinkObject(content: String, url: String, contentType: ContentType.Value, method: HTTPMethod.Value, headers: Map[String,String]= (Map[String,String]()))

/**
* Content type of the HTTP message
Expand Down Expand Up @@ -73,7 +73,6 @@ object OrionSink {
stream.addSink( msg => {

val httpEntity : HttpEntityEnclosingRequestBase= createHttpMsg(msg)

val client = HttpClientBuilder.create.build

try {
Expand All @@ -97,6 +96,9 @@ object OrionSink {
def createHttpMsg(msg: OrionSinkObject) : HttpEntityEnclosingRequestBase= {
val httpEntity = getMethod(msg.method, msg.url)
httpEntity.setHeader("Content-type", msg.contentType.toString)
if(msg.headers.nonEmpty){
msg.headers.foreach({case(k,v) => httpEntity.setHeader(k,v)})
}
httpEntity.setEntity(new StringEntity(msg.content))
httpEntity
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
package org.fiware.cosmos.orion.flink.connector.tests

import java.net.{InetAddress, InetSocketAddress}
import java.net.InetSocketAddress

import io.netty.handler.codec.http.{DefaultFullHttpRequest, HttpMethod, HttpVersion}
import org.fiware.cosmos.orion.flink.connector.{NgsiEvent, OrionSink, OrionHttpHandler, OrionSinkObject, OrionHttpServer, ContentType, HTTPMethod}
import org.junit.{Assert, Test}
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.{DefaultFullHttpRequest, HttpMethod, HttpVersion}
import io.netty.util.CharsetUtil
import org.apache.http.client.methods.{HttpPatch, HttpPost, HttpPut}
import org.fiware.cosmos.orion.flink.connector.test.FlinkJobTest
import org.fiware.cosmos.orion.flink.connector._
import org.junit.{Assert, Test}
import org.mockito.Mockito.mock

object Utils {
final val Port = 9001
final val SleepTime = 10000
final val SleepTimeShort = 3000
final val SleepTime = 20000
final val SleepTimeShort = 6000
final val ServerAddress = "http://localhost:9001"
final val OrionAddress = "http://localhost:2026"
final val ContentType = "Content-Type"
Expand Down

0 comments on commit c4053c7

Please sign in to comment.