mirror of
https://gh.wpcy.net/https://github.com/discourse/discourse.git
synced 2026-05-04 08:00:01 +08:00
Ruby's compact module syntax (`module Migrations::Database::Schema::DSL`) breaks lexical constant lookup — `Module.nesting` only includes the innermost constant, so every cross-module reference must be fully qualified. In practice this means writing `Migrations::Database::Schema::Helpers` even when you're already inside `Migrations::Database::Schema`. Nested module definitions restore the full nesting chain, which brings several practical benefits: - **Less verbose code**: references like `Schema::Helpers`, `Database::IntermediateDB`, or `Converters::Base::ProgressStep` work without repeating the full path from root - **Easier to write new code**: contributors don't need to remember which prefixes are required — if you're inside the namespace, short names just work - **Fewer aliasing workarounds**: removes the need for constants like `MappingType = Migrations::Importer::MappingType` that existed solely to shorten references - **Standard Ruby style**: consistent with how most Ruby projects and gems structure their namespaces The diff is large but mechanical — no logic changes, just module wrapping and shortening references that the nesting now resolves. Generated code (intermediate_db models/enums) keeps fully qualified references like `Migrations::Database.format_*` since it must work regardless of the configured output namespace. - Convert 138 lib files from compact to nested module definitions - Remove now-redundant fully qualified prefixes and aliases - Update model and enum writers to generate nested modules with correct indentation - Regenerate all intermediate_db models and enums
108 lines
2.8 KiB
Ruby
108 lines
2.8 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "oj"
|
|
|
|
module Migrations
|
|
module Converters
|
|
module Base
|
|
class Worker
|
|
OJ_SETTINGS = { mode: :object, class_cache: true, symbol_keys: true }
|
|
|
|
def initialize(index, input_queue, output_queue, job)
|
|
@index = index
|
|
@input_queue = input_queue
|
|
@output_queue = output_queue
|
|
@job = job
|
|
|
|
@threads = []
|
|
@mutex = Mutex.new
|
|
@data_processed = ConditionVariable.new
|
|
end
|
|
|
|
def start
|
|
parent_input_stream, parent_output_stream = IO.pipe
|
|
fork_input_stream, fork_output_stream = IO.pipe
|
|
|
|
worker_pid =
|
|
start_fork(
|
|
parent_input_stream,
|
|
parent_output_stream,
|
|
fork_input_stream,
|
|
fork_output_stream,
|
|
)
|
|
|
|
fork_output_stream.close
|
|
parent_input_stream.close
|
|
|
|
start_input_thread(parent_output_stream, worker_pid)
|
|
start_output_thread(fork_input_stream)
|
|
|
|
self
|
|
end
|
|
|
|
def wait
|
|
@threads.each(&:join)
|
|
end
|
|
|
|
private
|
|
|
|
def start_fork(
|
|
parent_input_stream,
|
|
parent_output_stream,
|
|
fork_input_stream,
|
|
fork_output_stream
|
|
)
|
|
ForkManager.fork do
|
|
begin
|
|
Process.setproctitle("worker_process#{@index}")
|
|
|
|
parent_output_stream.close
|
|
fork_input_stream.close
|
|
|
|
Oj.load(parent_input_stream, OJ_SETTINGS) do |data|
|
|
result = @job.run(data)
|
|
Oj.to_stream(fork_output_stream, result, OJ_SETTINGS)
|
|
end
|
|
rescue SignalException
|
|
exit(1)
|
|
ensure
|
|
@job.cleanup
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_input_thread(output_stream, worker_pid)
|
|
@threads << Thread.new do
|
|
Thread.current.name = "worker_#{@index}_input"
|
|
|
|
begin
|
|
while (data = @input_queue.pop)
|
|
Oj.to_stream(output_stream, data, OJ_SETTINGS)
|
|
@mutex.synchronize { @data_processed.wait(@mutex) }
|
|
end
|
|
ensure
|
|
output_stream.close
|
|
Process.waitpid(worker_pid)
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_output_thread(input_stream)
|
|
@threads << Thread.new do
|
|
Thread.current.name = "worker_#{@index}_output"
|
|
|
|
begin
|
|
Oj.load(input_stream, OJ_SETTINGS) do |data|
|
|
@output_queue.push(data)
|
|
@mutex.synchronize { @data_processed.signal }
|
|
end
|
|
ensure
|
|
input_stream.close
|
|
@mutex.synchronize { @data_processed.signal }
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|
|
end
|