mirror of
https://gh.wpcy.net/https://github.com/discourse/discourse.git
synced 2026-05-25 04:34:04 +08:00
Ruby's compact module syntax (`module Migrations::Database::Schema::DSL`) breaks lexical constant lookup — `Module.nesting` only includes the innermost constant, so every cross-module reference must be fully qualified. In practice this means writing `Migrations::Database::Schema::Helpers` even when you're already inside `Migrations::Database::Schema`. Nested module definitions restore the full nesting chain, which brings several practical benefits: - **Less verbose code**: references like `Schema::Helpers`, `Database::IntermediateDB`, or `Converters::Base::ProgressStep` work without repeating the full path from root - **Easier to write new code**: contributors don't need to remember which prefixes are required — if you're inside the namespace, short names just work - **Fewer aliasing workarounds**: removes the need for constants like `MappingType = Migrations::Importer::MappingType` that existed solely to shorten references - **Standard Ruby style**: consistent with how most Ruby projects and gems structure their namespaces The diff is large but mechanical — no logic changes, just module wrapping and shortening references that the nesting now resolves. Generated code (intermediate_db models/enums) keeps fully qualified references like `Migrations::Database.format_*` since it must work regardless of the configured output namespace. - Convert 138 lib files from compact to nested module definitions - Remove now-redundant fully qualified prefixes and aliases - Update model and enum writers to generate nested modules with correct indentation - Regenerate all intermediate_db models and enums
131 lines
3.5 KiB
Ruby
Vendored
131 lines
3.5 KiB
Ruby
Vendored
# frozen_string_literal: true
|
|
|
|
require "etc"
|
|
require "colored2"
|
|
|
|
module Migrations
|
|
module Converters
|
|
module Base
|
|
class ProgressStepExecutor
|
|
WORKER_COUNT = Etc.nprocessors - 1 # leave 1 CPU free to do other work
|
|
MIN_PARALLEL_ITEMS = WORKER_COUNT * 10
|
|
MAX_QUEUE_SIZE = WORKER_COUNT * 100
|
|
PRINT_RUNTIME_AFTER_SECONDS = 5
|
|
|
|
def initialize(step)
|
|
@step = step
|
|
end
|
|
|
|
def execute
|
|
@max_progress = calculate_max_progress
|
|
|
|
puts @step.class.title
|
|
@step.execute
|
|
|
|
if execute_in_parallel?
|
|
execute_parallel
|
|
else
|
|
execute_serially
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def execute_in_parallel?
|
|
@step.class.run_in_parallel? && (@max_progress.nil? || @max_progress > MIN_PARALLEL_ITEMS)
|
|
end
|
|
|
|
def execute_serially
|
|
job = SerialJob.new(@step)
|
|
|
|
with_progressbar do |progressbar|
|
|
@step.items.each do |item|
|
|
stats = job.run(item)
|
|
progressbar.update(
|
|
increment_by: stats.progress,
|
|
warning_count: stats.warning_count,
|
|
error_count: stats.error_count,
|
|
)
|
|
end
|
|
end
|
|
end
|
|
|
|
def execute_parallel
|
|
worker_output_queue = SizedQueue.new(MAX_QUEUE_SIZE)
|
|
work_queue = SizedQueue.new(MAX_QUEUE_SIZE)
|
|
|
|
workers = start_workers(work_queue, worker_output_queue)
|
|
writer_thread = start_db_writer(worker_output_queue)
|
|
push_work(work_queue)
|
|
|
|
workers.each(&:wait)
|
|
worker_output_queue.close
|
|
writer_thread.join
|
|
end
|
|
|
|
def calculate_max_progress
|
|
start_time = Time.now
|
|
max_progress = @step.max_progress
|
|
duration = Time.now - start_time
|
|
|
|
if duration > PRINT_RUNTIME_AFTER_SECONDS
|
|
message =
|
|
I18n.t(
|
|
"converter.max_progress_calculation",
|
|
duration: DateHelper.human_readable_time(duration),
|
|
)
|
|
puts " #{message}"
|
|
end
|
|
|
|
max_progress
|
|
end
|
|
|
|
def with_progressbar
|
|
ExtendedProgressBar
|
|
.new(max_progress: @max_progress)
|
|
.run { |progressbar| yield progressbar }
|
|
end
|
|
|
|
def start_db_writer(worker_output_queue)
|
|
Thread.new do
|
|
Thread.current.name = "writer_thread"
|
|
|
|
with_progressbar do |progressbar|
|
|
while (parametrized_insert_statements, stats = worker_output_queue.pop)
|
|
parametrized_insert_statements.each do |sql, parameters|
|
|
Database::IntermediateDB.insert(sql, *parameters)
|
|
end
|
|
|
|
progressbar.update(
|
|
increment_by: stats.progress,
|
|
warning_count: stats.warning_count,
|
|
error_count: stats.error_count,
|
|
)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_workers(work_queue, worker_output_queue)
|
|
workers = []
|
|
|
|
Process.warmup
|
|
|
|
ForkManager.batch_forks do
|
|
WORKER_COUNT.times do |index|
|
|
job = ParallelJob.new(@step)
|
|
workers << Worker.new(index, work_queue, worker_output_queue, job).start
|
|
end
|
|
end
|
|
|
|
workers
|
|
end
|
|
|
|
def push_work(work_queue)
|
|
@step.items.each { |item| work_queue.push(item) }
|
|
work_queue.close
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|