mirror of
https://gh.wpcy.net/https://github.com/discourse/discourse.git
synced 2026-05-03 20:07:47 +08:00
Ruby's compact module syntax (`module Migrations::Database::Schema::DSL`) breaks lexical constant lookup — `Module.nesting` only includes the innermost constant, so every cross-module reference must be fully qualified. In practice this means writing `Migrations::Database::Schema::Helpers` even when you're already inside `Migrations::Database::Schema`. Nested module definitions restore the full nesting chain, which brings several practical benefits: - **Less verbose code**: references like `Schema::Helpers`, `Database::IntermediateDB`, or `Converters::Base::ProgressStep` work without repeating the full path from root - **Easier to write new code**: contributors don't need to remember which prefixes are required — if you're inside the namespace, short names just work - **Fewer aliasing workarounds**: removes the need for constants like `MappingType = Migrations::Importer::MappingType` that existed solely to shorten references - **Standard Ruby style**: consistent with how most Ruby projects and gems structure their namespaces The diff is large but mechanical — no logic changes, just module wrapping and shortening references that the nesting now resolves. Generated code (intermediate_db models/enums) keeps fully qualified references like `Migrations::Database.format_*` since it must work regardless of the configured output namespace. - Convert 138 lib files from compact to nested module definitions - Remove now-redundant fully qualified prefixes and aliases - Update model and enum writers to generate nested modules with correct indentation - Regenerate all intermediate_db models and enums
140 lines
3.3 KiB
Ruby
140 lines
3.3 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "etc"
|
|
require "colored2"
|
|
|
|
module Migrations
|
|
module Uploader
|
|
module Tasks
|
|
class Base
|
|
class NotImplementedError < StandardError
|
|
end
|
|
|
|
TRANSACTION_SIZE = 1000
|
|
QUEUE_SIZE = 1000
|
|
DEFAULT_THREAD_FACTOR = 1.5
|
|
|
|
attr_reader :uploads_db,
|
|
:intermediate_db,
|
|
:settings,
|
|
:work_queue,
|
|
:status_queue,
|
|
:discourse_store,
|
|
:error_count,
|
|
:current_count,
|
|
:missing_count,
|
|
:skipped_count
|
|
|
|
def initialize(databases, settings)
|
|
@uploads_db = databases[:uploads_db]
|
|
@intermediate_db = databases[:intermediate_db]
|
|
|
|
@settings = settings
|
|
|
|
@work_queue = SizedQueue.new(QUEUE_SIZE)
|
|
@status_queue = SizedQueue.new(QUEUE_SIZE)
|
|
@discourse_store = Discourse.store
|
|
|
|
@error_count = 0
|
|
@current_count = 0
|
|
@missing_count = 0
|
|
@skipped_count = 0
|
|
end
|
|
|
|
def run!
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def self.run!(databases, settings)
|
|
new(databases, settings).run!
|
|
end
|
|
|
|
private
|
|
|
|
def handle_status_update
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def enqueue_jobs
|
|
raise NotImplementedError
|
|
end
|
|
|
|
def instantiate_task_resource
|
|
{}
|
|
end
|
|
|
|
def start_status_thread
|
|
Thread.new do
|
|
while !(result = status_queue.pop).nil?
|
|
handle_status_update(result)
|
|
log_status
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_consumer_threads
|
|
thread_count.times.map { |index| consumer_thread(index) }
|
|
end
|
|
|
|
def consumer_thread(index)
|
|
Thread.new do
|
|
Thread.current.name = "worker-#{index}"
|
|
resource = instantiate_task_resource
|
|
|
|
while (row = work_queue.pop)
|
|
process_upload(row, resource)
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_producer_thread
|
|
Thread.new { enqueue_jobs }
|
|
end
|
|
|
|
def thread_count
|
|
@thread_count ||= calculate_thread_count
|
|
end
|
|
|
|
def add_multisite_prefix(path)
|
|
return path if !Rails.configuration.multisite
|
|
|
|
File.join("uploads", RailsMultisite::ConnectionManagement.current_db, path)
|
|
end
|
|
|
|
def file_exists?(path)
|
|
if discourse_store.external?
|
|
discourse_store.object_from_path(path).exists?
|
|
else
|
|
File.exist?(File.join(discourse_store.public_dir, path))
|
|
end
|
|
end
|
|
|
|
def with_retries(max: 3)
|
|
count = 0
|
|
|
|
loop do
|
|
result = yield
|
|
break result if result
|
|
|
|
count += 1
|
|
break nil if count >= max
|
|
|
|
sleep(calculate_backoff(count))
|
|
end
|
|
end
|
|
|
|
def calculate_backoff(retry_count)
|
|
0.25 * retry_count
|
|
end
|
|
|
|
def calculate_thread_count
|
|
base = Etc.nprocessors
|
|
thread_count_factor = settings.fetch(:thread_count_factor, DEFAULT_THREAD_FACTOR)
|
|
store_factor = discourse_store.external? ? 2 : 1
|
|
|
|
(base * thread_count_factor * store_factor).to_i
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|