discourse/migrations/lib/converters/base/worker.rb
Gerhard Schlager 89f26da39d
MT: Switch to nested module style across migrations/ (#38564)
Ruby's compact module syntax (`module
Migrations::Database::Schema::DSL`) breaks lexical constant lookup —
`Module.nesting` only includes the innermost constant, so every
cross-module reference must be fully qualified. In practice this means
writing `Migrations::Database::Schema::Helpers` even when you're already
inside `Migrations::Database::Schema`.

Nested module definitions restore the full nesting chain, which brings
several practical benefits:

- **Less verbose code**: references like `Schema::Helpers`,
`Database::IntermediateDB`, or `Converters::Base::ProgressStep` work
without repeating the full path from root
- **Easier to write new code**: contributors don't need to remember
which prefixes are required — if you're inside the namespace, short
names just work
- **Fewer aliasing workarounds**: removes the need for constants like
`MappingType = Migrations::Importer::MappingType` that existed solely to
shorten references
- **Standard Ruby style**: consistent with how most Ruby projects and
gems structure their namespaces

The diff is large but mechanical — no logic changes, just module
wrapping and shortening references that the nesting now resolves.
Generated code (intermediate_db models/enums) keeps fully qualified
references like `Migrations::Database.format_*` since it must work
regardless of the configured output namespace.

- Convert 138 lib files from compact to nested module definitions
- Remove now-redundant fully qualified prefixes and aliases
- Update model and enum writers to generate nested modules with correct
indentation
- Regenerate all intermediate_db models and enums
2026-03-19 18:15:19 +01:00

108 lines
2.8 KiB
Ruby

# frozen_string_literal: true
require "oj"
module Migrations
module Converters
module Base
class Worker
OJ_SETTINGS = { mode: :object, class_cache: true, symbol_keys: true }
def initialize(index, input_queue, output_queue, job)
@index = index
@input_queue = input_queue
@output_queue = output_queue
@job = job
@threads = []
@mutex = Mutex.new
@data_processed = ConditionVariable.new
end
def start
parent_input_stream, parent_output_stream = IO.pipe
fork_input_stream, fork_output_stream = IO.pipe
worker_pid =
start_fork(
parent_input_stream,
parent_output_stream,
fork_input_stream,
fork_output_stream,
)
fork_output_stream.close
parent_input_stream.close
start_input_thread(parent_output_stream, worker_pid)
start_output_thread(fork_input_stream)
self
end
def wait
@threads.each(&:join)
end
private
def start_fork(
parent_input_stream,
parent_output_stream,
fork_input_stream,
fork_output_stream
)
ForkManager.fork do
begin
Process.setproctitle("worker_process#{@index}")
parent_output_stream.close
fork_input_stream.close
Oj.load(parent_input_stream, OJ_SETTINGS) do |data|
result = @job.run(data)
Oj.to_stream(fork_output_stream, result, OJ_SETTINGS)
end
rescue SignalException
exit(1)
ensure
@job.cleanup
end
end
end
def start_input_thread(output_stream, worker_pid)
@threads << Thread.new do
Thread.current.name = "worker_#{@index}_input"
begin
while (data = @input_queue.pop)
Oj.to_stream(output_stream, data, OJ_SETTINGS)
@mutex.synchronize { @data_processed.wait(@mutex) }
end
ensure
output_stream.close
Process.waitpid(worker_pid)
end
end
end
def start_output_thread(input_stream)
@threads << Thread.new do
Thread.current.name = "worker_#{@index}_output"
begin
Oj.load(input_stream, OJ_SETTINGS) do |data|
@output_queue.push(data)
@mutex.synchronize { @data_processed.signal }
end
ensure
input_stream.close
@mutex.synchronize { @data_processed.signal }
end
end
end
end
end
end
end