diff --git a/CHANGELOG.md b/CHANGELOG.md
index b2e854af8..cf343050a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,6 @@
## [Unreleased]
+### Added
+- New resource `elasticstack_elasticsearch_data_stream` to manage Elasticsearch [data streams](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) ([#45](https://github.com/elastic/terraform-provider-elasticstack/pull/45))
## [0.2.0] - 2022-01-27
### Added
diff --git a/docs/resources/elasticsearch_data_stream.md b/docs/resources/elasticsearch_data_stream.md
new file mode 100644
index 000000000..2b9390ce4
--- /dev/null
+++ b/docs/resources/elasticsearch_data_stream.md
@@ -0,0 +1,119 @@
+---
+subcategory: "Index"
+layout: ""
+page_title: "Elasticstack: elasticstack_elasticsearch_data_stream Resource"
+description: |-
+ Manages Elasticsearch Data Streams
+---
+
+# Resource: elasticstack_elasticsearch_data_stream
+
+Manages data streams. This resource can create, delete and show the information about the created data stream. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/data-stream-apis.html
+
+## Example Usage
+
+```terraform
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+// Create an ILM policy for our data stream
+resource "elasticstack_elasticsearch_index_lifecycle" "my_ilm" {
+ name = "my_ilm_policy"
+
+ hot {
+ min_age = "1h"
+ set_priority {
+ priority = 10
+ }
+ rollover {
+ max_age = "1d"
+ }
+ readonly {}
+ }
+
+ delete {
+ min_age = "2d"
+ delete {}
+ }
+}
+
+// First we must have a index template created
+resource "elasticstack_elasticsearch_index_template" "my_data_stream_template" {
+ name = "my_data_stream"
+
+ index_patterns = ["my-stream*"]
+
+ template {
+ // make sure our template uses prepared ILM policy
+ settings = jsonencode({
+ "lifecycle.name" = elasticstack_elasticsearch_index_lifecycle.my_ilm.name
+ })
+ }
+
+ data_stream {}
+}
+
+// and now we can create data stream based on the index template
+resource "elasticstack_elasticsearch_data_stream" "my_data_stream" {
+ name = "my-stream"
+
+ // make sure that template is created before the data stream
+ depends_on = [
+ elasticstack_elasticsearch_index_template.my_data_stream_template
+ ]
+}
+```
+
+
+## Schema
+
+### Required
+
+- **name** (String) Name of the data stream to create.
+
+### Optional
+
+- **elasticsearch_connection** (Block List, Max: 1) Used to establish connection to Elasticsearch server. Overrides environment variables if present. (see [below for nested schema](#nestedblock--elasticsearch_connection))
+
+### Read-Only
+
+- **generation** (Number) Current generation for the data stream.
+- **hidden** (Boolean) If `true`, the data stream is hidden.
+- **id** (String) Internal identifier of the resource
+- **ilm_policy** (String) Name of the current ILM lifecycle policy in the stream’s matching index template.
+- **indices** (List of Object) Array of objects containing information about the data stream’s backing indices. The last item in this array contains information about the stream’s current write index. (see [below for nested schema](#nestedatt--indices))
+- **metadata** (String) Custom metadata for the stream, copied from the _meta object of the stream’s matching index template.
+- **replicated** (Boolean) If `true`, the data stream is created and managed by cross-cluster replication and the local cluster can not write into this data stream or change its mappings.
+- **status** (String) Health status of the data stream.
+- **system** (Boolean) If `true`, the data stream is created and managed by an Elastic stack component and cannot be modified through normal user interaction.
+- **template** (String) Name of the index template used to create the data stream’s backing indices.
+- **timestamp_field** (String) Contains information about the data stream’s @timestamp field.
+
+
+### Nested Schema for `elasticsearch_connection`
+
+Optional:
+
+- **ca_file** (String) Path to a custom Certificate Authority certificate
+- **endpoints** (List of String, Sensitive) A list of endpoints the Terraform provider will point to. They must include the http(s) schema and port number.
+- **insecure** (Boolean) Disable TLS certificate validation
+- **password** (String, Sensitive) A password to use for API authentication to Elasticsearch.
+- **username** (String) A username to use for API authentication to Elasticsearch.
+
+
+
+### Nested Schema for `indices`
+
+Read-Only:
+
+- **index_name** (String)
+- **index_uuid** (String)
+
+## Import
+
+Import is supported using the following syntax:
+
+```shell
+terraform import elasticstack_elasticsearch_data_stream.my_data_stream /
+```
diff --git a/examples/resources/elasticstack_elasticsearch_data_stream/import.sh b/examples/resources/elasticstack_elasticsearch_data_stream/import.sh
new file mode 100644
index 000000000..c54f48b8d
--- /dev/null
+++ b/examples/resources/elasticstack_elasticsearch_data_stream/import.sh
@@ -0,0 +1,2 @@
+terraform import elasticstack_elasticsearch_data_stream.my_data_stream /
+
diff --git a/examples/resources/elasticstack_elasticsearch_data_stream/resource.tf b/examples/resources/elasticstack_elasticsearch_data_stream/resource.tf
new file mode 100644
index 000000000..b3daccf4c
--- /dev/null
+++ b/examples/resources/elasticstack_elasticsearch_data_stream/resource.tf
@@ -0,0 +1,50 @@
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+// Create an ILM policy for our data stream
+resource "elasticstack_elasticsearch_index_lifecycle" "my_ilm" {
+ name = "my_ilm_policy"
+
+ hot {
+ min_age = "1h"
+ set_priority {
+ priority = 10
+ }
+ rollover {
+ max_age = "1d"
+ }
+ readonly {}
+ }
+
+ delete {
+ min_age = "2d"
+ delete {}
+ }
+}
+
+// First we must have a index template created
+resource "elasticstack_elasticsearch_index_template" "my_data_stream_template" {
+ name = "my_data_stream"
+
+ index_patterns = ["my-stream*"]
+
+ template {
+ // make sure our template uses prepared ILM policy
+ settings = jsonencode({
+ "lifecycle.name" = elasticstack_elasticsearch_index_lifecycle.my_ilm.name
+ })
+ }
+
+ data_stream {}
+}
+
+// and now we can create data stream based on the index template
+resource "elasticstack_elasticsearch_data_stream" "my_data_stream" {
+ name = "my-stream"
+
+ // make sure that template is created before the data stream
+ depends_on = [
+ elasticstack_elasticsearch_index_template.my_data_stream_template
+ ]
+}
diff --git a/internal/clients/index.go b/internal/clients/index.go
index 3bba48abd..d90ee6be8 100644
--- a/internal/clients/index.go
+++ b/internal/clients/index.go
@@ -272,3 +272,58 @@ func (a *ApiClient) UpdateElasticsearchIndexMappings(index, mappings string) dia
}
return diags
}
+
+func (a *ApiClient) PutElasticsearchDataStream(dataStreamName string) diag.Diagnostics {
+ var diags diag.Diagnostics
+
+ res, err := a.es.Indices.CreateDataStream(dataStreamName)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ defer res.Body.Close()
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to create DataStream: %s", dataStreamName)); diags.HasError() {
+ return diags
+ }
+
+ return diags
+}
+
+func (a *ApiClient) GetElasticsearchDataStream(dataStreamName string) (*models.DataStream, diag.Diagnostics) {
+ var diags diag.Diagnostics
+ req := a.es.Indices.GetDataStream.WithName(dataStreamName)
+ res, err := a.es.Indices.GetDataStream(req)
+ if err != nil {
+ return nil, diag.FromErr(err)
+ }
+ defer res.Body.Close()
+ if res.StatusCode == http.StatusNotFound {
+ return nil, nil
+ }
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested DataStream: %s", dataStreamName)); diags.HasError() {
+ return nil, diags
+ }
+
+ dStreams := make(map[string][]models.DataStream)
+ if err := json.NewDecoder(res.Body).Decode(&dStreams); err != nil {
+ return nil, diag.FromErr(err)
+ }
+ log.Printf("[TRACE] get data stream '%v' from ES api: %+v", dataStreamName, dStreams)
+ // if the DataStream found in must be the first index in the data_stream object
+ ds := dStreams["data_streams"][0]
+ return &ds, diags
+}
+
+func (a *ApiClient) DeleteElasticsearchDataStream(dataStreamName string) diag.Diagnostics {
+ var diags diag.Diagnostics
+
+ res, err := a.es.Indices.DeleteDataStream([]string{dataStreamName})
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ defer res.Body.Close()
+ if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete DataStream: %s", dataStreamName)); diags.HasError() {
+ return diags
+ }
+
+ return diags
+}
diff --git a/internal/elasticsearch/index/data_stream.go b/internal/elasticsearch/index/data_stream.go
new file mode 100644
index 000000000..5727c9ce0
--- /dev/null
+++ b/internal/elasticsearch/index/data_stream.go
@@ -0,0 +1,229 @@
+package index
+
+import (
+ "context"
+ "encoding/json"
+ "log"
+ "regexp"
+
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ "github.com/elastic/terraform-provider-elasticstack/internal/utils"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/diag"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
+)
+
+func ResourceDataStream() *schema.Resource {
+ dataStreamSchema := map[string]*schema.Schema{
+ "id": {
+ Description: "Internal identifier of the resource",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "name": {
+ Description: "Name of the data stream to create.",
+ Type: schema.TypeString,
+ Required: true,
+ ForceNew: true,
+ ValidateFunc: validation.All(
+ validation.StringLenBetween(1, 255),
+ validation.StringNotInSlice([]string{".", ".."}, true),
+ validation.StringMatch(regexp.MustCompile(`^[^-_+]`), "cannot start with -, _, +"),
+ validation.StringMatch(regexp.MustCompile(`^[a-z0-9!$%&'()+.;=@[\]^{}~_-]+$`), "must contain lower case alphanumeric characters and selected punctuation, see: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-data-stream.html#indices-create-data-stream-api-path-params"),
+ ),
+ },
+ "timestamp_field": {
+ Description: "Contains information about the data stream’s @timestamp field.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "indices": {
+ Description: "Array of objects containing information about the data stream’s backing indices. The last item in this array contains information about the stream’s current write index.",
+ Type: schema.TypeList,
+ Computed: true,
+ Elem: &schema.Resource{
+ Schema: map[string]*schema.Schema{
+ "index_name": {
+ Description: "Name of the backing index.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "index_uuid": {
+ Description: "Universally unique identifier (UUID) for the index.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ },
+ },
+ },
+ "generation": {
+ Description: "Current generation for the data stream.",
+ Type: schema.TypeInt,
+ Computed: true,
+ },
+ "metadata": {
+ Description: "Custom metadata for the stream, copied from the _meta object of the stream’s matching index template.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "status": {
+ Description: "Health status of the data stream.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "template": {
+ Description: "Name of the index template used to create the data stream’s backing indices.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "ilm_policy": {
+ Description: "Name of the current ILM lifecycle policy in the stream’s matching index template.",
+ Type: schema.TypeString,
+ Computed: true,
+ },
+ "hidden": {
+ Description: "If `true`, the data stream is hidden.",
+ Type: schema.TypeBool,
+ Computed: true,
+ },
+ "system": {
+ Description: "If `true`, the data stream is created and managed by an Elastic stack component and cannot be modified through normal user interaction.",
+ Type: schema.TypeBool,
+ Computed: true,
+ },
+ "replicated": {
+ Description: "If `true`, the data stream is created and managed by cross-cluster replication and the local cluster can not write into this data stream or change its mappings.",
+ Type: schema.TypeBool,
+ Computed: true,
+ },
+ }
+
+ utils.AddConnectionSchema(dataStreamSchema)
+
+ return &schema.Resource{
+ Description: "Managing Elasticsearch data streams, see: https://www.elastic.co/guide/en/elasticsearch/reference/current/data-stream-apis.html",
+
+ CreateContext: resourceDataStreamPut,
+ UpdateContext: resourceDataStreamPut,
+ ReadContext: resourceDataStreamRead,
+ DeleteContext: resourceDataStreamDelete,
+
+ Importer: &schema.ResourceImporter{
+ StateContext: schema.ImportStatePassthroughContext,
+ },
+
+ Schema: dataStreamSchema,
+ }
+}
+
+func resourceDataStreamPut(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+ client, err := clients.NewApiClient(d, meta)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ dsId := d.Get("name").(string)
+ id, diags := client.ID(dsId)
+ if diags.HasError() {
+ return diags
+ }
+
+ if diags := client.PutElasticsearchDataStream(dsId); diags.HasError() {
+ return diags
+ }
+
+ d.SetId(id.String())
+ return resourceDataStreamRead(ctx, d, meta)
+}
+
+func resourceDataStreamRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+ var diags diag.Diagnostics
+ client, err := clients.NewApiClient(d, meta)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ id := d.Id()
+ compId, diags := clients.CompositeIdFromStr(id)
+ if diags.HasError() {
+ return diags
+ }
+
+ ds, diags := client.GetElasticsearchDataStream(compId.ResourceId)
+ if ds == nil && diags == nil {
+ // no data stream found on ES side
+ d.SetId("")
+ return diags
+ }
+ if diags.HasError() {
+ return diags
+ }
+ log.Printf("[TRACE] read the data stream data: %+v", ds)
+
+ if err := d.Set("name", ds.Name); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("timestamp_field", ds.TimestampField.Name); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("generation", ds.Generation); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("status", ds.Status); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("template", ds.Template); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("ilm_policy", ds.IlmPolicy); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("hidden", ds.Hidden); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("system", ds.System); err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("replicated", ds.Replicated); err != nil {
+ return diag.FromErr(err)
+ }
+ if ds.Meta != nil {
+ metadata, err := json.Marshal(ds.Meta)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ if err := d.Set("metadata", string(metadata)); err != nil {
+ return diag.FromErr(err)
+ }
+ }
+
+ indices := make([]interface{}, len(ds.Indices))
+ for i, idx := range ds.Indices {
+ index := make(map[string]interface{})
+ index["index_name"] = idx.IndexName
+ index["index_uuid"] = idx.IndexUUID
+ indices[i] = index
+ }
+ if err := d.Set("indices", indices); err != nil {
+ return diag.FromErr(err)
+ }
+
+ return diags
+}
+
+func resourceDataStreamDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
+ var diags diag.Diagnostics
+ client, err := clients.NewApiClient(d, meta)
+ if err != nil {
+ return diag.FromErr(err)
+ }
+ id := d.Id()
+ compId, diags := clients.CompositeIdFromStr(id)
+ if diags.HasError() {
+ return diags
+ }
+ if diags := client.DeleteElasticsearchDataStream(compId.ResourceId); diags.HasError() {
+ return diags
+ }
+
+ d.SetId("")
+ return diags
+}
diff --git a/internal/elasticsearch/index/data_stream_test.go b/internal/elasticsearch/index/data_stream_test.go
new file mode 100644
index 000000000..ca0c07160
--- /dev/null
+++ b/internal/elasticsearch/index/data_stream_test.go
@@ -0,0 +1,112 @@
+package index_test
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/elastic/terraform-provider-elasticstack/internal/acctest"
+ "github.com/elastic/terraform-provider-elasticstack/internal/clients"
+ sdkacctest "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
+ "github.com/hashicorp/terraform-plugin-sdk/v2/terraform"
+)
+
+func TestAccResourceDataStream(t *testing.T) {
+ // generate renadom name
+ dsName := sdkacctest.RandStringFromCharSet(22, sdkacctest.CharSetAlpha)
+
+ resource.UnitTest(t, resource.TestCase{
+ PreCheck: func() { acctest.PreCheck(t) },
+ CheckDestroy: checkResourceDataStreamDestroy,
+ ProviderFactories: acctest.Providers,
+ Steps: []resource.TestStep{
+ {
+ Config: testAccResourceDataStreamCreate(dsName),
+ Check: resource.ComposeTestCheckFunc(
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_data_stream.test_ds", "name", dsName),
+ // check some computed fields
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_data_stream.test_ds", "indices.#", "1"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_data_stream.test_ds", "template", dsName),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_data_stream.test_ds", "ilm_policy", dsName),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_data_stream.test_ds", "hidden", "false"),
+ resource.TestCheckResourceAttr("elasticstack_elasticsearch_data_stream.test_ds", "system", "false"),
+ ),
+ },
+ },
+ })
+}
+
+func testAccResourceDataStreamCreate(name string) string {
+ return fmt.Sprintf(`
+provider "elasticstack" {
+ elasticsearch {}
+}
+
+resource "elasticstack_elasticsearch_index_lifecycle" "test_ilm" {
+ name = "%s"
+
+ hot {
+ min_age = "1h"
+ set_priority {
+ priority = 10
+ }
+ rollover {
+ max_age = "1d"
+ }
+ readonly {}
+ }
+
+ delete {
+ min_age = "2d"
+ delete {}
+ }
+}
+
+resource "elasticstack_elasticsearch_index_template" "test_ds_template" {
+ name = "%s"
+
+ index_patterns = ["%s*"]
+
+ template {
+ // make sure our template uses prepared ILM policy
+ settings = jsonencode({
+ "lifecycle.name" = elasticstack_elasticsearch_index_lifecycle.test_ilm.name
+ })
+ }
+
+ data_stream {}
+}
+
+// and now we can create data stream based on the index template
+resource "elasticstack_elasticsearch_data_stream" "test_ds" {
+ name = "%s"
+
+ // make sure that template is created before the data stream
+ depends_on = [
+ elasticstack_elasticsearch_index_template.test_ds_template
+ ]
+}
+ `, name, name, name, name)
+}
+
+func checkResourceDataStreamDestroy(s *terraform.State) error {
+ client := acctest.Provider.Meta().(*clients.ApiClient)
+
+ for _, rs := range s.RootModule().Resources {
+ if rs.Type != "elasticstack_elasticsearch_data_stream" {
+ continue
+ }
+ compId, _ := clients.CompositeIdFromStr(rs.Primary.ID)
+
+ req := client.GetESClient().Indices.GetDataStream.WithName(compId.ResourceId)
+ res, err := client.GetESClient().Indices.GetDataStream(req)
+ if err != nil {
+ return err
+ }
+
+ if res.StatusCode != 404 {
+ return fmt.Errorf("Data Stream (%s) still exists", compId.ResourceId)
+ }
+ }
+ return nil
+}
diff --git a/internal/models/models.go b/internal/models/models.go
index 3b45ee1d4..978340816 100644
--- a/internal/models/models.go
+++ b/internal/models/models.go
@@ -133,3 +133,26 @@ type IndexAlias struct {
Routing string `json:"routing,omitempty"`
SearchRouting string `json:"search_routing,omitempty"`
}
+
+type DataStream struct {
+ Name string `json:"name"`
+ TimestampField TimestampField `json:"timestamp_field"`
+ Indices []DataStreamIndex `json:"indices"`
+ Generation uint64 `json:"generation"`
+ Meta map[string]interface{} `json:"_meta"`
+ Status string `json:"status"`
+ Template string `json:"template"`
+ IlmPolicy string `json:"ilm_policy"`
+ Hidden bool `json:"hidden"`
+ System bool `json:"system"`
+ Replicated bool `json:"replicated"`
+}
+
+type DataStreamIndex struct {
+ IndexName string `json:"index_name"`
+ IndexUUID string `json:"index_uuid"`
+}
+
+type TimestampField struct {
+ Name string `json:"name"`
+}
diff --git a/internal/provider/provider.go b/internal/provider/provider.go
index e71b8ac4a..9747eeabd 100644
--- a/internal/provider/provider.go
+++ b/internal/provider/provider.go
@@ -69,6 +69,7 @@ func New(version string) func() *schema.Provider {
},
ResourcesMap: map[string]*schema.Resource{
"elasticstack_elasticsearch_cluster_settings": cluster.ResourceSettings(),
+ "elasticstack_elasticsearch_data_stream": index.ResourceDataStream(),
"elasticstack_elasticsearch_index": index.ResourceIndex(),
"elasticstack_elasticsearch_index_lifecycle": index.ResourceIlm(),
"elasticstack_elasticsearch_index_template": index.ResourceTemplate(),
diff --git a/templates/resources/elasticsearch_data_stream.md.tmpl b/templates/resources/elasticsearch_data_stream.md.tmpl
new file mode 100644
index 000000000..4d7759bbb
--- /dev/null
+++ b/templates/resources/elasticsearch_data_stream.md.tmpl
@@ -0,0 +1,23 @@
+---
+subcategory: "Index"
+layout: ""
+page_title: "Elasticstack: elasticstack_elasticsearch_data_stream Resource"
+description: |-
+ Manages Elasticsearch Data Streams
+---
+
+# Resource: elasticstack_elasticsearch_data_stream
+
+Manages data streams. This resource can create, delete and show the information about the created data stream. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/data-stream-apis.html
+
+## Example Usage
+
+{{ tffile "examples/resources/elasticstack_elasticsearch_data_stream/resource.tf" }}
+
+{{ .SchemaMarkdown | trimspace }}
+
+## Import
+
+Import is supported using the following syntax:
+
+{{ codefile "shell" "examples/resources/elasticstack_elasticsearch_data_stream/import.sh" }}