Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## [Unreleased]
### Added
- New resource `elasticstack_elasticsearch_data_stream` to manage Elasticsearch [data streams](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) ([#45](https://github.com/elastic/terraform-provider-elasticstack/pull/45))
- New resource `elasticstack_elasticsearch_ingest_pipeline` to manage Elasticsearch [ingest pipelines](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/ingest.html) ([#56](https://github.com/elastic/terraform-provider-elasticstack/issues/56))

### Fixed
- Update only changed index settings ([#52](https://github.com/elastic/terraform-provider-elasticstack/issues/52))
Expand Down
82 changes: 82 additions & 0 deletions docs/resources/elasticsearch_ingest_pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
subcategory: "Ingest"
layout: ""
page_title: "Elasticstack: elasticstack_elasticsearch_ingest_pipeline Resource"
description: |-
Manages Ingest Pipelines
---

# Resource: elasticstack_elasticsearch_ingest_pipeline

Use ingest APIs to manage tasks and resources related to ingest pipelines and processors. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-apis.html

## Example Usage

```terraform
provider "elasticstack" {
elasticsearch {}
}

resource "elasticstack_elasticsearch_ingest_pipeline" "my_ingest_pipeline" {
name = "my_ingest_pipeline"
description = "My first ingest pipeline managed by Terraform"

// processors can be defined in different way
processors = [
// using the jsonencode function, which is the recomended way if you want to provide JSON object by yourself
jsonencode({
set = {
description = "My set processor descirption"
field = "_meta"
value = "indexed"
}
}),
// or use the HERE DOC construct to provide the processor definition
<<EOF
{"json": {
"field": "data",
"target_field": "parsed_data"
}}
EOF
,
]
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- **name** (String) The name of the ingest pipeline.
- **processors** (List of String) Processors used to perform transformations on documents before indexing. Processors run sequentially in the order specified. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html. Each record must be a valid JSON document.

### Optional

- **description** (String) Description of the ingest pipeline.
- **elasticsearch_connection** (Block List, Max: 1) Used to establish connection to Elasticsearch server. Overrides environment variables if present. (see [below for nested schema](#nestedblock--elasticsearch_connection))
- **metadata** (String) Optional user metadata about the index template.
- **on_failure** (List of String) Processors to run immediately after a processor failure. Each processor supports a processor-level `on_failure` value. If a processor without an `on_failure` value fails, Elasticsearch uses this pipeline-level parameter as a fallback. The processors in this parameter run sequentially in the order specified. Elasticsearch will not attempt to run the pipeline’s remaining processors. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html. Each record must be a valid JSON document

### Read-Only

- **id** (String) Internal identifier of the resource

<a id="nestedblock--elasticsearch_connection"></a>
### Nested Schema for `elasticsearch_connection`

Optional:

- **ca_file** (String) Path to a custom Certificate Authority certificate
- **endpoints** (List of String, Sensitive) A list of endpoints the Terraform provider will point to. They must include the http(s) schema and port number.
- **insecure** (Boolean) Disable TLS certificate validation
- **password** (String, Sensitive) A password to use for API authentication to Elasticsearch.
- **username** (String) A username to use for API authentication to Elasticsearch.

## Import

Import is supported using the following syntax:

```shell
terraform import elasticstack_elasticsearch_ingest_pipeline.my_ingest_pipeline <cluster_uuid>/<ingest pipeline name>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
terraform import elasticstack_elasticsearch_ingest_pipeline.my_ingest_pipeline <cluster_uuid>/<ingest pipeline name>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
provider "elasticstack" {
elasticsearch {}
}

resource "elasticstack_elasticsearch_ingest_pipeline" "my_ingest_pipeline" {
name = "my_ingest_pipeline"
description = "My first ingest pipeline managed by Terraform"

// processors can be defined in different way
processors = [
// using the jsonencode function, which is the recomended way if you want to provide JSON object by yourself
jsonencode({
set = {
description = "My set processor descirption"
field = "_meta"
value = "indexed"
}
}),
// or use the HERE DOC construct to provide the processor definition
<<EOF
{"json": {
"field": "data",
"target_field": "parsed_data"
}}
EOF
,
]
}
60 changes: 60 additions & 0 deletions internal/clients/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,63 @@ func (a *ApiClient) DeleteElasticsearchDataStream(dataStreamName string) diag.Di

return diags
}

func (a *ApiClient) PutElasticsearchIngestPipeline(pipeline *models.IngestPipeline) diag.Diagnostics {
var diags diag.Diagnostics
pipelineBytes, err := json.Marshal(pipeline)
if err != nil {
return diag.FromErr(err)
}
log.Printf("[TRACE] creating ingest pipeline %s: %s", pipeline.Name, pipelineBytes)

res, err := a.es.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(pipelineBytes))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to create or update ingest pipeline: %s", pipeline.Name)); diags.HasError() {
return diags
}

return diags
}

func (a *ApiClient) GetElasticsearchIngestPipeline(name *string) (*models.IngestPipeline, diag.Diagnostics) {
var diags diag.Diagnostics
req := a.es.Ingest.GetPipeline.WithPipelineID(*name)
res, err := a.es.Ingest.GetPipeline(req)
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested ingest pipeline: %s", *name)); diags.HasError() {
return nil, diags
}

pipelines := make(map[string]models.IngestPipeline)
if err := json.NewDecoder(res.Body).Decode(&pipelines); err != nil {
return nil, diag.FromErr(err)
}
pipeline := pipelines[*name]
pipeline.Name = *name
log.Printf("[TRACE] get ingest pipeline %s from ES API: %#+v", *name, pipeline)

return &pipeline, diags
}

func (a *ApiClient) DeleteElasticsearchIngestPipeline(name *string) diag.Diagnostics {
var diags diag.Diagnostics

res, err := a.es.Ingest.DeletePipeline(*name)
if err != nil {
return diags
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete ingest pipeline: %s", *name)); diags.HasError() {
return diags
}
return diags
}
Loading