Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support TLS by socket/server plugin helper #1423

Merged
merged 17 commits into from
Jan 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: ruby
cache: bundler

# script: bundle exec rake test TESTOPTS=-v
script: bundle exec rake test TESTOPTS=-v

# http://rubies.travis-ci.org/
# See here for osx_image -> OSX versions: https://docs.travis-ci.com/user/languages/objective-c
Expand Down Expand Up @@ -43,6 +43,7 @@ branches:
- v0.14

sudo: false
dist: trusty # for TLSv1.2 support

addons:
apt:
Expand Down
14 changes: 14 additions & 0 deletions example/in_forward_tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<source>
@type forward
port 24224
<transport tls>
insecure true
</transport>
</source>

<match test>
@type stdout
# <buffer>
# flush_interval 10s
# </buffer>
</match>
18 changes: 18 additions & 0 deletions example/out_forward_tls.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<source>
@type dummy
tag test
</source>

<match test>
@type forward
transport tls
tls_insecure_mode true
<server>
# first server
host 127.0.0.1
port 24224
</server>
<buffer>
flush_interval 0
</buffer>
</match>
4 changes: 4 additions & 0 deletions lib/fluent/config/section.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ def [](key)
@params[key.to_sym]
end

def []=(key, value)
@params[key.to_sym] = value
end

def respond_to?(symbol, include_all=false)
case symbol
when :inspect, :nil?, :to_h, :+, :instance_of?, :kind_of?, :[], :respond_to?, :respond_to_missing?
Expand Down
36 changes: 31 additions & 5 deletions lib/fluent/configurable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,8 @@ def initialize
end
end

def configure(conf)
@config = conf

logger = self.respond_to?(:log) ? log : (defined?($log) ? $log : nil)
def configure_proxy_generate
proxy = self.class.merged_configure_proxy
conf.corresponding_proxies << proxy

if self.respond_to?(:owner) && self.owner
owner_proxy = owner.class.merged_configure_proxy
Expand All @@ -63,6 +59,36 @@ def configure(conf)
proxy.overwrite_defaults(owner_proxy) if owner_proxy
end

proxy
end

def configured_section_create(name, conf = nil)
conf ||= Fluent::Config::Element.new(name.to_s, '', {}, [])
root_proxy = configure_proxy_generate
proxy = if name.nil? # root
root_proxy
else
root_proxy.sections[name]
end
# take care to raise Fluent::ConfigError if conf mismatched to proxy
Fluent::Config::SectionGenerator.generate(proxy, conf, nil, nil)
end

def configure(conf)
@config = conf

logger = if self.respond_to?(:log)
self.log
elsif self.respond_to?(:owner) && self.owner.respond_to?(:log)
self.owner.log
elsif defined?($log)
$log
else
nil
end
proxy = configure_proxy_generate
conf.corresponding_proxies << proxy

# In the nested section, can't get plugin class through proxies so get plugin class here
plugin_class = Fluent::Plugin.lookup_type_from_class(proxy.name.to_s)
root = Fluent::Config::SectionGenerator.generate(proxy, conf, logger, plugin_class)
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def start

shared_socket = system_config.workers > 1

