diff --git a/.gitignore b/.gitignore index 842eb0fc..6ae77e3d 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ tmp vendor/ synapse.jar +.ruby-version diff --git a/.travis.yml b/.travis.yml index ca11f3a5..bce1c9ba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,5 +1,8 @@ language: ruby cache: bundler +sudo: false rvm: - 1.9.3 - + - 2.0.0 + - 2.1.6 + - 2.2.2 diff --git a/Gemfile.lock b/Gemfile.lock index 694406f1..9452160f 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - synapse (0.11.1) + synapse (0.12.1) aws-sdk (~> 1.39) docker-api (~> 1.7.2) zk (~> 1.9.4) @@ -11,31 +11,31 @@ GEM specs: addressable (2.3.6) archive-tar-minitar (0.5.2) - aws-sdk (1.47.0) + aws-sdk (1.64.0) + aws-sdk-v1 (= 1.64.0) + aws-sdk-v1 (1.64.0) json (~> 1.4) nokogiri (>= 1.4.4) coderay (1.0.9) crack (0.4.2) safe_yaml (~> 1.0.0) - diff-lcs (1.2.4) + diff-lcs (1.2.5) docker-api (1.7.6) archive-tar-minitar excon (>= 0.28) json - excon (0.38.0) + excon (0.45.4) ffi (1.9.3-java) - json (1.8.1) - json (1.8.1-java) + json (1.8.3) little-plugger (1.1.3) logging (1.8.2) little-plugger (>= 1.1.3) multi_json (>= 1.8.4) method_source (0.8.2) - mini_portile (0.6.0) - multi_json (1.10.1) - nokogiri (1.6.2.1) - mini_portile (= 0.6.0) - nokogiri (1.6.2.1-java) + mini_portile (0.6.2) + multi_json (1.11.2) + nokogiri (1.6.6.2) + mini_portile (~> 0.6.0) pry (0.9.12.2) coderay (~> 1.0.5) method_source (~> 0.8) @@ -48,30 +48,29 @@ GEM pry-nav (0.2.3) pry (~> 0.9.10) rake (10.1.1) - rspec (2.14.1) - rspec-core (~> 2.14.0) - rspec-expectations (~> 2.14.0) - rspec-mocks (~> 2.14.0) - rspec-core (2.14.5) - rspec-expectations (2.14.2) - diff-lcs (>= 1.1.3, < 2.0) - rspec-mocks (2.14.3) + rspec (3.1.0) + rspec-core (~> 3.1.0) + rspec-expectations (~> 3.1.0) + rspec-mocks (~> 3.1.0) + rspec-core (3.1.7) + rspec-support (~> 3.1.0) + rspec-expectations (3.1.2) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.1.0) + rspec-mocks (3.1.3) + rspec-support (~> 3.1.0) + rspec-support (3.1.2) safe_yaml (1.0.3) slop (3.4.6) - slyphon-log4j (1.2.15) - slyphon-zookeeper_jar (3.3.5-java) spoon (0.0.4) ffi webmock (1.18.0) addressable (>= 2.3.6) crack (>= 0.3.2) - zk (1.9.4) + zk (1.9.5) logging (~> 1.8.2) zookeeper (~> 1.4.0) - zookeeper (1.4.8) - zookeeper (1.4.8-java) - slyphon-log4j (= 1.2.15) - slyphon-zookeeper_jar (= 3.3.5) + zookeeper (1.4.10) PLATFORMS java @@ -81,6 +80,9 @@ DEPENDENCIES pry pry-nav rake - rspec + rspec (~> 3.1.0) synapse! webmock + +BUNDLED WITH + 1.10.5 diff --git a/README.md b/README.md index 62c0492b..6fc933e4 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ In an environment like Amazon's EC2, all of the available workarounds are subopt * Round-robin DNS: Slow to converge, and doesn't work when applications cache DNS lookups (which is frequent) * Elastic IPs: slow to converge, limited in number, public-facing-only, which makes them less useful for internal services -* ELB: Again, public-facing only, and only useful for HTTP +* ELB: ultimately uses DNS (see above), can't tune load balancing, have to launch a new one for every service * region, autoscaling doesn't happen fast enough One solution to this problem is a discovery service, like [Apache Zookeeper](http://zookeeper.apache.org/). However, Zookeeper and similar services have their own problems: @@ -92,38 +92,50 @@ HAProxy will be transparently reloaded, and your application will keep running w ## Installation -Add this line to your application's Gemfile: +To download and run the synapse binary, first install a version of ruby. Then, +install synapse with: - gem 'synapse' - -And then execute: +```bash +$ mkdir -p /opt/smartstack/synapse +# If you are on Ruby 2.X use --no-document instead of --no-ri --no-rdoc +$ gem install synapse --install-dir /opt/smartstack/synapse --no-ri --no-rdoc +``` - $ bundle +This will download synapse and its dependencies into /opt/smartstack/synapse. You +might wish to omit the `--install-dir` flag to use your system's default gem +path, however this will require you to run `gem install synapse` with root +permissions. -Or install it yourself as: +You can now run the synapse binary like: - $ gem install synapse - +```bash +export GEM_PATH=/opt/smartstack/synapse +/opt/smartstack/synapse/bin/synapse --help +``` -Don't forget to install HAProxy prior to installing Synapse. +Don't forget to install HAProxy too. ## Configuration ## Synapse depends on a single config file in JSON format; it's usually called `synapse.conf.json`. -The file has two main sections. -The first is the `services` section, which lists the services you'd like to connect. -The second is the `haproxy` section, which specifies how to configure and interact with HAProxy. +The file has three main sections. + +1. [`services`](#services): lists the services you'd like to connect. +2. [`haproxy`](#haproxy): specifies how to configure and interact with HAProxy. +3. [`file_output`](#file) (optional): specifies where to write service state to on the filesystem. + ### Configuring a Service ### -The services are a hash, where the keys are the `name` of the service to be configured. +The `services` section is a hash, where the keys are the `name` of the service to be configured. The name is just a human-readable string; it will be used in logs and notifications. Each value in the services hash is also a hash, and should contain the following keys: -* `discovery`: how synapse will discover hosts providing this service (see below) +* [`discovery`](#discovery): how synapse will discover hosts providing this service (see below) * `default_servers`: the list of default servers providing this service; synapse uses these if no others can be discovered -* `haproxy`: how will the haproxy section for this service be configured +* [`haproxy`](#haproxysvc): how will the haproxy section for this service be configured + #### Service Discovery #### We've included a number of `watchers` which provide service discovery. @@ -209,6 +221,11 @@ If you do not list any default servers, no proxy will be created. The `default_servers` will also be used in addition to discovered servers if the `keep_default_servers` option is set. +If you do not list any `default_servers`, and all backends for a service +disappear then the previous known backends will be used. Disable this behavior +by unsetting `use_previous_backends`. + + #### The `haproxy` Section #### This section is its own hash, which should contain the following keys: @@ -218,30 +235,53 @@ This section is its own hash, which should contain the following keys: * `server_options`: the haproxy options for each `server` line of the service in HAProxy config; it may be left out. * `frontend`: additional lines passed to the HAProxy config in the `frontend` stanza of this service * `backend`: additional lines passed to the HAProxy config in the `backend` stanza of this service +* `backend_name`: The name of the generated HAProxy backend for this service + (defaults to the service's key in the `services` section) * `listen`: these lines will be parsed and placed in the correct `frontend`/`backend` section as applicable; you can put lines which are the same for the frontend and backend here. * `shared_frontend`: optional: haproxy configuration directives for a shared http frontend (see below) + ### Configuring HAProxy ### -The `haproxy` section of the config file has the following options: +The top level `haproxy` section of the config file has the following options: * `reload_command`: the command Synapse will run to reload HAProxy * `config_file_path`: where Synapse will write the HAProxy config file * `do_writes`: whether or not the config file will be written (default to `true`) * `do_reloads`: whether or not Synapse will reload HAProxy (default to `true`) +* `do_socket`: whether or not Synapse will use the HAProxy socket commands to prevent reloads (default to `true`) * `global`: options listed here will be written into the `global` section of the HAProxy config * `defaults`: options listed here will be written into the `defaults` section of the HAProxy config * `extra_sections`: additional, manually-configured `frontend`, `backend`, or `listen` stanzas * `bind_address`: force HAProxy to listen on this address (default is localhost) -* `shared_fronted`: (OPTIONAL) additional lines passed to the HAProxy config used to configure a shared HTTP frontend (see below) +* `shared_frontend`: (OPTIONAL) additional lines passed to the HAProxy config used to configure a shared HTTP frontend (see below) +* `restart_interval`: number of seconds to wait between restarts of haproxy (default: 2) +* `restart_jitter`: percentage, expressed as a float, of jitter to multiply the `restart_interval` by when determining the next + restart time. Use this to help prevent healthcheck storms when HAProxy restarts. (default: 0.0) +* `state_file_path`: full path on disk (e.g. /tmp/synapse/state.json) for caching haproxy state between reloads. + If provided, synapse will store recently seen backends at this location and can "remember" backends across both synapse and + HAProxy restarts. Any backends that are "down" in the reporter but listed in the cache will be put into HAProxy disabled (default: nil) +* `state_file_ttl`: the number of seconds that backends should be kept in the state file cache. + This only applies if `state_file_path` is provided (default: 86400) Note that a non-default `bind_address` can be dangerous. If you configure an `address:port` combination that is already in use on the system, haproxy will fail to start. + +### Configuring `file_output` ### + +This section controls whether or not synapse will write out service state +to the filesystem in json format. This can be used for services that want to +use discovery information but not go through HAProxy. + +* `output_directory`: the path to a directory on disk that service registrations +should be written to. + + ### HAProxy shared HTTP Frontend ### For HTTP-only services, it is not always necessary or desirable to dedicate a TCP port per service, since HAProxy can route traffic based on host headers. -To support this, the optional `shared_fronted` section can be added to both the `haproxy` section and each indvidual service definition. +To support this, the optional `shared_frontend` section can be added to both the `haproxy` section and each indvidual service definition. Synapse will concatenate them all into a single frontend section in the generated haproxy.cfg file. Note that synapse does not assemble the routing ACLs for you; you have to do that yourself based on your needs. This is probably most useful in combination with the `service_conf_dir` directive in a case where the individual service config files are being distributed by a configuration manager such as puppet or chef, or bundled into service packages. @@ -249,7 +289,8 @@ For example: ```yaml haproxy: - shared_frontend: "bind 127.0.0.1:8081" + shared_frontend: + - "bind 127.0.0.1:8081" reload_command: "service haproxy reload" config_file_path: "/etc/haproxy/haproxy.cfg" socket_file_path: "/var/run/haproxy.sock" @@ -268,7 +309,8 @@ For example: discovery: method: "zookeeper" path: "/nerve/services/service1" - hosts: "0.zookeeper.example.com:2181" + hosts: + - "0.zookeeper.example.com:2181" haproxy: server_options: "check inter 2s rise 3 fall 2" shared_frontend: @@ -287,7 +329,8 @@ For example: shared_frontend: - "acl is_service1 hdr_dom(host) -i service2.lb.example.com" - "use_backend service2 if is_service2 - backend: "mode http" + backend: + - "mode http" ``` @@ -322,29 +365,5 @@ Non-HTTP backends such as MySQL or RabbitMQ will obviously continue to need thei ### Creating a Service Watcher ### -If you'd like to create a new service watcher: - -1. Create a file for your watcher in `service_watcher` dir -2. Use the following template: -```ruby -require 'synapse/service_watcher/base' - -module Synapse - class NewWatcher < BaseWatcher - def start - # write code which begins running service discovery - end - - private - def validate_discovery_opts - # here, validate any required options in @discovery - end - end -end -``` - -3. Implement the `start` and `validate_discovery_opts` methods -4. Implement whatever additional methods your discovery requires - -When your watcher detects a list of new backends, they should be written to `@backends`. -You should then call `@synapse.configure` to force synapse to update the HAProxy config. +See the Service Watcher [README](lib/synapse/service_watcher/README.md) for +how to add new Service Watchers. diff --git a/lib/synapse.rb b/lib/synapse.rb index 0b18fd00..af85ef04 100644 --- a/lib/synapse.rb +++ b/lib/synapse.rb @@ -1,25 +1,34 @@ +require 'logger' +require 'json' + require "synapse/version" -require "synapse/service_watcher/base" +require "synapse/log" require "synapse/haproxy" +require "synapse/file_output" require "synapse/service_watcher" -require "synapse/log" - -require 'logger' -require 'json' -include Synapse module Synapse class Synapse + include Logging + def initialize(opts={}) # create the service watchers for all our services raise "specify a list of services to connect in the config" unless opts.has_key?('services') @service_watchers = create_service_watchers(opts['services']) - # create the haproxy object + # create objects that need to be notified of service changes + @config_generators = [] + # create the haproxy config generator, this is mandatory raise "haproxy config section is missing" unless opts.has_key?('haproxy') - @haproxy = Haproxy.new(opts['haproxy']) + @config_generators << Haproxy.new(opts['haproxy']) + + # possibly create a file manifestation for services that do not + # want to communicate via haproxy, e.g. cassandra + if opts.has_key?('file_output') + @config_generators << FileOutput.new(opts['file_output']) + end # configuration is initially enabled to configure on first loop @config_updated = true @@ -47,10 +56,15 @@ def run if @config_updated @config_updated = false - log.info "synapse: regenerating haproxy config" - @haproxy.update_config(@service_watchers) - else - sleep 1 + @config_generators.each do |config_generator| + log.info "synapse: configuring #{config_generator.name}" + config_generator.update_config(@service_watchers) + end + end + + sleep 1 + @config_generators.each do |config_generator| + config_generator.tick(@service_watchers) end loops += 1 diff --git a/lib/synapse/file_output.rb b/lib/synapse/file_output.rb new file mode 100644 index 00000000..b26846fb --- /dev/null +++ b/lib/synapse/file_output.rb @@ -0,0 +1,56 @@ +require 'fileutils' +require 'tempfile' + +module Synapse + class FileOutput + include Logging + attr_reader :opts, :name + + def initialize(opts) + unless opts.has_key?("output_directory") + raise ArgumentError, "flat file generation requires an output_directory key" + end + + begin + FileUtils.mkdir_p(opts['output_directory']) + rescue SystemCallError => err + raise ArgumentError, "provided output directory #{opts['output_directory']} is not present or creatable" + end + + @opts = opts + @name = 'file_output' + end + + def tick(watchers) + end + + def update_config(watchers) + watchers.each do |watcher| + write_backends_to_file(watcher.name, watcher.backends) + end + end + + def write_backends_to_file(service_name, new_backends) + data_path = File.join(@opts['output_directory'], "#{service_name}.json") + begin + old_backends = JSON.load(File.read(data_path)) + rescue Errno::ENOENT + old_backends = nil + end + + if old_backends == new_backends + # Prevent modifying the file unless something has actually changed + # This way clients can set watches on this file and update their + # internal state only when the smartstack state has actually changed + return false + else + # Atomically write new sevice configuration file + temp_path = File.join(@opts['output_directory'], + ".#{service_name}.json.tmp") + File.open(temp_path, 'w', 0644) {|f| f.write(new_backends.to_json)} + FileUtils.mv(temp_path, data_path) + return true + end + end + end +end diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index 2093bb99..2d2d577a 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -1,10 +1,11 @@ -require 'synapse/log' +require 'fileutils' +require 'json' require 'socket' module Synapse class Haproxy include Logging - attr_reader :opts + attr_reader :opts, :name # these come from the documentation for haproxy 1.5 # http://haproxy.1wt.eu/download/1.5/doc/configuration.txt @@ -43,6 +44,7 @@ class Haproxy "option abortonclose", "option accept-invalid-http-response", "option allbackups", + "option allredisp", "option checkcache", "option forceclose", "option forwardfor", @@ -173,6 +175,7 @@ class Haproxy "option accept-invalid-http-request", "option accept-invalid-http-response", "option allbackups", + "option allredisp", "option checkcache", "option clitcpka", "option contstats", @@ -386,6 +389,7 @@ class Haproxy "option accept-invalid-http-request", "option accept-invalid-http-response", "option allbackups", + "option allredisp", "option checkcache", "option clitcpka", "option contstats", @@ -523,20 +527,53 @@ def initialize(opts) end @opts = opts + @name = 'haproxy' + + @opts['do_writes'] = true unless @opts.key?('do_writes') + @opts['do_socket'] = true unless @opts.key?('do_socket') + @opts['do_reloads'] = true unless @opts.key?('do_reloads') # how to restart haproxy - @restart_interval = 2 + @restart_interval = @opts.fetch('restart_interval', 2).to_i + @restart_jitter = @opts.fetch('restart_jitter', 0).to_f @restart_required = true - @last_restart = Time.new(0) + + # virtual clock bookkeeping for controlling how often haproxy restarts + @time = 0 + @next_restart = @time # a place to store the parsed haproxy config from each watcher @watcher_configs = {} + + @state_file_path = @opts['state_file_path'] + @state_file_ttl = @opts.fetch('state_file_ttl', 60 * 60 * 24).to_i + @seen = {} + + unless @state_file_path.nil? + begin + @seen = JSON.load(File.read(@state_file_path)) + rescue StandardError => e + # It's ok if the state file doesn't exist + end + end + end + + def tick(watchers) + if @time % 60 == 0 && !@state_file_path.nil? + update_state_file(watchers) + end + + @time += 1 + + # We potentially have to restart if the restart was rate limited + # in the original call to update_config + restart if @opts['do_reloads'] && @restart_required end def update_config(watchers) # if we support updating backends, try that whenever possible if @opts['do_socket'] - update_backends(watchers) unless @restart_required + update_backends(watchers) else @restart_required = true end @@ -650,23 +687,50 @@ def generate_frontend_stanza(watcher, config) "\nfrontend #{watcher.name}", config.map {|c| "\t#{c}"}, "\tbind #{@opts['bind_address'] || 'localhost'}:#{watcher.haproxy['port']}", - "\tdefault_backend #{watcher.name}" + "\tdefault_backend #{watcher.haproxy.fetch('backend_name', watcher.name)}" ] end def generate_backend_stanza(watcher, config) + backends = {} + + # The ordering here is important. First we add all the backends in the + # disabled state... + @seen.fetch(watcher.name, []).each do |backend_name, backend| + backends[backend_name] = backend.merge('enabled' => false) + end + + # ... and then we overwite any backends that the watchers know about, + # setting the enabled state. + watcher.backends.each do |backend| + backend_name = construct_name(backend) + # If we have information in the state file that allows us to detect + # server option changes, use that to potentially force a restart + if backends.has_key?(backend_name) + old_backend = backends[backend_name] + if (old_backend.fetch('haproxy_server_options', "") != + backend.fetch('haproxy_server_options', "")) + log.info "synapse: restart required because haproxy_server_options changed for #{backend_name}" + @restart_required = true + end + end + backends[backend_name] = backend.merge('enabled' => true) + end + if watcher.backends.empty? - log.warn "synapse: no backends found for watcher #{watcher.name}" + log.debug "synapse: no backends found for watcher #{watcher.name}" end stanza = [ - "\nbackend #{watcher.name}", + "\nbackend #{watcher.haproxy.fetch('backend_name', watcher.name)}", config.map {|c| "\t#{c}"}, - watcher.backends.shuffle.map {|backend| - backend_name = construct_name(backend) + backends.keys.shuffle.map {|backend_name| + backend = backends[backend_name] b = "\tserver #{backend_name} #{backend['host']}:#{backend['port']}" b = "#{b} cookie #{backend_name}" unless config.include?('mode tcp') - b = "#{b} #{watcher.haproxy['server_options']}" + b = "#{b} #{watcher.haproxy['server_options']}" if watcher.haproxy['server_options'] + b = "#{b} #{backend['haproxy_server_options']}" if backend['haproxy_server_options'] + b = "#{b} disabled" unless backend['enabled'] b } ] end @@ -704,27 +768,26 @@ def update_backends(watchers) next if watcher.backends.empty? unless cur_backends.include? watcher.name - log.debug "synapse: restart required because we added new section #{watcher.name}" + log.info "synapse: restart required because we added new section #{watcher.name}" @restart_required = true - return + next end watcher.backends.each do |backend| backend_name = construct_name(backend) - unless cur_backends[watcher.name].include? backend_name - log.debug "synapse: restart required because we have a new backend #{watcher.name}/#{backend_name}" + if cur_backends[watcher.name].include? backend_name + enabled_backends[watcher.name] << backend_name + else + log.info "synapse: restart required because we have a new backend #{watcher.name}/#{backend_name}" @restart_required = true - return end - - enabled_backends[watcher.name] << backend_name end end # actually enable the enabled backends, and disable the disabled ones cur_backends.each do |section, backends| backends.each do |backend| - if enabled_backends[section].include? backend + if enabled_backends.fetch(section, []).include? backend command = "enable server #{section}/#{backend}\n" else command = "disable server #{section}/#{backend}\n" @@ -738,12 +801,10 @@ def update_backends(watchers) rescue StandardError => e log.warn "synapse: unknown error writing to socket" @restart_required = true - return else unless output == "\n" log.warn "synapse: socket command #{command} failed: #{output}" @restart_required = true - return end end end @@ -769,18 +830,24 @@ def write_config(new_config) end end - # restarts haproxy + # restarts haproxy if the time is right def restart - # sleep if we restarted too recently - delay = (@last_restart - Time.now) + @restart_interval - sleep(delay) if delay > 0 + if @time < @next_restart + log.info "synapse: at time #{@time} waiting until #{@next_restart} to restart" + return + end + + @next_restart = @time + @restart_interval + @next_restart += rand(@restart_jitter * @restart_interval + 1) # do the actual restart res = `#{opts['reload_command']}`.chomp - raise "failed to reload haproxy via #{opts['reload_command']}: #{res}" unless $?.success? + unless $?.success? + log.error "failed to reload haproxy via #{opts['reload_command']}: #{res}" + return + end log.info "synapse: restarted haproxy" - @last_restart = Time.now() @restart_required = false end @@ -793,5 +860,43 @@ def construct_name(backend) return name end + + def update_state_file(watchers) + log.info "synapse: writing state file" + + timestamp = Time.now.to_i + + # Remove stale backends + @seen.each do |watcher_name, backends| + backends.each do |backend_name, backend| + ts = backend.fetch('timestamp', 0) + delta = (timestamp - ts).abs + if delta > @state_file_ttl + log.info "synapse: expiring #{backend_name} with age #{delta}" + backends.delete(backend_name) + end + end + end + + # Remove any services which no longer have any backends + @seen = @seen.reject{|watcher_name, backends| backends.keys.length == 0} + + # Add backends from watchers + watchers.each do |watcher| + unless @seen.key?(watcher.name) + @seen[watcher.name] = {} + end + + watcher.backends.each do |backend| + backend_name = construct_name(backend) + @seen[watcher.name][backend_name] = backend.merge('timestamp' => timestamp) + end + end + + # Atomically write new state file + tmp_state_file_path = @state_file_path + ".tmp" + File.write(tmp_state_file_path, JSON.pretty_generate(@seen)) + FileUtils.mv(tmp_state_file_path, @state_file_path) + end end end diff --git a/lib/synapse/service_watcher.rb b/lib/synapse/service_watcher.rb index 6404b187..44227465 100644 --- a/lib/synapse/service_watcher.rb +++ b/lib/synapse/service_watcher.rb @@ -1,24 +1,8 @@ +require "synapse/log" require "synapse/service_watcher/base" -require "synapse/service_watcher/zookeeper" -require "synapse/service_watcher/ec2tag" -require "synapse/service_watcher/dns" -require "synapse/service_watcher/docker" -require "synapse/service_watcher/zookeeper_dns" -require "synapse/service_watcher/marathon" module Synapse class ServiceWatcher - - @watchers = { - 'base' => BaseWatcher, - 'zookeeper' => ZookeeperWatcher, - 'ec2tag' => EC2Watcher, - 'dns' => DnsWatcher, - 'docker' => DockerWatcher, - 'zookeeper_dns' => ZookeeperDnsWatcher, - 'marathon' => MarathonWatcher, - } - # the method which actually dispatches watcher creation requests def self.create(name, opts, synapse) opts['name'] = name @@ -27,10 +11,16 @@ def self.create(name, opts, synapse) unless opts.has_key?('discovery') && opts['discovery'].has_key?('method') discovery_method = opts['discovery']['method'] - raise ArgumentError, "Invalid discovery method #{discovery_method}" \ - unless @watchers.has_key?(discovery_method) - - return @watchers[discovery_method].new(opts, synapse) + watcher = begin + method = discovery_method.downcase + require "synapse/service_watcher/#{method}" + # zookeeper_dns => ZookeeperDnsWatcher, ec2tag => Ec2tagWatcher, etc ... + method_class = method.split('_').map{|x| x.capitalize}.join.concat('Watcher') + self.const_get("#{method_class}") + rescue Exception => e + raise ArgumentError, "Specified a discovery method of #{discovery_method}, which could not be found: #{e}" + end + return watcher.new(opts, synapse) end end end diff --git a/lib/synapse/service_watcher/README.md b/lib/synapse/service_watcher/README.md new file mode 100644 index 00000000..8c446521 --- /dev/null +++ b/lib/synapse/service_watcher/README.md @@ -0,0 +1,84 @@ +## Watcher Classes + +Watchers are the piece of Synapse that watch an external service registry +and reflect those changes in the local HAProxy state. Watchers should conform +to the interface specified by `BaseWatcher` and when your watcher has received +an update from the service registry you should call +`set_backends(new_backends)` to trigger a sync of your watcher state with local +HAProxy state. See the [`Backend Interface`](#backend_interface) section for +what service registrations Synapse understands. + +```ruby +require "synapse/service_watcher/base" + +class Synapse::ServiceWatcher + class MyWatcher < BaseWatcher + def start + # write code which begins running service discovery + end + + def stop + # write code which tears down the service discovery + end + + def ping? + # write code to check in on the health of the watcher + end + + private + def validate_discovery_opts + # here, validate any required options in @discovery + end + + ... setup watches, poll, etc ... and call set_backends when you have new + ... backends to set + + end +end +``` + +### Watcher Plugin Inteface +Synapse deduces both the class path and class name from the `method` key within +the watcher configuration. Every watcher is passed configuration with the +`method` key, e.g. `zookeeper` or `ec2tag`. + +#### Class Location +Synapse expects to find your class at `synapse/service_watcher/#{method}`. You +must make your watcher available at that path, and Synapse can "just work" and +find it. + +#### Class Name +These method strings are then transformed into class names via the following +function: + +``` +method_class = method.split('_').map{|x| x.capitalize}.join.concat('Watcher') +``` + +This has the effect of taking the method, splitting on '_', capitalizing each +part and recombining with an added 'Watcher' on the end. So `zookeeper_dns` +becomes `ZookeeperDnsWatcher`, and `zookeeper` becomes `Zookeeper`. Make sure +your class name is correct. + + +### Backend interface +Synapse understands the following fields in service backends (which are pulled +from the service registries): + +`host` (string): The hostname of the service instance + +`port` (integer): The port running the service on `host` + +`name` (string, optional): The human readable name to refer to this service instance by + +`weight` (float, optional): The weight that this backend should get when load +balancing to this service instance. Full support for updating HAProxy based on +this is still a WIP. + +`haproxy_server_options` (string, optional): Any haproxy server options +specific to this particular server. They will be applied to the generated +`server` line in the HAProxy configuration. If you want Synapse to react to +changes in these lines you will need to enable the `state_file_path` option +in the main synapse configuration. In general the HAProxy backend level +`haproxy.server_options` setting is preferred to setting this per server +in your backends. diff --git a/lib/synapse/service_watcher/base.rb b/lib/synapse/service_watcher/base.rb index 3d4b0d48..77aa0035 100644 --- a/lib/synapse/service_watcher/base.rb +++ b/lib/synapse/service_watcher/base.rb @@ -1,8 +1,9 @@ require 'synapse/log' +require 'set' -module Synapse +class Synapse::ServiceWatcher class BaseWatcher - include Logging + include Synapse::Logging LEADER_WARN_INTERVAL = 30 @@ -42,6 +43,10 @@ def initialize(opts={}, synapse) @keep_default_servers = opts['keep_default_servers'] || false + # If there are no default servers and a watcher reports no backends, then + # use the previous backends that we already know about. + @use_previous_backends = opts.fetch('use_previous_backends', true) + # set a flag used to tell the watchers to exit # this is not used in every watcher @should_exit = false @@ -95,13 +100,46 @@ def validate_discovery_opts end def set_backends(new_backends) - if @keep_default_servers - @backends = @default_servers + new_backends + # Aggregate and deduplicate all potential backend service instances. + new_backends = (new_backends + @default_servers) if @keep_default_servers + new_backends = new_backends.uniq {|b| + [b['host'], b['port'], b.fetch('name', '')] + } + + if new_backends.to_set == @backends.to_set + return false + end + + if new_backends.empty? + if @default_servers.empty? + if @use_previous_backends + # Discard this update + log.warn "synapse: no backends for service #{@name} and no default" \ + " servers for service #{@name}; using previous backends: #{@backends.inspect}" + return false + else + log.warn "synapse: no backends for service #{@name}, no default" \ + " servers for service #{@name} and 'use_previous_backends' is disabled;" \ + " dropping all backends" + @backends.clear + end + else + log.warn "synapse: no backends for service #{@name};" \ + " using default servers: #{@default_servers.inspect}" + @backends = @default_servers + end else + log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" @backends = new_backends end + + reconfigure! + + return true end + # Subclasses should not invoke this directly; it's only exposed so that it + # can be overridden in subclasses. def reconfigure! @synapse.reconfigure! end diff --git a/lib/synapse/service_watcher/dns.rb b/lib/synapse/service_watcher/dns.rb index d59c6971..898b866b 100644 --- a/lib/synapse/service_watcher/dns.rb +++ b/lib/synapse/service_watcher/dns.rb @@ -3,7 +3,7 @@ require 'thread' require 'resolv' -module Synapse +class Synapse::ServiceWatcher class DnsWatcher < BaseWatcher def start @check_interval = @discovery['check_interval'] || 30.0 @@ -89,21 +89,7 @@ def configure_backends(servers) end end - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name};" \ - " using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name};" \ - " using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - set_backends(new_backends) - end - - reconfigure! + set_backends(new_backends) end end end diff --git a/lib/synapse/service_watcher/docker.rb b/lib/synapse/service_watcher/docker.rb index 75c95434..0b967fd8 100644 --- a/lib/synapse/service_watcher/docker.rb +++ b/lib/synapse/service_watcher/docker.rb @@ -1,7 +1,7 @@ require "synapse/service_watcher/base" require 'docker' -module Synapse +class Synapse::ServiceWatcher class DockerWatcher < BaseWatcher def start @check_interval = @discovery['check_interval'] || 15.0 @@ -23,16 +23,10 @@ def validate_discovery_opts end def watch - last_containers = [] until @should_exit begin start = Time.now - current_containers = containers - unless last_containers == current_containers - last_containers = current_containers - configure_backends(last_containers) - end - + set_backends(containers) sleep_until_next_check(start) rescue Exception => e log.warn "synapse: error in watcher thread: #{e.inspect}" @@ -98,23 +92,5 @@ def containers log.warn "synapse: error while polling for containers: #{e.inspect}" [] end - - def configure_backends(new_backends) - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name};" \ - " using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name};" \ - " using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - set_backends(new_backends) - end - reconfigure! - end - end end diff --git a/lib/synapse/service_watcher/ec2tag.rb b/lib/synapse/service_watcher/ec2tag.rb index a3319c26..9df77490 100644 --- a/lib/synapse/service_watcher/ec2tag.rb +++ b/lib/synapse/service_watcher/ec2tag.rb @@ -1,8 +1,8 @@ require 'synapse/service_watcher/base' require 'aws-sdk' -module Synapse - class EC2Watcher < BaseWatcher +class Synapse::ServiceWatcher + class Ec2tagWatcher < BaseWatcher attr_reader :check_interval @@ -41,37 +41,33 @@ def validate_discovery_opts "Missing server_port_override for service #{@name} - which port are backends listening on?" end - unless @haproxy['server_port_override'].match(/^\d+$/) + unless @haproxy['server_port_override'].to_s.match(/^\d+$/) raise ArgumentError, "Invalid server_port_override value" end - # Required, but can use well-known environment variables. - %w[aws_access_key_id aws_secret_access_key aws_region].each do |attr| - unless (@discovery[attr] || ENV[attr.upcase]) - raise ArgumentError, "Missing #{attr} option or #{attr.upcase} environment variable" - end + # aws region is optional in the SDK, aws will use a default value if not provided + unless @discovery['aws_region'] || ENV['AWS_REGION'] + log.info "aws region is missing, will use default" + end + # access key id & secret are optional, might be using IAM instance profile for credentials + unless ((@discovery['aws_access_key_id'] || ENV['aws_access_key_id']) \ + && (@discovery['aws_secret_access_key'] || ENV['aws_secret_access_key'] )) + log.info "aws access key id & secret not set in config or env variables for service #{name}, will attempt to use IAM instance profile" end end def watch - last_backends = [] until @should_exit begin start = Time.now - current_backends = discover_instances - - if last_backends != current_backends + if set_backends(discover_instances) log.info "synapse: ec2tag watcher backends have changed." - last_backends = current_backends - configure_backends(current_backends) - else - log.info "synapse: ec2tag watcher backends are unchanged." end - - sleep_until_next_check(start) rescue Exception => e log.warn "synapse: error in ec2tag watcher thread: #{e.inspect}" log.warn e.backtrace + ensure + sleep_until_next_check(start) end end @@ -111,23 +107,6 @@ def instances_with_tags(tag_name, tag_value) .tagged_values(tag_value) .select { |i| i.status == :running } end - - def configure_backends(new_backends) - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name};" \ - " using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name};" \ - " using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - @backends = new_backends - end - @synapse.reconfigure! - end end end diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 4a88e077..8c72a573 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -1,13 +1,18 @@ require "synapse/service_watcher/base" +require 'thread' require 'zk' -module Synapse +class Synapse::ServiceWatcher class ZookeeperWatcher < BaseWatcher NUMBERS_RE = /^\d+$/ + @@zk_pool = {} + @@zk_pool_count = {} + @@zk_pool_lock = Mutex.new + def start - @zk_hosts = @discovery['hosts'].shuffle.join(',') + @zk_hosts = @discovery['hosts'].sort.join(',') @watcher = nil @zk = nil @@ -22,6 +27,8 @@ def stop end def ping? + # @zk being nil implies no session *or* a lost session, do not remove + # the check on @zk being truthy @zk && @zk.connected? end @@ -45,7 +52,7 @@ def create(path) @zk.create(path, ignore: :node_exists) end - # find the current backends at the discovery path; sets @backends + # find the current backends at the discovery path def discover log.info "synapse: discovering backends for service #{@name}" @@ -54,7 +61,7 @@ def discover node = @zk.get("#{@discovery['path']}/#{id}") begin - host, port, name = deserialize_service_instance(node.first) + host, port, name, weight = deserialize_service_instance(node.first) rescue StandardError => e log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" else @@ -65,35 +72,26 @@ def discover numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} + new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id, 'weight' => weight } end end - if new_backends.empty? - if @default_servers.empty? - log.warn "synapse: no backends and no default servers for service #{@name}; using previous backends: #{@backends.inspect}" - else - log.warn "synapse: no backends for service #{@name}; using default servers: #{@default_servers.inspect}" - @backends = @default_servers - end - else - log.info "synapse: discovered #{new_backends.length} backends for service #{@name}" - set_backends(new_backends) - end + set_backends(new_backends) end # sets up zookeeper callbacks if the data at the discovery path changes def watch return if @zk.nil? + log.debug "synapse: setting watch at #{@discovery['path']}" - @watcher.unsubscribe unless @watcher.nil? - @watcher = @zk.register(@discovery['path'], &watcher_callback) + @watcher = @zk.register(@discovery['path'], &watcher_callback) unless @watcher # Verify that we actually set up the watcher. unless @zk.exists?(@discovery['path'], :watch => true) log.error "synapse: zookeeper watcher path #{@discovery['path']} does not exist!" - raise RuntimeError.new('could not set a ZK watch on a node that should exist') + zk_cleanup end + log.debug "synapse: set watch at #{@discovery['path']}" end # handles the event that a watched path has changed in zookeeper @@ -103,26 +101,55 @@ def watcher_callback watch # Rediscover discover - # send a message to calling class to reconfigure - reconfigure! end end def zk_cleanup log.info "synapse: zookeeper watcher cleaning up" - @watcher.unsubscribe unless @watcher.nil? - @watcher = nil - - @zk.close! unless @zk.nil? - @zk = nil + begin + @watcher.unsubscribe unless @watcher.nil? + @watcher = nil + ensure + @@zk_pool_lock.synchronize { + if @@zk_pool.has_key?(@zk_hosts) + @@zk_pool_count[@zk_hosts] -= 1 + # Last thread to use the connection closes it + if @@zk_pool_count[@zk_hosts] == 0 + log.info "synapse: closing zk connection to #{@zk_hosts}" + begin + @zk.close! unless @zk.nil? + ensure + @@zk_pool.delete(@zk_hosts) + end + end + end + @zk = nil + } + end log.info "synapse: zookeeper watcher cleaned up successfully" end def zk_connect log.info "synapse: zookeeper watcher connecting to ZK at #{@zk_hosts}" - @zk = ZK.new(@zk_hosts) + + # Ensure that all Zookeeper watcher re-use a single zookeeper + # connection to any given set of zk hosts. + @@zk_pool_lock.synchronize { + unless @@zk_pool.has_key?(@zk_hosts) + log.info "synapse: creating pooled connection to #{@zk_hosts}" + @@zk_pool[@zk_hosts] = ZK.new(@zk_hosts, :timeout => 5, :thread => :per_callback) + @@zk_pool_count[@zk_hosts] = 1 + log.info "synapse: successfully created zk connection to #{@zk_hosts}" + else + @@zk_pool_count[@zk_hosts] += 1 + log.info "synapse: re-using existing zookeeper connection to #{@zk_hosts}" + end + } + + @zk = @@zk_pool[@zk_hosts] + log.info "synapse: retrieved zk connection to #{@zk_hosts}" # handle session expiry -- by cleaning up zk, this will make `ping?` # fail and so synapse will exit @@ -146,8 +173,9 @@ def deserialize_service_instance(data) host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') name = decoded['name'] || nil + weight = decoded['weight'] || nil - return host, port, name + return host, port, name, weight end end end diff --git a/lib/synapse/service_watcher/zookeeper_dns.rb b/lib/synapse/service_watcher/zookeeper_dns.rb index 31e54337..9acd9bc0 100644 --- a/lib/synapse/service_watcher/zookeeper_dns.rb +++ b/lib/synapse/service_watcher/zookeeper_dns.rb @@ -19,7 +19,7 @@ # for messages indicating that new servers are available, the check interval # has passed (triggering a re-resolve), or that the watcher should shut down. # The DNS watcher is responsible for the actual reconfiguring of backends. -module Synapse +class Synapse::ServiceWatcher class ZookeeperDnsWatcher < BaseWatcher # Valid messages that can be passed through the internal message queue @@ -46,7 +46,7 @@ class StopWatcher; end CHECK_INTERVAL_MESSAGE = CheckInterval.new end - class Dns < Synapse::DnsWatcher + class Dns < Synapse::ServiceWatcher::DnsWatcher # Overrides the discovery_servers method on the parent class attr_accessor :discovery_servers @@ -106,7 +106,7 @@ def validate_discovery_opts end end - class Zookeeper < Synapse::ZookeeperWatcher + class Zookeeper < Synapse::ServiceWatcher::ZookeeperWatcher def initialize(opts={}, synapse, message_queue) super(opts, synapse) diff --git a/lib/synapse/version.rb b/lib/synapse/version.rb index d48ce9e3..6e9338ee 100644 --- a/lib/synapse/version.rb +++ b/lib/synapse/version.rb @@ -1,3 +1,3 @@ module Synapse - VERSION = "0.11.1" + VERSION = "0.12.1" end diff --git a/spec/lib/synapse/haproxy_spec.rb b/spec/lib/synapse/haproxy_spec.rb index a90d2ec7..5e87e296 100644 --- a/spec/lib/synapse/haproxy_spec.rb +++ b/spec/lib/synapse/haproxy_spec.rb @@ -14,6 +14,15 @@ class MockWatcher; end; mockWatcher end + let(:mockwatcher_with_server_options) do + mockWatcher = double(Synapse::ServiceWatcher) + allow(mockWatcher).to receive(:name).and_return('example_service') + backends = [{ 'host' => 'somehost', 'port' => '5555', 'haproxy_server_options' => 'backup'}] + allow(mockWatcher).to receive(:backends).and_return(backends) + allow(mockWatcher).to receive(:haproxy).and_return({'server_options' => "check inter 2000 rise 3 fall 2"}) + mockWatcher + end + it 'updating the config' do expect(subject).to receive(:generate_config) subject.update_config([mockwatcher]) @@ -29,4 +38,8 @@ class MockWatcher; end; expect(subject.generate_backend_stanza(mockwatcher, mockConfig)).to eql(["\nbackend example_service", ["\tmode tcp"], ["\tserver somehost:5555 somehost:5555 check inter 2000 rise 3 fall 2"]]) end + it 'respects haproxy_server_options' do + mockConfig = [] + expect(subject.generate_backend_stanza(mockwatcher_with_server_options, mockConfig)).to eql(["\nbackend example_service", [], ["\tserver somehost:5555 somehost:5555 cookie somehost:5555 check inter 2000 rise 3 fall 2 backup"]]) + end end diff --git a/spec/lib/synapse/service_watcher_base_spec.rb b/spec/lib/synapse/service_watcher_base_spec.rb index af3e8172..663185ca 100644 --- a/spec/lib/synapse/service_watcher_base_spec.rb +++ b/spec/lib/synapse/service_watcher_base_spec.rb @@ -1,12 +1,12 @@ require 'spec_helper' -class Synapse::BaseWatcher +class Synapse::ServiceWatcher::BaseWatcher attr_reader :should_exit, :default_servers end -describe Synapse::BaseWatcher do +describe Synapse::ServiceWatcher::BaseWatcher do let(:mocksynapse) { double() } - subject { Synapse::BaseWatcher.new(args, mocksynapse) } + subject { Synapse::ServiceWatcher::BaseWatcher.new(args, mocksynapse) } let(:testargs) { { 'name' => 'foo', 'discovery' => { 'method' => 'base' }, 'haproxy' => {} }} def remove_arg(name) @@ -37,18 +37,76 @@ def remove_arg(name) end end - context "with default_servers" do - default_servers = ['server1', 'server2'] + context 'set_backends test' do + default_servers = [ + {'name' => 'default_server1', 'host' => 'default_server1', 'port' => 123}, + {'name' => 'default_server2', 'host' => 'default_server2', 'port' => 123} + ] + backends = [ + {'name' => 'server1', 'host' => 'server1', 'port' => 123}, + {'name' => 'server2', 'host' => 'server2', 'port' => 123} + ] let(:args) { testargs.merge({'default_servers' => default_servers}) } - it('sets default backends to default_servers') { expect(subject.backends).to equal(default_servers) } - context "with keep_default_servers set" do - let(:args) { testargs.merge({'default_servers' => default_servers, 'keep_default_servers' => true}) } - let(:new_backends) { ['discovered1', 'discovered2'] } + it 'sets backends' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends) + end + + it 'removes duplicate backends' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + duplicate_backends = backends + backends + expect(subject.send(:set_backends, duplicate_backends)).to equal(true) + expect(subject.backends).to eq(backends) + end + + it 'sets backends to default_servers if no backends discovered' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, [])).to equal(true) + expect(subject.backends).to eq(default_servers) + end + + context 'with no default_servers' do + let(:args) { remove_arg 'default_servers' } + it 'uses previous backends if no default_servers set' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.send(:set_backends, [])).to equal(false) + expect(subject.backends).to eq(backends) + end + end + + context 'with no default_servers set and use_previous_backends disabled' do + let(:args) { + remove_arg 'default_servers' + testargs.merge({'use_previous_backends' => false}) + } + it 'removes all backends if no default_servers set and use_previous_backends disabled' do + expect(subject).to receive(:'reconfigure!').exactly(:twice) + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends) + expect(subject.send(:set_backends, [])).to equal(true) + expect(subject.backends).to eq([]) + end + end + + it 'calls reconfigure only once for duplicate backends' do + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends) + expect(subject.send(:set_backends, backends)).to equal(false) + expect(subject.backends).to eq(backends) + end + context 'with keep_default_servers set' do + let(:args) { + testargs.merge({'default_servers' => default_servers, 'keep_default_servers' => true}) + } it('keeps default_servers when setting backends') do - subject.send(:set_backends, new_backends) - expect(subject.backends).to eq(default_servers + new_backends) + expect(subject).to receive(:'reconfigure!').exactly(:once) + expect(subject.send(:set_backends, backends)).to equal(true) + expect(subject.backends).to eq(backends + default_servers) end end end diff --git a/spec/lib/synapse/service_watcher_docker_spec.rb b/spec/lib/synapse/service_watcher_docker_spec.rb index e5411222..635ccce7 100644 --- a/spec/lib/synapse/service_watcher_docker_spec.rb +++ b/spec/lib/synapse/service_watcher_docker_spec.rb @@ -1,13 +1,14 @@ require 'spec_helper' +require 'synapse/service_watcher/docker' -class Synapse::DockerWatcher +class Synapse::ServiceWatcher::DockerWatcher attr_reader :check_interval, :watcher, :synapse attr_accessor :default_servers end -describe Synapse::DockerWatcher do +describe Synapse::ServiceWatcher::DockerWatcher do let(:mocksynapse) { double() } - subject { Synapse::DockerWatcher.new(testargs, mocksynapse) } + subject { Synapse::ServiceWatcher::DockerWatcher.new(testargs, mocksynapse) } let(:testargs) { { 'name' => 'foo', 'discovery' => { 'method' => 'docker', 'servers' => [{'host' => 'server1.local', 'name' => 'mainserver'}], 'image_name' => 'mycool/image', 'container_port' => 6379 }, 'haproxy' => {} }} before(:each) do allow(subject.log).to receive(:warn) @@ -46,12 +47,7 @@ def add_arg(name, value) end it('has a happy first run path, configuring backends') do expect(subject).to receive(:containers).and_return(['container1']) - expect(subject).to receive(:configure_backends).with(['container1']) - subject.send(:watch) - end - it('does not call configure_backends if there is no change') do - expect(subject).to receive(:containers).and_return([]) - expect(subject).to_not receive(:configure_backends) + expect(subject).to receive(:set_backends).with(['container1']) subject.send(:watch) end end @@ -65,33 +61,6 @@ def add_arg(name, value) end end - context "configure_backends tests" do - before(:each) do - expect(subject.synapse).to receive(:'reconfigure!').at_least(:once) - end - it 'runs' do - expect { subject.send(:configure_backends, []) }.not_to raise_error - end - it 'sets backends right' do - subject.send(:configure_backends, ['foo']) - expect(subject.backends).to eq(['foo']) - end - it 'resets to default backends if no container found' do - subject.default_servers = ['fallback1'] - subject.send(:configure_backends, ['foo']) - expect(subject.backends).to eq(['foo']) - subject.send(:configure_backends, []) - expect(subject.backends).to eq(['fallback1']) - end - it 'does not reset to default backends if there are no default backends' do - subject.default_servers = [] - subject.send(:configure_backends, ['foo']) - expect(subject.backends).to eq(['foo']) - subject.send(:configure_backends, []) - expect(subject.backends).to eq(['foo']) - end - end - context "rewrite_container_ports tests" do it 'doesnt break if Ports => nil' do subject.send(:rewrite_container_ports, nil) diff --git a/spec/lib/synapse/service_watcher_ec2tags_spec.rb b/spec/lib/synapse/service_watcher_ec2tags_spec.rb index 40ff8df3..bbcf15bd 100644 --- a/spec/lib/synapse/service_watcher_ec2tags_spec.rb +++ b/spec/lib/synapse/service_watcher_ec2tags_spec.rb @@ -1,7 +1,8 @@ require 'spec_helper' +require 'synapse/service_watcher/ec2tag' require 'logging' -class Synapse::EC2Watcher +class Synapse::ServiceWatcher::Ec2tagWatcher attr_reader :synapse attr_accessor :default_servers, :ec2 end @@ -28,9 +29,9 @@ def fake_address end end -describe Synapse::EC2Watcher do +describe Synapse::ServiceWatcher::Ec2tagWatcher do let(:mock_synapse) { double } - subject { Synapse::EC2Watcher.new(basic_config, mock_synapse) } + subject { Synapse::ServiceWatcher::Ec2tagWatcher.new(basic_config, mock_synapse) } let(:basic_config) do { 'name' => 'ec2tagtest', @@ -86,24 +87,24 @@ def munge_haproxy_arg(name, new_value) end context 'when missing arguments' do - it 'complains if aws_region is missing' do + it 'does not break if aws_region is missing' do expect { - Synapse::EC2Watcher.new(remove_discovery_arg('aws_region'), mock_synapse) - }.to raise_error(ArgumentError, /Missing aws_region/) + Synapse::ServiceWatcher::Ec2tagWatcher.new(remove_discovery_arg('aws_region'), mock_synapse) + }.not_to raise_error end - it 'complains if aws_access_key_id is missing' do + it 'does not break if aws_access_key_id is missing' do expect { - Synapse::EC2Watcher.new(remove_discovery_arg('aws_access_key_id'), mock_synapse) - }.to raise_error(ArgumentError, /Missing aws_access_key_id/) + Synapse::ServiceWatcher::Ec2tagWatcher.new(remove_discovery_arg('aws_access_key_id'), mock_synapse) + }.not_to raise_error end - it 'complains if aws_secret_access_key is missing' do + it 'does not break if aws_secret_access_key is missing' do expect { - Synapse::EC2Watcher.new(remove_discovery_arg('aws_secret_access_key'), mock_synapse) - }.to raise_error(ArgumentError, /Missing aws_secret_access_key/) + Synapse::ServiceWatcher::Ec2tagWatcher.new(remove_discovery_arg('aws_secret_access_key'), mock_synapse) + }.not_to raise_error end it 'complains if server_port_override is missing' do expect { - Synapse::EC2Watcher.new(remove_haproxy_arg('server_port_override'), mock_synapse) + Synapse::ServiceWatcher::Ec2tagWatcher.new(remove_haproxy_arg('server_port_override'), mock_synapse) }.to raise_error(ArgumentError, /Missing server_port_override/) end end @@ -111,7 +112,7 @@ def munge_haproxy_arg(name, new_value) context 'invalid data' do it 'complains if the haproxy server_port_override is not a number' do expect { - Synapse::EC2Watcher.new(munge_haproxy_arg('server_port_override', '80deadbeef'), mock_synapse) + Synapse::ServiceWatcher::Ec2tagWatcher.new(munge_haproxy_arg('server_port_override', '80deadbeef'), mock_synapse) }.to raise_error(ArgumentError, /Invalid server_port_override/) end end @@ -121,6 +122,27 @@ def munge_haproxy_arg(name, new_value) let(:instance1) { FakeAWSInstance.new } let(:instance2) { FakeAWSInstance.new } + context 'watch' do + + it 'discovers instances, configures backends, then sleeps' do + fake_backends = [1,2,3] + expect(subject).to receive(:discover_instances).and_return(fake_backends) + expect(subject).to receive(:set_backends).with(fake_backends) { subject.stop } + expect(subject).to receive(:sleep_until_next_check) + subject.send(:watch) + end + + it 'sleeps until next check if discover_instances fails' do + expect(subject).to receive(:discover_instances) do + subject.stop + raise "discover failed" + end + expect(subject).to receive(:sleep_until_next_check) + subject.send(:watch) + end + + end + context 'using the AWS API' do let(:ec2_client) { double('AWS::EC2') } let(:instance_collection) { double('AWS::EC2::InstanceCollection') } @@ -135,11 +157,11 @@ def munge_haproxy_arg(name, new_value) # done remotely; breaking into separate calls would result in # unnecessary data being retrieved. - subject.ec2.should_receive(:instances).and_return(instance_collection) + expect(subject.ec2).to receive(:instances).and_return(instance_collection) - instance_collection.should_receive(:tagged).with('foo').and_return(instance_collection) - instance_collection.should_receive(:tagged_values).with('bar').and_return(instance_collection) - instance_collection.should_receive(:select).and_return(instance_collection) + expect(instance_collection).to receive(:tagged).with('foo').and_return(instance_collection) + expect(instance_collection).to receive(:tagged_values).with('bar').and_return(instance_collection) + expect(instance_collection).to receive(:select).and_return(instance_collection) subject.send(:instances_with_tags, 'foo', 'bar') end @@ -147,27 +169,29 @@ def munge_haproxy_arg(name, new_value) context 'returned backend data structure' do before do - subject.stub(:instances_with_tags).and_return([instance1, instance2]) + allow(subject).to receive(:instances_with_tags).and_return([instance1, instance2]) end let(:backends) { subject.send(:discover_instances) } it 'returns an Array of backend name/host/port Hashes' do - - expect { backends.all? {|b| %w[name host port].each {|k| b.has_key?(k) }} }.to be_true + required_keys = %w[name host port] + expect( + backends.all?{|b| required_keys.each{|k| b.has_key?(k)}} + ).to be_truthy end it 'sets the backend port to server_port_override for all backends' do backends = subject.send(:discover_instances) - expect { + expect( backends.all? { |b| b['port'] == basic_config['haproxy']['server_port_override'] } - }.to be_true + ).to be_truthy end end context 'returned instance fields' do before do - subject.stub(:instances_with_tags).and_return([instance1]) + allow(subject).to receive(:instances_with_tags).and_return([instance1]) end let(:backend) { subject.send(:discover_instances).pop } @@ -181,40 +205,5 @@ def munge_haproxy_arg(name, new_value) end end end - - context "configure_backends tests" do - let(:backend1) { { 'name' => 'foo', 'host' => 'foo.backend.tld', 'port' => '123' } } - let(:backend2) { { 'name' => 'bar', 'host' => 'bar.backend.tld', 'port' => '456' } } - let(:fallback) { { 'name' => 'fall', 'host' => 'fall.backend.tld', 'port' => '789' } } - - before(:each) do - expect(subject.synapse).to receive(:'reconfigure!').at_least(:once) - end - - it 'runs' do - expect { subject.send(:configure_backends, []) }.not_to raise_error - end - - it 'sets backends correctly' do - subject.send(:configure_backends, [ backend1, backend2 ]) - expect(subject.backends).to eq([ backend1, backend2 ]) - end - - it 'resets to default backends if no instances are found' do - subject.default_servers = [ fallback ] - subject.send(:configure_backends, [ backend1 ]) - expect(subject.backends).to eq([ backend1 ]) - subject.send(:configure_backends, []) - expect(subject.backends).to eq([ fallback ]) - end - - it 'does not reset to default backends if there are no default backends' do - subject.default_servers = [] - subject.send(:configure_backends, [ backend1 ]) - expect(subject.backends).to eq([ backend1 ]) - subject.send(:configure_backends, []) - expect(subject.backends).to eq([ backend1 ]) - end - end end diff --git a/spec/lib/synapse/service_watcher_spec.rb b/spec/lib/synapse/service_watcher_spec.rb new file mode 100644 index 00000000..8d259d45 --- /dev/null +++ b/spec/lib/synapse/service_watcher_spec.rb @@ -0,0 +1,92 @@ +require 'spec_helper' +require 'synapse/service_watcher' + +describe Synapse::ServiceWatcher do + let(:mock_synapse) { double } + subject { Synapse::ServiceWatcher } + let(:config) do + { + 'haproxy' => { + 'port' => '8080', + 'server_port_override' => '8081', + }, + 'discovery' => { + 'method' => 'test' + } + } + end + + def replace_discovery(new_value) + args = config.clone + args['discovery'] = new_value + args + end + + context 'bogus arguments' do + it 'complains if discovery method is bogus' do + expect { + subject.create('test', config, mock_synapse) + }.to raise_error(ArgumentError) + end + end + + context 'service watcher dispatch' do + let (:zookeeper_config) {{ + 'method' => 'zookeeper', + 'hosts' => 'localhost:2181', + 'path' => '/smartstack', + }} + let (:dns_config) {{ + 'method' => 'dns', + 'servers' => ['localhost'], + }} + let (:docker_config) {{ + 'method' => 'docker', + 'servers' => 'localhost', + 'image_name' => 'servicefoo', + 'container_port' => 1234, + }} + let (:ec2_config) {{ + 'method' => 'ec2tag', + 'tag_name' => 'footag', + 'tag_value' => 'barvalue', + 'aws_access_key_id' => 'bogus', + 'aws_secret_access_key' => 'morebogus', + 'aws_region' => 'evenmorebogus', + }} + let (:zookeeper_dns_config) {{ + 'method' => 'zookeeper_dns', + 'hosts' => 'localhost:2181', + 'path' => '/smartstack', + }} + + it 'creates zookeeper correctly' do + expect { + subject.create('test', replace_discovery(zookeeper_config), mock_synapse) + }.not_to raise_error + end + it 'creates dns correctly' do + expect { + subject.create('test', replace_discovery(dns_config), mock_synapse) + }.not_to raise_error + end + it 'creates docker correctly' do + expect { + subject.create('test', replace_discovery(docker_config), mock_synapse) + }.not_to raise_error + end + it 'creates ec2tag correctly' do + expect { + subject.create('test', replace_discovery(ec2_config), mock_synapse) + }.not_to raise_error + end + it 'creates zookeeper_dns correctly' do + expect { + subject.create('test', replace_discovery(zookeeper_dns_config), mock_synapse) + }.not_to raise_error + end + end + +end + + diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index a0cb822a..9bce69ff 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -10,11 +10,16 @@ require 'webmock/rspec' RSpec.configure do |config| - config.treat_symbols_as_metadata_keys_with_true_values = true config.run_all_when_everything_filtered = true config.filter_run :focus config.include Configuration + # verify every double we can think of + config.mock_with :rspec do |mocks| + mocks.verify_doubled_constant_names = true + mocks.verify_partial_doubles = true + end + # Run specs in random order to surface order dependencies. If you find an # order dependency and want to debug it, you can fix the order by providing # the seed, which is printed after each run. diff --git a/spec/support/configuration.rb b/spec/support/configuration.rb index 1153523d..68dc4b49 100644 --- a/spec/support/configuration.rb +++ b/spec/support/configuration.rb @@ -1,9 +1,7 @@ require "yaml" module Configuration - def config @config ||= YAML::load_file(File.join(File.dirname(File.expand_path(__FILE__)), 'minimum.conf.yaml')) end - end diff --git a/synapse.gemspec b/synapse.gemspec index 36d946c6..f7b548d5 100644 --- a/synapse.gemspec +++ b/synapse.gemspec @@ -21,7 +21,7 @@ Gem::Specification.new do |gem| gem.add_runtime_dependency "zk", "~> 1.9.4" gem.add_development_dependency "rake" - gem.add_development_dependency "rspec" + gem.add_development_dependency "rspec", "~> 3.1.0" gem.add_development_dependency "pry" gem.add_development_dependency "pry-nav" gem.add_development_dependency "webmock"