Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple Connection Simple Execute #421

Merged
merged 7 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
//
using System;
using System.Collections.Concurrent;
using System.Data.Common;
using System.Data.SqlClient;
using System.IO;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.Connection;
using Microsoft.SqlTools.ServiceLayer.Connection.Contracts;
using Microsoft.SqlTools.Hosting.Protocol;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts.ExecuteRequests;
Expand Down Expand Up @@ -173,82 +176,105 @@ internal Task HandleExecuteRequest(ExecuteRequestParamsBase executeParams,
/// <summary>
/// Handles a request to execute a string and return the result
/// </summary>
internal Task HandleSimpleExecuteRequest(SimpleExecuteParams executeParams,
internal async Task HandleSimpleExecuteRequest(SimpleExecuteParams executeParams,
RequestContext<SimpleExecuteResult> requestContext)
{
ExecuteStringParams executeStringParams = new ExecuteStringParams
{
Query = executeParams.QueryString,
// generate guid as the owner uri to make sure every query is unique
OwnerUri = Guid.NewGuid().ToString()
};

// get connection
ConnectionInfo connInfo;
if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connInfo))
{
return requestContext.SendError(SR.QueryServiceQueryInvalidOwnerUri);
}

if (connInfo.ConnectionDetails.MultipleActiveResultSets == null || connInfo.ConnectionDetails.MultipleActiveResultSets == false) {
// if multipleActive result sets is not allowed, don't specific a connection and make the ownerURI the true owneruri
connInfo = null;
executeStringParams.OwnerUri = executeParams.OwnerUri;
}

Func<string, Task> queryCreateFailureAction = message => requestContext.SendError(message);

ResultOnlyContext<SimpleExecuteResult> newContext = new ResultOnlyContext<SimpleExecuteResult>(requestContext);

// handle sending event back when the query completes
Query.QueryAsyncEventHandler queryComplete = async q =>
try
{
Query removedQuery;
// check to make sure any results were recieved
if (q.Batches.Length == 0 || q.Batches[0].ResultSets.Count == 0)
string randomUri = Guid.NewGuid().ToString();
ExecuteStringParams executeStringParams = new ExecuteStringParams
{
await requestContext.SendError(SR.QueryServiceResultSetHasNoResults);
ActiveQueries.TryRemove(executeStringParams.OwnerUri, out removedQuery);
return;
}
Query = executeParams.QueryString,
// generate guid as the owner uri to make sure every query is unique
OwnerUri = randomUri
};

var rowCount = q.Batches[0].ResultSets[0].RowCount;
// check to make sure there is a safe amount of rows to load into memory
if (rowCount > Int32.MaxValue)
// get connection
ConnectionInfo connInfo;
if (!ConnectionService.TryFindConnection(executeParams.OwnerUri, out connInfo))
{
await requestContext.SendError(SR.QueryServiceResultSetTooLarge);
ActiveQueries.TryRemove(executeStringParams.OwnerUri, out removedQuery);
await requestContext.SendError(SR.QueryServiceQueryInvalidOwnerUri);
return;
}

SubsetParams subsetRequestParams = new SubsetParams
ConnectParams connectParams = new ConnectParams
{
OwnerUri = executeStringParams.OwnerUri,
BatchIndex = 0,
ResultSetIndex = 0,
RowsStartIndex = 0,
RowsCount = Convert.ToInt32(rowCount)
OwnerUri = randomUri,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By connecting with a random URI, doesn't this incur a lot of overhead by kicking off intellisense discovery and whatnot?

Connection = connInfo.ConnectionDetails,
Type = ConnectionType.Default
};
// get the data to send back
ResultSetSubset subset = await InterServiceResultSubset(subsetRequestParams);
SimpleExecuteResult result = new SimpleExecuteResult

await ConnectionService.Connect(connectParams);

ConnectionInfo newConn;
ConnectionService.TryFindConnection(randomUri, out newConn);

Func<string, Task> queryCreateFailureAction = message => requestContext.SendError(message);

ResultOnlyContext<SimpleExecuteResult> newContext = new ResultOnlyContext<SimpleExecuteResult>(requestContext);

// handle sending event back when the query completes
Query.QueryAsyncEventHandler queryComplete = async q =>
{
RowCount = q.Batches[0].ResultSets[0].RowCount,
ColumnInfo = q.Batches[0].ResultSets[0].Columns,
Rows = subset.Rows
try
{
// check to make sure any results were recieved
if (q.Batches.Length == 0 || q.Batches[0].ResultSets.Count == 0)
{
await requestContext.SendError(SR.QueryServiceResultSetHasNoResults);
return;
}

var rowCount = q.Batches[0].ResultSets[0].RowCount;
// check to make sure there is a safe amount of rows to load into memory
if (rowCount > Int32.MaxValue)
{
await requestContext.SendError(SR.QueryServiceResultSetTooLarge);
return;
}

SubsetParams subsetRequestParams = new SubsetParams
{
OwnerUri = randomUri,
BatchIndex = 0,
ResultSetIndex = 0,
RowsStartIndex = 0,
RowsCount = Convert.ToInt32(rowCount)
};
// get the data to send back
ResultSetSubset subset = await InterServiceResultSubset(subsetRequestParams);
SimpleExecuteResult result = new SimpleExecuteResult
{
RowCount = q.Batches[0].ResultSets[0].RowCount,
ColumnInfo = q.Batches[0].ResultSets[0].Columns,
Rows = subset.Rows
};
await requestContext.SendResult(result);
}
finally
{
Query removedQuery;
// remove the active query since we are done with it
ActiveQueries.TryRemove(randomUri, out removedQuery);
ConnectionService.Disconnect(new DisconnectParams(){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you'll want to disconnect in all scenarios, in other words, I'd move this to the outside try/catch/finally block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question @kburtram would know more.

OwnerUri = randomUri,
Type = null
});
}
};
await requestContext.SendResult(result);
// remove the active query since we are done with it
ActiveQueries.TryRemove(executeStringParams.OwnerUri, out removedQuery);
};

