Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## [Unreleased]
### Added
- New resource `elasticstack_elasticsearch_data_stream` to manage Elasticsearch [data streams](https://www.elastic.co/guide/en/elasticsearch/reference/current/data-streams.html) ([#45](https://github.com/elastic/terraform-provider-elasticstack/pull/45))
- New resource `elasticstack_elasticsearch_ingest_pipeline` to manage Elasticsearch [ingest pipelines](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/ingest.html) ([#56](https://github.com/elastic/terraform-provider-elasticstack/issues/56))

### Fixed
- Update only changed index settings ([#52](https://github.com/elastic/terraform-provider-elasticstack/issues/52))
Expand Down
82 changes: 82 additions & 0 deletions docs/resources/elasticsearch_ingest_pipeline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
---
subcategory: "Ingest"
layout: ""
page_title: "Elasticstack: elasticstack_elasticsearch_ingest_pipeline Resource"
description: |-
Manages Ingest Pipelines
---

# Resource: elasticstack_elasticsearch_ingest_pipeline

Use ingest APIs to manage tasks and resources related to ingest pipelines and processors. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-apis.html

## Example Usage

```terraform
provider "elasticstack" {
elasticsearch {}
}

resource "elasticstack_elasticsearch_ingest_pipeline" "my_ingest_pipeline" {
name = "my_ingest_pipeline"
description = "My first ingest pipeline managed by Terraform"

// processors can be defined in different way
processors = [
// using the jsonencode function, which is the recomended way if you want to provide JSON object by yourself
jsonencode({
set = {
description = "My set processor descirption"
field = "_meta"
value = "indexed"
}
}),
// or use the HERE DOC construct to provide the processor definition
<<EOF
{"json": {
"field": "data",
"target_field": "parsed_data"
}}
EOF
,
]
}
```

<!-- schema generated by tfplugindocs -->
## Schema

### Required

- **name** (String) The name of the ingest pipeline.
- **processors** (List of String) Processors used to perform transformations on documents before indexing. Processors run sequentially in the order specified. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html. Each record must be a valid JSON document.

### Optional

- **description** (String) Description of the ingest pipeline.
- **elasticsearch_connection** (Block List, Max: 1) Used to establish connection to Elasticsearch server. Overrides environment variables if present. (see [below for nested schema](#nestedblock--elasticsearch_connection))
- **metadata** (String) Optional user metadata about the index template.
- **on_failure** (List of String) Processors to run immediately after a processor failure. Each processor supports a processor-level `on_failure` value. If a processor without an `on_failure` value fails, Elasticsearch uses this pipeline-level parameter as a fallback. The processors in this parameter run sequentially in the order specified. Elasticsearch will not attempt to run the pipeline’s remaining processors. See: https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html. Each record must be a valid JSON document

### Read-Only

- **id** (String) Internal identifier of the resource

<a id="nestedblock--elasticsearch_connection"></a>
### Nested Schema for `elasticsearch_connection`

Optional:

- **ca_file** (String) Path to a custom Certificate Authority certificate
- **endpoints** (List of String, Sensitive) A list of endpoints the Terraform provider will point to. They must include the http(s) schema and port number.
- **insecure** (Boolean) Disable TLS certificate validation
- **password** (String, Sensitive) A password to use for API authentication to Elasticsearch.
- **username** (String) A username to use for API authentication to Elasticsearch.

## Import

Import is supported using the following syntax:

```shell
terraform import elasticstack_elasticsearch_ingest_pipeline.my_ingest_pipeline <cluster_uuid>/<ingest pipeline name>
```
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
terraform import elasticstack_elasticsearch_ingest_pipeline.my_ingest_pipeline <cluster_uuid>/<ingest pipeline name>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
provider "elasticstack" {
elasticsearch {}
}

resource "elasticstack_elasticsearch_ingest_pipeline" "my_ingest_pipeline" {
name = "my_ingest_pipeline"
description = "My first ingest pipeline managed by Terraform"

// processors can be defined in different way
processors = [
// using the jsonencode function, which is the recomended way if you want to provide JSON object by yourself
jsonencode({
set = {
description = "My set processor descirption"
field = "_meta"
value = "indexed"
}
}),
// or use the HERE DOC construct to provide the processor definition
<<EOF
{"json": {
"field": "data",
"target_field": "parsed_data"
}}
EOF
,
]
}
60 changes: 60 additions & 0 deletions internal/clients/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,63 @@ func (a *ApiClient) DeleteElasticsearchDataStream(dataStreamName string) diag.Di

return diags
}

func (a *ApiClient) PutElasticsearchIngestPipeline(pipeline *models.IngestPipeline) diag.Diagnostics {
var diags diag.Diagnostics
pipelineBytes, err := json.Marshal(pipeline)
if err != nil {
return diag.FromErr(err)
}
log.Printf("[TRACE] creating ingest pipeline %s: %s", pipeline.Name, pipelineBytes)

res, err := a.es.Ingest.PutPipeline(pipeline.Name, bytes.NewReader(pipelineBytes))
if err != nil {
return diag.FromErr(err)
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to create or update ingest pipeline: %s", pipeline.Name)); diags.HasError() {
return diags
}

return diags
}

func (a *ApiClient) GetElasticsearchIngestPipeline(name *string) (*models.IngestPipeline, diag.Diagnostics) {
var diags diag.Diagnostics
req := a.es.Ingest.GetPipeline.WithPipelineID(*name)
res, err := a.es.Ingest.GetPipeline(req)
if err != nil {
return nil, diag.FromErr(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusNotFound {
return nil, nil
}
if diags := utils.CheckError(res, fmt.Sprintf("Unable to get requested ingest pipeline: %s", *name)); diags.HasError() {
return nil, diags
}

pipelines := make(map[string]models.IngestPipeline)
if err := json.NewDecoder(res.Body).Decode(&pipelines); err != nil {
return nil, diag.FromErr(err)
}
pipeline := pipelines[*name]
pipeline.Name = *name
log.Printf("[TRACE] get ingest pipeline %s from ES API: %#+v", *name, pipeline)

return &pipeline, diags
}

func (a *ApiClient) DeleteElasticsearchIngestPipeline(name *string) diag.Diagnostics {
var diags diag.Diagnostics

res, err := a.es.Ingest.DeletePipeline(*name)
if err != nil {
return diags
}
defer res.Body.Close()
if diags := utils.CheckError(res, fmt.Sprintf("Unable to delete ingest pipeline: %s", *name)); diags.HasError() {
return diags
}
return diags
}
Loading