The elasticsearch_sender_api
makes the elasticsearch sender functionality available to any processor. It's a teraslice api that uses the api factory to create, cache, and manage multiple elasticsearch senders. This api is the core of the elasticsearch bulk operation and utilizes the standard metadata fields, e.g., _key
, _process_time
,_ingest_time
, etc... See the metadata section for details about metadata fields.
The elasticsearch_sender_api will also look for the metadata field _delete_id
in each record's metadata, if this field exists it adds a delete operation for the id in the _delete_id
field to the bulk request. This allows for an index (or any other action) and a delete operation in the same bulk request.
A teraslice job and the associated processor using the elasticsearch_sender_api
Example Job
{
"name" : "example",
"workers" : 1,
"slicers" : 1,
"lifecycle" : "once",
"assets" : [
"elasticsearch"
],
"apis" : [
{
"_name": "elasticsearch_sender_api",
"connection": "ELASTICSEARCH_CONNECTION",
"index": "example_index",
"type": "_doc",
"size": 1000
}
],
"operations" : [
{
"_op" : "example-reader",
},
{
"_op" : "example_sender",
"api_name" : "elasticsearch_sender_api"
},
]
}
The processor for the job described above
// located at /example_sender/processor.ts
const { BatchProcessor } = require('@terascope/job-components');
export default class SomeSender extends BatchProcessor {
async initialize() {
await super.initialize();
const apiManager = this.getAP(this.opConfig.api_name);
this.client = await apiManager.create('bulkSender', {});
}
async onBatch(data) {
if (data == null || data.length === 0) return data;
await this.client.send(data);
// NOTE: its important to return original data so operators afterwards can run
return data;
}
}
Returns the number of separate sender apis
parameters:
- name: String
Fetches any sender api associated with the name provided
parameters:
- name: String
Fetches any sender api config associated with the name provided
parameters:
- name: String
- configOverrides: Check options below, optional
Creates an instance of a sender api and caches it with the name given. Any config provided in the second argument will override what is specified in the apiConfig. Throws an error if you try creating another api with the same name.
parameters:
- name: String
Removes an instance of a sender api and follows any cleanup code specified in the api code.
Iterates over the cached names and clients
Iterates over the cached names
Iterates over the values
// example of api configuration
const apiConfig = {
_name: "elasticsearch_sender_api",
index: "new_index",
size: 1000,
type: "events",
connection: "default"
};
const apiManager = this.getAPI(apiName);
apiManager.size() === 0
// returns an api cached at "normalClient" and uses the default api config
const normalClient = await apiManager.create('normalClient', {})
apiManager.size() === 1
apiManager.get('normalClient') === normalClient
// returns an api cached at "overrideClient" and it will use the api config, but overrides the index to "other_index" in the new instance.
const overrideClient = await apiManager.create('overrideClient', { index: 'other_index', connection: "other", update: true })
apiManager.size() === 2
// returns the full configuration for this client
apiManger.getConfig('overrideClient') === {
_name: "elasticsearch_sender_api",
index: "other_index",
size: 1000,
type: "events",
connection: "other",
update: true
}
await apiManger.remove('normalClient');
apiManager.size() === 1
apiManager.get('normalClient') === undefined
The sender class, sender api, returned from the create method of the APIFactory, follows our common sender api interface.
(records: DataEntities[]) => Promise<void>
Formats an elasticsearch bulk request and sends it to elasticsearch
parameters:
- records: an array of data-entities
(route?: string) => Promise<void>
Ensures that the index is created. The bulk index request will make the index if it doesn't exist so this function is not necessary for the Elasticsearch sender, but this might change in the future.
parameters:
- route: a string representing the index to create
await api.send([
DataEntity.make({
some: 'data',
name: 'someName',
job: 'to be awesome!'
})
]);
Configuration | Description | Type | Notes |
---|---|---|---|
_op | Name of operation, it must reflect the exact name of the file | String | required |
size | the maximum number of docs it will send in a given request, anything past it will be split up and sent | Number | required, typically the index selector returns up to double the length of the original documents due to the metadata involved with bulk requests. This number is essentially doubled to to maintain the notion that we split by actual documents and not the metadata |
connection | Name of the elasticsearch connection to use when sending data | String | optional, defaults to the 'default' connection created for elasticsearch |
index | Index to where the data will be sent to, it must be lowercase | String | required |
type | Set the type of the data for elasticsearch | String | optional defaults to '_doc', is required for elasticsearch v5 |
delete | Use the id_field from the incoming records to bulk delete documents | Boolean | optional, defaults to false |
upsert | Specify if the incoming records should be used to perform an upsert. If update_fields is also specified then existing records will be updated with those fields otherwise the full incoming record will be inserted | Boolean | optional, defaults to false |
create | Specify if the incoming records should be used to perform an create event ("put-if-absent" behavior) | Boolean | optional, defaults to false |
update | Specify if the data should update existing records, if false it will index them | Boolean | optional, defaults to false |
update_fields | if you are updating the documents, you can specify fields to update here (it should be an array containing all the field names you want), it defaults to sending the entire document | Array | optional, defaults to [] |
script_file | Name of the script file to run as part of an update request | String | optional |
script | Inline script to include in each indexing request. Only very simple painless scripts are currently supported | String | optional |
script_params | key -> value parameter mappings. The value will be extracted from the incoming data and passed to the script as param based on the key | Object | optional |
update_retry_on_conflict | If there is a version conflict from an update how often should it be retried | Number | optional, defaults to 0 |