// handle sending error back when query fails
Query.QueryAsyncErrorEventHandler queryFail = async (q, e) =>
{
await requestContext.SendError(e);
};
// handle sending error back when query fails
Query.QueryAsyncErrorEventHandler queryFail = async (q, e) =>
{
await requestContext.SendError(e);
};

return InterServiceExecuteQuery(executeStringParams, connInfo, newContext, null, queryCreateFailureAction, queryComplete, queryFail);
await InterServiceExecuteQuery(executeStringParams, newConn, newContext, null, queryCreateFailureAction, queryComplete, queryFail);
}
catch(Exception ex)
{
await requestContext.SendError(ex.ToString());
}
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.SqlTools.ServiceLayer.QueryExecution;
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts.ExecuteRequests;
Expand Down Expand Up @@ -431,15 +432,14 @@ public async Task SimpleExecuteErrorWithNoResultsTest()
.Complete();
await queryService.HandleSimpleExecuteRequest(queryParams, efv.Object);

Query q;
queryService.ActiveQueries.TryGetValue(Constants.OwnerUri, out q);

// wait on the task to finish
Query q = queryService.ActiveQueries.Values.First();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have a test that actually verifies we can run 2 queries against the same URI using this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolved

Assert.NotNull(q);
q.ExecutionTask.Wait();

efv.Validate();

Assert.Equal(0, queryService.ActiveQueries.Count);

}

[Fact]
Expand All @@ -452,8 +452,9 @@ public async Task SimpleExecuteVerifyResultsTest()
.Complete();
await queryService.HandleSimpleExecuteRequest(queryParams, efv.Object);

Query q;
queryService.ActiveQueries.TryGetValue(Constants.OwnerUri, out q);
Query q = queryService.ActiveQueries.Values.First();

Assert.NotNull(q);

// wait on the task to finish
q.ExecutionTask.Wait();
Expand All @@ -463,6 +464,38 @@ public async Task SimpleExecuteVerifyResultsTest()
Assert.Equal(0, queryService.ActiveQueries.Count);
}

[Fact]
public async Task SimpleExecuteMultipleQueriesTest()
{
var queryService = Common.GetPrimedExecutionService(Common.StandardTestDataSet, true, false, null);
var queryParams = new SimpleExecuteParams { OwnerUri = Constants.OwnerUri, QueryString = Constants.StandardQuery };
var efv1 = new EventFlowValidator<SimpleExecuteResult>()
.AddSimpleExecuteQueryResultValidator(Common.StandardTestDataSet)
.Complete();
var efv2 = new EventFlowValidator<SimpleExecuteResult>()
.AddSimpleExecuteQueryResultValidator(Common.StandardTestDataSet)
.Complete();
Task qT1 = queryService.HandleSimpleExecuteRequest(queryParams, efv1.Object);
Task qT2 = queryService.HandleSimpleExecuteRequest(queryParams, efv2.Object);

await Task.WhenAll(qT1, qT2);

var queries = queryService.ActiveQueries.Values.Take(2).ToArray();
Query q1 = queries[0];
Query q2 = queries[1];

Assert.NotNull(q1);
Assert.NotNull(q2);

// wait on the task to finish
await Task.WhenAll(q1.ExecutionTask, q2.ExecutionTask);

efv1.Validate();
efv2.Validate();

Assert.Equal(0, queryService.ActiveQueries.Count);
}

#endregion

private static WorkspaceService<SqlToolsSettings> GetDefaultWorkspaceService(string query)
Expand Down