discourse/plugins/discourse-ai/lib/completions/converse_message_processor.rb
Sam 65024326dd
FEATURE: Add AI authoring to Discourse Workflows (#40504)
Previously, admins could only build Discourse Workflows by manually
adding and connecting every trigger, condition, and action node on the
canvas.

This change adds an AI authoring assistant, gated behind
`discourse_workflows_ai_authoring_enabled` and DiscourseAi, that turns a
natural-language request into a server-validated, reviewable workflow
patch the admin applies. It only ever proposes drafts and never
publishes.

---------

Co-authored-by: chapoi <101828855+chapoi@users.noreply.github.com>
Co-authored-by: Rafael Silva <xfalcox@gmail.com>
Co-authored-by: Joffrey JAFFEUX <j.jaffeux@gmail.com>
2026-06-18 18:32:30 +02:00

220 lines
6.6 KiB
Ruby
Vendored

# frozen_string_literal: true
class DiscourseAi::Completions::ConverseMessageProcessor
PROVIDER_KEY = :bedrock_converse
class ConverseToolCall
attr_reader :name, :raw_json, :id
def initialize(name, id, partial_tool_calls: false)
@name = name
@id = id
@raw_json = +""
@tool_call = DiscourseAi::Completions::ToolCall.new(id: id, name: name, parameters: {})
@streaming_parser =
DiscourseAi::Completions::JsonStreamingTracker.new(self) if partial_tool_calls
end
def append(json)
@raw_json << json
@streaming_parser << json if @streaming_parser
end
def notify_progress(key, value)
@tool_call.partial = true
@tool_call.parameters[key.to_sym] = value
@has_new_data = true
end
def has_partial?
@has_new_data
end
def partial_tool_call
@has_new_data = false
@tool_call
end
def to_tool_call
parameters = DiscourseAi::Completions::ToolArgumentsParser.parse(raw_json)
@tool_call = @tool_call.dup
@tool_call.partial = false
@tool_call.parameters = parameters
@tool_call
end
end
attr_reader :tool_calls,
:input_tokens,
:output_tokens,
:cache_read_input_tokens,
:cache_write_input_tokens,
:output_thinking
def initialize(streaming_mode:, partial_tool_calls: false, output_thinking: false)
@streaming_mode = streaming_mode
@tool_calls = []
@current_tool_call = nil
@partial_tool_calls = partial_tool_calls
@output_thinking = output_thinking
@thinking = nil
end
def to_tool_calls
@tool_calls.map { |tool_call| tool_call.to_tool_call }
end
# Processes a streamed event from the Converse API.
# Events are hashes with symbolized keys matching the SDK event structure.
def process_streamed_message(parsed)
return if !parsed
result = nil
type = parsed[:type]
case type
when :content_block_start
start_data = parsed[:start]
if start_data&.dig(:tool_use)
tool = start_data[:tool_use]
result = @current_tool_call.to_tool_call if @current_tool_call
@current_tool_call =
ConverseToolCall.new(
tool[:name],
tool[:tool_use_id],
partial_tool_calls: @partial_tool_calls,
)
end
when :content_block_delta
delta = parsed[:delta]
if delta&.key?(:tool_use)
@current_tool_call&.append(delta[:tool_use][:input].to_s)
result = @current_tool_call.partial_tool_call if @current_tool_call&.has_partial?
elsif delta&.key?(:reasoning_content)
if @output_thinking
reasoning = delta[:reasoning_content]
if reasoning[:redacted_content]
result =
DiscourseAi::Completions::Thinking.new(
message: nil,
partial: false,
provider_info: {
PROVIDER_KEY => {
redacted: true,
redacted_content: reasoning[:redacted_content],
},
},
)
elsif reasoning[:signature]
# Signature delta — append to current thinking's provider_info
if @thinking
info = (@thinking.provider_info[PROVIDER_KEY] ||= { signature: +"", redacted: false })
info[:signature] ||= +""
info[:signature] << reasoning[:signature]
end
elsif reasoning[:text]
text = reasoning[:text].to_s
if @thinking
@thinking.message << text
result = DiscourseAi::Completions::Thinking.new(message: text, partial: true)
else
provider_info = { PROVIDER_KEY => { signature: +"", redacted: false } }
@thinking =
DiscourseAi::Completions::Thinking.new(
message: +text,
partial: true,
provider_info: provider_info,
)
result = @thinking.dup
end
end
end
elsif delta&.key?(:text)
text = delta[:text].to_s
result = text unless text.empty?
end
when :content_block_stop
if @thinking
@thinking.partial = false
result = @thinking
@thinking = nil
elsif @current_tool_call
result = @current_tool_call.to_tool_call
@current_tool_call = nil
end
when :message_stop
# nothing to do
when :metadata
usage = parsed[:usage]
if usage
@input_tokens = usage[:input_tokens]
@output_tokens = usage[:output_tokens]
@cache_read_input_tokens = usage[:cache_read_input_tokens]
@cache_write_input_tokens = usage[:cache_write_input_tokens]
end
when :message_start
# nothing to do
end
result
end
# Processes a complete (non-streaming) Converse API response hash.
def process_message(payload)
parsed = payload
parsed = JSON.parse(payload, symbolize_names: true) if payload.is_a?(String)
result = []
content = parsed.dig(:output, :message, :content)
if content.is_a?(Array)
content.each do |block|
if block.key?(:text)
result << block[:text]
elsif block.key?(:tool_use)
tool = block[:tool_use]
call = ConverseToolCall.new(tool[:name], tool[:tool_use_id])
call.append(tool[:input].to_json) if tool[:input]
result << call.to_tool_call
elsif block.key?(:reasoning_content)
if @output_thinking
reasoning = block[:reasoning_content]
if reasoning[:redacted_content]
result << DiscourseAi::Completions::Thinking.new(
message: nil,
provider_info: {
PROVIDER_KEY => {
redacted: true,
redacted_content: reasoning[:redacted_content],
},
},
)
elsif reasoning[:reasoning_text]
rt = reasoning[:reasoning_text]
result << DiscourseAi::Completions::Thinking.new(
message: rt[:text],
provider_info: {
PROVIDER_KEY => {
signature: rt[:signature],
redacted: false,
},
},
)
end
end
end
end
end
usage = parsed[:usage]
if usage
@input_tokens = usage[:input_tokens]
@output_tokens = usage[:output_tokens]
@cache_read_input_tokens = usage[:cache_read_input_tokens]
@cache_write_input_tokens = usage[:cache_write_input_tokens]
end
result
end
end