log.info "listening a tcp port", port: @port, bind: @bind
log.info "listening port", port: @port, bind: @bind
server_create_connection(
:in_forward_server, @port,
bind: @bind,
Expand Down
2 changes: 1 addition & 1 deletion lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def start
super

@buffer = ''
server_create(:in_tcp_server, @port, proto: :tcp, bind: @bind) do |data, conn|
server_create(:in_tcp_server, @port, bind: @bind) do |data, conn|
@buffer << data
begin
pos = 0
Expand Down
96 changes: 81 additions & 15 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,17 @@ class ConnectionClosedError < Error; end

LISTEN_PORT = 24224

desc 'The transport protocol.'
config_param :transport, :enum, list: [:tcp, :tls], default: :tcp
# TODO: TLS session cache/tickets
# TODO: Connection keepalive

desc 'The timeout time when sending event logs.'
config_param :send_timeout, :time, default: 60
# TODO: add linger_timeout, recv_timeout

desc 'The transport protocol to use for heartbeats.(udp,tcp,none)'
config_param :heartbeat_type, :enum, list: [:tcp, :udp, :none], default: :tcp
desc 'The protocol to use for heartbeats (default is the same with "transport").'
config_param :heartbeat_type, :enum, list: [:transport, :tcp, :udp, :none], default: :transport
desc 'The interval of the heartbeat packer.'
config_param :heartbeat_interval, :time, default: 1
desc 'The wait time before accepting a server fault recovery.'
Expand Down Expand Up @@ -75,6 +80,19 @@ class ConnectionClosedError < Error; end
desc 'Compress buffered data.'
config_param :compress, :enum, list: [:text, :gzip], default: :text

desc 'The default version of TLS transport.'
config_param :tls_version, :enum, list: Fluent::PluginHelper::Socket::TLS_SUPPORTED_VERSIONS, default: Fluent::PluginHelper::Socket::TLS_DEFAULT_VERSION
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using tls_ prefix instead of <tls> is for avoiding confustion with <transport tls>?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you mentioned, these parameters are named in different way from servers' <transport> section.
I think that almost users just specify one or (at most) two parameters in these tls_ prefixed parameters (tls_cert_path, and sometimes tls_allow_self_signed_cert). Many users don't change the default TLS version nor TLS ciphers.
In such cases, tls_cert_path ... is much more simple than <tls> tls_cert_path ... </tls>.

desc 'The cipher configuration of TLS transport.'
config_param :tls_ciphers, :string, default: Fluent::PluginHelper::Socket::CIPHERS_DEFAULT
desc 'Skip all verification of certificates or not.'
config_param :tls_insecure_mode, :bool, default: false
desc 'Allow self signed certificates or not.'
config_param :tls_allow_self_signed_cert, :bool, default: false
desc 'Verify hostname of servers and certificates or not in TLS transport.'
config_param :tls_verify_hostname, :bool, default: true
desc 'The additional CA certificate path for TLS.'
config_param :tls_cert_path, :array, value_type: :string, default: nil

config_section :security, required: false, multi: false do
desc 'The hostname'
config_param :self_hostname, :string
Expand All @@ -85,7 +103,7 @@ class ConnectionClosedError < Error; end
config_section :server, param_name: :servers do
desc "The IP address or host name of the server."
config_param :host, :string
desc "The name of the server. Used in log messages."
desc "The name of the server. Used for logging and certificate verification in TLS transport (when host is address)."
config_param :name, :string, default: nil
desc "The port number of the host."
config_param :port, :integer, default: LISTEN_PORT
Expand Down Expand Up @@ -136,9 +154,29 @@ def configure(conf)
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval

if @heartbeat_type == :tcp
log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
@heartbeat_type = :transport
end

if @dns_round_robin
if @heartbeat_type == :udp
raise Fluent::ConfigError, "forward output heartbeat type must be 'tcp' or 'none' to use dns_round_robin option"
raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
end
end

if @transport == :tls
if @tls_cert_path && !@tls_cert_path.empty?
@tls_cert_path.each do |path|
raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
end
end

if @tls_insecure_mode
log.warn "TLS transport is configured in insecure way"
@tls_verify_hostname = false
@tls_allow_self_signed_cert = true
end
end

Expand Down Expand Up @@ -275,14 +313,34 @@ def select_a_healthy_node
raise NoNodesAvailable, "no nodes are available"
end

def create_transfer_socket(host, port, &block)
socket_create_tcp(
host, port,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
&block
)
def create_transfer_socket(host, port, hostname, &block)
case @transport
when :tls
socket_create_tls(
host, port,
version: @tls_version,
ciphers: @tls_ciphers,
insecure: @tls_insecure_mode,
verify_fqdn: @tls_verify_hostname,
fqdn: hostname,
allow_self_signed_cert: @tls_allow_self_signed_cert,
cert_paths: @tls_cert_path,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
&block
)
when :tcp
socket_create_tcp(
host, port,
linger_timeout: @send_timeout,
send_timeout: @send_timeout,
recv_timeout: @ack_response_timeout,
&block
)
else
raise "BUG: unknown transport protocol #{@transport}"
end
end

# MessagePack FixArray length is 3
Expand Down Expand Up @@ -458,6 +516,14 @@ def initialize(sender, server, failure:)
@available = true
@state = nil

# @hostname is used for certificate verification & TLS SNI
host_is_hostname = !(IPAddr.new(@host) rescue false)
@hostname = case
when host_is_hostname then @host
when @name then @name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using @name is correct way for certificate verification?
<server>name xxx</server> is not limitation for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I changed the meaning of name in <server> section.
But in past versions, this name is used to indicate the role (or other human readable thing) of this server node in logging.
The "human readable role" is just FQDN server name signed by server certificate, isn't it?

else nil
end

@usock = nil

@username = server.username
Expand Down Expand Up @@ -557,7 +623,7 @@ def send_data_actual(sock, tag, chunk)
end

def send_data(tag, chunk)
sock = @sender.create_transfer_socket(resolved_host, port)
sock = @sender.create_transfer_socket(resolved_host, port, @hostname)
begin
send_data_actual(sock, tag, chunk)
rescue
Expand Down Expand Up @@ -589,8 +655,8 @@ def send_heartbeat
end

case @sender.heartbeat_type
when :tcp
@sender.create_transfer_socket(dest_addr, port) do |sock|
when :transport
@sender.create_transfer_socket(dest_addr, port, @hostname) do |sock|
## don't send any data to not cause a compatibility problem
# sock.write FORWARD_TCP_HEARTBEAT_DATA

Expand Down
Loading