mirror of
https://gh.wpcy.net/https://github.com/discourse/discourse.git
synced 2026-05-21 01:53:41 +08:00
Checking if a connection is available is probably not enough, when the connection is available, we should still verify it’s not stale.
158 lines
3.8 KiB
Ruby
Vendored
158 lines
3.8 KiB
Ruby
Vendored
# frozen_string_literal: true
|
|
require "weakref"
|
|
|
|
module Scheduler
|
|
module Deferrable
|
|
DEFAULT_TIMEOUT = 90
|
|
STATS_CACHE_SIZE = 100
|
|
|
|
attr_reader :async
|
|
|
|
def initialize
|
|
@async = !Rails.env.test?
|
|
@queue =
|
|
WorkQueue::ThreadSafeWrapper.new(
|
|
WorkQueue::FairQueue.new(:site, 500) do
|
|
WorkQueue::FairQueue.new(:user, 100) { WorkQueue::BoundedQueue.new(50) }
|
|
end,
|
|
)
|
|
|
|
@mutex = Mutex.new
|
|
@stats_mutex = Mutex.new
|
|
@paused = false
|
|
@thread = nil
|
|
@reactor = nil
|
|
@timeout = DEFAULT_TIMEOUT
|
|
@stats = LruRedux::ThreadSafeCache.new(STATS_CACHE_SIZE)
|
|
@finish = false
|
|
end
|
|
|
|
def timeout=(t)
|
|
@mutex.synchronize { @timeout = t }
|
|
end
|
|
|
|
def length
|
|
@queue.size
|
|
end
|
|
|
|
def stats
|
|
@stats_mutex.synchronize { @stats.to_a }
|
|
end
|
|
|
|
def pause
|
|
stop!
|
|
@paused = true
|
|
end
|
|
|
|
def resume
|
|
@paused = false
|
|
end
|
|
|
|
# for test and sidekiq
|
|
def async=(val)
|
|
@async = val
|
|
end
|
|
|
|
def later(
|
|
desc = nil,
|
|
db = RailsMultisite::ConnectionManagement.current_db,
|
|
force: true,
|
|
current_user: nil,
|
|
&blk
|
|
)
|
|
@stats_mutex.synchronize do
|
|
stats = (@stats[desc] ||= { queued: 0, finished: 0, duration: 0, errors: 0 })
|
|
stats[:queued] += 1
|
|
end
|
|
|
|
if @async
|
|
start_thread if !@thread&.alive? && !@paused
|
|
@queue.push({ site: db, user: current_user, db: db, job: blk, desc: desc }, force: force)
|
|
else
|
|
blk.call
|
|
end
|
|
end
|
|
|
|
def stop!(finish_work: false)
|
|
if finish_work
|
|
@finish = true
|
|
@queue.push({ finish: true }, force: true)
|
|
@thread&.join
|
|
end
|
|
@thread.kill if @thread&.alive?
|
|
@thread = nil
|
|
@reactor&.stop
|
|
@reactor = nil
|
|
end
|
|
|
|
# test only
|
|
def stopped?
|
|
!@thread&.alive?
|
|
end
|
|
|
|
def do_all_work
|
|
do_work(non_block = true) while !@queue.empty?
|
|
end
|
|
|
|
private
|
|
|
|
def start_thread
|
|
@mutex.synchronize do
|
|
@reactor = MessageBus::TimerThread.new if !@reactor
|
|
@thread =
|
|
Thread.new do
|
|
@thread.abort_on_exception = true if Rails.env.test?
|
|
do_work while (!@finish || !@queue.empty?)
|
|
end if !@thread&.alive?
|
|
end
|
|
end
|
|
|
|
# using non_block to match Ruby #deq
|
|
def do_work(non_block = false)
|
|
db, job, desc, finish = @queue.shift(block: !non_block).values_at(:db, :job, :desc, :finish)
|
|
|
|
return if finish
|
|
|
|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
|
db ||= RailsMultisite::ConnectionManagement::DEFAULT
|
|
|
|
RailsMultisite::ConnectionManagement.with_connection(db) do
|
|
begin
|
|
warning_job =
|
|
@reactor.queue(@timeout) do
|
|
Rails.logger.error "'#{desc}' is still running after #{@timeout} seconds on db #{db}, this process may need to be restarted!"
|
|
end if !non_block
|
|
job.call
|
|
rescue => ex
|
|
@stats_mutex.synchronize do
|
|
stats = @stats[desc]
|
|
stats[:errors] += 1 if stats
|
|
end
|
|
Discourse.handle_job_exception(ex, message: "Running deferred code '#{desc}'")
|
|
ensure
|
|
warning_job&.cancel
|
|
end
|
|
end
|
|
rescue => ex
|
|
Discourse.handle_job_exception(ex, message: "Processing deferred code queue")
|
|
ensure
|
|
if ActiveRecord::Base.connection&.verify!
|
|
ActiveRecord::Base.connection_handler.clear_active_connections!
|
|
end
|
|
if start
|
|
@stats_mutex.synchronize do
|
|
stats = @stats[desc]
|
|
if stats
|
|
stats[:finished] += 1
|
|
stats[:duration] += Process.clock_gettime(Process::CLOCK_MONOTONIC) - start
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
class Defer
|
|
extend Deferrable
|
|
initialize
|
|
end
|
|
end
|