Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
Integration-Test-OpenSearch:
strategy:
matrix:
logstash: [ "7.13.2", "7.13.4" ]
logstash: [ "7.15.2", "7.14.2" ]
opensearch: [ "1.2.0" ]
secure: [ true, false ]

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
id: lychee
uses: lycheeverse/lychee-action@master
with:
args: --accept=200,403,429 "**/*.html" "**/*.md" "**/*.txt" "**/*.json" --exclude "https://github.com/\[your*" --exclude-mail
args: --accept=200,403,429 **/*.html **/*.md **/*.txt **/*.json --exclude https://github.com/\[your* --exclude-mail
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
- name: Fail if there were link errors
Expand Down
10 changes: 10 additions & 0 deletions logstash-output-opensearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# SPDX-License-Identifier: Apache-2.0
#
# The OpenSearch Contributors require contributions made to
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
#
# Modifications Copyright OpenSearch Contributors. See
# GitHub history for details.

Gem::Specification.new do |s|
s.name = 'logstash-output-opensearch'
s.version = '1.2.0'
Expand Down Expand Up @@ -37,4 +46,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'logstash-devutils'
s.add_development_dependency 'flores'
s.add_development_dependency 'cabin', ['~> 0.6']
s.add_development_dependency 'opensearch-ruby'
end
6 changes: 3 additions & 3 deletions spec/integration/outputs/compressed_indexing_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
}
subject { LogStash::Outputs::OpenSearch.new(config) }

let(:es_url) { "http://#{get_host_port}" }
let(:index_url) {"#{es_url}/#{index}"}
let(:opensearch_url) { "http://#{get_host_port}" }
let(:index_url) {"#{opensearch_url}/#{index}"}
let(:http_client_options) { {} }
let(:http_client) do
Manticore::Client.new(http_client_options)
Expand All @@ -49,7 +49,7 @@
it "ships events" do
subject.multi_receive(events)

http_client.post("#{es_url}/_refresh").call
http_client.post("#{opensearch_url}/_refresh").call

response = http_client.get("#{index_url}/_count?q=*")
result = LogStash::Json.load(response.body)
Expand Down
14 changes: 7 additions & 7 deletions spec/integration/outputs/create_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
describe "client create actions", :integration => true do
require "logstash/outputs/opensearch"

def get_es_output(action, id, version=nil, version_type=nil)
def get_output(action, id, version=nil, version_type=nil)
settings = {
"manage_template" => true,
"index" => "logstash-create",
Expand All @@ -37,7 +37,7 @@ def get_es_output(action, id, version=nil, version_type=nil)

context "when action => create" do
it "should create new documents with or without id" do
subject = get_es_output("create", "id123")
subject = get_output("create", "id123")
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
@client.indices.refresh
Expand All @@ -49,27 +49,27 @@ def get_es_output(action, id, version=nil, version_type=nil)
end

it "should allow default (internal) version" do
subject = get_es_output("create", "id123", 43)
subject = get_output("create", "id123", 43)
subject.register
end

it "should allow internal version" do
subject = get_es_output("create", "id123", 43, "internal")
subject = get_output("create", "id123", 43, "internal")
subject.register
end

it "should not allow external version" do
subject = get_es_output("create", "id123", 43, "external")
subject = get_output("create", "id123", 43, "external")
expect { subject.register }.to raise_error(LogStash::ConfigurationError)
end

it "should not allow external_gt version" do
subject = get_es_output("create", "id123", 43, "external_gt")
subject = get_output("create", "id123", 43, "external_gt")
expect { subject.register }.to raise_error(LogStash::ConfigurationError)
end

it "should not allow external_gte version" do
subject = get_es_output("create", "id123", 43, "external_gte")
subject = get_output("create", "id123", 43, "external_gte")
expect { subject.register }.to raise_error(LogStash::ConfigurationError)
end
end
Expand Down
16 changes: 8 additions & 8 deletions spec/integration/outputs/delete_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
describe "Versioned delete", :integration => true do
require "logstash/outputs/opensearch"

let(:es) { get_client }
let(:client) { get_client }

before :each do
# Delete all templates first.
# Clean ES of data before we start.
es.indices.delete_template(:name => "*")
client.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
es.indices.delete(:index => "*") rescue nil
es.indices.refresh
client.indices.delete(:index => "*") rescue nil
client.indices.refresh
end

context "when delete only" do
Expand All @@ -48,25 +48,25 @@
it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = client.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 98)])
r2 = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r2 = client.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
expect(r2['_version']).to eq(99)
expect(r2['_source']['message']).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "index", "message" => "foo", "my_version" => 99)])
r = es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
r = client.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true)
expect(r['_version']).to eq(99)
expect(r['_source']['message']).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_action" => "delete", "message" => "foo", "my_version" => 100)])
expect { es.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound)
expect { client.get(:index => 'logstash-delete', :type => doc_type, :id => id, :refresh => true) }.to raise_error(OpenSearch::Transport::Transport::Errors::NotFound)
end
end
end
10 changes: 5 additions & 5 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@
let(:events) { event_count.times.map { event }.to_a }
subject { LogStash::Outputs::OpenSearch.new(config) }

