Skip to content

Commit

Permalink
Cluster parallel mode (#3)
Browse files Browse the repository at this point in the history
* run cluster in parallel mode

* add test cases for cluster parallel mode

* additional test cases

* prepare 1.0.0.beta1

* update readme

Co-authored-by: Ryan <rmammina@gmail.com>
  • Loading branch information
ryan-avamia and rmammina authored Feb 27, 2022
1 parent e525eaa commit 01dc73f
Show file tree
Hide file tree
Showing 16 changed files with 390 additions and 119 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Kanrisuru 1.0.0.beta1 (February 27, 2022) ##
* Add parallel mode for cluster. Allows hosts to run commands concurrently, reducing time it takes due to the high I/O blocking nature of the network requests.
* Add test cases for cluster parallel mode.
* Clean up methods on cluster class to use map and each methods in a simplified manner.
* Remove `/spec` dir from codecoverage.

## Kanrisuru 0.20.0 (February 21, 2022) ##
* Allow hosts to be connected via proxy host. This is much like using a bastion / jump server.
* Add integration test cases for proxy host connection.
Expand Down
19 changes: 18 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,24 @@ host = Kanrisuru::Remote::Host.new(host: 'remote-host-4', username: 'rhel', keys
cluster << host
```

Kanrisuru at this point only runs commands sequentially. We plan on creating a parallel run mode in a future release.
#### Run cluster in parallel mode to reduce time waiting on blocking IO
```ruby
Benchmark.measure do
cluster.each do |host|
puts cluster.pwd
end
end
# => 0.198980 0.029681 0.228661 ( 5.258496)

cluster.parallel = true

Benchmark.measure do
cluster.each do |host|
puts cluster.pwd
end
end
# => 0.016478 0.007956 0.024434 ( 0.120066)
```

#### To run across all hosts with a single command, cluster will return a array of result hashes
```ruby
Expand Down
1 change: 1 addition & 0 deletions lib/kanrisuru.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
require_relative 'kanrisuru/mode'
require_relative 'kanrisuru/os_package'
require_relative 'kanrisuru/command'
require_relative 'kanrisuru/processor_count'
require_relative 'kanrisuru/remote'
require_relative 'kanrisuru/result'
require_relative 'kanrisuru/core'
Expand Down
9 changes: 4 additions & 5 deletions lib/kanrisuru/os_package/collection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ def os_collection(mod, opts = {})
os_method_names.each do |method_name|
define_method method_name do |*args, &block|
cluster = namespace_instance.instance_variable_get(:@cluster)
hosts = cluster.instance_variable_get(:@hosts)
hosts.map do |host_addr, host|
{ host: host_addr, result: host.send(namespace).send(method_name, *args, &block) }
cluster.map do |host|
host.send(namespace).send(method_name, *args, &block)
end
end
end
Expand All @@ -45,8 +44,8 @@ def os_collection(mod, opts = {})
class_eval do
os_method_names.each do |method_name|
define_method method_name do |*args, &block|
@hosts.map do |host_addr, host|
{ host: host_addr, result: host.send(method_name, *args, &block) }
map do |host|
host.send(method_name, *args, &block)
end
end
end
Expand Down
45 changes: 45 additions & 0 deletions lib/kanrisuru/processor_count.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# frozen_string_literal: true
require 'etc'

module Kanrisuru
# https://github.com/grosser/parallel/blob/master/lib/parallel/processor_count.rb
module ProcessorCount
# Number of processors seen by the OS, used for process scheduling
def self.processor_count
@processor_count ||= Integer(ENV['PARALLEL_PROCESSOR_COUNT'] || Etc.nprocessors)
end

# Number of physical processor cores on the current system.
def self.physical_processor_count
@physical_processor_count ||= begin
ppc =
case RbConfig::CONFIG["target_os"]
when /darwin[12]/
IO.popen("/usr/sbin/sysctl -n hw.physicalcpu").read.to_i
when /linux/
cores = {} # unique physical ID / core ID combinations
phy = 0
IO.read("/proc/cpuinfo").scan(/^physical id.*|^core id.*/) do |ln|
if ln.start_with?("physical")
phy = ln[/\d+/]
elsif ln.start_with?("core")
cid = "#{phy}:#{ln[/\d+/]}"
cores[cid] = true unless cores[cid]
end
end
cores.count
when /mswin|mingw/
require 'win32ole'
result_set = WIN32OLE.connect("winmgmts://").ExecQuery(
"select NumberOfCores from Win32_Processor"
)
result_set.to_enum.collect(&:NumberOfCores).reduce(:+)
else
processor_count
end
# fall back to logical count if physical info is invalid
ppc > 0 ? ppc : processor_count
end
end
end
end
107 changes: 78 additions & 29 deletions lib/kanrisuru/remote/cluster.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
# frozen_string_literal: true
require 'thread'

module Kanrisuru
module Remote
class Cluster
extend OsPackage::Collection
include Enumerable

attr_accessor :parallel, :concurrency

def initialize(*hosts)
@parallel = false
@concurrency = local_concurrency

@hosts = {}
hosts.each do |host_opts|
add_host(host_opts)
Expand All @@ -30,53 +36,94 @@ def delete(host)
end

def execute(command)
@hosts.map do |host_addr, host|
## Need to evaluate each host independently for the command.
cmd = create_command(command)

{ host: host_addr, result: host.execute(cmd) }
end
map { |host| host.execute(create_command(command)) }
end

def execute_shell(command)
@hosts.map do |host_addr, host|
## Need to evaluate each host independently for the command.
cmd = create_command(command)

{ host: host_addr, result: host.execute_shell(cmd) }
end
end

def each(&block)
@hosts.each { |_host_addr, host| block.call(host) }
map { |host| host.execute_shell(create_command(command)) }
end

def hostname
map_host_results(:hostname)
map { |host| host.send(:hostname) }
end

def ping?
map_host_results(:ping?)
map { |host| host.send(:ping?) }
end

def su(user)
@hosts.each { |_, host| host.su(user) }
each { |host| host.su(user) }
end

def chdir(path = '~')
cd(path)
end

def cd(path = '~')
@hosts.each { |_, host| host.cd(path) }
each { |host| host.cd(path) }
end

def disconnect
@hosts.each { |_, host| host.disconnect }
each { |host| host.disconnect }
end

def map(&block)
parallel? ? each_parallel(preserve: true, &block) : each_sequential(preserve: true, &block)
end

def each(&block)
parallel? ? each_parallel(preserve: false, &block) : each_sequential(preserve: false, &block)
end

def parallel?
@parallel
end

def sequential?
!parallel?
end

private

def each_sequential(opts = {}, &block)
results = @hosts.map do |host_addr, host|
{ host: host_addr, result: block.call(host) }
end

opts[:preserve] ? results : self
end

def each_parallel(opts = {}, &block)
queue = Queue.new.tap do |q|
@hosts.each { |_, host| q << host }
end

threads = []
results = []
mutex = Mutex.new

## No need to spawn more threads then number of hosts in cluster
concurrency = queue.length < @concurrency ? queue.length : @concurrency
concurrency.times do
threads << Thread.new do
loop do
host = queue.pop(true) rescue Thread.exit

begin
result = block.call(host)
mutex.synchronize { results.push({ host: host.host, result: result }) }
rescue Exception => exception
mutex.synchronize { results.push({ host: host.host, result: exception }) }
end
end
end
end

threads.each(&:join)

opts[:preserve] ? results : self
end

def create_command(command)
case command
when String
Expand All @@ -88,12 +135,6 @@ def create_command(command)
end
end

def map_host_results(action)
@hosts.map do |host_addr, host|
{ host: host_addr, result: host.send(action) }
end
end

def remove_host(host)
if host.instance_of?(Kanrisuru::Remote::Host)
removed = false
Expand All @@ -119,14 +160,22 @@ def remove_host(host)
end

def add_host(host_opts)
if host_opts.instance_of?(Hash)
case host_opts
when Hash
@hosts[host_opts[:host]] = Kanrisuru::Remote::Host.new(host_opts)
elsif host_opts.instance_of?(Kanrisuru::Remote::Host)
when Kanrisuru::Remote::Host
@hosts[host_opts.host] = host_opts
when Kanrisuru::Remote::Cluster
host_opts.send(:each_sequential) { |host| @hosts[host.host] = host }
else
raise ArgumentError, 'Invalid host option'
end
end

def local_concurrency
ProcessorCount.physical_processor_count
end

end
end
end
2 changes: 1 addition & 1 deletion lib/kanrisuru/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module Kanrisuru
VERSION = '0.20.0'
VERSION = '1.0.0.beta1'
end
17 changes: 17 additions & 0 deletions spec/functional/processor_count_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# frozen_string_literal: true

require 'spec_helper'

RSpec.describe Kanrisuru::ProcessorCount do
it 'gets processor count' do
expect(described_class.processor_count).to be > 0
end

it 'gets physical processor count' do
expect(described_class.physical_processor_count).to be > 0
end

it "is even factor of logical cpus" do
expect(described_class.processor_count % described_class.physical_processor_count).to be == 0
end
end
Loading

0 comments on commit 01dc73f

Please sign in to comment.