Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

[Testing] Combo changes #458

Closed
wants to merge 11 commits into from
19 changes: 11 additions & 8 deletions src/Microsoft.AspNet.Server.Kestrel/Http/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,22 +70,25 @@ public abstract partial class Frame : FrameContext, IFrameControl
private readonly Action<IFeatureCollection> _prepareRequest;

private readonly string _pathBase;
protected readonly IStringCache _stringCache;

public Frame(ConnectionContext context)
: this(context, remoteEndPoint: null, localEndPoint: null, prepareRequest: null)
: this(context, remoteEndPoint: null, localEndPoint: null, prepareRequest: null, stringCache: null)
{
}

public Frame(ConnectionContext context,
IPEndPoint remoteEndPoint,
IPEndPoint localEndPoint,
Action<IFeatureCollection> prepareRequest)
Action<IFeatureCollection> prepareRequest,
IStringCache stringCache)
: base(context)
{
_remoteEndPoint = remoteEndPoint;
_localEndPoint = localEndPoint;
_prepareRequest = prepareRequest;
_pathBase = context.ServerAddress.PathBase;
_stringCache = stringCache;

FrameControl = this;
Reset();
Expand Down Expand Up @@ -702,7 +705,7 @@ protected bool TakeStartLine(SocketInput input)
{
return false;
}
var method = begin.GetAsciiString(scan);
var method = begin.GetAsciiString(scan, _stringCache);

scan.Take();
begin = scan;
Expand All @@ -726,7 +729,7 @@ protected bool TakeStartLine(SocketInput input)
{
return false;
}
queryString = begin.GetAsciiString(scan);
queryString = begin.GetAsciiString(scan, _stringCache);
}

scan.Take();
Expand All @@ -735,7 +738,7 @@ protected bool TakeStartLine(SocketInput input)
{
return false;
}
var httpVersion = begin.GetAsciiString(scan);
var httpVersion = begin.GetAsciiString(scan, _stringCache);

scan.Take();
if (scan.Take() != '\n')
Expand All @@ -756,7 +759,7 @@ protected bool TakeStartLine(SocketInput input)
else
{
// URI wasn't encoded, parse as ASCII
requestUrlPath = pathBegin.GetAsciiString(pathEnd);
requestUrlPath = pathBegin.GetAsciiString(pathEnd, _stringCache);
}

consumed = scan;
Expand Down Expand Up @@ -809,7 +812,7 @@ private bool RequestUrlStartsWithPathBase(string requestUrl, out bool caseMatche
return true;
}

