diff --git a/bundler/lib/bundler/installer/parallel_installer.rb b/bundler/lib/bundler/installer/parallel_installer.rb index 79489a6a6c95..020db30b8443 100644 --- a/bundler/lib/bundler/installer/parallel_installer.rb +++ b/bundler/lib/bundler/installer/parallel_installer.rb @@ -24,6 +24,10 @@ def enqueued? state == :enqueued end + def enqueue_with_priority? + state == :installable && spec.extensions.any? + end + def failed? state == :failed end @@ -194,7 +198,7 @@ def process_specs(installed_specs) spec.state = :installable end - worker_pool.enq(spec) + worker_pool.enq(spec, priority: spec.enqueue_with_priority?) end def finished_installing? diff --git a/bundler/lib/bundler/worker.rb b/bundler/lib/bundler/worker.rb index 7137484cc6d0..77f4f004aa69 100644 --- a/bundler/lib/bundler/worker.rb +++ b/bundler/lib/bundler/worker.rb @@ -22,6 +22,7 @@ def initialize(exn) def initialize(size, name, func) @name = name @request_queue = Thread::Queue.new + @request_queue_with_priority = Thread::Queue.new @response_queue = Thread::Queue.new @func = func @size = size @@ -32,9 +33,10 @@ def initialize(size, name, func) # Enqueue a request to be executed in the worker pool # # @param obj [String] mostly it is name of spec that should be downloaded - def enq(obj) + def enq(obj, priority: false) + queue = priority ? @request_queue_with_priority : @request_queue create_threads unless @threads - @request_queue.enq obj + queue.enq obj end # Retrieves results of job function being executed in worker pool @@ -52,7 +54,13 @@ def stop def process_queue(i) loop do - obj = @request_queue.deq + obj = begin + @request_queue_with_priority.deq(true) + rescue ThreadError + @request_queue.deq(false, timeout: 0.05) + end + + next if obj.nil? break if obj.equal? POISON @response_queue.enq apply_func(obj, i) end diff --git a/bundler/spec/bundler/installer/parallel_installer_spec.rb b/bundler/spec/bundler/installer/parallel_installer_spec.rb new file mode 100644 index 000000000000..49bcb5310ba9 --- /dev/null +++ b/bundler/spec/bundler/installer/parallel_installer_spec.rb @@ -0,0 +1,79 @@ +# frozen_string_literal: true + +require "bundler/installer/parallel_installer" +require "bundler/rubygems_gem_installer" +require "rubygems/remote_fetcher" +require "bundler" + +RSpec.describe Bundler::ParallelInstaller do + describe "priority queue" do + before do + require "support/artifice/compact_index" + + @previous_client = Gem::Request::ConnectionPools.client + Gem::Request::ConnectionPools.client = Gem::Net::HTTP + Gem::RemoteFetcher.fetcher.close_all + + build_repo2 do + build_gem "gem_with_extension", &:add_c_extension + build_gem "gem_without_extension" + end + + gemfile <<~G + source "https://gem.repo2" + + gem "gem_with_extension" + gem "gem_without_extension" + G + lockfile <<~L + GEM + remote: https://gem.repo2/ + specs: + gem_with_extension (1.0) + gem_without_extension (1.0) + + DEPENDENCIES + gem_with_extension + gem_without_extension + L + + @old_ui = Bundler.ui + Bundler.ui = Bundler::UI::Silent.new + end + + after do + Bundler.ui = @old_ui + Gem::Request::ConnectionPools.client = @previous_client + Artifice.deactivate + end + + let(:definition) do + allow(Bundler).to receive(:root) { bundled_app } + + definition = Bundler::Definition.build(bundled_app.join("Gemfile"), bundled_app.join("Gemfile.lock"), false) + definition.tap(&:setup_domain!) + end + let(:installer) { Bundler::Installer.new(bundled_app, definition) } + + it "queues native extensions in priority" do + parallel_installer = Bundler::ParallelInstaller.new(installer, definition.specs, 2, false, true) + worker_pool = parallel_installer.send(:worker_pool) + expected = 6 # Enqueue to download bundler and the 2 gems. Enqueue to install Bundler and the 2 gems. + + expect(worker_pool).to receive(:enq).exactly(expected).times.and_wrap_original do |original_enq, spec, opts| + unless opts.nil? # Enqueued for download, no priority + if spec.name == "gem_with_extension" + expect(opts).to eq({ priority: true }) + else + expect(opts).to eq({ priority: false }) + end + end + + opts ||= {} + original_enq.call(spec, **opts) + end + + parallel_installer.call + end + end +end diff --git a/bundler/spec/bundler/worker_spec.rb b/bundler/spec/bundler/worker_spec.rb index e4ebbd2932cf..2ad2845e378c 100644 --- a/bundler/spec/bundler/worker_spec.rb +++ b/bundler/spec/bundler/worker_spec.rb @@ -20,6 +20,26 @@ end end + describe "priority queue" do + it "process elements from the priority queue first" do + processed_elements = [] + + function = proc do |element, _| + processed_elements << element + end + + worker = described_class.new(1, "Spec Worker", function) + worker.instance_variable_set(:@threads, []) # Prevent the enqueueing from starting work. + worker.enq("Normal element") + worker.enq("Priority element", priority: true) + worker.send(:create_threads) + + worker.stop + + expect(processed_elements).to eq(["Priority element", "Normal element"]) + end + end + describe "handling interrupts" do let(:status) do pid = Process.fork do diff --git a/bundler/spec/support/windows_tag_group.rb b/bundler/spec/support/windows_tag_group.rb index c41c446462c4..bd6acb9d55ca 100644 --- a/bundler/spec/support/windows_tag_group.rb +++ b/bundler/spec/support/windows_tag_group.rb @@ -137,6 +137,7 @@ module WindowsTagGroup "spec/bundler/build_metadata_spec.rb", "spec/bundler/current_ruby_spec.rb", "spec/bundler/installer/gem_installer_spec.rb", + "spec/bundler/installer/parallel_installer_spec.rb", "spec/bundler/cli_common_spec.rb", "spec/bundler/ci_detector_spec.rb", ],