Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions lib/logstash/outputs/opensearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ def sniff!
update_urls(check_sniff)
end

ES1_SNIFF_RE_URL = /\[([^\/]*)?\/?([^:]*):([0-9]+)\]/
ES2_AND_ABOVE_SNIFF_RE_URL = /([^\/]*)?\/?([^:]*):([0-9]+)/
SNIFF_RE_URL = /([^\/]*)?\/?([^:]*):([0-9]+)/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to use ES2_AND_ABOVE_SNIFF_RE_URL?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

never mind. You are already using ES2_AND_ABOVE_SNIFF_RE_URL.

# Sniffs and returns the results. Does not update internal URLs!
def check_sniff
_, url_meta, resp = perform_request(:get, @sniffing_path)
Expand Down Expand Up @@ -197,7 +196,7 @@ def sniff(nodes)
end

def address_str_to_uri(addr_str)
matches = addr_str.match(ES1_SNIFF_RE_URL) || addr_str.match(ES2_AND_ABOVE_SNIFF_RE_URL)
matches = addr_str.match(SNIFF_RE_URL)
if matches
host = matches[1].empty? ? matches[2] : matches[1]
::LogStash::Util::SafeURI.new("#{host}:#{matches[3]}")
Expand All @@ -221,7 +220,7 @@ def start_resurrectionist
end

def health_check_request(url)
logger.debug("Running health check to see if an ES connection is working", url: url.sanitized.to_s, path: @healthcheck_path)
logger.debug("Running health check to see if an OpenSearch connection is working", url: url.sanitized.to_s, path: @healthcheck_path)
perform_request_to_url(url, :head, @healthcheck_path)
end

Expand All @@ -231,16 +230,16 @@ def healthcheck!
begin
health_check_request(url)
# If no exception was raised it must have succeeded!
logger.warn("Restored connection to ES instance", url: url.sanitized.to_s)
# We reconnected to this node, check its ES version
logger.warn("Restored connection to OpenSearch instance", url: url.sanitized.to_s)
# We reconnected to this node, check its version
version = get_version(url)
@state_mutex.synchronize do
meta[:version] = version
set_last_version(version, url)
meta[:state] = :alive
end
rescue HostUnreachableError, BadResponseCodeError => e
logger.warn("Attempted to resurrect connection to dead ES instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message)
logger.warn("Attempted to resurrect connection to dead OpenSearch instance, but got an error", url: url.sanitized.to_s, exception: e.class, message: e.message)
end
end
end
Expand Down Expand Up @@ -312,7 +311,7 @@ def update_urls(new_urls)
logger.info? && logger.info("OpenSearch pool URLs updated", :changes => state_changes)
end

# Run an inline healthcheck anytime URLs are updated
# Run an inline health check anytime URLs are updated
# This guarantees that during startup / post-startup
# sniffing we don't have idle periods waiting for the
# periodic sniffer to allow new hosts to come online
Expand Down
8 changes: 4 additions & 4 deletions lib/logstash/plugin_mixins/opensearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Common
DOC_SUCCESS_CODES = [200, 201]
DOC_CONFLICT_CODE = 409

# Perform some ES options validations and Build the HttpClient.
# Perform some OpenSearch options validations and Build the HttpClient.
# Note that this methods may sets the @user, @password, @hosts and @client ivars as a side effect.
# @return [HttpClient] the new http client
def build_client
Expand Down Expand Up @@ -53,7 +53,7 @@ def hosts_default?(hosts)
private :hosts_default?


# Plugin initialization extension point (after a successful ES connection).
# Plugin initialization extension point (after a successful OpenSearch connection).
def finish_register
end
protected :finish_register
Expand All @@ -70,7 +70,7 @@ def successful_connection?
!!maximum_seen_major_version
end

# launch a thread that waits for an initial successful connection to the ES cluster to call the given block
# launch a thread that waits for an initial successful connection to the OpenSearch cluster to call the given block
# @param block [Proc] the block to execute upon initial successful connection
# @return [Thread] the successful connection wait thread
def after_successful_connection(&block)
Expand Down Expand Up @@ -253,7 +253,7 @@ def safe_bulk(actions)
message = "Encountered a retryable error (will retry with exponential backoff)"

# We treat 429s as a special case because these really aren't errors, but
# rather just ES telling us to back off a bit, which we do.
# rather just OpenSearch telling us to back off a bit, which we do.
# The other retryable code is 503, which are true errors
# Even though we retry the user should be made aware of these
if e.response_code == 429
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/create_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def get_es_output(action, id, version=nil, version_type=nil)
before :each do
@es = get_client
# Delete all templates first.
# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
Expand Down
4 changes: 2 additions & 2 deletions spec/integration/outputs/index_version_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

before :each do
# Delete all templates first.
# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
es.indices.delete(:index => "*") rescue nil
Expand All @@ -44,7 +44,7 @@
}
end

it "should default to ES version" do
it "should default to OpenSearch version" do
subject.multi_receive([LogStash::Event.new("my_id" => "123", "message" => "foo")])
r = es.get(:index => 'logstash-index', :type => doc_type, :id => "123", :refresh => true)
expect(r["_version"]).to eq(1)
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/ingest_pipeline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
# Delete all templates first.
require "elasticsearch"

# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es = get_client
@es.indices.delete_template(:name => "*")

Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/metrics_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
before :each do
require "elasticsearch"

# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es = get_client
clean(@es)
subject.register
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/no_opensearch_on_startup_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
require "elasticsearch"
allow(Stud).to receive(:stoppable_sleep)

# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es = get_client
@es.indices.delete_template(:name => "*")
@es.indices.delete(:index => "*")
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/painless_update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_es_output( options={} )
before :each do
@es = get_client
# Delete all templates first.
# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/retry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def mock_actions_with_response(*resp)
require "elasticsearch"
allow(Stud).to receive(:stoppable_sleep)

# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es = get_client
@es.indices.delete_template(:name => "*")
@es.indices.delete(:index => "*")
Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/templates_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# Delete all templates first.
require "elasticsearch"

# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es = get_client
@es.indices.delete_template(:name => "*")

Expand Down
2 changes: 1 addition & 1 deletion spec/integration/outputs/update_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def get_es_output( options={} )
before :each do
@es = get_client
# Delete all templates first.
# Clean ES of data before we start.
# Clean OpenSearch of data before we start.
@es.indices.delete_template(:name => "*")
# This can fail if there are no indexes, ignore failure.
@es.indices.delete(:index => "*") rescue nil
Expand Down
2 changes: 1 addition & 1 deletion spec/unit/outputs/opensearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

allow(subject).to receive(:finish_register) # stub-out thread completion (to avoid error log entries)

# emulate 'successful' ES connection on the same thread
# emulate 'successful' OpenSearch connection on the same thread
allow(subject).to receive(:after_successful_connection) { |&block| block.call }.
and_return after_successful_connection_thread_mock
allow(subject).to receive(:stop_after_successful_connection_thread)
Expand Down