Skip to content

Commit a1d1679

Browse files
chamonsmaririoschristothes
authored
[Synapse] Add LRO for Spark operations (#17739)
- Add a LRO for spark operations that require special Livy poling - The logic in these LRO is ported from test helpers written by the client team (and removed here as well) - This is a rebase of #17677 - I had to shut up a few tests due to more nullability issues (fixing in Azure/azure-rest-api-specs#12258) Co-authored-by: Mariana Rios Flores <[email protected]> Co-authored-by: Christopher Scott <[email protected]>
1 parent ff77085 commit a1d1679

27 files changed

+13317
-4097
lines changed

sdk/synapse/Azure.Analytics.Synapse.Artifacts/tests/samples/Sample6_HelloWorldLinkedService.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ namespace Azure.Analytics.Synapse.Artifacts.Samples
1212
{
1313
public partial class Sample6_HelloWorldLinkedService : SampleFixture
1414
{
15+
[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/17455")]
1516
[Test]
1617
public async Task LinkedServiceSample()
1718
{

sdk/synapse/Azure.Analytics.Synapse.ManagedPrivateEndpoints/tests/samples/Sample1_HelloManangedPrivateEndpoint.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ namespace Azure.Analytics.Synapse.ManagedPrivateEndpoints.Samples
1313
{
1414
public partial class Sample1_HelloManangedPrivateEndpoint : SampleFixture
1515
{
16+
[Ignore("https://github.com/Azure/azure-sdk-for-net/issues/17455")]
1617
[Test]
1718
public void TestManagedPrivateEndpoint()
1819
{

sdk/synapse/Azure.Analytics.Synapse.Spark/README.md

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,19 @@ SparkBatchJobOptions request = new SparkBatchJobOptions(name, file)
8989
ExecutorCount = 2
9090
};
9191

92-
SparkBatchJob jobCreated = client.CreateSparkBatchJob(request);
92+
SparkBatchOperation createOperation = client.StartCreateSparkBatchJob(request);
93+
while (!createOperation.HasCompleted)
94+
{
95+
System.Threading.Thread.Sleep(2000);
96+
createOperation.UpdateStatus();
97+
}
98+
SparkBatchJob jobCreated = createOperation.Value;
9399
```
94100

95101
### Cancel spark batch job
96102
Cancel a Spark batch job with Spark batch id under specific workspace and Spark pool.
97103

98-
```C# Snippet:DeleteSparkBatchJob
104+
```C# Snippet:CancelSparkBatchJob
99105
Response operation = client.CancelSparkBatchJob(jobCreated.Id);
100106
```
101107

sdk/synapse/Azure.Analytics.Synapse.Spark/api/Azure.Analytics.Synapse.Spark.netstandard2.0.cs

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,25 @@ public SparkBatchClient(System.Uri endpoint, string sparkPoolName, Azure.Core.To
77
public SparkBatchClient(System.Uri endpoint, string sparkPoolName, Azure.Core.TokenCredential credential, Azure.Analytics.Synapse.Spark.SparkClientOptions options) { }
88
public virtual Azure.Response CancelSparkBatchJob(int batchId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
99
public virtual System.Threading.Tasks.Task<Azure.Response> CancelSparkBatchJobAsync(int batchId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
10-
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob> CreateSparkBatchJob(Azure.Analytics.Synapse.Spark.Models.SparkBatchJobOptions sparkBatchJobOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
11-
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob>> CreateSparkBatchJobAsync(Azure.Analytics.Synapse.Spark.Models.SparkBatchJobOptions sparkBatchJobOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
1210
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob> GetSparkBatchJob(int batchId, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
1311
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob>> GetSparkBatchJobAsync(int batchId, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
1412
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJobCollection> GetSparkBatchJobs(int? from = default(int?), int? size = default(int?), bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
1513
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJobCollection>> GetSparkBatchJobsAsync(int? from = default(int?), int? size = default(int?), bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
14+
public virtual Azure.Analytics.Synapse.Spark.SparkBatchOperation StartCreateSparkBatchJob(Azure.Analytics.Synapse.Spark.Models.SparkBatchJobOptions sparkBatchJobOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
15+
public virtual System.Threading.Tasks.Task<Azure.Analytics.Synapse.Spark.SparkBatchOperation> StartCreateSparkBatchJobAsync(Azure.Analytics.Synapse.Spark.Models.SparkBatchJobOptions sparkBatchJobOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
16+
}
17+
public partial class SparkBatchOperation : Azure.Operation<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob>
18+
{
19+
internal SparkBatchOperation() { }
20+
public override bool HasCompleted { get { throw null; } }
21+
public override bool HasValue { get { throw null; } }
22+
public override string Id { get { throw null; } }
23+
public override Azure.Analytics.Synapse.Spark.Models.SparkBatchJob Value { get { throw null; } }
24+
public override Azure.Response GetRawResponse() { throw null; }
25+
public override Azure.Response UpdateStatus(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
26+
public override System.Threading.Tasks.ValueTask<Azure.Response> UpdateStatusAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
27+
public override System.Threading.Tasks.ValueTask<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob>> WaitForCompletionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
28+
public override System.Threading.Tasks.ValueTask<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkBatchJob>> WaitForCompletionAsync(System.TimeSpan pollingInterval, System.Threading.CancellationToken cancellationToken) { throw null; }
1629
}
1730
public partial class SparkClientOptions : Azure.Core.ClientOptions
1831
{
@@ -31,10 +44,6 @@ public SparkSessionClient(System.Uri endpoint, string sparkPoolName, Azure.Core.
3144
public virtual System.Threading.Tasks.Task<Azure.Response> CancelSparkSessionAsync(int sessionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3245
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatementCancellationResult> CancelSparkStatement(int sessionId, int statementId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3346
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatementCancellationResult>> CancelSparkStatementAsync(int sessionId, int statementId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
34-
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSession> CreateSparkSession(Azure.Analytics.Synapse.Spark.Models.SparkSessionOptions sparkSessionOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
35-
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSession>> CreateSparkSessionAsync(Azure.Analytics.Synapse.Spark.Models.SparkSessionOptions sparkSessionOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
36-
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatement> CreateSparkStatement(int sessionId, Azure.Analytics.Synapse.Spark.Models.SparkStatementOptions sparkStatementOptions, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
37-
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatement>> CreateSparkStatementAsync(int sessionId, Azure.Analytics.Synapse.Spark.Models.SparkStatementOptions sparkStatementOptions, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3847
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSession> GetSparkSession(int sessionId, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
3948
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSession>> GetSparkSessionAsync(int sessionId, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
4049
public virtual Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSessionCollection> GetSparkSessions(int? from = default(int?), int? size = default(int?), bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
@@ -45,6 +54,34 @@ public SparkSessionClient(System.Uri endpoint, string sparkPoolName, Azure.Core.
4554
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatementCollection>> GetSparkStatementsAsync(int sessionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
4655
public virtual Azure.Response ResetSparkSessionTimeout(int sessionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
4756
public virtual System.Threading.Tasks.Task<Azure.Response> ResetSparkSessionTimeoutAsync(int sessionId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
57+
public virtual Azure.Analytics.Synapse.Spark.SparkSessionOperation StartCreateSparkSession(Azure.Analytics.Synapse.Spark.Models.SparkSessionOptions sparkSessionOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
58+
public virtual System.Threading.Tasks.Task<Azure.Analytics.Synapse.Spark.SparkSessionOperation> StartCreateSparkSessionAsync(Azure.Analytics.Synapse.Spark.Models.SparkSessionOptions sparkSessionOptions, bool? detailed = default(bool?), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
59+
}
60+
public partial class SparkSessionOperation : Azure.Operation<Azure.Analytics.Synapse.Spark.Models.SparkSession>
61+
{
62+
internal SparkSessionOperation() { }
63+
public override bool HasCompleted { get { throw null; } }
64+
public override bool HasValue { get { throw null; } }
65+
public override string Id { get { throw null; } }
66+
public override Azure.Analytics.Synapse.Spark.Models.SparkSession Value { get { throw null; } }
67+
public override Azure.Response GetRawResponse() { throw null; }
68+
public override Azure.Response UpdateStatus(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
69+
public override System.Threading.Tasks.ValueTask<Azure.Response> UpdateStatusAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
70+
public override System.Threading.Tasks.ValueTask<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSession>> WaitForCompletionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
71+
public override System.Threading.Tasks.ValueTask<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkSession>> WaitForCompletionAsync(System.TimeSpan pollingInterval, System.Threading.CancellationToken cancellationToken) { throw null; }
72+
}
73+
public partial class SparkStatementOperation : Azure.Operation<Azure.Analytics.Synapse.Spark.Models.SparkStatement>
74+
{
75+
internal SparkStatementOperation() { }
76+
public override bool HasCompleted { get { throw null; } }
77+
public override bool HasValue { get { throw null; } }
78+
public override string Id { get { throw null; } }
79+
public override Azure.Analytics.Synapse.Spark.Models.SparkStatement Value { get { throw null; } }
80+
public override Azure.Response GetRawResponse() { throw null; }
81+
public override Azure.Response UpdateStatus(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
82+
public override System.Threading.Tasks.ValueTask<Azure.Response> UpdateStatusAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
83+
public override System.Threading.Tasks.ValueTask<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatement>> WaitForCompletionAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
84+
public override System.Threading.Tasks.ValueTask<Azure.Response<Azure.Analytics.Synapse.Spark.Models.SparkStatement>> WaitForCompletionAsync(System.TimeSpan pollingInterval, System.Threading.CancellationToken cancellationToken) { throw null; }
4885
}
4986
}
5087
namespace Azure.Analytics.Synapse.Spark.Models

sdk/synapse/Azure.Analytics.Synapse.Spark/samples/Sample1_SubmitSparkJob.md

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,21 @@ This sample demonstrates basic operations with two core classes in this library:
77
To interact with Spark jobs running on Azure Synapse, you need to instantiate a `SparkBatchClient`. It requires an endpoint URL and a `TokenCredential`.
88

99
```C# Snippet:CreateSparkBatchClient
10+
// Replace the strings below with the spark, endpoint, and file system information
11+
string sparkPoolName = "<my-spark-pool-name>";
12+
13+
string endpoint = "<my-endpoint-url>";
14+
15+
string storageAccount = "<my-storage-account-name>";
16+
17+
string fileSystem = "<my-storage-filesystem-name>";
18+
1019
SparkBatchClient client = new SparkBatchClient(new Uri(endpoint), sparkPoolName, new DefaultAzureCredential());
1120
```
1221

1322
## Submitting Spark jobs
1423

15-
To submit a Spark job, first create a `SparkBatchJob`, passing in an instance of `SparkBatchJobOptions` describing the job's parameters. Calling `CreateSparkBatchJob` with that job will submit it to Synapse.
24+
To submit a Spark job, first create a `SparkBatchJob`, passing in an instance of `SparkBatchJobOptions` describing the job's parameters. Calling `StartCreateSparkBatchJob` with that job will submit it to Synapse.
1625

1726
```C# Snippet:SubmitSparkBatchJob
1827
string name = $"batch-{Guid.NewGuid()}";
@@ -32,15 +41,21 @@ SparkBatchJobOptions request = new SparkBatchJobOptions(name, file)
3241
ExecutorCount = 2
3342
};
3443

35-
SparkBatchJob jobCreated = client.CreateSparkBatchJob(request);
44+
SparkBatchOperation createOperation = client.StartCreateSparkBatchJob(request);
45+
while (!createOperation.HasCompleted)
46+
{
47+
System.Threading.Thread.Sleep(2000);
48+
createOperation.UpdateStatus();
49+
}
50+
SparkBatchJob jobCreated = createOperation.Value;
3651
```
3752

3853
## Retrieve a Spark job
3954

40-
To retrieve the details of a Spark job call `GetSparkBatchJob`, passing in the Spark job ID.
55+
To retrieve the details of a Spark job call `StartGetSparkBatchJob`, passing in the Spark job ID.
4156

4257
```C# Snippet:GetSparkBatchJob
43-
SparkBatchJob retrievedJob = client.GetSparkBatchJob(jobCreated.Id);
58+
SparkBatchJob retrievedJob = client.GetSparkBatchJob (jobCreated.Id);
4459
Debug.WriteLine($"Job is returned with name {retrievedJob.Name} and state {retrievedJob.State}");
4560
```
4661

@@ -60,6 +75,6 @@ foreach (SparkBatchJob job in jobs.Value.Sessions)
6075

6176
To cancel a submitted Spark job call `CancelSparkBatchJob`, passing in the Spark job ID.
6277

63-
```C# Snippet:DeleteSparkBatchJob
78+
```C# Snippet:CancelSparkBatchJob
6479
Response operation = client.CancelSparkBatchJob(jobCreated.Id);
6580
```
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
# Create, Run and Cancel Synapse Spark jobs
2+
3+
This sample demonstrates basic asynchronous operations with two core classes in this library: `SparkBatchClient` and `SparkBatchJob`. `SparkBatchClient` is used to interact with Spark jobs running on Azure Synapse - each method call sends a request to the service's REST API. `SparkBatchJob` is an entity that represents a batched Spark job within Synapse. The sample walks through the basics of creating, running, and canceling job requests. To get started, you'll need a connection endpoint to Azure Synapse. See the [README](https://github.com/Azure/azure-sdk-for-net/blob/master/sdk/synapse/Azure.Analytics.Synapse.Spark/README.md) for links and instructions.
4+
5+
## Create Spark batch client
6+
7+
To interact with Spark jobs running on Azure Synapse, you need to instantiate a `SparkBatchClient`. It requires an endpoint URL and a `TokenCredential`.
8+
9+
```C# Snippet:CreateSparkBatchClientAsync
10+
// Replace the strings below with the spark, endpoint, and file system information
11+
string sparkPoolName = "<my-spark-pool-name>";
12+
13+
string endpoint = "<my-endpoint-url>";
14+
15+
string storageAccount = "<my-storage-account-name>";
16+
17+
string fileSystem = "<my-storage-filesystem-name>";
18+
19+
SparkBatchClient client = new SparkBatchClient(new Uri(endpoint), sparkPoolName, new DefaultAzureCredential());
20+
```
21+
22+
## Submitting Spark jobs
23+
24+
To submit a Spark job, first create a `SparkBatchJob`, passing in an instance of `SparkBatchJobOptions` describing the job's parameters. Calling `StartCreateSparkBatchJobAsync` with that job will submit it to Synapse.
25+
26+
```C# Snippet:SubmitSparkBatchJobAsync
27+
string name = $"batch-{Guid.NewGuid()}";
28+
string file = string.Format("abfss://{0}@{1}.dfs.core.windows.net/samples/net/wordcount/wordcount.zip", fileSystem, storageAccount);
29+
SparkBatchJobOptions request = new SparkBatchJobOptions(name, file)
30+
{
31+
ClassName = "WordCount",
32+
Arguments =
33+
{
34+
string.Format("abfss://{0}@{1}.dfs.core.windows.net/samples/net/wordcount/shakespeare.txt", fileSystem, storageAccount),
35+
string.Format("abfss://{0}@{1}.dfs.core.windows.net/samples/net/wordcount/result/", fileSystem, storageAccount),
36+
},
37+
DriverMemory = "28g",
38+
DriverCores = 4,
39+
ExecutorMemory = "28g",
40+
ExecutorCores = 4,
41+
ExecutorCount = 2
42+
};
43+
44+
SparkBatchOperation createOperation = await client.StartCreateSparkBatchJobAsync(request);
45+
SparkBatchJob jobCreated = await createOperation.WaitForCompletionAsync();
46+
```
47+
48+
## Retrieve a Spark job
49+
50+
To retrieve the details of a Spark job call `StartGetSparkBatchJobAsync`, passing in the Spark job ID.
51+
52+
```C# Snippet:GetSparkBatchJobAsync
53+
SparkBatchJob retrievedJob = await client.GetSparkBatchJobAsync (jobCreated.Id);
54+
Debug.WriteLine($"Job is returned with name {retrievedJob.Name} and state {retrievedJob.State}");
55+
```
56+
57+
## List Spark jobs
58+
59+
To enumerate all Spark jobs in the Synapse workspace call `GetSparkBatchJobs`.
60+
61+
```C# Snippet:ListSparkBatchJobsAsync
62+
Response<SparkBatchJobCollection> jobs = client.GetSparkBatchJobs();
63+
foreach (SparkBatchJob job in jobs.Value.Sessions)
64+
{
65+
Console.WriteLine(job.Name);
66+
}
67+
```
68+
69+
## Canceling a Spark job
70+
71+
To cancel a submitted Spark job call `CancelSparkBatchJob`, passing in the Spark job ID.
72+
73+
```C# Snippet:CancelSparkBatchJobAsync
74+
Response operation = client.CancelSparkBatchJob(jobCreated.Id);
75+
```

0 commit comments

Comments
 (0)