Skip to content

Commit

Permalink
Infrastructure contexts & other bug fixes.
Browse files Browse the repository at this point in the history
- Implemented Infrastructure contexts (Fixes Issue ReactiveX#101)
- Fixed issue ReactiveX#55 (Wire debugging)
- Fixed issue ReactiveX#105 Made the global idle connection cleanup thread a daemon.
  • Loading branch information
Nitesh Kant committed May 6, 2014
1 parent 92178d8 commit ec00e52
Show file tree
Hide file tree
Showing 54 changed files with 2,123 additions and 506 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.reactivex.netty.contexts;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.Map;

/**
* This handler does the following:
*
* <ul>
<li>Writes any contexts available in {@link ContextsContainer} for the request id obtained by
{@link RequestIdProvider}</li>
<li>Reads any contexts written back from the server by calling
{@link ContextsContainer#consumeBidirectionalContextsFromResponse(ContextKeySupplier)}</li>
</ul>
*
* @author Nitesh Kant
*/
public abstract class AbstractClientContextHandler<R, W> extends AbstractContextHandler<R,W> {

protected final RequestIdProvider requestIdProvider;
protected final RequestCorrelator correlator;

protected AbstractClientContextHandler(RequestCorrelator correlator, RequestIdProvider requestIdProvider) {
this.correlator = correlator;
this.requestIdProvider = requestIdProvider;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

try {
if (isAcceptableToRead(msg)) {
@SuppressWarnings("unchecked")
R response = (R) msg;
String requestId = requestIdProvider.onClientResponse(ctx);

if (null != requestId) {
newRequestIdRead(requestId);
ContextsContainer container = ContextAttributeStorageHelper.getContainer(ctx, requestId);
if (null != container) {
ContextKeySupplier keySupplier = newKeySupplierForRead(response);
container.consumeBidirectionalContextsFromResponse(keySupplier);
}
}
}
super.channelRead(ctx, msg);
} finally {
String currentRequestId = getCurrentlyProcessingRequestId();
if (null != currentRequestId && isLastResponseFragmentToRead(msg)) {
correlator.onClientProcessingEnd(currentRequestId);
}
}
}

protected abstract void newRequestIdRead(String requestId);

protected abstract String getCurrentlyProcessingRequestId();

protected abstract boolean isLastResponseFragmentToRead(Object response);

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {

if (isAcceptableToWrite(msg)) {
String requestId = requestIdProvider.beforeClientRequest(ctx);

if (null != requestId) {
@SuppressWarnings("unchecked")
W request = (W) msg;

addKey(request, requestIdProvider.getRequestIdContextKeyName(), requestId);
ContextsContainer container = correlator.getContextForClientRequest(requestId);
ContextAttributeStorageHelper.setContainer(ctx, requestId, container);

if (null != container) {
Map<String,String> serializedContexts = container.getSerializedContexts();
for (Map.Entry<String, String> entry : serializedContexts.entrySet()) {
addKey(request, entry.getKey(), entry.getValue());
if (logger.isDebugEnabled()) {
logger.debug("Added an outbound context key. Name: " + entry.getKey() +
", value: " + entry.getValue());
}
}
}
}
}

super.write(ctx, msg, promise);
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof NewContextEvent) {
NewContextEvent newContextEvent = (NewContextEvent) evt;
correlator.beforeNewClientRequest(newContextEvent.getRequestId(), newContextEvent.getContainer());
}
super.userEventTriggered(ctx, evt);
}

public static class NewContextEvent {

private final String requestId;
private final ContextsContainer container;

public NewContextEvent(String requestId, ContextsContainer container) {
this.requestId = requestId;
this.container = container;
}

public ContextsContainer getContainer() {
return container;
}

public String getRequestId() {
return requestId;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.reactivex.netty.contexts;

import io.netty.channel.ChannelDuplexHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*
* @param <R> The type of object this handler will read.
* @param <W> The type of object this handler will write.
*
* @author Nitesh Kant
*/
public abstract class AbstractContextHandler<R, W> extends ChannelDuplexHandler {

protected static final Logger logger = LoggerFactory.getLogger(AbstractContextHandler.class);

/**
* Asserts whether the passed message is an acceptable object type to read. If it is not, this handler will just pass
* this message further in the pipeline.
*
* @param msg Message that is being read.
*
* @return {@code true} if the message is acceptable.
*/
protected abstract boolean isAcceptableToRead(Object msg);

/**
* Asserts whether the passed message is an acceptable object type to write. If it is not, this handler will just pass
* this message further in the pipeline.
*
* @param msg Message that is being written.
*
* @return {@code true} if the message is acceptable.
*/
protected abstract boolean isAcceptableToWrite(Object msg);

/**
* Adds a key to the message that is written.
*
* @param msg Message that is being written.
* @param key Key name to add.
* @param value Key value to add.
*/
protected abstract void addKey(W msg, String key, String value);

/**
* Creates a new {@link ContextKeySupplier} for the passed message to be written.
*
* @param msg Message to be written.
*
* @return The newly created {@link ContextKeySupplier}
*/
protected abstract ContextKeySupplier newKeySupplierForWrite(W msg);

/**
* Creates a new {@link ContextKeySupplier} for the passed message that is being read.
*
* @param msg Message that is being read.
*
* @return The newly created {@link ContextKeySupplier}
*/
protected abstract ContextKeySupplier newKeySupplierForRead(R msg);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package io.reactivex.netty.contexts;

import com.netflix.server.context.BiDirectional;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/**
* A generic handler for all protocols that can handle {@link ContextsContainer}s for servers. This handler does the
* following:
*
* <ul>
<li>Read any serialized contexts in the incoming HTTP requests. If any contexts are found, it will add it to the
{@link ContextsContainer} for that request. This will create the {@link ContextsContainer} instance using the factory
method {@link #newContextContainer(ContextKeySupplier)}. In case, any changes to the implementation of
{@link ContextsContainer} is required, it should be done via overriding this factory method.</li>
<li>Write any {@link BiDirectional} contexts which are modified back to the response. The modified contexts are found
by the method {@link ContextsContainer#getModifiedBidirectionalContexts()}</li>
</ul>
*
* All protocol specific actions are delegated to the abstract methods.
*
* @param <R> The type of object this handler will read.
* @param <W> The type of object this handler will write.
*
* @author Nitesh Kant
*/
public abstract class AbstractServerContextHandler<R, W> extends AbstractContextHandler<R, W> {

protected static final Logger logger = LoggerFactory.getLogger(AbstractServerContextHandler.class);

protected final RequestIdProvider requestIdProvider;
protected final RequestCorrelator correlator;

protected AbstractServerContextHandler(RequestCorrelator correlator, RequestIdProvider requestIdProvider) {
this.correlator = correlator;
this.requestIdProvider = requestIdProvider;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (isAcceptableToRead(msg)) {
@SuppressWarnings("unchecked")
R request = (R) msg;
ContextKeySupplier keySupplier = newKeySupplierForRead(request);
String requestId = requestIdProvider.onServerRequest(keySupplier, ctx);
if (null == requestId) {
requestId = requestIdProvider.newRequestId(keySupplier, ctx);
}

ContextsContainer contextsContainer = newContextContainer(keySupplier);
ContextAttributeStorageHelper.setContainer(ctx, requestId, contextsContainer);

correlator.onNewServerRequest(requestId, contextsContainer);
}

super.channelRead(ctx, msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
try {
if (isAcceptableToWrite(msg)) {
@SuppressWarnings("unchecked")
W response = (W) msg;
ContextKeySupplier keySupplier = newKeySupplierForWrite(response);
String requestId = requestIdProvider.beforeServerResponse(keySupplier, ctx);
if (null != requestId) {
newRequestIdWritten(requestId);
ContextsContainer container = ContextAttributeStorageHelper.getContainer(ctx, requestId);
if (null != container) {
Map<String,String> modifiedCtxs = container.getModifiedBidirectionalContexts();
for (Map.Entry<String, String> modifiedCtxEntry : modifiedCtxs.entrySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Added a modified bi-directional context key. Name: " + modifiedCtxEntry.getKey()
+ ", value: " + modifiedCtxEntry.getValue());
}
addKey(response, modifiedCtxEntry.getKey(), modifiedCtxEntry.getValue());
}
}
}
}

super.write(ctx, msg, promise);
} finally {
String currentRequestId = getCurrentlyProcessingRequestId();
if (null != currentRequestId && isLastResponseFragmenTotWrite(msg)) {
correlator.onClientProcessingEnd(currentRequestId);
}
}
}

protected abstract void newRequestIdWritten(String requestId);

protected abstract String getCurrentlyProcessingRequestId();

protected abstract boolean isLastResponseFragmenTotWrite(Object response);

protected ContextsContainer newContextContainer(ContextKeySupplier keySupplier) {
return new ContextsContainerImpl(keySupplier);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.reactivex.netty.contexts;

import io.reactivex.netty.contexts.http.HttpClientContextConfigurator;
import io.reactivex.netty.contexts.http.HttpServerContextConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurator;
import io.reactivex.netty.pipeline.PipelineConfigurators;
import io.reactivex.netty.protocol.http.client.HttpClientRequest;
import io.reactivex.netty.protocol.http.client.HttpClientResponse;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.protocol.text.sse.ServerSentEvent;

/**
* A factory class for different {@link PipelineConfigurator} for the context module.
*
* @author Nitesh Kant
*/
public final class ContextPipelineConfigurators {

private ContextPipelineConfigurators() {
}


public static <I, O> PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>>
httpServerConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator,
PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>> httpConfigurator) {
return new HttpServerContextConfigurator<I, O>(requestIdProvider, correlator, httpConfigurator);
}

public static <I, O> PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>>
httpClientConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator,
PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>> httpConfigurator) {
return new HttpClientContextConfigurator<I, O>(requestIdProvider, correlator, httpConfigurator);
}

public static <I, O> PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<O>>
httpServerConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator) {
return httpServerConfigurator(requestIdProvider, correlator, PipelineConfigurators.<I, O>httpServerConfigurator());
}

public static <I, O> PipelineConfigurator<HttpClientResponse<O>, HttpClientRequest<I>>
httpClientConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator) {
return httpClientConfigurator(requestIdProvider, correlator, PipelineConfigurators.<I, O>httpClientConfigurator());
}

public static <I> PipelineConfigurator<HttpClientResponse<ServerSentEvent>, HttpClientRequest<I>>
sseClientConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator) {
return new HttpClientContextConfigurator<I, ServerSentEvent>(requestIdProvider, correlator,
PipelineConfigurators.<I>sseClientConfigurator());
}

public static <I> PipelineConfigurator<HttpServerRequest<I>, HttpServerResponse<ServerSentEvent>>
sseServerConfigurator(RequestIdProvider requestIdProvider, RequestCorrelator correlator) {
return new HttpServerContextConfigurator<I, ServerSentEvent>(requestIdProvider, correlator,
PipelineConfigurators.<I>sseServerConfigurator());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.reactivex.netty.contexts;

import java.util.HashMap;
import java.util.Map;

/**
Expand All @@ -26,6 +27,10 @@ public class MapBackedKeySupplier implements ContextKeySupplier {

private final Map<String, String> keys;

public MapBackedKeySupplier() {
this(new HashMap<String, String>());
}

public MapBackedKeySupplier(final Map<String, String> keys) {
this.keys = keys;
}
Expand All @@ -34,4 +39,8 @@ public MapBackedKeySupplier(final Map<String, String> keys) {
public String getContextValue(String key) {
return keys.get(key);
}

public void put(String key, String value) {
keys.put(key, value);
}
}
Loading

0 comments on commit ec00e52

Please sign in to comment.