let(:es_url) { "http://#{get_host_port}" }
let(:index_url) {"#{es_url}/#{index}"}
let(:opensearch_url) { "http://#{get_host_port}" }
let(:index_url) {"#{opensearch_url}/#{index}"}
let(:http_client_options) { {} }
let(:http_client) do
Manticore::Client.new(http_client_options)
Expand All @@ -78,7 +78,7 @@
it "ships events" do
subject.multi_receive(events)

http_client.post("#{es_url}/_refresh").call
http_client.post("#{opensearch_url}/_refresh").call

response = http_client.get("#{index_url}/_count?q=*")
result = LogStash::Json.load(response.body)
Expand Down Expand Up @@ -135,7 +135,7 @@
describe "a secured indexer", :secure_integration => true do
let(:user) { "admin" }
let(:password) { "admin" }
let(:es_url) {"https://integration:9200"}
let(:opensearch_url) {"https://integration:9200"}
let(:config) do
{
"hosts" => ["integration:9200"],
Expand Down Expand Up @@ -170,7 +170,7 @@
} }
let(:user) {options[:auth_type]["user"]}
let(:password) {options[:auth_type]["password"]}
let(:es_url) {"https://integration:9200"}
let(:opensearch_url) {"https://integration:9200"}
let(:config) do
{
"hosts" => ["integration:9200"],
Expand Down
22 changes: 11 additions & 11 deletions spec/integration/outputs/index_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
describe "Versioned indexing", :integration => true do
require "logstash/outputs/opensearch"

let(:es) { get_client }
let(:client) { get_client }

before :each do
# Delete all templates first.
# Clean OpenSearch of data before we start.
es.indices.delete_template(:name => "*")
client.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
es.indices.delete(:index => "*") rescue nil
es.indices.refresh
client.indices.delete(:index => "*") rescue nil
client.indices.refresh
end

context "when index only" do
Expand All @@ -46,11 +46,11 @@

it "should default to OpenSearch version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r = client.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
expect(r["_version"]).to eq(1)
expect(r["_source"]["message"]).to eq('foo')
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foobar")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
r2 = client.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
expect(r2["_version"]).to eq(2)
expect(r2["_source"]["message"]).to eq('foobar')
end
Expand All @@ -74,33 +74,33 @@
it "should respect the external version" do
id = "ev1"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = client.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')
end

it "should ignore non-monotonic external version updates" do
id = "ev2"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = client.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "98", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = client.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r2["_version"]).to eq(99)
expect(r2["_source"]["message"]).to eq('foo')
end

it "should commit monotonic external version updates" do
id = "ev3"
subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "99", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r = client.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r["_version"]).to eq(99)
expect(r["_source"]["message"]).to eq('foo')

subject.multi_receive([LogStash::Event.new("my_id" => id, "my_version" => "100", "message" => "foo")])
r2 = es.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
r2 = client.get(:index => 'logstash-index', :type => doc_type, :id => id, :refresh => true)
expect(r2["_version"]).to eq(100)
expect(r2["_source"]["message"]).to eq('foo')
end
Expand Down
2 changes: 0 additions & 2 deletions spec/integration/outputs/ingest_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@

before :each do
# Delete all templates first.
require "elasticsearch"

# Clean OpenSearch of data before we start.
@client = get_client
@client.indices.delete_template(:name => "*")
Expand Down
2 changes: 0 additions & 2 deletions spec/integration/outputs/metrics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
let(:document_level_metrics) { subject.instance_variable_get(:@document_level_metrics) }

before :each do
require "elasticsearch"

# Clean OpenSearch of data before we start.
@client = get_client
clean(@client)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

before :each do
# Delete all templates first.
require "elasticsearch"
allow(Stud).to receive(:stoppable_sleep)

