diff --git a/bin/jetpants b/bin/jetpants index 5c4fadc..42f6a3a 100755 --- a/bin/jetpants +++ b/bin/jetpants @@ -268,13 +268,13 @@ module Jetpants end end - spares_needed.map do |role, needed| + spares_needed.each do |role, needed| next if needed == 0 available = Jetpants.topology.count_spares(role: "#{role}_slave".to_sym, like: compare) raise "Not enough spare machines with role of #{role} slave! Requested #{needed} but only have #{available} available." if needed > available end - spares_needed.map do |role, needed| + spares_needed.each do |role, needed| next if needed == 0 targets.concat Jetpants.topology.claim_spares(needed, role: "#{role}_slave".to_sym, like: compare) end @@ -407,10 +407,17 @@ module Jetpants method_option :max_id, :desc => 'Maximum ID of parent shard to split' method_option :ranges, :desc => 'Optional comma-separated list of ranges per child ie "1000-1999,2000-2499" (default if omitted: split evenly)' method_option :count, :desc => 'How many child shards to split the parent into (only necessary if the ranges option is omitted)' + method_option :shard_pool, :desc => 'The sharding pool for which to perform the split' def shard_split shard_min = options[:min_id] || ask('Please enter min ID of the parent shard: ') shard_max = options[:max_id] || ask('Please enter max ID of the parent shard: ') - s = Jetpants.topology.shard shard_min, shard_max + + shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the split (enter for default pool, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + + output "Using shard pool `#{shard_pool}`" + + s = Jetpants.topology.shard(shard_min, shard_max, shard_pool) raise "Shard not found" unless s raise "Shard isn't in ready state" unless s.state == :ready @@ -503,10 +510,14 @@ module Jetpants desc 'shard_cutover', 'truncate the current last shard range, and add a new shard after it' method_option :cutover_id, :desc => 'Minimum ID of new last shard being created' + method_option :shard_pool, :desc => 'The sharding pool for which to perform the cutover' def shard_cutover cutover_id = options[:cutover_id] || ask('Please enter min ID of the new shard to be created: ') cutover_id = cutover_id.to_i - last_shard = Jetpants.topology.shards.select {|s| s.max_id == 'INFINITY' && s.in_config?}.first + shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the split (enter for default pool, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + + last_shard = Jetpants.topology.shards(shard_pool).select {|s| s.max_id == 'INFINITY' && s.in_config?}.first last_shard_master = last_shard.master # Simple sanity-check that the cutover ID is greater than the current last shard's MIN id. @@ -528,7 +539,7 @@ module Jetpants # has the same master/slaves but now has a non-infinity max ID. last_shard.state = :recycle last_shard.sync_configuration - last_shard_replace = Shard.new(last_shard.min_id, cutover_id - 1, last_shard_master) + last_shard_replace = Shard.new(last_shard.min_id, cutover_id - 1, last_shard_master, :ready, shard_pool) last_shard_replace.sync_configuration Jetpants.topology.add_pool last_shard_replace @@ -557,12 +568,12 @@ module Jetpants end end - new_last_shard = Shard.new(cutover_id, 'INFINITY', new_last_shard_master) + new_last_shard = Shard.new(cutover_id, 'INFINITY', new_last_shard_master, :ready, shard_pool) new_last_shard.sync_configuration Jetpants.topology.add_pool new_last_shard # Create tables on the new shard's master, obtaining schema from previous last shard - tables = Table.from_config 'sharded_tables' + tables = Table.from_config('sharded_tables', shard_pool) last_shard_master.export_schemata tables last_shard_master.host.fast_copy_chain(Jetpants.export_location, new_last_shard_master.host, files: ["create_tables_#{last_shard_master.port}.sql"]) new_last_shard_master.import_schemata! @@ -581,9 +592,12 @@ module Jetpants method_option :min_id, :desc => 'Minimum ID of shard involved in master promotion' method_option :max_id, :desc => 'Maximum ID of shard involved in master promotion' method_option :new_master, :desc => 'New node to become master of the shard' + method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion' def shard_promote_master + shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? # find the shard we are going to do master promotion on - s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id]) + s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id], shard_pool) new_master = ask_node "Please enter the IP of the new master for #{s}: ", options[:new_master] raise "New master node #{new_master} is not currently a slave in shard #{s}" unless s.slaves && s.slaves.include?(new_master) @@ -610,9 +624,12 @@ module Jetpants desc 'shard_promote_master_reads', 'Lockless shard master promotion (step 2 of 4): move reads to new master' method_option :min_id, :desc => 'Minimum ID of shard involved in master promotion' method_option :max_id, :desc => 'Maximum ID of shard involved in master promotion' + method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion' def shard_promote_master_reads + shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default pool, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? # find the shard we are going to do master promotion on - s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id]) + s = ask_shard_being_promoted(:prep, options[:max_id], options[:max_id], shard_pool) # at this point we only have one slave, which is the new master new_master = s.master.slaves.last @@ -637,8 +654,11 @@ module Jetpants end desc 'shard_promote_master_writes', 'Lockless shard master promotion (step 3 of 4): move writes to new master' + method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion' def shard_promote_master_writes - s = ask_shard_being_promoted :writes + shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default pool, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + s = ask_shard_being_promoted(:writes, nil, nil, shard_pool) if s.state != :child raise "Shard #{s} is in wrong state to perform this action! Expected :child, found #{s.state}" end @@ -659,8 +679,11 @@ module Jetpants end desc 'shard_promote_master_cleanup', 'Lockless shard master promotion (step 4 of 4): clean up shard and eject old master' + method_option :shard_pool, :desc => 'The sharding pool for which to perform the promotion' def shard_promote_master_cleanup - s = ask_shard_being_promoted :cleanup + shard_pool = options[:shard_pool] || ask("Please enter the sharding pool for which to perform the master promotion (enter for default pool, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + s = ask_shard_being_promoted(:cleanup, nil, nil, shard_pool) if s.state != :needs_cleanup raise "Shard #{s} is in wrong state to perform this action! Expected :needs_cleanup, found #{s.state}" end @@ -794,13 +817,17 @@ module Jetpants output 'Which shard would you like to perform this action on?' shard_min = ask('Please enter min ID of the shard: ') shard_max = ask('Please enter max ID of the shard: ') - s = Jetpants.topology.shard shard_min, shard_max + shard_pool = ask("Please enter the sharding pool which to perform the action on (enter for default pool, #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + s = Jetpants.topology.shard(shard_min, shard_max, shard_pool) raise 'Shard not found' unless s s end def ask_shard_being_split - shards_being_split = Jetpants.shards.select {|s| s.children.count > 0} + shard_pool = ask("Enter shard pool to take action on (enter for default pool, #{Jetpants.topology.default_shard_pool}):") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + shards_being_split = Jetpants.shards(shard_pool).select {|s| s.children.count > 0} if shards_being_split.count == 0 raise 'No shards are currently being split. You can only use this task after running "jetpants shard_split".' elsif shards_being_split.count == 1 @@ -814,9 +841,9 @@ module Jetpants s end - def ask_shard_being_promoted(stage = :prep, min_id = nil, max_id = nil) + def ask_shard_being_promoted(stage = :prep, min_id = nil, max_id = nil, shard_pool) if stage == :writes || stage == :cleanup - shards_being_promoted = Jetpants.shards.select do |s| + shards_being_promoted = Jetpants.shards(shard_pool).select do |s| [:reads, :child, :needs_cleanup].include?(s.state) && !s.parent && s.master.master end @@ -836,7 +863,7 @@ module Jetpants max_id = ask("Enter max id of shard to perform master promotion: ") max_id = Integer(max_id) rescue max_id.upcase - s = Jetpants.topology.shard(min_id, max_id) + s = Jetpants.topology.shard(min_id, max_id, shard_pool) end raise "Invalid shard selected!" unless s.is_a? Shard diff --git a/lib/jetpants.rb b/lib/jetpants.rb index 2c835d4..d0353ad 100644 --- a/lib/jetpants.rb +++ b/lib/jetpants.rb @@ -9,7 +9,7 @@ module Jetpants; end $LOAD_PATH.unshift File.join(File.dirname(__FILE__), 'jetpants'), File.join(File.dirname(__FILE__), '..', 'plugins') -%w(output callback table host db pool topology shard monkeypatch commandsuite).each {|g| require g} +%w(output callback table host db pool topology shard shardpool monkeypatch commandsuite).each {|g| require g} # Since Jetpants is extremely multi-threaded, we need to force uncaught exceptions to # kill all threads in order to have any kind of sane error handling. @@ -37,7 +37,7 @@ module Jetpants 'verify_replication' => true, # raise exception if the 2 repl threads are in different states, or if actual repl topology differs from Jetpants' understanding of it 'plugins' => {}, # hash of plugin name => arbitrary plugin data (usually a nested hash of settings) 'ssh_keys' => nil, # array of SSH key file locations - 'sharded_tables' => [], # array of name => {sharding_key=>X, chunks=>Y} hashes + 'sharded_tables' => [], # hash of {shard_pool => {name => {sharding_key=>X, chunks=>Y}} hashes 'compress_with' => false, # command line to use for compression in large file transfers 'decompress_with' => false, # command line to use for decompression in large file transfers 'private_interface' => 'bond0', # network interface corresponding to private IP @@ -48,6 +48,14 @@ module Jetpants 'log_file' => '/var/log/jetpants.log', # where to log all output from the jetpants commands 'local_private_interface' => nil, # local network interface corresponding to private IP of the machine jetpants is running on 'free_mem_min_mb' => 0, # Minimum amount of free memory in MB to be maintained on the node while performing the task (eg. network copy) + 'default_shard_pool' => nil, # default pool for sharding operations + 'import_without_indices' => false, + 'ssl_ca_path' => '/var/lib/mysql/ca.pem', + 'ssl_client_cert_path' => '/var/lib/mysql/client-cert.pem', + 'ssl_client_key_path' => '/var/lib/mysql/client-key.pem', + 'encrypt_with' => false, # command line stream encryption binary + 'decrypt_with' => false, # command line stream decryption binary + 'encrypt_file_transfers' => false # flag to use stream encryption } config_paths = ["/etc/jetpants.yaml", "~/.jetpants.yml", "~/.jetpants.yaml"] @@ -156,5 +164,6 @@ def with_retries(retries = nil, max_retry_backoff = nil) # Finally, initialize topology object @topology = Topology.new + @topology.load_shard_pools unless @config['lazy_load_pools'] @topology.load_pools unless @config['lazy_load_pools'] end diff --git a/lib/jetpants/db/import_export.rb b/lib/jetpants/db/import_export.rb index 23d6194..0845e4c 100644 --- a/lib/jetpants/db/import_export.rb +++ b/lib/jetpants/db/import_export.rb @@ -274,7 +274,7 @@ def rebuild!(tables=false, min_id=false, max_id=false) p = pool if p.is_a?(Shard) - tables ||= Table.from_config 'sharded_tables' + tables ||= Table.from_config('sharded_tables', p.shard_pool.name) min_id ||= p.min_id max_id ||= p.max_id if p.max_id != 'INFINITY' end @@ -307,8 +307,40 @@ def rebuild!(tables=false, min_id=false, max_id=false) export_schemata tables export_data tables, min_id, max_id import_schemata! - alter_schemata if respond_to? :alter_schemata + if respond_to? :alter_schemata + alter_schemata + # re-retrieve table metadata in the case that we alter the tables + pool.probe_tables + tables = pool.tables.select{|t| pool.tables.map(&:name).include?(t.name)} + end + + index_list = {} + db_prefix = "USE #{app_schema};" + + if Jetpants.import_without_indices + tables.each do |t| + index_list[t] = t.indexes + + t.indexes.each do |index_name, index_info| + drop_idx_cmd = t.drop_index_query(index_name) + output "Dropping index #{index_name} from #{t.name} prior to import" + mysql_root_cmd("#{db_prefix}#{drop_idx_cmd}") + end + end + end + import_data tables, min_id, max_id + + if Jetpants.import_without_indices + index_list.each do |table, indexes| + next if indexes.keys.empty? + + create_idx_cmd = table.create_index_query(indexes) + index_names = indexes.keys.join(", ") + output "Recreating indexes #{index_names} for #{table.name} after import" + mysql_root_cmd("#{db_prefix}#{create_idx_cmd}") + end + end restart_mysql catch_up_to_master if is_slave? @@ -345,8 +377,9 @@ def clone_to!(*targets) targets.concurrent_each {|t| t.ssh_cmd "rm -rf #{t.mysql_directory}/ib_logfile*"} files = (databases + ['ibdata1', app_schema]).uniq + files += ['*.tokudb', 'tokudb.*', 'log*.tokulog*'] if ssh_cmd("test -f #{mysql_directory}/tokudb.environment 2>/dev/null; echo $?").chomp.to_i == 0 files << 'ib_lru_dump' if ssh_cmd("test -f #{mysql_directory}/ib_lru_dump 2>/dev/null; echo $?").chomp.to_i == 0 - + fast_copy_chain(mysql_directory, destinations, :port => 3306, :files => files, :overwrite => true) clone_settings_to!(*targets) diff --git a/lib/jetpants/db/replication.rb b/lib/jetpants/db/replication.rb index ca5f2cd..c583ac0 100644 --- a/lib/jetpants/db/replication.rb +++ b/lib/jetpants/db/replication.rb @@ -28,17 +28,39 @@ def change_master_to(new_master, option_hash={}) repl_user = option_hash[:user] || replication_credentials[:user] repl_pass = option_hash[:password] || replication_credentials[:pass] + use_ssl = new_master.use_ssl_replication? && use_ssl_replication? pause_replication if @master && !@repl_paused - result = mysql_root_cmd "CHANGE MASTER TO " + + cmd_str = "CHANGE MASTER TO " + "MASTER_HOST='#{new_master.ip}', " + "MASTER_PORT=#{new_master.port}, " + "MASTER_LOG_FILE='#{logfile}', " + "MASTER_LOG_POS=#{pos}, " + "MASTER_USER='#{repl_user}', " + "MASTER_PASSWORD='#{repl_pass}'" - - output "Changing master to #{new_master} with coordinates (#{logfile}, #{pos}). #{result}" + + if use_ssl + ssl_ca_path = option_hash[:ssl_ca_path] || Jetpants.ssl_ca_path + ssl_client_cert_path = option_hash[:ssl_client_cert_path] || Jetpants.ssl_client_cert_path + ssl_client_key_path = option_hash[:ssl_client_key_path] || Jetpants.ssl_client_key_path + + cmd_str += ", MASTER_SSL=1" + cmd_str += ", MASTER_SSL_CA='#{ssl_ca_path}'" if ssl_ca_path + + if ssl_client_cert_path && ssl_client_key_path + cmd_str += + ", MASTER_SSL_CERT='#{ssl_client_cert_path}', " + + "MASTER_SSL_KEY='#{ssl_client_key_path}'" + end + end + + result = mysql_root_cmd cmd_str + + msg = "Changing master to #{new_master}" + msg += " using SSL" if use_ssl + msg += " with coordinates (#{logfile}, #{pos}). #{result}" + output msg + @master.slaves.delete(self) if @master rescue nil @master = new_master @repl_paused = true diff --git a/lib/jetpants/db/schema.rb b/lib/jetpants/db/schema.rb index 7db532c..43fe2f0 100644 --- a/lib/jetpants/db/schema.rb +++ b/lib/jetpants/db/schema.rb @@ -24,6 +24,14 @@ def detect_table_schema(table_name) 'columns' => connection.schema(table_name).map{|schema| schema[0]} } + if pool.is_a? Shard + config_params = Jetpants.send('sharded_tables')[pool.shard_pool.name.downcase] + + unless(config_params[table_name].nil?) + params.merge!(config_params[table_name]) + end + end + Table.new(table_name, params) end diff --git a/lib/jetpants/db/server.rb b/lib/jetpants/db/server.rb index 375e0f8..593b9d0 100644 --- a/lib/jetpants/db/server.rb +++ b/lib/jetpants/db/server.rb @@ -11,7 +11,7 @@ def stop_mysql output "Attempting to shutdown MySQL" disconnect if @db output service(:stop, 'mysql') - running = ssh_cmd "netstat -ln | grep ':#{@port}' | wc -l" + running = ssh_cmd "netstat -ln | grep \":#{@port}\\s\" | wc -l" raise "[#{@ip}] Failed to shut down MySQL: Something is still listening on port #{@port}" unless running.chomp == '0' @options = [] @running = false diff --git a/lib/jetpants/db/state.rb b/lib/jetpants/db/state.rb index 687f3b2..aa4869a 100644 --- a/lib/jetpants/db/state.rb +++ b/lib/jetpants/db/state.rb @@ -363,6 +363,11 @@ def avg_buffer_pool_hit_rate ((buffer_pool_hit_rate.split[4].to_f * 100) / buffer_pool_hit_rate.split[6].to_f).round(2) end + # Determine whether a server should use ssl as a replication source + def use_ssl_replication? + global_variables[:have_ssl] && global_variables[:have_ssl].downcase == "yes" + end + ###### Private methods ##################################################### private diff --git a/lib/jetpants/host.rb b/lib/jetpants/host.rb index 1bd7784..3c2f02e 100644 --- a/lib/jetpants/host.rb +++ b/lib/jetpants/host.rb @@ -214,6 +214,19 @@ def fast_copy_chain(base_dir, targets, options={}) else output "Compression disabled -- no compression method specified in Jetpants config file" end + + should_encrypt = false + targets.each do |t| + should_encrypt = should_encrypt || should_encrypt_with?(t) + end + + if Jetpants.encrypt_with && Jetpants.decrypt_with && should_encrypt + enc_bin = Jetpants.encrypt_with.split(' ')[0] + confirm_installed enc_bin + output "Using #{enc_bin} for encryption" + else + output "Not encrypting data stream, either no encryption method specified or encryption unneeded with target" + end # On each destination host, do any initial setup (and optional validation/erasing), # and then listen for new files. If there are multiple destination hosts, all of them @@ -228,6 +241,12 @@ def fast_copy_chain(base_dir, targets, options={}) decomp_bin = Jetpants.decompress_with.split(' ')[0] t.confirm_installed decomp_bin end + + if Jetpants.encrypt_with && Jetpants.decrypt_with && should_encrypt + decrypt_bin = Jetpants.decrypt_with.split(' ')[0] + t.confirm_installed decrypt_bin + end + t.ssh_cmd "mkdir -p #{dir}" # Check if contents already exist / non-empty. @@ -239,8 +258,9 @@ def fast_copy_chain(base_dir, targets, options={}) end decompression_pipe = Jetpants.decompress_with ? "| #{Jetpants.decompress_with}" : '' + decryption_pipe = (Jetpants.decrypt_with && should_encrypt) ? "| #{Jetpants.decrypt_with}" : '' if i == 0 - workers << Thread.new { t.ssh_cmd "cd #{dir} && nc -l #{port} #{decompression_pipe} | tar xv" } + workers << Thread.new { t.ssh_cmd "cd #{dir} && nc -l #{port} #{decryption_pipe} #{decompression_pipe} | tar xv" } t.confirm_listening_on_port port t.output "Listening with netcat." else @@ -249,7 +269,7 @@ def fast_copy_chain(base_dir, targets, options={}) workers << Thread.new { t.ssh_cmd "cd #{dir} && mkfifo #{fifo} && nc #{tt.ip} #{port} <#{fifo} && rm #{fifo}" } checker_th = Thread.new { t.ssh_cmd "while [ ! -p #{dir}/#{fifo} ] ; do sleep 1; done" } raise "FIFO not found on #{t} after 10 tries" unless checker_th.join(10) - workers << Thread.new { t.ssh_cmd "cd #{dir} && nc -l #{port} | tee #{fifo} #{decompression_pipe} | tar xv" } + workers << Thread.new { t.ssh_cmd "cd #{dir} && nc -l #{port} | tee #{fifo} #{decryption_pipe} #{decompression_pipe} | tar xv" } t.confirm_listening_on_port port t.output "Listening with netcat, and chaining to #{tt}." end @@ -259,7 +279,8 @@ def fast_copy_chain(base_dir, targets, options={}) # Start the copy chain. output "Sending files over to #{targets[0]}: #{file_list}" compression_pipe = Jetpants.compress_with ? "| #{Jetpants.compress_with}" : '' - ssh_cmd "cd #{base_dir} && tar vc #{file_list} #{compression_pipe} | nc #{targets[0].ip} #{port}" + encryption_pipe = (Jetpants.encrypt_with && should_encrypt) ? "| #{Jetpants.encrypt_with}" : '' + ssh_cmd "cd #{base_dir} && tar vc #{file_list} #{compression_pipe} #{encryption_pipe} | nc #{targets[0].ip} #{port}" workers.each {|th| th.join} output "File copy complete." @@ -270,6 +291,12 @@ def fast_copy_chain(base_dir, targets, options={}) compare_dir base_dir, destinations, options output "Verification successful." end + + # Add a hook point to determine whether a host should encrypt a data stream between two hosts + # This is useful to avoid encryption latency in a secure environment + def should_encrypt_with?(host) + Jetpants.encrypt_file_transfers + end # Given the name of a directory or single file, returns a hash of filename => size of each file present. # Subdirectories will be returned with a size of '/', so you can process these differently as needed. diff --git a/lib/jetpants/pool.rb b/lib/jetpants/pool.rb index d210534..23a1c96 100644 --- a/lib/jetpants/pool.rb +++ b/lib/jetpants/pool.rb @@ -354,7 +354,7 @@ def sync_configuration end # Callback to ensure that a sync'ed pool is already in Topology.pools - def before_sync_configuration + def after_sync_configuration unless Jetpants.topology.pools.include? self Jetpants.topology.add_pool self end @@ -377,6 +377,10 @@ def method_missing(name, *args, &block) def respond_to?(name, include_private=false) super || @master.respond_to?(name) end + + def slave_for_clone + backup_slaves.empty? ? standby_slaves.last : backup_slaves.last + end end end diff --git a/lib/jetpants/shard.rb b/lib/jetpants/shard.rb index 4a889b6..5f490c9 100644 --- a/lib/jetpants/shard.rb +++ b/lib/jetpants/shard.rb @@ -37,26 +37,32 @@ class Shard < Pool # :deprecated -- Parent shard that has been split but children are still in :child or :needs_cleanup state. Shard may still be in production for writes / replication not torn down yet. # :recycle -- Parent shard that has been split and children are now in the :ready state. Shard no longer in production, replication to children has been torn down. attr_accessor :state + + # the sharding pool to which this shard belongs + attr_reader :shard_pool # Constructor for Shard -- # * min_id: int # * max_id: int or the string "INFINITY" # * master: string (IP address) or a Jetpants::DB object # * state: one of the above state symbols - def initialize(min_id, max_id, master, state=:ready) + def initialize(min_id, max_id, master, state=:ready, shard_pool_name=nil) @min_id = min_id.to_i @max_id = (max_id.to_s.upcase == 'INFINITY' ? 'INFINITY' : max_id.to_i) @state = state @children = [] # array of shards being initialized by splitting this one @parent = nil + shard_pool_name = Jetpants.topology.default_shard_pool if shard_pool_name.nil? + @shard_pool = Jetpants.topology.shard_pool(shard_pool_name) super(generate_name, master) end # Generates a string containing the shard's min and max IDs. Plugin may want to override. def generate_name - "shard-#{min_id}-#{max_id.to_s.downcase}" + prefix = (@shard_pool.nil?) ? 'anon' : @shard_pool.name.downcase + "#{prefix}-#{min_id}-#{max_id.to_s.downcase}" end # Returns true if the shard state is one of the values that indicates it's @@ -107,20 +113,20 @@ def db(mode=:read) # Override the probe_tables method to accommodate shard topology - # delegate everything to the first shard. def probe_tables - if Jetpants.topology.shards.first == self + if Jetpants.topology.shards(self.shard_pool.name).first == self super else - Jetpants.topology.shards.first.probe_tables + Jetpants.topology.shards(self.shard_pool.name).first.probe_tables end end # Override the tables accessor to accommodate shard topology - delegate # everything to the first shard def tables - if Jetpants.topology.shards.first == self + if Jetpants.topology.shards(self.shard_pool.name).first == self super else - Jetpants.topology.shards.first.tables + Jetpants.topology.shards(self.shard_pool.name).first.tables end end @@ -221,7 +227,7 @@ def prune_data! raise "Shard #{self} is not in a state compatible with calling prune_data! (current state=#{@state})" end - tables = Table.from_config 'sharded_tables' + tables = Table.from_config('sharded_tables', shard_pool.name) if @state == :initializing @state = :exporting @@ -315,7 +321,7 @@ def cleanup! # situation A - clean up after a shard split if @state == :deprecated && @children.size > 0 - tables = Table.from_config 'sharded_tables' + tables = Table.from_config('sharded_tables', pool.shard_pool.name) @master.revoke_all_access! @children.concurrent_each do |child_shard| raise "Child state does not indicate cleanup is needed" unless child_shard.state == :needs_cleanup @@ -413,7 +419,7 @@ def init_child_shard_masters(id_ranges) spare = Jetpants.topology.claim_spare(role: :master, like: master) spare.disable_read_only! if (spare.running? && spare.read_only?) spare.output "Will be master for new shard with ID range of #{my_range.first} to #{my_range.last} (inclusive)" - child_shard = Shard.new(my_range.first, my_range.last, spare, :initializing) + child_shard = Shard.new(my_range.first, my_range.last, spare, :initializing, shard_pool.name) child_shard.sync_configuration add_child(child_shard) Jetpants.topology.add_pool child_shard diff --git a/lib/jetpants/shardpool.rb b/lib/jetpants/shardpool.rb new file mode 100644 index 0000000..1812058 --- /dev/null +++ b/lib/jetpants/shardpool.rb @@ -0,0 +1,24 @@ +module Jetpants + # A ShardPool is a sharding keyspace in Jetpants that contains + # many Shards. All shards within the pool partition a logically coherent + # keyspace + + class ShardPool + include CallbackHandler + include Output + + attr_reader :name + + def initialize(name) + @name = name + end + + def shards + Jetpants.topology.shards(@name) + end + + def to_s + @name.downcase + end + end +end diff --git a/lib/jetpants/table.rb b/lib/jetpants/table.rb index 15f9b18..33ac36e 100644 --- a/lib/jetpants/table.rb +++ b/lib/jetpants/table.rb @@ -106,6 +106,42 @@ def max_pk_val_query end return sql end + + # generates a query to drop a specified index named by + # the symbol passed in to the method + def drop_index_query(index_name) + raise "Unable to find index #{index_name}!" unless indexes.has_key? index_name + + "ALTER TABLE #{name} DROP INDEX #{index_name}" + end + + # generates a query to create a specified index, given + # a hash of columns, a name, and a unique flag as show below: + # {:index_name=> + # {:columns=>[:column_one, :column_two], :unique=>false}}, + # + def create_index_query(index_specs) + index_defs = [] + + index_specs.each do |index_name, index_opts| + raise "Cannot determine index name!" if index_name.nil? + + raise "Cannot determine index metadata for new index #{index_name}!" unless index_opts[:columns].kind_of?(Array) + + index_opts[:columns].each do |col| + raise "Table #{name} does not have column #{col}" unless columns.include?(col) + end + + unique = "" + if index_opts[:unique] + unique = "UNIQUE" + end + + index_defs << "ADD #{unique} INDEX #{index_name} (#{index_opts[:columns].join(',')})" + end + + "ALTER TABLE #{name} #{index_defs.join(", ")}" + end # Returns the first column of the primary key, or nil if there isn't one def first_pk_col @@ -125,9 +161,10 @@ def belongs_to?(pool) # of the given label. # TODO: integrate better with table schema detection code. Consider auto-detecting chunk # count based on file size and row count estimate. - def Table.from_config(label) - result = [] - Jetpants.send(label).map {|name, attributes| Table.new name, attributes} + def Table.from_config(label, shard_pool_name = nil) + shard_pool_name = Jetpants.topology.default_shard_pool if shard_pool_name.nil? + raise "Unable to find sharded tables for shard pool `#{shard_pool_name.downcase}`" if Jetpants.send(label)[shard_pool_name.downcase].nil? + Jetpants.send(label)[shard_pool_name.downcase].map {|name, attributes| Table.new name, attributes} end def to_s diff --git a/lib/jetpants/topology.rb b/lib/jetpants/topology.rb index c59f999..c30ad2c 100644 --- a/lib/jetpants/topology.rb +++ b/lib/jetpants/topology.rb @@ -12,16 +12,33 @@ def initialize # initialize @pools to an empty state @pools = nil + # initialize shard pools to empty + @shard_pools = nil + # We intentionally don't call load_pools here. The caller must do that. # This allows Jetpants module to create Jetpants.topology object, and THEN # invoke load_pools, which might then refer back to Jetpants.topology. end + def to_s + "Jetpants.topology" + end + def pools load_pools if @pools.nil? @pools end + def shard_pools + load_shard_pools if @shard_pools.nil? + @shard_pools + end + + def default_shard_pool + raise "Default shard pool not defined!" if Jetpants.default_shard_pool.nil? + Jetpants.default_shard_pool + end + ###### Class methods ####################################################### # Metaprogramming hackery to create a "synchronized" method decorator @@ -69,12 +86,24 @@ def load_pools output "Notice: no plugin has overridden Topology#load_pools, so *no* pools are imported automatically" end + synchronized + # Plugin should override this to initialize @shard_pools + def load_shard_pools + output "Notice: no plugin has overridden Topology#load_shard_pools, so *no* shard pools are imported automaticaly" + end + synchronized # Plugin should override so that this adds the given pool to the current topology (@pools) def add_pool(pool) output "Notice: no plugin has overridden Topology#add_pool, so the pool was *not* added to the topology" end + synchronized + # Plugin should override so that this adds the given shard pool to the current topology (@shard_pools) + def add_shard_pool(shard_pool) + output "Notice: no plugin has overridden Topology#add_shard_pool, so the shard pool was *not* added to the topology" + end + synchronized # Plugin should override so that it writes a configuration file or commits a # configuration change to a config service. @@ -127,8 +156,12 @@ def slave_roles ###### Instance Methods #################################################### # Returns array of this topology's Jetpants::Pool objects of type Jetpants::Shard - def shards - pools.select {|p| p.is_a? Shard} + def shards(shard_pool_name = nil) + if shard_pool_name.nil? + shard_pool_name = default_shard_pool + output "Using default shard pool #{default_shard_pool}" + end + pools.select {|p| p.is_a? Shard}.select { |p| p.shard_pool && p.shard_pool.name.downcase == shard_pool_name.downcase } end # Returns array of this topology's Jetpants::Pool objects that are NOT of type Jetpants::Shard @@ -146,22 +179,23 @@ def pool(target) end end - # Finds and returns a single Jetpants::Shard. Pass in one of these: - # * a min ID and a max ID - # * just a min ID - # * a Range object - def shard(*args) - if args.count == 2 || args[0].is_a?(Array) - args.flatten! - args.map! {|x| x.to_s.upcase == 'INFINITY' ? 'INFINITY' : x.to_i} - shards.select {|s| s.min_id == args[0] && s.max_id == args[1]}.first - elsif args[0].is_a?(Range) - shards.select {|s| s.min_id == args[0].min && s.max_id == args[0].max}.first + # Finds and returns a single Jetpants::Shard + def shard(min_id, max_id, shard_pool_name = nil) + shard_pool_name = default_shard_pool if shard_pool_name.nil? + if max_id.is_a?(String) && max_id.upcase == 'INFINITY' + max_id.upcase! else - result = shards.select {|s| s.min_id == args[0].to_i} - raise "Multiple shards found with that min_id!" if result.count > 1 - result.first + max_id = max_id.to_i end + + min_id = min_id.to_i + + shards(shard_pool_name).select {|s| s.min_id == min_id && s.max_id == max_id}.first + end + + # Finds a ShardPool object by name + def shard_pool(name) + shard_pools.select{|sp| sp.name.downcase == name.downcase}.first end # Returns the Jetpants::Shard that handles the given ID. @@ -170,8 +204,9 @@ def shard(*args) # child is fully built / in production, this method will always return # the child shard. However, Shard#db(:write) will correctly delegate writes # to the parent shard when appropriate in this case. (see also: Topology#shard_db_for_id) - def shard_for_id(id) - choices = shards.select {|s| s.min_id <= id && (s.max_id == 'INFINITY' || s.max_id >= id)} + def shard_for_id(id, shard_pool = nil) + shard_pool = default_shard_pool if shard_pool.nil? + choices = shards(shard_pool).select {|s| s.min_id <= id && (s.max_id == 'INFINITY' || s.max_id >= id)} choices.reject! {|s| s.parent && ! s.in_config?} # filter out child shards that are still being built # Preferentially return child shards at this point @@ -184,8 +219,8 @@ def shard_for_id(id) # Returns the Jetpants::DB that handles the given ID with the specified # mode (either :read or :write) - def shard_db_for_id(id, mode=:read) - shard_for_id(id).db(mode) + def shard_db_for_id(id, mode=:read, shard_pool = nil) + shard_for_id(id, shard_pool).db(mode) end # Nicer inteface into claim_spares when only one DB is desired -- returns @@ -210,7 +245,8 @@ def normalize_roles(*roles) synchronized # Clears the pool list and nukes cached DB and Host object lookup tables def clear - @pools = [] + @pools = nil + @shard_pools = nil DB.clear Host.clear end @@ -218,6 +254,7 @@ def clear # Empties and then reloads the pool list def refresh clear + load_shard_pools load_pools true end diff --git a/plugins/jetpants_collins/asset.rb b/plugins/jetpants_collins/asset.rb index af586d4..d586a36 100644 --- a/plugins/jetpants_collins/asset.rb +++ b/plugins/jetpants_collins/asset.rb @@ -15,8 +15,16 @@ def to_host raise "Can only call to_host on SERVER_NODE assets, but #{self} has type #{type}" unless type.upcase == 'SERVER_NODE' backend_ip_address.to_host end - - + + # Convert a Collins:Asset to a Jetpants::ShardPool + def to_shard_pool + raise "Can only call to_shard_pool on CONFIGURATION assets, but #{self} has type #{type}" unless type.upcase == 'CONFIGURATION' + raise "Unknown primary role #{primary_role} for configuration asset #{self}" unless primary_role.upcase == 'MYSQL_SHARD_POOL' + raise "No shard_pool attribute set on asset #{self}" unless shard_pool && shard_pool.length > 0 + + Jetpants::ShardPool.new(shard_pool) + end + # Convert a Collins::Asset to either a Jetpants::Pool or a Jetpants::Shard, depending # on the value of PRIMARY_ROLE. Requires asset TYPE to be CONFIGURATION. def to_pool @@ -65,7 +73,8 @@ def to_pool result = Jetpants::Shard.new(shard_min_id.to_i, shard_max_id == 'INFINITY' ? 'INFINITY' : shard_max_id.to_i, master_assets.first.to_db, - shard_state.downcase.to_sym) + shard_state.downcase.to_sym, + shard_pool) # We'll need to set up the parent/child relationship if a shard split is in progress, # BUT we need to wait to do that later since the shards may have been returned by diff --git a/plugins/jetpants_collins/commandsuite.rb b/plugins/jetpants_collins/commandsuite.rb index 26fb34d..8eaf68e 100644 --- a/plugins/jetpants_collins/commandsuite.rb +++ b/plugins/jetpants_collins/commandsuite.rb @@ -29,11 +29,11 @@ def self.jetpants_collins_before_dispatch(task) method_option :name, :name => 'unique name of new pool to be created' method_option :master, :master => 'ip of pre-configured master for new pool' def create_pool - name = options[:name] || ask('Please enter the name of the new pool.') - if configuration_assets('MYSQL_POOL').map(&:pool).include? name.upcase + name = options[:name] || ask('Please enter the name of the new pool: ') + if Jetpants.topology.configuration_assets('MYSQL_POOL').map(&:pool).include? name.upcase error "Pool #{name} already exists" end - master = options[:master] || ask("Please enter the ip of the master, or 'none' if one does not yet exist.") + master = options[:master] || ask("Please enter the ip of the master, or 'none' if one does not yet exist: ") if (master.downcase != 'none') && ! (is_ip? master) error "Master must either be 'none' or a valid ip." end diff --git a/plugins/jetpants_collins/jetpants_collins.rb b/plugins/jetpants_collins/jetpants_collins.rb index b8a3f5d..49a1402 100644 --- a/plugins/jetpants_collins/jetpants_collins.rb +++ b/plugins/jetpants_collins/jetpants_collins.rb @@ -303,4 +303,4 @@ def collins_status_state # load all the monkeypatches for other Jetpants classes -%w(monkeypatch asset host db pool shard topology commandsuite).each {|mod| require "jetpants_collins/#{mod}"} +%w(monkeypatch asset host db pool shard topology shardpool commandsuite).each {|mod| require "jetpants_collins/#{mod}"} diff --git a/plugins/jetpants_collins/shard.rb b/plugins/jetpants_collins/shard.rb index da7418e..ea3a8e2 100644 --- a/plugins/jetpants_collins/shard.rb +++ b/plugins/jetpants_collins/shard.rb @@ -7,7 +7,7 @@ class Shard < Pool include Plugin::JetCollins - collins_attr_accessor :shard_min_id, :shard_max_id, :shard_state, :shard_parent + collins_attr_accessor :shard_min_id, :shard_max_id, :shard_state, :shard_parent, :shard_pool # Returns a Collins::Asset for this pool def collins_asset(create_if_missing=false) @@ -15,9 +15,10 @@ def collins_asset(create_if_missing=false) operation: 'and', details: true, type: 'CONFIGURATION', - primary_role: 'MYSQL_SHARD', + primary_role: '^MYSQL_SHARD$', shard_min_id: "^#{@min_id}$", shard_max_id: "^#{@max_id}$", + shard_pool: "^#{@shard_pool.name}$" } selector[:remoteLookup] = true if Jetpants.plugins['jetpants_collins']['remote_lookup'] @@ -45,7 +46,8 @@ def collins_asset(create_if_missing=false) primary_role: 'MYSQL_SHARD', pool: @name.upcase, shard_min_id: @min_id, - shard_max_id: @max_id + shard_max_id: @max_id, + shard_pool: @shard_pool.name.upcase Plugin::JetCollins.get new_tag elsif results.count == 0 && !create_if_missing raise "Could not find configuration asset for pool #{name}" diff --git a/plugins/jetpants_collins/shardpool.rb b/plugins/jetpants_collins/shardpool.rb new file mode 100644 index 0000000..fa9fed1 --- /dev/null +++ b/plugins/jetpants_collins/shardpool.rb @@ -0,0 +1,56 @@ +module Jetpants + # A ShardPool is a sharding keyspace in Jetpants that contains + # many Shards. All shards within the pool partition a logically coherent + # keyspace + + class ShardPool + + ##### JETCOLLINS MIX-IN #################################################### + + include Plugin::JetCollins + + collins_attr_accessor :shard_pool + + def collins_asset(create_if_missing=false) + selector = { + operation: 'and', + details: true, + type: 'CONFIGURATION', + primary_role: '^MYSQL_SHARD_POOL$', + shard_pool: "^#{@name.upcase}$", + status: 'Allocated', + } + selector[:remoteLookup] = true if Jetpants.plugins['jetpants_collins']['remote_lookup'] + + results = Plugin::JetCollins.find selector, !create_if_missing + + # If we got back multiple results, try ignoring the remote datacenter ones + if results.count > 1 + filtered_results = results.select {|a| a.location.nil? || a.location.upcase == Plugin::JetCollins.datacenter} + results = filtered_results if filtered_results.count > 0 + end + + if results.count > 1 + raise "Multiple configuration assets found for pool #{@name}" + elsif results.count == 0 && create_if_missing + output "Could not find configuration asset for pool; creating now" + new_tag = 'mysql-shard-pool-' + @name + asset = Collins::Asset.new type: 'CONFIGURATION', tag: new_tag, status: 'Allocated' + begin + Plugin::JetCollins.create!(asset) + rescue + collins_set asset: asset, + status: 'Allocated' + end + collins_set asset: asset, + primary_role: 'MYSQL_SHARD_POOL', + shard_pool: @name.upcase + Plugin::JetCollins.get new_tag + elsif results.count == 0 && !create_if_missing + raise "Could not find configuration asset for pool #{name}" + else + results.first + end + end + end +end diff --git a/plugins/jetpants_collins/topology.rb b/plugins/jetpants_collins/topology.rb index e654a2e..bc25758 100644 --- a/plugins/jetpants_collins/topology.rb +++ b/plugins/jetpants_collins/topology.rb @@ -38,8 +38,18 @@ def process_spare_selector_options(selector, options) ##### METHOD OVERRIDES ##################################################### + def load_shard_pools + @shard_pools = configuration_assets('MYSQL_SHARD_POOL').map(&:to_shard_pool) + @shard_pools.compact! + @shard_pools.sort_by! { |p| p.name } + + true + end + # Initializes list of pools + shards from Collins def load_pools + load_shard_pools if @shard_pools.nil? + # We keep a cache of Collins::Asset objects, organized as pool_name => role => [asset, asset, ...] @pool_role_assets = {} @@ -71,6 +81,15 @@ def add_pool(pool) true end + def add_shard_pool(shard_pool) + raise 'Attempt to add a non shard pool to the sharding pools topology' unless shard_pool.is_a?(ShardPool) + + unless shard_pools.include? shard_pool + @shard_pools << shard_pool + @shard_pools.sort_by! { |sp| sp.name } + end + end + # Returns (count) DB objects. Pulls from machines in the spare state # and converts them to the Allocated status. # You can pass in :role to request spares with a particular secondary_role @@ -117,9 +136,9 @@ def spares(options={}) ##### NEW METHODS ########################################################## - def db_location_report(shards_only = false) - if shards_only - pools_to_consider = shards + def db_location_report(shards_only = nil) + unless shards_only.nil? + pools_to_consider = shards(shards_only) else pools_to_consider = pools end @@ -209,19 +228,18 @@ def configuration_assets(*primary_roles) operation: 'and', details: true, size: per_page, - query: 'status != ^DECOMMISSIONED$', + query: 'status != ^DECOMMISSIONED$ AND type = ^CONFIGURATION$', } if primary_roles.count == 1 - selector[:type] = '^CONFIGURATION$' selector[:primary_role] = primary_roles.first else values = primary_roles.map {|r| "primary_role = ^#{r}$"} - selector[:query] += ' AND type = ^CONFIGURATION$ AND (' + values.join(' OR ') + ')' + selector[:query] += ' AND (' + values.join(' OR ') + ')' end selector[:remoteLookup] = true if Jetpants.plugins['jetpants_collins']['remote_lookup'] - + done = false page = 0 assets = [] @@ -307,11 +325,8 @@ def query_spare_assets(count, options={}) keep_assets = [] - # Probe concurrently for speed reasons nodes.map(&:to_db).concurrent_each {|db| db.probe rescue nil} - - # Now iterate in a single-threaded way for simplicity - nodes.each do |node| + nodes.concurrent_each do |node| db = node.to_db if(db.usable_spare? && ( @@ -355,17 +370,19 @@ def sort_assets_for_pool(pool, assets) def sort_pools_callback(pool) asset = pool.collins_asset role = asset.primary_role.upcase + shard_pool_name = '' case role when 'MYSQL_POOL' position = (asset.config_sort_order || 0).to_i when 'MYSQL_SHARD' position = asset.shard_min_id.to_i + shard_pool_name = pool.shard_pool.name else position = 0 end - [role, position] + [role, shard_pool_name, position] end end diff --git a/plugins/merge_helper/lib/aggregator.rb b/plugins/merge_helper/lib/aggregator.rb index 19011cd..81f618f 100644 --- a/plugins/merge_helper/lib/aggregator.rb +++ b/plugins/merge_helper/lib/aggregator.rb @@ -347,7 +347,7 @@ def cleanup! # WARNING! This will pause replication on the nodes this machine aggregates from # And perform expensive row count operations on them def validate_aggregate_row_counts(restart_monitoring = true, tables = false) - tables = Table.from_config 'sharded_tables' unless tables + tables = Table.from_config('sharded_tables', aggregating_nodes.first.pool.shard_pool.name) unless tables query_nodes = [ slaves, aggregating_nodes ].flatten aggregating_nodes.concurrent_each do |node| node.disable_monitoring diff --git a/plugins/merge_helper/lib/commandsuite.rb b/plugins/merge_helper/lib/commandsuite.rb index e5b6f3b..dff22bf 100644 --- a/plugins/merge_helper/lib/commandsuite.rb +++ b/plugins/merge_helper/lib/commandsuite.rb @@ -78,16 +78,18 @@ def merge_shards raise "Invalid aggregate node!" unless aggregate_node.aggregator? # claim the slaves further along in the process - aggregate_shard_master = ask_node("Enter the IP address of the new master or press enter to select a spare:") + aggregate_shard_master_ip = ask("Enter the IP address of the new master or press enter to select a spare:") - if aggregate_shard_master + unless aggregate_shard_master_ip.empty? + error "Node (#{aggregate_shard_master_ip.blue}) does not appear to be an IP address." unless is_ip? aggregate_shard_master_ip + aggregate_shard_master = aggregate_shard_master_ip.to_db aggregate_shard_master.claim! if aggregate_shard_master.is_spare? else aggregate_shard_master = Jetpants.topology.claim_spare(role: :master, like: shards_to_merge.first.master) end # claim node for the new shard master - spare_count = shards_to_merge.first.slaves_layout[:standby_slave] + 1; + spare_count = shards_to_merge.first.slaves_layout[:standby_slave]; raise "Not enough spares available!" unless Jetpants.count_spares(like: aggregate_shard_master) >= spare_count raise "Not enough backup_slave role spare machines!" unless Jetpants.topology.count_spares(role: :backup_slave) >= shards_to_merge.first.slaves_layout[:backup_slave] @@ -118,7 +120,7 @@ def merge_shards aggregate_shard_master.catch_up_to_master - aggregate_shard = Shard.new(shards_to_merge.first.min_id, shards_to_merge.last.max_id, aggregate_shard_master, :initializing) + aggregate_shard = Shard.new(shards_to_merge.first.min_id, shards_to_merge.last.max_id, aggregate_shard_master, :initializing, shards_to_merge.first.shard_pool.name) # ensure a record is present in collins aggregate_shard.sync_configuration Jetpants.topology.add_pool aggregate_shard @@ -283,7 +285,10 @@ def validate_merge_replication no_tasks do def ask_merge_shards - shards_to_merge = Jetpants.shards.select{ |shard| !shard.combined_shard.nil? } + shard_pool_name = ask("Enter shard pool name performing a merge operation (enter for default #{Jetpants.topology.default_shard_pool}):") + shard_pool_name = Jetpants.topology.default_shard_pool if shard_pool_name.empty? + shards_to_merge = Jetpants.shards(shard_pool_name).select{ |shard| !shard.combined_shard.nil? } + raise("No shards detected as merging!") if shards_to_merge.empty? shards_str = shards_to_merge.join(', ') answer = ask "Detected shards to merge as #{shards_str}, proceed (enter YES in all caps if so)?" raise "Aborting on user input" unless answer == "YES" @@ -292,11 +297,14 @@ def ask_merge_shards end def ask_merge_shard_ranges + shard_pool = ask("Please enter the sharding pool which to perform the action on (enter for default pool #{Jetpants.topology.default_shard_pool}): ") + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + min_id = ask("Please provide the min ID of the shard range to merge:") max_id = ask("Please provide the max ID of the shard range to merge:") # for now we assume we'll never merge the shard at the head of the list - shards_to_merge = Jetpants.shards.select do |shard| + shards_to_merge = Jetpants.shards(shard_pool).select do |shard| shard.min_id.to_i >= min_id.to_i && shard.max_id.to_i <= max_id.to_i && shard.max_id != 'INFINITY' diff --git a/plugins/merge_helper/lib/shard.rb b/plugins/merge_helper/lib/shard.rb index 4769212..99ccc79 100644 --- a/plugins/merge_helper/lib/shard.rb +++ b/plugins/merge_helper/lib/shard.rb @@ -4,7 +4,7 @@ module Jetpants class Shard # Runs queries against a slave in the pool to verify sharding key values def validate_shard_data - tables = Table.from_config 'sharded_tables' + tables = Table.from_config('sharded_tables', shard_pool.name) table_statuses = {} tables.limited_concurrent_map(8) { |table| table.sharding_keys.each do |col| @@ -125,7 +125,7 @@ def self.find_duplicate_keys(shards, table, key, min_key_val = nil, max_key_val # Generate a list of filenames for exported data def table_export_filenames(full_path = true, tables = false) export_filenames = [] - tables = Table.from_config 'sharded_tables' unless tables + tables = Table.from_config('sharded_tables', shard_pool.name) unless tables export_filenames = tables.map { |table| table.export_filenames(@min_id, @max_id) }.flatten export_filenames.map!{ |filename| File.basename filename } unless full_path @@ -159,7 +159,7 @@ def self.set_up_aggregate_node(shards_to_merge, aggregate_node, new_shard_master slaves_to_replicate = shards_to_merge.map { |shard| shard.standby_slaves.last } # sharded table list to ship - tables = Plugin::MergeHelper.tables_to_merge + tables = Plugin::MergeHelper.tables_to_merge(shards_to_merge.first.shard_pool.name) # data export counts for validation later export_counts = {} @@ -235,7 +235,7 @@ def self.set_up_aggregate_node(shards_to_merge, aggregate_node, new_shard_master end def combined_shard - Jetpants.shards.select { |shard| ( + Jetpants.shards(shard_pool.name).select { |shard| ( shard.min_id.to_i <= @min_id.to_i \ && shard.max_id.to_i >= @max_id.to_i \ && shard.max_id != 'INFINITY' \ diff --git a/plugins/merge_helper/merge_helper.rb b/plugins/merge_helper/merge_helper.rb index 47cde4f..34ab732 100644 --- a/plugins/merge_helper/merge_helper.rb +++ b/plugins/merge_helper/merge_helper.rb @@ -3,8 +3,8 @@ module Plugin module MergeHelper class << self # Provide a config hook to specify a list of tables to merge, overriding the sharded_tables list - def tables_to_merge - tables = Table.from_config 'sharded_tables' + def tables_to_merge(shard_pool) + tables = Table.from_config('sharded_tables', shard_pool) table_list = [] if (!Jetpants.plugins['merge_helper'].nil? && Jetpants.plugins['merge_helper'].has_key?('table_list')) table_list = Jetpants.plugins['merge_helper']['table_list'] diff --git a/plugins/online_schema_change/lib/commandsuite.rb b/plugins/online_schema_change/lib/commandsuite.rb index 4022c6e..b7c5f28 100644 --- a/plugins/online_schema_change/lib/commandsuite.rb +++ b/plugins/online_schema_change/lib/commandsuite.rb @@ -11,6 +11,7 @@ class CommandSuite < Thor method_option :table, :desc => 'Table to run the alter table on' method_option :all_shards, :desc => 'To run on all the shards', :type => :boolean method_option :no_check_plan, :desc => 'Do not check the query execution plan', :type => :boolean + method_option :shard_pool, :desc => 'The sharding pool for which to perform the alter' def alter_table unless options[:all_shards] pool_name = options[:pool] || ask('Please enter a name of a pool: ') @@ -23,7 +24,9 @@ def alter_table alter = options[:alter] || ask('Please enter a alter table statement (eg ADD COLUMN c1 INT): ') if options[:all_shards] - Jetpants.topology.alter_table_shards(database, table, alter, options[:dry_run], options[:no_check_plan]) + shard_pool = options[:shard_pool] || ask('Please enter the sharding pool for which to perform the split (enter for default pool): ') + shard_pool = default_shard_pool if shard_pool.empty? + Jetpants.topology.alter_table_shards(database, table, alter, options[:dry_run], options[:no_check_plan], shard_pool) else unless pool.alter_table(database, table, alter, options[:dry_run], false, options[:no_check_plan]) output "Check for errors during online schema change".red, :error @@ -36,6 +39,7 @@ def alter_table method_option :table, :desc => 'Table you ran the alter table on' method_option :database, :desc => 'Database you ran the alter table on' method_option :all_shards, :desc => 'To run on all the shards', :type => :boolean + method_option :shard_pool, :desc => 'The sharding pool for which to drop the old table' def alter_table_drop unless options[:all_shards] pool_name = options[:pool] || ask('Please enter a name of a pool: ') @@ -47,7 +51,9 @@ def alter_table_drop table = options[:table] || ask('Please enter a name of a table: ') if options[:all_shards] - Jetpants.topology.drop_old_alter_table_shards(database, table) + shard_pool = options[:shard_pool] || ask('Please enter the sharding pool for which to perform the split (enter for default pool): ') + shard_pool = default_shard_pool if shard_pool.empty? + Jetpants.topology.drop_old_alter_table_shards(database, table, shard_pool) else pool.drop_old_alter_table(database, table) end diff --git a/plugins/online_schema_change/lib/topology.rb b/plugins/online_schema_change/lib/topology.rb index 904d802..808fde6 100644 --- a/plugins/online_schema_change/lib/topology.rb +++ b/plugins/online_schema_change/lib/topology.rb @@ -5,13 +5,14 @@ class Topology # if you specify dry run it will run a dry run on all the shards # otherwise it will run on the first shard and ask if you want to # continue on the rest of the shards, 10 shards at a time - def alter_table_shards(database, table, alter, dry_run=true, no_check_plan=false) - my_shards = shards.dup + def alter_table_shards(database, table, alter, dry_run=true, no_check_plan=false, shard_pool = nil) + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.nil? + my_shards = shards(shard_pool).dup first_shard = my_shards.shift - print "Will run on first shard and prompt for going past the dry run only on the first shard\n\n" - print "[#{Time.now.to_s.blue}] #{first_shard.pool.to_s}\n" + output "Will run on first shard and prompt for going past the dry run only on the first shard\n\n" + output "#{first_shard.pool.to_s}\n" unless first_shard.alter_table(database, table, alter, dry_run, false) - print "First shard had an error, please check output\n" + output "First shard had an error, please check output\n" return end @@ -20,12 +21,12 @@ def alter_table_shards(database, table, alter, dry_run=true, no_check_plan=false errors = [] my_shards.limited_concurrent_map(10) do |shard| - print "[#{Time.now.to_s.blue}] #{shard.pool.to_s}\n" + output "#{shard.pool.to_s}\n" errors << shard unless shard.alter_table(database, table, alter, dry_run, true, no_check_plan) end errors.each do |shard| - print "check #{shard.name} for errors during online schema change\n" + output "check #{shard.name} for errors during online schema change\n" end end end @@ -34,17 +35,18 @@ def alter_table_shards(database, table, alter, dry_run=true, no_check_plan=false # this is because we do not drop the old table in the osc # also I will do the first shard and ask if you want to # continue, after that it will do each table serially - def drop_old_alter_table_shards(database, table) - my_shards = shards.dup + def drop_old_alter_table_shards(database, table, shard_pool = nil) + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.nil? + my_shards = shards(shard_pool).dup first_shard = my_shards.shift - print "Will run on first shard and prompt before going on to the rest\n\n" - print "[#{Time.now.to_s.blue}] #{first_shard.pool.to_s}\n" + output "Will run on first shard and prompt before going on to the rest\n\n" + output "#{first_shard.pool.to_s}\n" first_shard.drop_old_alter_table(database, table) continue = ask('First shard complete would you like to continue with the rest of the shards?: (YES/no) - YES has to be in all caps and fully typed') if continue == 'YES' my_shards.each do |shard| - print "[#{Time.now.to_s.blue}] #{shard.pool.to_s}\n" + print "#{shard.pool.to_s}\n" shard.drop_old_alter_table(database, table) end end diff --git a/plugins/simple_tracker/lib/shard.rb b/plugins/simple_tracker/lib/shard.rb index 7971762..d632f91 100644 --- a/plugins/simple_tracker/lib/shard.rb +++ b/plugins/simple_tracker/lib/shard.rb @@ -26,7 +26,7 @@ def self.from_hash(h) # we just return the shard for now... we have to wait until later to # set up children + parents, since it's easier to grab the corresponding # objects once all pools have been initialized. - Shard.new(h['min_id'], h['max_id'], h['master'], h['state'].to_sym) + Shard.new(h['min_id'], h['max_id'], h['master'], h['state'].to_sym, h['shard_pool']) end # Sets up parent/child relationships for the shard represented by the @@ -82,6 +82,7 @@ def to_hash(for_app_config=false) 'state' => state, 'master' => master, 'slaves' => slave_data, + 'shard_pool' => shard_pool, } end end diff --git a/plugins/simple_tracker/lib/shardpool.rb b/plugins/simple_tracker/lib/shardpool.rb new file mode 100644 index 0000000..f46dcc1 --- /dev/null +++ b/plugins/simple_tracker/lib/shardpool.rb @@ -0,0 +1,29 @@ +module Jetpants + class ShardPool + + def sync_configuration + Jetpants.topology.update_tracker_data + end + + ##### NEW CLASS-LEVEL METHODS ############################################## + + # Converts a hash (from asset tracker json file) into a Shard. + def self.from_hash(h) + # we just return the shard for now... we have to wait until later to + # set up children + parents, since it's easier to grab the corresponding + # objects once all pools have been initialized. + ShardPool.new(h['shard_pool']) + end + + ##### NEW METHODS ########################################################## + + # Converts a Shard to a hash, for use in either the internal asset tracker + # json (for_app_config=false) or for use in the application config file yaml + # (for_app_config=true) + def to_hash(for_app_config = true) + { + shard_pool: @name + } + end + end +end diff --git a/plugins/simple_tracker/lib/topology.rb b/plugins/simple_tracker/lib/topology.rb index fcdf2a7..be89b91 100644 --- a/plugins/simple_tracker/lib/topology.rb +++ b/plugins/simple_tracker/lib/topology.rb @@ -1,39 +1,55 @@ module Jetpants class Topology - attr_accessor :tracker + def self.tracker + @tracker ||= Jetpants::Plugin::SimpleTracker.new + + @tracker + end ##### METHOD OVERRIDES ##################################################### # Populates @pools by reading asset tracker data def load_pools - @tracker = Jetpants::Plugin::SimpleTracker.new # Create Pool and Shard objects - @pools = @tracker.global_pools.map {|h| Pool.from_hash(h)}.compact - all_shards = @tracker.shards.map {|h| Shard.from_hash(h)}.reject {|s| s.state == :recycle} + @pools = self.class.tracker.global_pools.map {|h| Pool.from_hash(h)}.compact + all_shards = self.class.tracker.shards.map {|h| Shard.from_hash(h)}.reject {|s| s.state == :recycle} @pools.concat all_shards # Now that all shards exist, we can safely assign parent/child relationships - @tracker.shards.each {|h| Shard.assign_relationships(h, all_shards)} + self.class.tracker.shards.each {|h| Shard.assign_relationships(h, all_shards)} + end + + # Populate @shard_pools by reading asset tracker data + def load_shard_pools + @shard_pools = self.class.tracker.shard_pools.map{|h| ShardPool.from_hash(h) }.compact end def add_pool(pool) @pools << pool unless pools.include? pool end + def add_shard_pool(shard_pool) + @shard_pools << shard_pool unless shard_pools.include? shard_pool + end + # Generates a database configuration file for a hypothetical web application def write_config - config_file_path = @tracker.app_config_file_path + config_file_path = self.class.tracker.app_config_file_path # Convert the pool list into a hash db_data = { 'database' => { 'pools' => functional_partitions.map {|p| p.to_hash(true)}, - 'shards' => shards.select {|s| s.in_config?}.map {|s| s.to_hash(true)}, - } + }, + 'shard_pools' => {} } + shard_pools.each do |shard_pool| + db_data['shard_pools'][shard_pool.name] = shard_pool.shards.select {|s| s.in_config?}.map {|s| s.to_hash(true)} + end + # Convert that hash to YAML and write it to a file File.open(config_file_path, 'w') do |f| f.write db_data.to_yaml @@ -44,8 +60,8 @@ def write_config # simple_tracker completely ignores any options like :role or :like def claim_spares(count, options={}) - raise "Not enough spare machines -- requested #{count}, only have #{@tracker.spares.count}" if @tracker.spares.count < count - hashes = @tracker.spares.shift(count) + raise "Not enough spare machines -- requested #{count}, only have #{self.class.tracker.spares.count}" if self.class.tracker.spares.count < count + hashes = self.class.tracker.spares.shift(count) update_tracker_data dbs = hashes.map {|h| h.is_a?(Hash) && h['node'] ? h['node'].to_db : h.to_db} @@ -60,11 +76,11 @@ def claim_spares(count, options={}) end def count_spares(options={}) - @tracker.spares.count + self.class.tracker.spares.count end def spares(options={}) - @tracker.spares.map(&:to_db) + self.class.tracker.spares.map(&:to_db) end @@ -76,9 +92,10 @@ def spares(options={}) # instead Pool#sync_configuration could just update the info for that pool # only. def update_tracker_data - @tracker.global_pools = functional_partitions.map &:to_hash - @tracker.shards = shards.reject {|s| s.state == :recycle}.map &:to_hash - @tracker.save + self.class.tracker.global_pools = functional_partitions.map &:to_hash + self.class.tracker.shards = pools.select{|p| p.is_a? Shard}.reject {|s| s.state == :recycle}.map &:to_hash + self.class.tracker.shard_pools = shard_pools.map(&:to_hash) + self.class.tracker.save end end diff --git a/plugins/simple_tracker/simple_tracker.rb b/plugins/simple_tracker/simple_tracker.rb index 678b6ed..e37da0a 100644 --- a/plugins/simple_tracker/simple_tracker.rb +++ b/plugins/simple_tracker/simple_tracker.rb @@ -17,6 +17,9 @@ class SimpleTracker # Array of hashes, each containing info from Shard#to_hash attr_accessor :shards + + # Array of hashes, each containing info from ShardPool#to_hash + attr_accessor :shard_pools # Clean state DB nodes that are ready for use. Array of any of the following: # * hashes each containing key 'node'. could expand to include 'role' or other metadata as well, @@ -29,15 +32,16 @@ class SimpleTracker def initialize @tracker_data_file_path = Jetpants.plugins['simple_tracker']['tracker_data_file_path'] || '/etc/jetpants_tracker.json' @app_config_file_path = Jetpants.plugins['simple_tracker']['app_config_file_path'] || '/var/lib/mysite/config/databases.yaml' - data = JSON.parse(File.read(@tracker_data_file_path)) rescue {'pools' => {}, 'shards' => [], 'spares' => []} + data = JSON.parse(File.read(@tracker_data_file_path)) rescue {'pools' => {}, 'shards' => [], 'spares' => [], 'shard_pools' => []} @global_pools = data['pools'] @shards = data['shards'] @spares = data['spares'] + @shard_pools = data['shard_pools'] end def save File.open(@tracker_data_file_path, 'w') do |f| - data = {'pools' => @global_pools, 'shards' => @shards, 'spares' => @spares} + data = {'pools' => @global_pools, 'shards' => @shards, 'spares' => @spares, 'shard_pools' => @shard_pools} f.puts JSON.pretty_generate(data) f.close end @@ -71,4 +75,4 @@ def determine_slaves(ip, port=3306) end # load all the monkeypatches for other Jetpants classes -%w(pool shard topology db commandsuite).each { |mod| require "simple_tracker/lib/#{mod}" } +%w(pool shard topology db shardpool commandsuite).each { |mod| require "simple_tracker/lib/#{mod}" } diff --git a/plugins/upgrade_helper/commandsuite.rb b/plugins/upgrade_helper/commandsuite.rb index f8e46ec..66fe4fa 100644 --- a/plugins/upgrade_helper/commandsuite.rb +++ b/plugins/upgrade_helper/commandsuite.rb @@ -17,8 +17,8 @@ def upgrade_clone_slave puts "You may clone to particular IP address(es), or can type \"spare\" to claim a node from the spare pool." target = options[:target] || ask('Please enter comma-separated list of targets (IPs or "spare") to clone to: ') - spares_needed = target.split(',').count {|t| t.strip.upcase == 'SPARE'} target = 'spare' if target.strip == '' || target.split(',').length == 0 + spares_needed = target.split(',').count {|t| t.strip.upcase == 'SPARE'} if spares_needed > 0 spares_available = Jetpants.topology.count_spares(role: :standby_slave, like: source, version: Plugin::UpgradeHelper.new_version) raise "Not enough upgraded spares with role of standby slave! Requested #{spares_needed} but only have #{spares_available} available." if spares_needed > spares_available @@ -122,10 +122,14 @@ def self.after_upgrade_promotion method_option :reads, :desc => 'Move reads to the new master', :type => :boolean method_option :writes, :desc => 'Move writes to new master', :type => :boolean method_option :cleanup, :desc => 'Tear down the old-version nodes', :type => :boolean + method_option :shard_pool, :desc => 'The sharding pool for which to perform the upgrade' def shard_upgrade + shard_pool = options[:shard_pool] || ask('Please enter the sharding pool which to perform the action on (enter for default pool): ') + shard_pool = Jetpants.topology.default_shard_pool if shard_pool.empty? + if options[:reads] raise 'The --reads, --writes, and --cleanup options are mutually exclusive' if options[:writes] || options[:cleanup] - s = ask_shard_being_upgraded :reads + s = ask_shard_being_upgraded(:reads, shard_pool) s.branched_upgrade_move_reads Jetpants.topology.write_config self.class.reminders( @@ -136,7 +140,7 @@ def shard_upgrade ) elsif options[:writes] raise 'The --reads, --writes, and --cleanup options are mutually exclusive' if options[:reads] || options[:cleanup] - s = ask_shard_being_upgraded :writes + s = ask_shard_being_upgraded(:writes, shard_pool) s.branched_upgrade_move_writes Jetpants.topology.write_config self.class.reminders( @@ -148,7 +152,7 @@ def shard_upgrade elsif options[:cleanup] raise 'The --reads, --writes, and --cleanup options are mutually exclusive' if options[:reads] || options[:writes] - s = ask_shard_being_upgraded :cleanup + s = ask_shard_being_upgraded(:cleanup, shard_pool) s.cleanup! else @@ -156,7 +160,7 @@ def shard_upgrade 'This process may take an hour or two. You probably want to run this from a screen session.', 'Be especially careful if you are relying on SSH Agent Forwarding for your root key, since this is not screen-friendly.' ) - s = ask_shard_being_upgraded :prep + s = ask_shard_being_upgraded(:prep, shard_pool) s.branched_upgrade_prep self.class.reminders( 'Proceed to next step: jetpants shard_upgrade --reads' @@ -208,8 +212,8 @@ def check_pool_queries end no_tasks do - def ask_shard_being_upgraded(stage=:prep) - shards_being_upgraded = Jetpants.shards.select {|s| [:child, :needs_cleanup].include?(s.state) && !s.parent && s.master.master} + def ask_shard_being_upgraded(stage = :prep, shard_pool = nil) + shards_being_upgraded = Jetpants.shards(shard_pool).select {|s| [:child, :needs_cleanup].include?(s.state) && !s.parent && s.master.master} if stage == :writes || stage == :cleanup if shards_being_upgraded.size == 0 raise 'No shards are currently being upgraded. You can only use this task after running "jetpants shard_upgrade".' diff --git a/plugins/upgrade_helper/shard.rb b/plugins/upgrade_helper/shard.rb index 46c81f0..9b9526a 100644 --- a/plugins/upgrade_helper/shard.rb +++ b/plugins/upgrade_helper/shard.rb @@ -5,11 +5,23 @@ class Shard def branched_upgrade_prep raise "Shard #{self} in wrong state to perform this action! expected :ready, found #{@state}" unless @state == :ready raise "Not enough standby slaves of this shard!" unless standby_slaves.size >= slaves_layout[:standby_slave] - source = standby_slaves.last - spares_available = Jetpants.topology.count_spares(role: :standby_slave, like: source, version: Plugin::UpgradeHelper.new_version) - raise "Not enough spares available!" unless spares_available >= 1 + slaves_layout[:standby_slave] - - targets = Jetpants.topology.claim_spares(1 + slaves_layout[:standby_slave], role: :standby_slave, like: source, version: Plugin::UpgradeHelper.new_version) + source = slave_for_clone + + spares_needed = {'standby' => slaves_layout[:standby_slave] + 1, 'backup' => slaves_layout[:backup_slave]} + + # Array to hold all the target nodes + targets = [] + + spares_needed.each do |role, needed| + next if needed == 0 + available = Jetpants.topology.count_spares(role: "#{role}_slave".to_sym, like: source, version: Plugin::UpgradeHelper.new_version) + raise "Not enough spare machines with role of #{role} slave! Requested #{needed} but only have #{available} available." if needed > available + end + + spares_needed.each do |role, needed| + next if needed == 0 + targets.concat Jetpants.topology.claim_spares(needed, role: "#{role}_slave".to_sym, like: source, version: Plugin::UpgradeHelper.new_version) + end # Disable fast shutdown on the source source.mysql_root_cmd 'SET GLOBAL innodb_fast_shutdown = 0' @@ -28,11 +40,14 @@ def branched_upgrade_prep # Make the 1st new slave be the "future master" which the other new # slaves will replicate from future_master = targets.shift - targets.each do |t| - future_master.pause_replication_with t - t.change_master_to future_master - [future_master, t].each {|db| db.resume_replication; db.catch_up_to_master} + future_master.pause_replication_with *targets + targets.concurrent_each do |slave| + slave.change_master_to future_master + slave.resume_replication + slave.catch_up_to_master end + future_master.resume_replication + future_master.catch_up_to_master end # Hack the pool configuration to send reads to the new master, but still send