Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/synapse/Azure.Analytics.Synapse.Spark/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## 1.0.0-preview.8 (Unreleased)

### Features Added
- Enhance Long Running Operation (LRO) logic for `SparkBatchClient` to support both scenarios of job submission and job execution.

### Breaking Changes

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

using System;

namespace Azure.Analytics.Synapse.Spark.Models
{
/// <summary> The SparkBatchJobOptions. </summary>
public partial class SparkBatchJobOptions
{
/// <summary> Initializes a new instance of SparkBatchJobOptions. </summary>
/// <param name="name"></param>
/// <param name="file"></param>
/// <param name="creationCompletionType"></param>
/// <exception cref="ArgumentNullException"> <paramref name="name"/> or <paramref name="file"/> is null. </exception>
public SparkBatchJobOptions(string name, string file, SparkBatchOperationCompletionType creationCompletionType = SparkBatchOperationCompletionType.JobSubmission) : this(name, file)
{
CreationCompletionType = creationCompletionType;
}

/// <summary>
/// Describes the different ways of Spark batch job operation could complete.
/// If <see cref="SparkBatchOperationCompletionType.JobSubmission"/> is used, the operation will be considered as complete when Livy state is starting/running/error/dead/success/killed.
/// If <see cref="SparkBatchOperationCompletionType.JobExecution"/> is used, the operation will be considered as complete when Livy state is error/dead/success/killed.
/// </summary>
public SparkBatchOperationCompletionType CreationCompletionType { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

namespace Azure.Analytics.Synapse.Spark.Models
{
/// <summary>
/// Describes the different ways of Spark batch job operation could complete.
/// If <see cref="JobSubmission"/> is used, the operation will be considered as complete when Livy state is starting/running/error/dead/success/killed.
/// If <see cref="JobExecution"/> is used, the operation will be considered as complete when Livy state is error/dead/success/killed.
/// </summary>
public enum SparkBatchOperationCompletionType
{
JobSubmission,
JobExecution
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private async Task<SparkBatchOperation> StartCreateSparkBatchJobInternal (bool a
{
batchSession = RestClient.CreateSparkBatchJob(sparkBatchJobOptions, detailed, cancellationToken);
}
return new SparkBatchOperation(this, _clientDiagnostics, batchSession);
return new SparkBatchOperation(this, _clientDiagnostics, batchSession, sparkBatchJobOptions.CreationCompletionType);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
// Licensed under the MIT License.

using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -21,26 +20,56 @@ public class SparkBatchOperation : Operation<SparkBatchJob>
{
private static readonly TimeSpan s_defaultPollingInterval = TimeSpan.FromSeconds(5);

/// <summary>
/// Provides tools for exception creation in case of failure.
/// </summary>
private readonly ClientDiagnostics _diagnostics;

/// <summary>
/// Get the completion type of Spark batch job operation.
/// </summary>
private readonly SparkBatchOperationCompletionType _completionType;

/// <summary>
/// The client used to check for completion.
/// </summary>
private readonly SparkBatchClient _client;
private readonly SparkBatchJob _value;
private Response<SparkBatchJob> _response;
private bool _completed;
private RequestFailedException _requestFailedException;

internal SparkBatchOperation(SparkBatchClient client, ClientDiagnostics diagnostics, Response<SparkBatchJob> response)
{
_client = client;
_value = response.Value ?? throw new InvalidOperationException("The response does not contain a value.");
_response = response;
_diagnostics = diagnostics;
}
/// <summary>
/// Whether the operation has completed.
/// </summary>
private bool _hasCompleted;

/// <summary> Initializes a new instance of <see cref="SparkBatchOperation" /> for mocking. </summary>
protected SparkBatchOperation() {}
/// <summary>
/// Gets the created Spark batch job.
/// </summary>
private SparkBatchJob _value;

/// <summary>
/// Raw HTTP response.
/// </summary>
private Response _rawResponse;

/// <summary>
/// <c>true</c> if the long-running operation has a value. Otherwise, <c>false</c>.
/// </summary>
private bool _hasValue;

/// <summary>
/// Gets the Id of the created Spark batch job.
/// </summary>
private int _batchId;

/// <summary>
/// Gets a value indicating whether the operation has completed.
/// </summary>
public override bool HasCompleted => _hasCompleted;

/// <inheritdoc/>
public override bool HasValue => _hasValue;

/// <inheritdoc/>
public override string Id => _value.Id.ToString(CultureInfo.InvariantCulture);
public override string Id => _batchId.ToString(CultureInfo.InvariantCulture);

/// <summary>
/// Gets the <see cref="SparkBatchJob"/>.
Expand All @@ -49,34 +78,38 @@ protected SparkBatchOperation() {}
/// <remarks>
/// Azure Synapse will return a <see cref="SparkBatchJob"/> immediately but may take time to the session to be ready.
/// </remarks>
public override SparkBatchJob Value
public override SparkBatchJob Value => OperationHelpers.GetValue(ref _value);

/// <summary>
/// Get the completion type of Spark batch job operation.
/// </summary>
public SparkBatchOperationCompletionType CompletionType => _completionType;

/// <summary>
/// Initializes a new instance of the <see cref="SparkBatchOperation"/> class.
/// </summary>
/// <param name="batchId">The ID of the Spark batch job.</param>
/// <param name="client">The client used to check for completion.</param>
/// <param name="completionType">The operation completion type.</param>
public SparkBatchOperation(int batchId, SparkBatchClient client, SparkBatchOperationCompletionType completionType = SparkBatchOperationCompletionType.JobSubmission)
{
get
{
#pragma warning disable CA1065 // Do not raise exceptions in unexpected locations
if (!HasCompleted)
{
throw new InvalidOperationException("The operation is not complete.");
}
if (_requestFailedException != null)
{
throw _requestFailedException;
}
#pragma warning restore CA1065 // Do not raise exceptions in unexpected locations
return _value;
}
_batchId = batchId;
_client = client;
_completionType = completionType;
}

/// <inheritdoc/>
public override bool HasCompleted => _completed;

/// <inheritdoc/>
public override bool HasValue => !_responseHasError && HasCompleted;
internal SparkBatchOperation(SparkBatchClient client, ClientDiagnostics diagnostics, Response<SparkBatchJob> response, SparkBatchOperationCompletionType completionType)
: this(response.Value.Id, client, completionType)
{
_diagnostics = diagnostics;
_rawResponse = response.GetRawResponse();
}

private bool _responseHasError => StringComparer.OrdinalIgnoreCase.Equals ("error", _response?.Value?.State);
/// <summary> Initializes a new instance of <see cref="SparkBatchOperation" /> for mocking. </summary>
protected SparkBatchOperation() {}

/// <inheritdoc/>
public override Response GetRawResponse() => _response.GetRawResponse();
public override Response GetRawResponse() => _rawResponse;

/// <inheritdoc/>
public override Response UpdateStatus(CancellationToken cancellationToken = default) => UpdateStatusAsync(false, cancellationToken).EnsureCompleted();
Expand All @@ -94,67 +127,53 @@ public override ValueTask<Response<SparkBatchJob>> WaitForCompletionAsync(TimeSp

private async ValueTask<Response> UpdateStatusAsync(bool async, CancellationToken cancellationToken)
{
if (!_completed)
if (!_hasCompleted)
{
using DiagnosticScope scope = _diagnostics.CreateScope($"{nameof(SparkSessionOperation)}.{nameof(UpdateStatus)}");
scope.Start();
using DiagnosticScope? scope = _diagnostics?.CreateScope($"{nameof(SparkSessionOperation)}.{nameof(UpdateStatus)}");
scope?.Start();

try
{
if (async)
// Get the latest status
Response<SparkBatchJob> update = async
? await _client.GetSparkBatchJobAsync(_batchId, true, cancellationToken).ConfigureAwait(false)
: _client.GetSparkBatchJob(_batchId, true, cancellationToken);

// Check if the operation is no longer running
_hasCompleted = IsJobComplete(update.Value.Result ?? SparkBatchJobResultType.Uncertain, update.Value.State.Value, _completionType);
if (_hasCompleted)
{
_response = await _client.RestClient.GetSparkBatchJobAsync(_value.Id, true, cancellationToken).ConfigureAwait(false);
_hasValue = true;
_value = update.Value;
}
else
{
_response = _client.RestClient.GetSparkBatchJob(_value.Id, true, cancellationToken);
}
_completed = IsJobComplete(_response.Value.Result.ToString(), _response.Value.State);
}
catch (RequestFailedException e)
{
_requestFailedException = e;
scope.Failed(e);
throw;

// Update raw response
_rawResponse = update.GetRawResponse();
}
catch (Exception e)
{
_requestFailedException = new RequestFailedException("Unexpected failure", e);
scope.Failed(e);
throw _requestFailedException;
}
if (_responseHasError)
{
_requestFailedException = new RequestFailedException("SparkBatchOperation ended in state: 'error'");
scope.Failed(_requestFailedException);
throw _requestFailedException;
scope?.Failed(e);
throw;
}
}

return GetRawResponse();
}

private static bool IsJobComplete(string jobState, string livyState)
private static bool IsJobComplete(SparkBatchJobResultType jobState, LivyStates livyState, SparkBatchOperationCompletionType creationCompletionType)
{
switch (jobState)
{
case "succeeded":
case "failed":
case "cancelled":
return true;
}

switch (livyState)
if (jobState == SparkBatchJobResultType.Succeeded || jobState == SparkBatchJobResultType.Failed || jobState == SparkBatchJobResultType.Cancelled)
{
case "starting":
case "error":
case "dead":
case "success":
case "killed":
return true;
return true;
}

return false;
return creationCompletionType == SparkBatchOperationCompletionType.JobSubmission
&& (livyState == LivyStates.Starting
|| livyState == LivyStates.Running
|| livyState == LivyStates.Error
|| livyState == LivyStates.Dead
|| livyState == LivyStates.Success
|| livyState == LivyStates.Killed);
}
}
}
Loading