diff --git a/lib/synapse/haproxy.rb b/lib/synapse/haproxy.rb index b1c0beb4..0fb199ce 100644 --- a/lib/synapse/haproxy.rb +++ b/lib/synapse/haproxy.rb @@ -1,7 +1,5 @@ require 'synapse/log' - require 'socket' -require 'digest' module Synapse class Haproxy @@ -750,8 +748,12 @@ def restart # used to build unique, consistent haproxy names for backends def construct_name(backend) - address_digest = Digest::SHA256.hexdigest(backend['host'])[0..7] - return "#{backend['name']}:#{backend['port']}_#{address_digest}" + name = "#{backend['host']}:#{backend['port']}" + if backend['name'] && !backend['name'].empty? + name = "#{name}_#{backend['name']}" + end + + return name end end end diff --git a/lib/synapse/service_watcher/base.rb b/lib/synapse/service_watcher/base.rb index 895098ad..c0193a43 100644 --- a/lib/synapse/service_watcher/base.rb +++ b/lib/synapse/service_watcher/base.rb @@ -3,7 +3,10 @@ module Synapse class BaseWatcher include Logging - attr_reader :name, :backends, :haproxy + + LEADER_WARN_INTERVAL = 30 + + attr_reader :name, :haproxy def initialize(opts={}, synapse) super() @@ -18,6 +21,9 @@ def initialize(opts={}, synapse) @name = opts['name'] @discovery = opts['discovery'] + @leader_election = opts['leader_election'] || false + @leader_last_warn = Time.now - LEADER_WARN_INTERVAL + # the haproxy config @haproxy = opts['haproxy'] @haproxy['server_options'] ||= "" @@ -58,6 +64,26 @@ def ping? true end + def backends + if @leader_election + if @backends.all?{|b| b.key?('id') && b['id']} + smallest = @backends.sort_by{ |b| b['id']}.first + log.debug "synapse: leader election chose one of #{@backends.count} backends " \ + "(#{smallest['host']}:#{smallest['port']} with id #{smallest['id']})" + + return [smallest] + elsif (Time.now - @leader_last_warn) > LEADER_WARN_INTERVAL + log.warn "synapse: service #{@name}: leader election failed; not all backends include an id" + @leader_last_warn = Time.now + end + + # if leader election fails, return no backends + return [] + end + + return @backends + end + private def validate_discovery_opts raise ArgumentError, "invalid discovery method '#{@discovery['method']}' for base watcher" \ diff --git a/lib/synapse/service_watcher/dns.rb b/lib/synapse/service_watcher/dns.rb index 78bc846b..b9c70559 100644 --- a/lib/synapse/service_watcher/dns.rb +++ b/lib/synapse/service_watcher/dns.rb @@ -78,7 +78,6 @@ def configure_backends(servers) new_backends = servers.flat_map do |(server, addresses)| addresses.map do |address| { - 'name' => server['name'], 'host' => address, 'port' => server['port'] } diff --git a/lib/synapse/service_watcher/zookeeper.rb b/lib/synapse/service_watcher/zookeeper.rb index 9a994720..59257ed2 100644 --- a/lib/synapse/service_watcher/zookeeper.rb +++ b/lib/synapse/service_watcher/zookeeper.rb @@ -4,6 +4,8 @@ module Synapse class ZookeeperWatcher < BaseWatcher + NUMBERS_RE = /^\d+$/ + def start zk_hosts = @discovery['hosts'].shuffle.join(',') @@ -53,18 +55,22 @@ def discover new_backends = [] begin - @zk.children(@discovery['path'], :watch => true).map do |name| - node = @zk.get("#{@discovery['path']}/#{name}") + @zk.children(@discovery['path'], :watch => true).each do |id| + node = @zk.get("#{@discovery['path']}/#{id}") begin - host, port = deserialize_service_instance(node.first) - rescue - log.error "synapse: invalid data in ZK node #{name} at #{@discovery['path']}" + host, port, name = deserialize_service_instance(node.first) + rescue StandardError => e + log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}" else server_port = @server_port_override ? @server_port_override : port + # find the numberic id in the node name; used for leader elections if enabled + numeric_id = id.split('_').last + numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil + log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}" - new_backends << { 'name' => name, 'host' => host, 'port' => server_port} + new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id} end end rescue ZK::Exceptions::NoNode @@ -106,28 +112,16 @@ def watcher_callback end end - # tries to extract host/port from a json hash - def parse_json(data) - begin - json = JSON.parse data - rescue Object => o - return false - end - raise 'instance json data does not have host key' unless json.has_key?('host') - raise 'instance json data does not have port key' unless json.has_key?('port') - return json['host'], json['port'] - end - # decode the data at a zookeeper endpoint def deserialize_service_instance(data) log.debug "synapse: deserializing process data" + decoded = JSON.parse(data) - # if that does not work, try json - host, port = parse_json(data) - return host, port if host + host = decoded['host'] || (raise ValueError, 'instance json data does not have host key') + port = decoded['port'] || (raise ValueError, 'instance json data does not have port key') + name = decoded['name'] || nil - # if we got this far, then we have a problem - raise "could not decode this data:\n#{data}" + return host, port, name end end end