diff --git a/app/controllers/servers_controller.rb b/app/controllers/servers_controller.rb index dc03765ba..9b83870cc 100644 --- a/app/controllers/servers_controller.rb +++ b/app/controllers/servers_controller.rb @@ -97,7 +97,7 @@ def unsuspend private def safe_params(*extras) - params.require(:server).permit(:name, :mode, :ip_pool_id, *extras) + params.require(:server).permit(:name, :mode, :priority, :ip_pool_id, *extras) end end diff --git a/app/lib/worker/jobs/process_queued_messages_job.rb b/app/lib/worker/jobs/process_queued_messages_job.rb index 18489b578..9c0995229 100644 --- a/app/lib/worker/jobs/process_queued_messages_job.rb +++ b/app/lib/worker/jobs/process_queued_messages_job.rb @@ -40,14 +40,18 @@ def local_ip?(ip) end # Obtain a queued message from the database for processing + # Messages are prioritized based on the server's priority (higher priority first) + # and then by the message ID (ascending order). # # @return [void] def lock_message_for_processing - QueuedMessage.where(ip_address_id: [nil, @ip_addresses]) - .where(locked_by: nil, locked_at: nil) - .ready_with_delayed_retry - .limit(1) - .update_all(locked_by: @locker, locked_at: @lock_time) + QueuedMessage.joins(:server) + .where(ip_address_id: [nil, @ip_addresses]) + .where(locked_by: nil, locked_at: nil) + .ready_with_delayed_retry + .order("servers.priority DESC, queued_messages.id ASC") + .limit(1) + .update_all(locked_by: @locker, locked_at: @lock_time) end # Get a full list of all messages which we can process (i.e. those which have just diff --git a/app/models/server.rb b/app/models/server.rb index b90f121f4..57d65ad01 100644 --- a/app/models/server.rb +++ b/app/models/server.rb @@ -79,7 +79,12 @@ class Server < ApplicationRecord validates :mode, inclusion: { in: MODES } validates :permalink, presence: true, uniqueness: { scope: :organization_id, case_sensitive: false }, format: { with: /\A[a-z0-9-]*\z/ }, exclusion: { in: RESERVED_PERMALINKS } validate :validate_ip_pool_belongs_to_organization - + validates :priority, presence: true, numericality: { + only_integer: true, + greater_than_or_equal_to: 0, + less_than_or_equal_to: 32767, + message: "must be a whole number between 0 and 32,767" + } before_validation(on: :create) do self.token = token.downcase if token end diff --git a/app/views/servers/_form.html.haml b/app/views/servers/_form.html.haml index 511970f18..7153d4fcc 100644 --- a/app/views/servers/_form.html.haml +++ b/app/views/servers/_form.html.haml @@ -20,6 +20,15 @@ e-mail will be routed normally to the intended recipients. When in Development mode, outgoing & incoming mail will be held and only visible in the web interface and will not be sent to any recipients or HTTP endpoints. + .fieldSet__field + = f.label :priority, "Sending Priority", :class => 'fieldSet__label' + .fieldSet__input + = f.text_field :priority, + :class => 'input input--text', + :placeholder => "e.g. 10" + %p.fieldSet__text + Set a priority for this server's outgoing mail. Messages from servers with a higher number will be sent first. + The default is 0. - if Postal.ip_pools? .fieldSet__field diff --git a/db/migrate/20250915065902_add_priority_to_server.rb b/db/migrate/20250915065902_add_priority_to_server.rb new file mode 100644 index 000000000..2b2e424ae --- /dev/null +++ b/db/migrate/20250915065902_add_priority_to_server.rb @@ -0,0 +1,5 @@ +class AddPriorityToServer < ActiveRecord::Migration[7.0] + def change + add_column :servers, :priority, :integer, limit: 2, unsigned: true, default: 0 + end +end diff --git a/db/schema.rb b/db/schema.rb index e0ba83581..e8fb67f5e 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.0].define(version: 2025_07_16_102600) do +ActiveRecord::Schema[7.0].define(version: 2025_09_15_065902) do create_table "additional_route_endpoints", id: :integer, charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t| t.integer "route_id" t.string "endpoint_type" @@ -256,6 +256,7 @@ t.index ["permalink"], name: "index_servers_on_permalink", length: 6 t.index ["token"], name: "index_servers_on_token", length: 6 t.index ["uuid"], name: "index_servers_on_uuid", length: 8 + t.integer "priority", limit: 2, default: 0, unsigned: true end create_table "smtp_endpoints", id: :integer, charset: "utf8mb4", collation: "utf8mb4_general_ci", force: :cascade do |t|