diff --git a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/ModelLifecycleSamples.cs b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/ModelLifecycleSamples.cs
index 56b08dad8a4b..7f0cda30e668 100644
--- a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/ModelLifecycleSamples.cs
+++ b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/ModelLifecycleSamples.cs
@@ -30,7 +30,7 @@ public async Task RunSamplesAsync()
{
PrintHeader("MODEL LIFECYCLE SAMPLE");
- // For the purpose of this example We will create temporary models using random model Ids and then decommission a model.
+ // For the purpose of this example we will create temporary models using random model Ids and then decommission a model.
// We have to make sure these model Ids are unique within the DT instance.
string newComponentModelId = await GetUniqueModelIdAsync(SamplesConstants.TemporaryComponentModelPrefix, DigitalTwinsClient).ConfigureAwait(false);
diff --git a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/Program.cs b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/Program.cs
index 4502ccebc464..06bbace8c7ba 100644
--- a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/Program.cs
+++ b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/Program.cs
@@ -40,6 +40,9 @@ public static async Task Main(string[] args)
var componentSamples = new ComponentSamples(dtClient);
await componentSamples.RunSamplesAsync().ConfigureAwait(false);
+ var publishTelemetrySamples = new PublishTelemetrySamples(dtClient);
+ await publishTelemetrySamples.RunSamplesAsync().ConfigureAwait(false);
+
httpClient.Dispose();
}
diff --git a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/PublishTelemetrySamples.cs b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/PublishTelemetrySamples.cs
new file mode 100644
index 000000000000..7314aea31ce5
--- /dev/null
+++ b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/PublishTelemetrySamples.cs
@@ -0,0 +1,108 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Text.Json;
+using System.Threading.Tasks;
+using Azure.DigitalTwins.Core;
+using Azure.DigitalTwins.Core.Samples;
+using static Azure.DigitalTwins.Core.Samples.SampleLogger;
+using static Azure.DigitalTwins.Core.Samples.UniqueIdHelper;
+
+namespace Azure.DigitalTwins.Samples
+{
+ internal class PublishTelemetrySamples
+ {
+ private DigitalTwinsClient DigitalTwinsClient { get; }
+
+ public PublishTelemetrySamples(DigitalTwinsClient dtClient)
+ {
+ DigitalTwinsClient = dtClient;
+ }
+
+ ///
+ /// Create a temporary component model, twin model and digital twin instance.
+ /// Publish a telemetry message and a component telemetry message to the digital twin instance.
+ ///
+ public async Task RunSamplesAsync()
+ {
+ PrintHeader("PUBLISH TELEMETRY MESSAGE SAMPLE");
+
+ // For the purpose of this example we will create temporary models using a random model Ids.
+ // We will also create temporary twin instances to publish the telemetry to.
+
+ string componentModelId = await GetUniqueModelIdAsync(SamplesConstants.TemporaryComponentModelPrefix, DigitalTwinsClient).ConfigureAwait(false);
+ string modelId = await GetUniqueModelIdAsync(SamplesConstants.TemporaryModelPrefix, DigitalTwinsClient).ConfigureAwait(false);
+ string twinId = await GetUniqueTwinIdAsync(SamplesConstants.TemporaryTwinPrefix, DigitalTwinsClient).ConfigureAwait(false);
+
+ string newComponentModelPayload = SamplesConstants.TemporaryComponentModelPayload
+ .Replace(SamplesConstants.ComponentId, componentModelId);
+
+ string newModelPayload = SamplesConstants.TemporaryModelPayload
+ .Replace(SamplesConstants.ModelId, modelId)
+ .Replace(SamplesConstants.ComponentId, componentModelId);
+
+ // Then we create the models.
+ await DigitalTwinsClient
+ .CreateModelsAsync(new[] { newComponentModelPayload, newModelPayload })
+ .ConfigureAwait(false);
+
+ Console.WriteLine($"Successfully created models with Ids: {componentModelId}, {modelId}");
+
+ // Create digital twin with Component payload.
+ string twinPayload = SamplesConstants.TemporaryTwinPayload
+ .Replace(SamplesConstants.ModelId, modelId)
+ .Replace(SamplesConstants.ComponentId, componentModelId);
+
+ await DigitalTwinsClient.CreateDigitalTwinAsync(twinId, twinPayload).ConfigureAwait(false);
+ Console.WriteLine($"Created digital twin {twinId}.");
+
+ try
+ {
+ #region Snippet:DigitalTwinsSamplePublishTelemetry
+
+ // construct your json telemetry payload by hand.
+ Response publishTelemetryResponse = await DigitalTwinsClient.PublishTelemetryAsync(twinId, "{\"Telemetry1\": 5}");
+ Console.WriteLine($"Successfully published telemetry message, status: {publishTelemetryResponse.Status}");
+
+ #endregion Snippet:DigitalTwinsSamplePublishTelemetry
+
+ #region Snippet:DigitalTwinsSamplePublishComponentTelemetry
+
+ // construct your json telemetry payload by serializing a dictionary.
+ var telemetryPayload = new Dictionary
+ {
+ { "ComponentTelemetry1", 9}
+ };
+ Response publishTelemetryToComponentResponse = await DigitalTwinsClient.PublishComponentTelemetryAsync(twinId, "Component1", JsonSerializer.Serialize(telemetryPayload));
+ Console.WriteLine($"Successfully published component telemetry message, status: {publishTelemetryToComponentResponse.Status}");
+
+ #endregion Snippet:DigitalTwinsSamplePublishComponentTelemetry
+ }
+ catch (Exception ex)
+ {
+ FatalError($"Failed to publish a telemetry message due to {ex.Message}");
+ }
+
+ try
+ {
+ // Delete the twin.
+ await DigitalTwinsClient.DeleteDigitalTwinAsync(twinId).ConfigureAwait(false);
+
+ // Delete the models.
+ await DigitalTwinsClient.DeleteModelAsync(modelId).ConfigureAwait(false);
+ await DigitalTwinsClient.DeleteModelAsync(componentModelId).ConfigureAwait(false);
+ }
+ catch (RequestFailedException ex) when (ex.Status == (int)HttpStatusCode.NotFound)
+ {
+ // Digital twin or models do not exist.
+ }
+ catch (RequestFailedException ex)
+ {
+ FatalError($"Failed to delete due to {ex.Message}");
+ }
+ }
+ }
+}
diff --git a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/SamplesConstants.cs b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/SamplesConstants.cs
index b2c63feb2630..3cec544fa6c3 100644
--- a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/SamplesConstants.cs
+++ b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/DigitalTwinsClientSample/SamplesConstants.cs
@@ -74,6 +74,11 @@ public static class SamplesConstants
""@type"": ""Component"",
""name"": ""Component1"",
""schema"": ""COMPONENT_ID""
+ },
+ {
+ ""@type"": ""Telemetry"",
+ ""name"": ""Telemetry1"",
+ ""schema"": ""integer""
}
]
}";
@@ -102,6 +107,11 @@ public static class SamplesConstants
""@type"": ""Property"",
""name"": ""ComponentProp2"",
""schema"": ""string""
+ },
+ {
+ ""@type"": ""Telemetry"",
+ ""name"": ""ComponentTelemetry1"",
+ ""schema"": ""integer""
}
]
}";
diff --git a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/Readme.md b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/Readme.md
index d0bae7181ac5..479863439f0d 100644
--- a/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/Readme.md
+++ b/sdk/digitaltwins/Azure.DigitalTwins.Core/samples/Readme.md
@@ -5,7 +5,8 @@ Sample project demonstrates the following:
* Create, query and delete a Digital Twin
* Get and update components for a Digital Twin
* Create, get and delete relationships between Digital Twins
-* Create, get and delete eventroutes for Digital Twin
+* Create, get and delete event routes for Digital Twin
+* Publish telemetry messages to a Digital Twin and Digital Twin component
## Creating Digital Twin Client
@@ -265,3 +266,25 @@ Delete an event route given event route id
```C# Snippet:DigitalTwinSampleDeleteEventRoute
Response response = await DigitalTwinsClient.DeleteEventRouteAsync(_eventRouteId).ConfigureAwait(false);
```
+
+### Publish telemetry messages to a Digital Twin
+
+To publish a telemetry message to a digital twin, you need to provide the digital twin id, along with the payload on which telemetry that needs the update.
+
+```C# Snippet:DigitalTwinsSamplePublishTelemetry
+// construct your json telemetry payload by hand.
+Response publishTelemetryResponse = await DigitalTwinsClient.PublishTelemetryAsync(twinId, "{\"Telemetry1\": 5}");
+Console.WriteLine($"Successfully published telemetry message, status: {publishTelemetryResponse.Status}");
+```
+
+You can also publish a telemetry message to a specific component in a digital twin. In addition to the digital twin id and payload, you need to specify the target component id.
+
+```C# Snippet:DigitalTwinsSamplePublishComponentTelemetry
+// construct your json telemetry payload by serializing a dictionary.
+var telemetryPayload = new Dictionary
+{
+ { "ComponentTelemetry1", 9}
+};
+Response publishTelemetryToComponentResponse = await DigitalTwinsClient.PublishComponentTelemetryAsync(twinId, "Component1", JsonSerializer.Serialize(telemetryPayload));
+Console.WriteLine($"Successfully published component telemetry message, status: {publishTelemetryToComponentResponse.Status}");
+```
diff --git a/sdk/digitaltwins/Azure.DigitalTwins.Core/src/Customized/DigitalTwinsRestClient.cs b/sdk/digitaltwins/Azure.DigitalTwins.Core/src/Customized/DigitalTwinsRestClient.cs
index 7284b713f2aa..bb48c8c7ea37 100644
--- a/sdk/digitaltwins/Azure.DigitalTwins.Core/src/Customized/DigitalTwinsRestClient.cs
+++ b/sdk/digitaltwins/Azure.DigitalTwins.Core/src/Customized/DigitalTwinsRestClient.cs
@@ -629,40 +629,70 @@ internal Response UpdateComponent(string id, string componentPath, strin
}
}
- public async Task SendTelemetryAsync(string id, string ceId, string ceDataschema, string ceSpecversion, string ceType, string ceDatacontenttype = null, string ceSubject = null, string ceTime = null, string telemetry = null, CancellationToken cancellationToken = default)
+ internal async Task SendTelemetryAsync(string id, string dtId, string telemetry, string dtTimestamp = null, CancellationToken cancellationToken = default)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}
- if (ceId == null)
+ if (dtId == null)
{
- throw new ArgumentNullException(nameof(ceId));
+ throw new ArgumentNullException(nameof(dtId));
}
- if (ceDataschema == null)
+ if (telemetry == null)
{
- throw new ArgumentNullException(nameof(ceDataschema));
+ throw new ArgumentNullException(nameof(telemetry));
}
- if (ceSpecversion == null)
+
+ using DiagnosticScope scope = _clientDiagnostics.CreateScope("DigitalTwinsClient.SendTelemetry");
+ scope.Start();
+ try
+ {
+ using HttpMessage message = CreateSendTelemetryRequest(id, dtId, telemetry, dtTimestamp);
+ await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);
+ switch (message.Response.Status)
+ {
+ case 204:
+ return message.Response;
+ default:
+ throw await _clientDiagnostics.CreateRequestFailedExceptionAsync(message.Response).ConfigureAwait(false);
+ }
+ }
+ catch (Exception e)
{
- throw new ArgumentNullException(nameof(ceSpecversion));
+ scope.Failed(e);
+ throw;
}
- if (ceType == null)
+ }
+
+ internal Response SendTelemetry(string id, string dtId, string telemetry, string dtTimestamp = null, CancellationToken cancellationToken = default)
+ {
+ if (id == null)
{
- throw new ArgumentNullException(nameof(ceType));
+ throw new ArgumentNullException(nameof(id));
+ }
+ if (dtId == null)
+ {
+ throw new ArgumentNullException(nameof(dtId));
+ }
+ if (telemetry == null)
+ {
+ throw new ArgumentNullException(nameof(telemetry));
}
using DiagnosticScope scope = _clientDiagnostics.CreateScope("DigitalTwinsClient.SendTelemetry");
scope.Start();
try
{
- using HttpMessage message = CreateSendTelemetryRequest(id, ceId, ceDataschema, ceSpecversion, ceType, ceDatacontenttype, ceSubject, ceTime, telemetry);
- await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);
- return message.Response.Status switch
+ using HttpMessage message = CreateSendTelemetryRequest(id, dtId, telemetry, dtTimestamp);
+ _pipeline.Send(message, cancellationToken);
+ switch (message.Response.Status)
{
- 204 => message.Response,
- _ => throw await _clientDiagnostics.CreateRequestFailedExceptionAsync(message.Response).ConfigureAwait(false),
- };
+ case 204:
+ return message.Response;
+ default:
+ throw _clientDiagnostics.CreateRequestFailedException(message.Response);
+ }
}
catch (Exception e)
{
@@ -671,40 +701,78 @@ public async Task SendTelemetryAsync(string id, string ceId, string ce
}
}
- public Response SendTelemetry(string id, string ceId, string ceDataschema, string ceSpecversion, string ceType, string ceDatacontenttype = null, string ceSubject = null, string ceTime = null, string telemetry = null, CancellationToken cancellationToken = default)
+ internal async Task SendComponentTelemetryAsync(string id, string componentPath, string dtId, string telemetry, string dtTimestamp = null, CancellationToken cancellationToken = default)
{
if (id == null)
{
throw new ArgumentNullException(nameof(id));
}
- if (ceId == null)
+ if (componentPath == null)
{
- throw new ArgumentNullException(nameof(ceId));
+ throw new ArgumentNullException(nameof(componentPath));
}
- if (ceDataschema == null)
+ if (dtId == null)
{
- throw new ArgumentNullException(nameof(ceDataschema));
+ throw new ArgumentNullException(nameof(dtId));
}
- if (ceSpecversion == null)
+ if (telemetry == null)
{
- throw new ArgumentNullException(nameof(ceSpecversion));
+ throw new ArgumentNullException(nameof(telemetry));
}
- if (ceType == null)
+
+ using DiagnosticScope scope = _clientDiagnostics.CreateScope("DigitalTwinsClient.SendComponentTelemetry");
+ scope.Start();
+ try
{
- throw new ArgumentNullException(nameof(ceType));
+ using HttpMessage message = CreateSendComponentTelemetryRequest(id, componentPath, dtId, telemetry, dtTimestamp);
+ await _pipeline.SendAsync(message, cancellationToken).ConfigureAwait(false);
+ switch (message.Response.Status)
+ {
+ case 204:
+ return message.Response;
+ default:
+ throw await _clientDiagnostics.CreateRequestFailedExceptionAsync(message.Response).ConfigureAwait(false);
+ }
}
+ catch (Exception e)
+ {
+ scope.Failed(e);
+ throw;
+ }
+ }
- using DiagnosticScope scope = _clientDiagnostics.CreateScope("DigitalTwinsClient.SendTelemetry");
+ internal Response SendComponentTelemetry(string id, string componentPath, string dtId, string telemetry, string dtTimestamp = null, CancellationToken cancellationToken = default)
+ {
+ if (id == null)
+ {
+ throw new ArgumentNullException(nameof(id));
+ }
+ if (componentPath == null)
+ {
+ throw new ArgumentNullException(nameof(componentPath));
+ }
+ if (dtId == null)
+ {
+ throw new ArgumentNullException(nameof(dtId));
+ }
+ if (telemetry == null)
+ {
+ throw new ArgumentNullException(nameof(telemetry));
+ }
+
+ using DiagnosticScope scope = _clientDiagnostics.CreateScope("DigitalTwinsClient.SendComponentTelemetry");
scope.Start();
try
{
- using HttpMessage message = CreateSendTelemetryRequest(id, ceId, ceDataschema, ceSpecversion, ceType, ceDatacontenttype, ceSubject, ceTime, telemetry);
+ using HttpMessage message = CreateSendComponentTelemetryRequest(id, componentPath, dtId, telemetry, dtTimestamp);
_pipeline.Send(message, cancellationToken);
- return message.Response.Status switch
+ switch (message.Response.Status)
{
- 204 => message.Response,
- _ => throw _clientDiagnostics.CreateRequestFailedException(message.Response),
- };
+ case 204:
+ return message.Response;
+ default:
+ throw _clientDiagnostics.CreateRequestFailedException(message.Response);
+ }
}
catch (Exception e)
{
@@ -804,34 +872,51 @@ private HttpMessage CreateUpdateComponentRequest(string id, string componentPath
return message;
}
- private HttpMessage CreateSendTelemetryRequest(string id, string ceId, string ceDataschema, string ceSpecversion, string ceType, string ceDatacontenttype, string ceSubject, string ceTime, string telemetry)
+ private HttpMessage CreateSendTelemetryRequest(string id, string dtId, string telemetry, string dtTimestamp)
{
- HttpMessage message = _pipeline.CreateMessage();
- Request request = message.Request;
+ var message = _pipeline.CreateMessage();
+ var request = message.Request;
request.Method = RequestMethod.Post;
var uri = new RawRequestUriBuilder();
uri.Reset(endpoint);
uri.AppendPath("/digitaltwins/", false);
uri.AppendPath(id, true);
uri.AppendPath("/telemetry", false);
+ uri.AppendQuery("api-version", apiVersion, true);
request.Uri = uri;
- request.Headers.Add("ce-id", ceId);
- if (ceDatacontenttype != null)
+ request.Headers.Add("dt-id", dtId);
+ if (dtTimestamp != null)
{
- request.Headers.Add("ce-datacontenttype", ceDatacontenttype);
+ request.Headers.Add("dt-timestamp", dtTimestamp);
}
- request.Headers.Add("ce-dataschema", ceDataschema);
- request.Headers.Add("ce-specversion", ceSpecversion);
- if (ceSubject != null)
+ request.Headers.Add("Content-Type", "application/json");
+ if (telemetry != null)
{
- request.Headers.Add("ce-subject", ceSubject);
+ request.Content = new StringRequestContent(telemetry);
}
- if (ceTime != null)
+ return message;
+ }
+
+ private HttpMessage CreateSendComponentTelemetryRequest(string id, string componentPath, string dtId, string telemetry, string dtTimestamp)
+ {
+ var message = _pipeline.CreateMessage();
+ var request = message.Request;
+ request.Method = RequestMethod.Post;
+ var uri = new RawRequestUriBuilder();
+ uri.Reset(endpoint);
+ uri.AppendPath("/digitaltwins/", false);
+ uri.AppendPath(id, true);
+ uri.AppendPath("/components/", false);
+ uri.AppendPath(componentPath, true);
+ uri.AppendPath("/telemetry", false);
+ uri.AppendQuery("api-version", apiVersion, true);
+ request.Uri = uri;
+ request.Headers.Add("dt-id", dtId);
+ if (dtTimestamp != null)
{
- request.Headers.Add("ce-time", ceTime);
+ request.Headers.Add("dt-timestamp", dtTimestamp);
}
- request.Headers.Add("ce-type", ceType);
- request.Headers.Add("Content-Type", "application/json; charset=utf-8");
+ request.Headers.Add("Content-Type", "application/json");
if (telemetry != null)
{
request.Content = new StringRequestContent(telemetry);
@@ -853,26 +938,36 @@ private HttpMessage CreateSendTelemetryRequest(string id, string ceId, string ce
private Response