Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions lib/synapse/haproxy.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
require 'synapse/log'

require 'socket'
require 'digest'

module Synapse
class Haproxy
Expand Down Expand Up @@ -750,8 +748,12 @@ def restart

# used to build unique, consistent haproxy names for backends
def construct_name(backend)
address_digest = Digest::SHA256.hexdigest(backend['host'])[0..7]
return "#{backend['name']}:#{backend['port']}_#{address_digest}"
name = "#{backend['host']}:#{backend['port']}"
if backend['name'] && !backend['name'].empty?
name = "#{name}_#{backend['name']}"
end

return name
end
end
end
28 changes: 27 additions & 1 deletion lib/synapse/service_watcher/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
module Synapse
class BaseWatcher
include Logging
attr_reader :name, :backends, :haproxy

LEADER_WARN_INTERVAL = 30

attr_reader :name, :haproxy

def initialize(opts={}, synapse)
super()
Expand All @@ -18,6 +21,9 @@ def initialize(opts={}, synapse)
@name = opts['name']
@discovery = opts['discovery']

@leader_election = opts['leader_election'] || false
@leader_last_warn = Time.now - LEADER_WARN_INTERVAL

# the haproxy config
@haproxy = opts['haproxy']
@haproxy['server_options'] ||= ""
Expand Down Expand Up @@ -58,6 +64,26 @@ def ping?
true
end

def backends
if @leader_election
if @backends.all?{|b| b.key?('id') && b['id']}
smallest = @backends.sort_by{ |b| b['id']}.first
log.debug "synapse: leader election chose one of #{@backends.count} backends " \
"(#{smallest['host']}:#{smallest['port']} with id #{smallest['id']})"

return [smallest]
elsif (Time.now - @leader_last_warn) > LEADER_WARN_INTERVAL
log.warn "synapse: service #{@name}: leader election failed; not all backends include an id"
@leader_last_warn = Time.now
end

# if leader election fails, return no backends
return []
end

return @backends
end

private
def validate_discovery_opts
raise ArgumentError, "invalid discovery method '#{@discovery['method']}' for base watcher" \
Expand Down
1 change: 0 additions & 1 deletion lib/synapse/service_watcher/dns.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def configure_backends(servers)
new_backends = servers.flat_map do |(server, addresses)|
addresses.map do |address|
{
'name' => server['name'],
'host' => address,
'port' => server['port']
}
Expand Down
40 changes: 17 additions & 23 deletions lib/synapse/service_watcher/zookeeper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

module Synapse
class ZookeeperWatcher < BaseWatcher
NUMBERS_RE = /^\d+$/

def start
zk_hosts = @discovery['hosts'].shuffle.join(',')

Expand Down Expand Up @@ -53,18 +55,22 @@ def discover

new_backends = []
begin
@zk.children(@discovery['path'], :watch => true).map do |name|
node = @zk.get("#{@discovery['path']}/#{name}")
@zk.children(@discovery['path'], :watch => true).each do |id|
node = @zk.get("#{@discovery['path']}/#{id}")

begin
host, port = deserialize_service_instance(node.first)
rescue
log.error "synapse: invalid data in ZK node #{name} at #{@discovery['path']}"
host, port, name = deserialize_service_instance(node.first)
rescue StandardError => e
log.error "synapse: invalid data in ZK node #{id} at #{@discovery['path']}: #{e}"
else
server_port = @server_port_override ? @server_port_override : port

# find the numberic id in the node name; used for leader elections if enabled
numeric_id = id.split('_').last
numeric_id = NUMBERS_RE =~ numeric_id ? numeric_id.to_i : nil

log.debug "synapse: discovered backend #{name} at #{host}:#{server_port} for service #{@name}"
new_backends << { 'name' => name, 'host' => host, 'port' => server_port}
new_backends << { 'name' => name, 'host' => host, 'port' => server_port, 'id' => numeric_id}
end
end
rescue ZK::Exceptions::NoNode
Expand Down Expand Up @@ -106,28 +112,16 @@ def watcher_callback
end
end

# tries to extract host/port from a json hash
def parse_json(data)
begin
json = JSON.parse data
rescue Object => o
return false
end
raise 'instance json data does not have host key' unless json.has_key?('host')
raise 'instance json data does not have port key' unless json.has_key?('port')
return json['host'], json['port']
end

# decode the data at a zookeeper endpoint
def deserialize_service_instance(data)
log.debug "synapse: deserializing process data"
decoded = JSON.parse(data)

# if that does not work, try json
host, port = parse_json(data)
return host, port if host
host = decoded['host'] || (raise ValueError, 'instance json data does not have host key')
port = decoded['port'] || (raise ValueError, 'instance json data does not have port key')
name = decoded['name'] || nil

# if we got this far, then we have a problem
raise "could not decode this data:\n#{data}"
return host, port, name
end
end
end