# Clean OpenSearch of data before we start.
Expand Down
20 changes: 10 additions & 10 deletions spec/integration/outputs/painless_update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
describe "Update actions using painless scripts", :integration => true, :update_tests => 'painless' do
require "logstash/outputs/opensearch"

def get_es_output( options={} )
def get_output( options={} )
settings = {
"manage_template" => true,
"index" => "logstash-update",
Expand Down Expand Up @@ -42,7 +42,7 @@ def get_es_output( options={} )
context "scripted updates" do

it "should increment a counter with event/doc 'count' variable with inline script" do
subject = get_es_output({
subject = get_output({
'document_id' => "123",
'script' => 'ctx._source.counter += params.event.counter',
'script_type' => 'inline'
Expand All @@ -54,7 +54,7 @@ def get_es_output( options={} )
end

it "should increment a counter with event/doc 'count' variable with event/doc as upsert and inline script" do
subject = get_es_output({
subject = get_output({
'document_id' => "123",
'doc_as_upsert' => true,
'script' => 'if( ctx._source.containsKey("counter") ){ ctx._source.counter += params.event.counter; } else { ctx._source.counter = params.event.counter; }',
Expand All @@ -67,7 +67,7 @@ def get_es_output( options={} )
end

it "should, with new doc, set a counter with event/doc 'count' variable with event/doc as upsert and inline script" do
subject = get_es_output({
subject = get_output({
'document_id' => "456",
'doc_as_upsert' => true,
'script' => 'if( ctx._source.containsKey("counter") ){ ctx._source.counter += params.event.counter; } else { ctx._source.counter = params.event.counter; }',
Expand All @@ -90,7 +90,7 @@ def get_es_output( options={} )

plugin_parameters.merge!('script_lang' => '')

subject = get_es_output(plugin_parameters)
subject = get_output(plugin_parameters)
subject.register
subject.multi_receive([LogStash::Event.new("count" => 4 )])
r = @client.get(:index => 'logstash-update', :type => doc_type, :id => "123", :refresh => true)
Expand All @@ -101,23 +101,23 @@ def get_es_output( options={} )

context "when update with upsert" do
it "should create new documents with provided upsert" do
subject = get_es_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject = get_output({ 'document_id' => "456", 'upsert' => '{"message": "upsert message"}' })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @client.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('upsert message')
end

it "should create new documents with event/doc as upsert" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject = get_output({ 'document_id' => "456", 'doc_as_upsert' => true })
subject.register
subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
r = @client.get(:index => 'logstash-update', :type => doc_type, :id => "456", :refresh => true)
expect(r["_source"]["message"]).to eq('sample message here')
end

it "should fail on documents with event/doc as upsert at external version" do
subject = get_es_output({ 'document_id' => "456", 'doc_as_upsert' => true, 'version' => 999, "version_type" => "external" })
subject = get_output({ 'document_id' => "456", 'doc_as_upsert' => true, 'version' => 999, "version_type" => "external" })
expect { subject.register }.to raise_error(LogStash::ConfigurationError)
end
end
Expand All @@ -126,7 +126,7 @@ def get_es_output( options={} )

context 'with an inline script' do
it "should create new documents with upsert content" do
subject = get_es_output({ 'document_id' => "456", 'script' => 'ctx._source.counter = params.event.counter', 'upsert' => '{"message": "upsert message"}', 'script_type' => 'inline' })
subject = get_output({ 'document_id' => "456", 'script' => 'ctx._source.counter = params.event.counter', 'upsert' => '{"message": "upsert message"}', 'script_type' => 'inline' })
subject.register

subject.multi_receive([LogStash::Event.new("message" => "sample message here")])
Expand All @@ -135,7 +135,7 @@ def get_es_output( options={} )
end

it "should create new documents with event/doc as script params" do
subject = get_es_output({ 'document_id' => "456", 'script' => 'ctx._source.counter = params.event.counter', 'scripted_upsert' => true, 'script_type' => 'inline' })
subject = get_output({ 'document_id' => "456", 'script' => 'ctx._source.counter = params.event.counter', 'scripted_upsert' => true, 'script_type' => 'inline' })
subject.register
subject.multi_receive([LogStash::Event.new("counter" => 1)])
@client.indices.refresh
Expand Down
1 change: 0 additions & 1 deletion spec/integration/outputs/retry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def mock_actions_with_response(*resp)

before :each do
# Delete all templates first.
require "elasticsearch"
allow(Stud).to receive(:stoppable_sleep)

# Clean OpenSearch of data before we start.
Expand Down
Loading