discourse/plugins/discourse-workflows/lib/discourse_workflows/trigger_runtime.rb
Joffrey JAFFEUX 0914b9fb32
FEATURE: discourse-workflows (#40374)
Discourse Workflows is a visual automation plugin for admins. A workflow
is a
versioned graph made of trigger, condition, action, and utility nodes.

## Architecture

The plugin has three main responsibilities:

- Store and version workflow graphs.
- Render an admin editor from node metadata.
- Execute published workflow versions and record their results.

The graph is stored as workflow nodes plus connections. Each node has a
stable
type, a type version, editor position, configuration parameters,
optional
credential references, and direct runtime settings. Published workflows
create immutable versions that executions can safely use even after the
draft changes.

## Runtime Flow

A normal execution follows this path:

1. A trigger produces initial data from a Discourse event, schedule,
webhook, or
   manual run.
2. Trigger data is wrapped as workflow items.
3. The executor queues downstream nodes from the graph connections.
4. Each node receives input items and returns one array per output.
5. The executor records the step, stores node output for expressions,
and routes
   each output array to connected downstream nodes.
6. The execution finishes, fails, or enters a waiting state.

Waiting nodes persist enough execution context to resume later. Resume
requests
continue from the waiting node using the workflow snapshot saved with
the
execution, not the latest draft.

Expressions are resolved at runtime against the current item, node
parameters,
workflow variables, previous node outputs, and execution context. The
editor
stores expression values in parameters; node code should read them
through the
execution context instead of parsing parameter values directly.

## Adding nodes

- Workflow nodes, registered with
`DiscoursePluginRegistry.register_discourse_workflows_node`.

Core nodes live in `lib/discourse_workflows/nodes` and are registered
during
plugin initialization. Other plugins can add their own nodes when
`DiscourseWorkflows` is loaded.

## Node API

Nodes inherit from `DiscourseWorkflows::NodeType`. A node class is
responsible
for two things:

- Declaring its contract with `description(...)`.
- Implementing the runtime entry point for its node kind.

The description is the public contract between the node, editor,
validator, and
runtime. It should include:

- `name`: stable identifier, usually prefixed by `trigger:`, `action:`,
`condition:`, or `flow:`.
- `version`: implementation version used by stored workflow nodes.
- `defaults`: editor metadata such as icon and color.
- `group`: palette category.
- `inputs` and `outputs`: graph ports.
- `properties`: configuration fields rendered by the property engine.
- `credentials`: credential slots the node can use.
- `webhooks`: public or resume webhook declarations.
- `events`: Discourse events that should activate a trigger node.
- `capabilities`: feature flags such as manual triggering, waiting, or
current
  user access.
- `available`: optional availability gate for nodes backed by another
plugin or
  site setting.

Description data should stay declarative. Put business logic in the
runtime
entry point or helper classes.

### Action, Condition, And Flow Nodes

Action-like nodes implement `execute(exec_ctx)`. They receive input
items through
the execution context and return positional output arrays. A one-output
node
returns one outer array; a branching node returns one inner array per
branch.

### Trigger Nodes

Trigger nodes start executions. Event triggers declare `events` and
usually
implement:

- `valid?` to ignore events that should not run workflows.
- `matches?(trigger_ctx)` to compare event data with node configuration.
- `output` to produce the initial workflow item data.

### Webhook And Waiting Nodes

Webhook trigger nodes declare webhook metadata and produce initial data
from the
incoming request. Waiting nodes pause an execution and resume it through
a later
interaction or webhook.

### Dynamic Options

Nodes can provide dynamic property options with
`self.load_options_context(context)`. Use the context object to access
current
parameters, filtered credentials, the search filter, the current user,
guardian,
and shared helpers.

### Errors And Logs

Raise `DiscourseWorkflows::NodeError` for failures admins can act on,
such as
invalid configuration or missing Discourse records. Unexpected
exceptions fail
the current execution.

---------

---------

Co-authored-by: Renato Atilio <3530+renato@users.noreply.github.com>
Co-authored-by: Martin Brennan <martin@discourse.org>
Co-authored-by: Jordan Vidrine <30537603+jordanvidrine@users.noreply.github.com>
2026-05-28 19:44:50 +02:00

157 lines
5.5 KiB
Ruby
Vendored

# frozen_string_literal: true
module DiscourseWorkflows
class TriggerRuntime
class << self
def tick!(now: Time.current.utc)
now = scheduled_minute(now)
published_triggers("trigger:schedule").each do |published_trigger|
run_trigger(published_trigger, mode: :normal, activation_mode: :trigger, now:, tick: true)
end
end
def activate_workflow!(workflow, workflow_version: workflow.active_version)
return if workflow_version.nil?
Webhook::Action::ActivateWebhooks.call(
workflow: workflow,
workflow_version: workflow_version,
)
activation_trigger_nodes(workflow_version).each do |node|
published_trigger = PublishedTrigger.new(workflow:, workflow_version:, trigger_node: node)
run_trigger(
published_trigger,
mode: :normal,
activation_mode: :init,
tick: false,
dispatch: :none,
)
end
end
def deactivate_workflow!(workflow)
Webhook::Action::DeactivateWebhooks.call(workflow: workflow)
end
def manual_trigger_data(workflow:, trigger_node:, user:)
workflow_version = workflow.workflow_versions.find_by(version_id: workflow.version_id)
node_type_class = node_type_for(trigger_node)
return {} unless node_type_class&.capability_enabled?(:synthesizes_manual_data)
published_trigger =
PublishedTrigger.new(workflow:, workflow_version:, trigger_node: trigger_node)
runtime_state = runtime_state_for(published_trigger)
ctx =
Executor::TriggerExecutionContext.new(
published_trigger:,
mode: :manual,
activation_mode: :manual,
dispatch: :collect,
user:,
runtime_state:,
)
result = instantiate(node_type_class, trigger_node).trigger(ctx)
result[:manual_trigger_function]&.call if result.is_a?(Hash)
commit_runtime_state!(published_trigger, runtime_state)
runtime_state.collected_trigger_data.first || {}
end
private
def published_triggers(trigger_type)
Workflow::Action::FindPublishedTriggers.call(trigger_type:)
end
def activation_trigger_nodes(workflow_version)
workflow_version.nodes.select { |node| activation_trigger_node?(node_type_for(node)) }
end
def run_trigger(
published_trigger,
mode:,
activation_mode:,
now: Time.current.utc,
tick: false,
dispatch: :enqueue
)
node_type_class = node_type_for(published_trigger.trigger_node)
return unless activation_trigger_node?(node_type_class)
published_trigger.workflow.with_lock do
runtime_state = runtime_state_for(published_trigger, tick:)
ctx =
Executor::TriggerExecutionContext.new(
published_trigger:,
mode:,
activation_mode:,
now:,
dispatch:,
runtime_state:,
)
instantiate(node_type_class, published_trigger.trigger_node).trigger(ctx)
commit_runtime_state!(published_trigger, runtime_state)
end
end
def runtime_state_for(published_trigger, tick: false)
workflow = published_trigger.workflow
trigger_node = published_trigger.trigger_node
node_name = trigger_node["name"].to_s
Executor::TriggerExecutionContext::RuntimeState.new(
trigger_state: workflow.node_trigger_state(published_trigger.trigger_node_id).deep_dup,
static_data_global: workflow.global_static_data.deep_dup,
static_data_node: workflow.node_static_data(node_name).deep_dup,
tick:,
)
end
def commit_runtime_state!(published_trigger, runtime_state)
workflow = published_trigger.workflow
node_name = published_trigger.trigger_node["name"].to_s
# trigger_state holds engine bookkeeping (dedup, last_triggered_at).
# static_data holds user-facing runtime state; we merge the trigger's
# flat `node:<name>` slot back without disturbing other nodes' entries.
# We skip writing the slot if it would be empty AND the slot didn't
# already exist, to avoid creating noise entries for triggers that
# never touched static data (e.g. activation-time runs).
workflow.transaction do
workflow.update_node_trigger_state!(
published_trigger.trigger_node_id,
runtime_state.trigger_state,
)
current_node_data = workflow.node_static_data_entries
merged_node = current_node_data.dup
if runtime_state.static_data_node.any? || current_node_data.key?(node_name)
merged_node[node_name] = runtime_state.static_data_node
end
workflow.commit_static_data!(global: runtime_state.static_data_global, node: merged_node)
end
end
def node_type_for(node)
Registry.find_node_type(node["type"], version: node["typeVersion"])
end
def activation_trigger_node?(node_type_class)
node_type_class&.capability_enabled?(:activation_trigger)
end
def instantiate(node_type_class, node)
node_type_class.new(
parameters: node["parameters"],
credentials: node["credentials"],
webhook_id: node["webhookId"],
)
end
def scheduled_minute(time)
time.utc.change(sec: 0, usec: 0)
end
end
end
end