public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders)
public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders requestHeaders, IStringCache stringCache)
{
var scan = input.ConsumingStart();
var consumed = scan;
Expand Down Expand Up @@ -900,7 +903,7 @@ public static bool TakeMessageHeaders(SocketInput input, FrameRequestHeaders req
}

var name = beginName.GetArraySegment(endName);
var value = beginValue.GetAsciiString(endValue);
var value = beginValue.GetAsciiString(endValue, stringCache);
if (wrapping)
{
value = value.Replace("\r\n", " ");
Expand Down
12 changes: 8 additions & 4 deletions src/Microsoft.AspNet.Server.Kestrel/Http/FrameOfT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using System.Threading.Tasks;
using Microsoft.AspNet.Hosting.Server;
using Microsoft.AspNet.Http.Features;
using Microsoft.AspNet.Server.Kestrel.Infrastructure;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNet.Server.Kestrel.Http
Expand All @@ -16,16 +17,17 @@ public class Frame<TContext> : Frame

public Frame(IHttpApplication<TContext> application,
ConnectionContext context)
: this(application, context, remoteEndPoint: null, localEndPoint: null, prepareRequest: null)
: this(application, context, remoteEndPoint: null, localEndPoint: null, prepareRequest: null, stringCache: null)
{
}

public Frame(IHttpApplication<TContext> application,
ConnectionContext context,
IPEndPoint remoteEndPoint,
IPEndPoint localEndPoint,
Action<IFeatureCollection> prepareRequest)
: base(context, remoteEndPoint, localEndPoint, prepareRequest)
Action<IFeatureCollection> prepareRequest,
IStringCache stringCache)
: base(context, remoteEndPoint, localEndPoint, prepareRequest, stringCache)
{
_application = application;
}
Expand All @@ -42,6 +44,8 @@ public override async Task RequestProcessingAsync()
{
while (!_requestProcessingStopping)
{
_stringCache?.MarkStart();

while (!_requestProcessingStopping && !TakeStartLine(SocketInput))
{
if (SocketInput.RemoteIntakeFin)
Expand All @@ -51,7 +55,7 @@ public override async Task RequestProcessingAsync()
await SocketInput;
}

while (!_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders))
while (!_requestProcessingStopping && !TakeMessageHeaders(SocketInput, _requestHeaders, _stringCache))
{
if (SocketInput.RemoteIntakeFin)
{
Expand Down
116 changes: 95 additions & 21 deletions src/Microsoft.AspNet.Server.Kestrel/Http/SocketOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class SocketOutput : ISocketOutput
private const int _maxPendingWrites = 3;
private const int _maxBytesPreCompleted = 65536;
private const int _initialTaskQueues = 64;
private const int _maxPooledWriteContexts = 32;

private static WaitCallback _returnBlocks = (state) => ReturnBlocks((MemoryPoolBlock2)state);

Expand All @@ -39,6 +40,7 @@ public class SocketOutput : ISocketOutput

// This locks access to to all of the below fields
private readonly object _contextLock = new object();
private bool _isDisposed = false;

// The number of write operations that have been scheduled so far
// but have not completed.
Expand All @@ -49,6 +51,7 @@ public class SocketOutput : ISocketOutput
private WriteContext _nextWriteContext;
private readonly Queue<TaskCompletionSource<object>> _tasksPending;
private readonly Queue<TaskCompletionSource<object>> _tasksCompleted;
private readonly Queue<WriteContext> _writeContextPool;

public SocketOutput(
KestrelThread thread,
Expand All @@ -67,6 +70,7 @@ public SocketOutput(
_threadPool = threadPool;
_tasksPending = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_tasksCompleted = new Queue<TaskCompletionSource<object>>(_initialTaskQueues);
_writeContextPool = new Queue<WriteContext>(_maxPooledWriteContexts);

_head = memory.Lease();
_tail = _head;
Expand All @@ -93,7 +97,14 @@ public Task WriteAsync(
{
if (_nextWriteContext == null)
{
_nextWriteContext = new WriteContext(this);
if (_writeContextPool.Count > 0)
{
_nextWriteContext = _writeContextPool.Dequeue();
}
else
{
_nextWriteContext = new WriteContext(this);
}
}

if (socketShutdownSend)
Expand Down Expand Up @@ -272,9 +283,12 @@ private void WriteAllPending()
}

// This is called on the libuv event loop
private void OnWriteCompleted(int bytesWritten, int status, Exception error)
private void OnWriteCompleted(WriteContext writeContext)
{
_log.ConnectionWriteCallback(_connectionId, status);
var bytesWritten = writeContext.ByteCount;
var status = writeContext.WriteStatus;
var error = writeContext.WriteError;


if (error != null)
{
Expand All @@ -288,6 +302,7 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)

lock (_contextLock)
{
PoolWriteContext(writeContext);
if (_nextWriteContext != null)
{
scheduleWrite = true;
Expand Down Expand Up @@ -330,11 +345,11 @@ private void OnWriteCompleted(int bytesWritten, int status, Exception error)
}
}

_log.ConnectionWriteCallback(_connectionId, status);

if (scheduleWrite)
{
// ScheduleWrite();
// on right thread, fairness issues?
WriteAllPending();
ScheduleWrite();
}

_tasksCompleted.Clear();
Expand Down Expand Up @@ -368,6 +383,32 @@ private void ReturnAllBlocks()
}
}

private void PoolWriteContext(WriteContext writeContext)
{
// called inside _contextLock
if (!_isDisposed && _writeContextPool.Count < _maxPooledWriteContexts)
{
writeContext.Reset();
_writeContextPool.Enqueue(writeContext);
}
else
{
writeContext.Dispose();
}
}

private void Dispose()
{
lock (_contextLock)
{
_isDisposed = true;
while (_writeContextPool.Count > 0)
{
_writeContextPool.Dequeue().Dispose();
}
}
}

void ISocketOutput.Write(ArraySegment<byte> buffer, bool immediate)
{
var task = WriteAsync(buffer, immediate);
Expand All @@ -387,14 +428,14 @@ Task ISocketOutput.WriteAsync(ArraySegment<byte> buffer, bool immediate, Cancell
return WriteAsync(buffer, immediate);
}

private class WriteContext
private class WriteContext : IDisposable
{
private static WaitCallback _returnWrittenBlocks = (state) => ReturnWrittenBlocks((MemoryPoolBlock2)state);

private MemoryPoolIterator2 _lockedStart;
private MemoryPoolIterator2 _lockedEnd;
private int _bufferCount;
private int _byteCount;
public int ByteCount;

public SocketOutput Self;

Expand All @@ -404,11 +445,15 @@ private class WriteContext
public int WriteStatus;
public Exception WriteError;

private UvWriteReq _writeReq;

public int ShutdownSendStatus;

public WriteContext(SocketOutput self)
{
Self = self;
_writeReq = new UvWriteReq(Self._log);
_writeReq.Init(Self._thread.Loop);
}

/// <summary>
Expand All @@ -418,27 +463,28 @@ public void DoWriteIfNeeded()
{
LockWrite();

if (_byteCount == 0 || Self._socket.IsClosed)
if (ByteCount == 0 || Self._socket.IsClosed)
{
DoShutdownIfNeeded();
return;
}

var writeReq = new UvWriteReq(Self._log);
writeReq.Init(Self._thread.Loop);
// Sample values locally in case write completes inline
// to allow block to be Reset and still complete this function
var lockedEndBlock = _lockedEnd.Block;
var lockedEndIndex = _lockedEnd.Index;

writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
_writeReq.Write(Self._socket, _lockedStart, _lockedEnd, _bufferCount, (_writeReq, status, error, state) =>
{
_writeReq.Dispose();
var _this = (WriteContext)state;
_this.ScheduleReturnFullyWrittenBlocks();
_this.WriteStatus = status;
_this.WriteError = error;
_this.DoShutdownIfNeeded();
}, this);

Self._head = _lockedEnd.Block;
Self._head.Start = _lockedEnd.Index;
Self._head = lockedEndBlock;
Self._head.Start = lockedEndIndex;
}

/// <summary>
Expand Down Expand Up @@ -471,21 +517,28 @@ public void DoShutdownIfNeeded()
/// </summary>
public void DoDisconnectIfNeeded()
{
if (SocketDisconnect == false || Self._socket.IsClosed)
if (SocketDisconnect == false)
{
Complete();
return;
}
else if (Self._socket.IsClosed)
{
Self.Dispose();
Complete();
return;
}

Self._socket.Dispose();
Self.ReturnAllBlocks();
Self.Dispose();
Self._log.ConnectionStop(Self._connectionId);
Complete();
}

public void Complete()
{
Self.OnWriteCompleted(_byteCount, WriteStatus, WriteError);
Self.OnWriteCompleted(this);
}

private void ScheduleReturnFullyWrittenBlocks()
Expand Down Expand Up @@ -537,23 +590,44 @@ private void LockWrite()

if (_lockedStart.Block == _lockedEnd.Block)
{
_byteCount = _lockedEnd.Index - _lockedStart.Index;
ByteCount = _lockedEnd.Index - _lockedStart.Index;
_bufferCount = 1;
return;
}

_byteCount = _lockedStart.Block.Data.Offset + _lockedStart.Block.Data.Count - _lockedStart.Index;
ByteCount = _lockedStart.Block.Data.Offset + _lockedStart.Block.Data.Count - _lockedStart.Index;
_bufferCount = 1;

for (var block = _lockedStart.Block.Next; block != _lockedEnd.Block; block = block.Next)
{
_byteCount += block.Data.Count;
ByteCount += block.Data.Count;
_bufferCount++;
}

_byteCount += _lockedEnd.Index - _lockedEnd.Block.Data.Offset;
ByteCount += _lockedEnd.Index - _lockedEnd.Block.Data.Offset;
_bufferCount++;
}

public void Reset()
{
_lockedStart = default(MemoryPoolIterator2);
_lockedEnd = default(MemoryPoolIterator2);
_bufferCount = 0;
ByteCount = 0;

SocketShutdownSend = false;
SocketDisconnect = false;

WriteStatus = 0;
WriteError = null;

ShutdownSendStatus = 0;
}

public void Dispose()
{
_writeReq.Dispose();
}
}
}
}
Loading