Skip to content
This repository has been archived by the owner on Apr 8, 2020. It is now read-only.

Commit

Permalink
Split out 'socket' hosting model into a separate optional NuGet packa…
Browse files Browse the repository at this point in the history
…ge, since most developers won't need it
  • Loading branch information
SteveSandersonMS committed Nov 30, 2016
1 parent ebf5a18 commit 832da2a
Show file tree
Hide file tree
Showing 24 changed files with 244 additions and 135 deletions.
1 change: 1 addition & 0 deletions pack-local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ versionSuffix=$1
dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
projects=(
./src/Microsoft.AspNetCore.NodeServices
./src/Microsoft.AspNetCore.NodeServices.Sockets
./src/Microsoft.AspNetCore.SpaServices
./src/Microsoft.AspNetCore.AngularServices
./src/Microsoft.AspNetCore.ReactServices
Expand Down
5 changes: 5 additions & 0 deletions samples/misc/LatencyTest/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.NodeServices;
using Microsoft.AspNetCore.NodeServices.Sockets;
using Microsoft.Extensions.DependencyInjection;

namespace ConsoleApplication
Expand All @@ -16,6 +17,10 @@ public static void Main(string[] args) {
// Set up the DI system
var services = new ServiceCollection();
services.AddNodeServices(options => {
// To compare with Socket hosting, uncomment the following line
// Since .NET Core 1.1, the HTTP hosting model has become basically as fast as the Socket hosting model
//options.UseSocketHosting();

options.ProjectPath = Directory.GetCurrentDirectory();
options.WatchFileExtensions = new string[] {}; // Don't watch anything
});
Expand Down
1 change: 1 addition & 0 deletions samples/misc/LatencyTest/project.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"type": "platform"
},
"Microsoft.AspNetCore.NodeServices": "1.1.0-*",
"Microsoft.AspNetCore.NodeServices.Sockets": "1.1.0-*",
"Microsoft.Extensions.DependencyInjection": "1.1.0"
},
"frameworks": {
Expand Down
3 changes: 3 additions & 0 deletions src/Microsoft.AspNetCore.NodeServices.Sockets/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/bin/
/node_modules/
yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,81 @@
/* 0 */
/***/ function(module, exports, __webpack_require__) {

module.exports = __webpack_require__(7);
module.exports = __webpack_require__(1);


/***/ },
/* 1 */
/***/ function(module, exports, __webpack_require__) {

"use strict";
// Limit dependencies to core Node modules. This means the code in this file has to be very low-level and unattractive,
// but simplifies things for the consumer of this module.
__webpack_require__(2);
var net = __webpack_require__(3);
var path = __webpack_require__(4);
var readline = __webpack_require__(5);
var ArgsUtil_1 = __webpack_require__(6);
var ExitWhenParentExits_1 = __webpack_require__(7);
var virtualConnectionServer = __webpack_require__(8);
// Webpack doesn't support dynamic requires for files not present at compile time, so grab a direct
// reference to Node's runtime 'require' function.
var dynamicRequire = eval('require');
// Signal to the .NET side when we're ready to accept invocations
var server = net.createServer().on('listening', function () {
console.log('[Microsoft.AspNetCore.NodeServices:Listening]');
});
// Each virtual connection represents a separate invocation
virtualConnectionServer.createInterface(server).on('connection', function (connection) {
readline.createInterface(connection, null).on('line', function (line) {
try {
// Get a reference to the function to invoke
var invocation = JSON.parse(line);
var invokedModule = dynamicRequire(path.resolve(process.cwd(), invocation.moduleName));
var invokedFunction = invocation.exportedFunctionName ? invokedModule[invocation.exportedFunctionName] : invokedModule;
// Prepare a callback for accepting non-streamed JSON responses
var hasInvokedCallback_1 = false;
var invocationCallback = function (errorValue, successValue) {
if (hasInvokedCallback_1) {
throw new Error('Cannot supply more than one result. The callback has already been invoked,'
+ ' or the result stream has already been accessed');
}
hasInvokedCallback_1 = true;
connection.end(JSON.stringify({
result: successValue,
errorMessage: errorValue && (errorValue.message || errorValue),
errorDetails: errorValue && (errorValue.stack || null)
}));
};
// Also support streamed binary responses
Object.defineProperty(invocationCallback, 'stream', {
enumerable: true,
get: function () {
hasInvokedCallback_1 = true;
return connection;
}
});
// Actually invoke it, passing through any supplied args
invokedFunction.apply(null, [invocationCallback].concat(invocation.args));
}
catch (ex) {
connection.end(JSON.stringify({
errorMessage: ex.message,
errorDetails: ex.stack
}));
}
});
});
// Begin listening now. The underlying transport varies according to the runtime platform.
// On Windows it's Named Pipes; on Linux/OSX it's Domain Sockets.
var useWindowsNamedPipes = /^win/.test(process.platform);
var parsedArgs = ArgsUtil_1.parseArgs(process.argv);
var listenAddress = (useWindowsNamedPipes ? '\\\\.\\pipe\\' : '/tmp/') + parsedArgs.listenAddress;
server.listen(listenAddress);
ExitWhenParentExits_1.exitWhenParentExits(parseInt(parsedArgs.parentPid));


/***/ },
/* 1 */,
/* 2 */
/***/ function(module, exports) {

Expand Down Expand Up @@ -90,14 +160,25 @@


/***/ },
/* 3 */,
/* 3 */
/***/ function(module, exports) {

module.exports = require("net");

/***/ },
/* 4 */
/***/ function(module, exports) {

module.exports = require("path");

/***/ },
/* 5 */
/***/ function(module, exports) {

module.exports = require("readline");

/***/ },
/* 6 */
/***/ function(module, exports) {

"use strict";
Expand All @@ -123,7 +204,7 @@


/***/ },
/* 6 */
/* 7 */
/***/ function(module, exports) {

/*
Expand Down Expand Up @@ -189,96 +270,13 @@
}


/***/ },
/* 7 */
/***/ function(module, exports, __webpack_require__) {

"use strict";
// Limit dependencies to core Node modules. This means the code in this file has to be very low-level and unattractive,
// but simplifies things for the consumer of this module.
__webpack_require__(2);
var net = __webpack_require__(8);
var path = __webpack_require__(4);
var readline = __webpack_require__(9);
var ArgsUtil_1 = __webpack_require__(5);
var ExitWhenParentExits_1 = __webpack_require__(6);
var virtualConnectionServer = __webpack_require__(10);
// Webpack doesn't support dynamic requires for files not present at compile time, so grab a direct
// reference to Node's runtime 'require' function.
var dynamicRequire = eval('require');
// Signal to the .NET side when we're ready to accept invocations
var server = net.createServer().on('listening', function () {
console.log('[Microsoft.AspNetCore.NodeServices:Listening]');
});
// Each virtual connection represents a separate invocation
virtualConnectionServer.createInterface(server).on('connection', function (connection) {
readline.createInterface(connection, null).on('line', function (line) {
try {
// Get a reference to the function to invoke
var invocation = JSON.parse(line);
var invokedModule = dynamicRequire(path.resolve(process.cwd(), invocation.moduleName));
var invokedFunction = invocation.exportedFunctionName ? invokedModule[invocation.exportedFunctionName] : invokedModule;
// Prepare a callback for accepting non-streamed JSON responses
var hasInvokedCallback_1 = false;
var invocationCallback = function (errorValue, successValue) {
if (hasInvokedCallback_1) {
throw new Error('Cannot supply more than one result. The callback has already been invoked,'
+ ' or the result stream has already been accessed');
}
hasInvokedCallback_1 = true;
connection.end(JSON.stringify({
result: successValue,
errorMessage: errorValue && (errorValue.message || errorValue),
errorDetails: errorValue && (errorValue.stack || null)
}));
};
// Also support streamed binary responses
Object.defineProperty(invocationCallback, 'stream', {
enumerable: true,
get: function () {
hasInvokedCallback_1 = true;
return connection;
}
});
// Actually invoke it, passing through any supplied args
invokedFunction.apply(null, [invocationCallback].concat(invocation.args));
}
catch (ex) {
connection.end(JSON.stringify({
errorMessage: ex.message,
errorDetails: ex.stack
}));
}
});
});
// Begin listening now. The underlying transport varies according to the runtime platform.
// On Windows it's Named Pipes; on Linux/OSX it's Domain Sockets.
var useWindowsNamedPipes = /^win/.test(process.platform);
var parsedArgs = ArgsUtil_1.parseArgs(process.argv);
var listenAddress = (useWindowsNamedPipes ? '\\\\.\\pipe\\' : '/tmp/') + parsedArgs.listenAddress;
server.listen(listenAddress);
ExitWhenParentExits_1.exitWhenParentExits(parseInt(parsedArgs.parentPid));


/***/ },
/* 8 */
/***/ function(module, exports) {

module.exports = require("net");

/***/ },
/* 9 */
/***/ function(module, exports) {

module.exports = require("readline");

/***/ },
/* 10 */
/***/ function(module, exports, __webpack_require__) {

"use strict";
var events_1 = __webpack_require__(11);
var VirtualConnection_1 = __webpack_require__(12);
var events_1 = __webpack_require__(9);
var VirtualConnection_1 = __webpack_require__(10);
// Keep this in sync with the equivalent constant in the .NET code. Both sides split up their transmissions into frames with this max length,
// and both will reject longer frames.
var MaxFrameBodyLength = 16 * 1024;
Expand Down Expand Up @@ -460,13 +458,13 @@


/***/ },
/* 11 */
/* 9 */
/***/ function(module, exports) {

module.exports = require("events");

/***/ },
/* 12 */
/* 10 */
/***/ function(module, exports, __webpack_require__) {

"use strict";
Expand All @@ -475,17 +473,18 @@
function __() { this.constructor = d; }
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
var stream_1 = __webpack_require__(13);
var stream_1 = __webpack_require__(11);
/**
* Represents a virtual connection. Multiple virtual connections may be multiplexed over a single physical socket connection.
*/
var VirtualConnection = (function (_super) {
__extends(VirtualConnection, _super);
function VirtualConnection(_beginWriteCallback) {
_super.call(this);
this._beginWriteCallback = _beginWriteCallback;
this._flowing = false;
this._receivedDataQueue = [];
var _this = _super.call(this) || this;
_this._beginWriteCallback = _beginWriteCallback;
_this._flowing = false;
_this._receivedDataQueue = [];
return _this;
}
VirtualConnection.prototype._read = function () {
this._flowing = true;
Expand Down Expand Up @@ -516,7 +515,7 @@


/***/ },
/* 13 */
/* 11 */
/***/ function(module, exports) {

module.exports = require("stream");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.IO.Pipes;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
namespace Microsoft.AspNetCore.NodeServices.Sockets.PhysicalConnections
{
internal class NamedPipeConnection : StreamConnection
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.IO;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
namespace Microsoft.AspNetCore.NodeServices.Sockets.PhysicalConnections
{
internal abstract class StreamConnection : IDisposable
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
using System.Net.Sockets;
using System.Threading.Tasks;

namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
namespace Microsoft.AspNetCore.NodeServices.Sockets.PhysicalConnections
{
internal class UnixDomainSocketConnection : StreamConnection
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Net.Sockets;
using System.Text;

namespace Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections
namespace Microsoft.AspNetCore.NodeServices.Sockets.PhysicalConnections
{
// From System.IO.Pipes/src/System/Net/Sockets/UnixDomainSocketEndPoint.cs (an internal class in System.IO.Pipes)
internal sealed class UnixDomainSocketEndPoint : EndPoint
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.NodeServices.HostingModels.PhysicalConnections;
using Microsoft.AspNetCore.NodeServices.HostingModels.VirtualConnections;
using Microsoft.AspNetCore.NodeServices.HostingModels;
using Microsoft.AspNetCore.NodeServices.Sockets.PhysicalConnections;
using Microsoft.AspNetCore.NodeServices.Sockets.VirtualConnections;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Newtonsoft.Json.Serialization;

namespace Microsoft.AspNetCore.NodeServices.HostingModels
namespace Microsoft.AspNetCore.NodeServices.Sockets
{
/// <summary>
/// A specialisation of the OutOfProcessNodeInstance base class that uses a lightweight binary streaming protocol
Expand Down Expand Up @@ -77,7 +76,7 @@ protected override async Task<T> InvokeExportAsync<T>(NodeInvocationInfo invocat
// wait for the same connection task. There's no reason why the first caller should have the
// special ability to cancel the connection process in a way that would affect subsequent
// callers. So, each caller just independently stops awaiting connection if that call is cancelled.
await EnsureVirtualConnectionClientCreated().OrThrowOnCancellation(cancellationToken);
await ThrowOnCancellation(EnsureVirtualConnectionClientCreated(), cancellationToken);
}

// For each invocation, we open a new virtual connection. This gives an API equivalent to opening a new
Expand Down Expand Up @@ -213,6 +212,17 @@ private static string MakeNewCommandLineOptions(string listenAddress)
return $"--listenAddress {listenAddress}";
}

private static Task ThrowOnCancellation(Task task, CancellationToken cancellationToken)
{
return task.IsCompleted
? task // If the task is already completed, no need to wrap it in a further layer of task
: task.ContinueWith(
_ => {}, // If the task completes, allow execution to continue
cancellationToken,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}

#pragma warning disable 649 // These properties are populated via JSON deserialization
private class RpcJsonResponse<TResult>
{
Expand Down
Loading

0 comments on commit 832da2a

Please sign in to comment.