Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/synapse/Azure.Analytics.Synapse.Spark/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.0.0-preview.8 (Unreleased)

### Features Added
- Enhance Long Running Operation (LRO) logic for `SparkBatchClient` to support both scenarios of job submission and job execution.

### Breaking Changes

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;

namespace Azure.Analytics.Synapse.Spark.Models
{
/// <summary> The SparkBatchJobOptions. </summary>
public partial class SparkBatchJobOptions
{
/// <summary> Initializes a new instance of SparkBatchJobOptions. </summary>
/// <param name="name"></param>
/// <param name="file"></param>
/// <param name="creationCompletionType"></param>
/// <exception cref="ArgumentNullException"> <paramref name="name"/> or <paramref name="file"/> is null. </exception>
public SparkBatchJobOptions(string name, string file, SparkBatchOperationCompletionType creationCompletionType = SparkBatchOperationCompletionType.JobSubmission) : this(name, file)
{
CreationCompletionType = creationCompletionType;
}

/// <summary>
/// Describes the different ways of Spark batch job operation could complete.
/// If <see cref="SparkBatchOperationCompletionType.JobSubmission"/> is used, the operation will be considered as complete when Livy state is starting/running/error/dead/success/killed.
/// If <see cref="SparkBatchOperationCompletionType.JobExecution"/> is used, the operation will be considered as complete when Livy state is error/dead/success/killed.
/// </summary>
public SparkBatchOperationCompletionType CreationCompletionType { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Analytics.Synapse.Spark.Models
{
/// <summary>
/// Describes the different ways of Spark batch job operation could complete.
/// If <see cref="JobSubmission"/> is used, the operation will be considered as complete when Livy state is starting/running/error/dead/success/killed.
/// If <see cref="JobExecution"/> is used, the operation will be considered as complete when Livy state is error/dead/success/killed.
/// </summary>
public enum SparkBatchOperationCompletionType
{
JobSubmission,
JobExecution
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private async Task<SparkBatchOperation> StartCreateSparkBatchJobInternal (bool a
{
batchSession = RestClient.CreateSparkBatchJob(sparkBatchJobOptions, detailed, cancellationToken);
}
return new SparkBatchOperation(this, _clientDiagnostics, batchSession);
return new SparkBatchOperation(this, _clientDiagnostics, sparkBatchJobOptions.CreationCompletionType, batchSession);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -23,24 +22,24 @@ public class SparkBatchOperation : Operation<SparkBatchJob>

private readonly ClientDiagnostics _diagnostics;
private readonly SparkBatchClient _client;
private readonly SparkBatchJob _value;
private Response<SparkBatchJob> _response;
private bool _completed;
private RequestFailedException _requestFailedException;
private readonly SparkBatchOperationCompletionType _completionType;

internal SparkBatchOperation(SparkBatchClient client, ClientDiagnostics diagnostics, Response<SparkBatchJob> response)
internal SparkBatchOperation(SparkBatchClient client, ClientDiagnostics diagnostics, SparkBatchOperationCompletionType completionType, Response<SparkBatchJob> response)
{
_client = client;
_value = response.Value ?? throw new InvalidOperationException("The response does not contain a value.");
_response = response;
_diagnostics = diagnostics;
_completionType = completionType;
}

/// <summary> Initializes a new instance of <see cref="SparkBatchOperation" /> for mocking. </summary>
protected SparkBatchOperation() {}

/// <inheritdoc/>
public override string Id => _value.Id.ToString(CultureInfo.InvariantCulture);
public override string Id => _response.Value.Id.ToString(CultureInfo.InvariantCulture);

/// <summary>
/// Gets the <see cref="SparkBatchJob"/>.
Expand All @@ -63,7 +62,7 @@ public override SparkBatchJob Value
throw _requestFailedException;
}
#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
return _value;
return _response.Value;
}
}

Expand Down Expand Up @@ -103,13 +102,13 @@ private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToke
{
if (async)
{
_response = await _client.RestClient.GetSparkBatchJobAsync(_value.Id, true, cancellationToken).ConfigureAwait(false);
_response = await _client.RestClient.GetSparkBatchJobAsync(_response.Value.Id, true, cancellationToken).ConfigureAwait(false);
}
else
{
_response = _client.RestClient.GetSparkBatchJob(_value.Id, true, cancellationToken);
_response = _client.RestClient.GetSparkBatchJob(_response.Value.Id, true, cancellationToken);
}
_completed = IsJobComplete(_response.Value.Result.ToString(), _response.Value.State);
_completed = IsJobComplete(_response.Value.Result ?? SparkBatchJobResultType.Uncertain, _response.Value.State.Value, _completionType);
}
catch (RequestFailedException e)
{
Expand All @@ -134,27 +133,20 @@ private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToke
return GetRawResponse();
}

private static bool IsJobComplete(string jobState, string livyState)
private static bool IsJobComplete(SparkBatchJobResultType jobState, LivyStates livyState, SparkBatchOperationCompletionType creationCompletionType)
{
switch (jobState)
if (jobState == SparkBatchJobResultType.Succeeded || jobState == SparkBatchJobResultType.Failed || jobState == SparkBatchJobResultType.Cancelled)
{
case "succeeded":
case "failed":
case "cancelled":
return true;
return true;
}

switch (livyState)
{
case "starting":
case "error":
case "dead":
case "success":
case "killed":
return true;
}

return false;
return creationCompletionType == SparkBatchOperationCompletionType.JobSubmission
&& (livyState == LivyStates.Starting
|| livyState == LivyStates.Running
|| livyState == LivyStates.Error
|| livyState == LivyStates.Dead
|| livyState == LivyStates.Success
|| livyState == LivyStates.Killed);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,13 @@ public class SparkSessionOperation : Operation<SparkSession>
private readonly ClientDiagnostics _diagnostics;

private readonly SparkSessionClient _client;
private readonly SparkSession _value;
private Response<SparkSession> _response;
private bool _completed;
private RequestFailedException _requestFailedException;

internal SparkSessionOperation(SparkSessionClient client, ClientDiagnostics diagnostics, Response<SparkSession> response)
{
_client = client;
_value = response.Value ?? throw new InvalidOperationException("The response does not contain a value.");
_response = response;
_diagnostics = diagnostics;
}
Expand All @@ -41,7 +39,7 @@ internal SparkSessionOperation(SparkSessionClient client, ClientDiagnostics diag
protected SparkSessionOperation() {}

/// <inheritdoc/>
public override string Id => _value.Id.ToString(CultureInfo.InvariantCulture);
public override string Id => _response.Value.Id.ToString(CultureInfo.InvariantCulture);

/// <summary>
/// Gets the <see cref="SparkSession"/>.
Expand All @@ -64,7 +62,7 @@ public override SparkSession Value
throw _requestFailedException;
}
#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
return _value;
return _response.Value;
}
}

Expand Down Expand Up @@ -104,13 +102,13 @@ private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToke
{
if (async)
{
_response = await _client.RestClient.GetSparkSessionAsync(_value.Id, true, cancellationToken).ConfigureAwait(false);
_response = await _client.RestClient.GetSparkSessionAsync(_response.Value.Id, true, cancellationToken).ConfigureAwait(false);
}
else
{
_response = _client.RestClient.GetSparkSession(_value.Id, true, cancellationToken);
_response = _client.RestClient.GetSparkSession(_response.Value.Id, true, cancellationToken);
}
_completed = IsJobComplete(_response.Value.Result.ToString(), _response.Value.State);
_completed = IsJobComplete(_response.Value.State.Value);
}
catch (RequestFailedException e)
{
Expand All @@ -135,24 +133,11 @@ private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToke
return GetRawResponse();
}

private static bool IsJobComplete(string jobState, string livyState)
private static bool IsJobComplete(LivyStates livyState)
{
switch (jobState)
if (livyState == LivyStates.Error || livyState == LivyStates.Dead || livyState == LivyStates.Success || livyState == LivyStates.Killed || livyState == LivyStates.Idle)
{
case "succeeded":
case "failed":
case "cancelled":
return true;
}

switch (livyState)
{
case "error":
case "dead":
case "success":
case "killed":
case "idle":
return true;
return true;
}

return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@ public class SparkStatementOperation : Operation<SparkStatement>

private int _sessionId;
private readonly SparkSessionClient _client;
private readonly SparkStatement _value;
private Response<SparkStatement> _response;
private bool _completed;
private RequestFailedException _requestFailedException;

internal SparkStatementOperation(SparkSessionClient client, ClientDiagnostics diagnostics, Response<SparkStatement> response, int sessionId)
{
_client = client;
_value = response.Value ?? throw new InvalidOperationException("The response does not contain a value.");
_response = response;
_sessionId = sessionId;
_diagnostics = diagnostics;
Expand All @@ -43,7 +41,7 @@ internal SparkStatementOperation(SparkSessionClient client, ClientDiagnostics di
protected SparkStatementOperation() {}

/// <inheritdoc/>
public override string Id => _value.Id.ToString(CultureInfo.InvariantCulture);
public override string Id => _response.Value.Id.ToString(CultureInfo.InvariantCulture);

/// <summary>
/// Gets the <see cref="SparkSession"/>.
Expand All @@ -66,7 +64,7 @@ public override SparkStatement Value
throw _requestFailedException;
}
#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
return _value;
return _response.Value;
}
}

Expand Down Expand Up @@ -106,13 +104,13 @@ private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToke
{
if (async)
{
_response = await _client.RestClient.GetSparkStatementAsync(_sessionId, _value.Id, cancellationToken).ConfigureAwait(false);
_response = await _client.RestClient.GetSparkStatementAsync(_sessionId, _response.Value.Id, cancellationToken).ConfigureAwait(false);
}
else
{
_response = _client.RestClient.GetSparkStatement(_sessionId, _value.Id, cancellationToken);
_response = _client.RestClient.GetSparkStatement(_sessionId, _response.Value.Id, cancellationToken);
}
_completed = IsJobComplete(_response.Value.State);
_completed = IsJobComplete(_response.Value.State.Value);
}
catch (RequestFailedException e)
{
Expand All @@ -137,17 +135,9 @@ private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToke
return GetRawResponse();
}

private static bool IsJobComplete(string livyState)
private static bool IsJobComplete(LivyStatementStates livyState)
{
switch (livyState)
{
case "starting":
case "waiting":
case "running":
case "cancelling":
return false;
};
return true;
return livyState == LivyStatementStates.Available || livyState == LivyStatementStates.Error || livyState == LivyStatementStates.Cancelled;
}
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading