packages.wenpai.net/internal/packages/package.go
elementary 6434bcc8f3
Some checks failed
ci / Go Modules Tidy (push) Waiting to run
ci / Go Test (push) Waiting to run
ci / Integration Test (push) Waiting to run
govulncheck / govulncheck (push) Waiting to run
ci / GolangCI-Lint (push) Has been cancelled
refactor(db+cli): drop versions_json and pipeline tables; disable legacy commands
Migration:
- Add 027_migrate_mirror_mode.sql
- Drop packages.versions_json (versions now fetched from upstream p2)
- Drop sync_runs, builds, metadata_changes, monthly_installs, status_checks, status_check_changes

Packages layer:
- Remove versions_json references from UpsertPackage, UpsertShellPackage, BatchUpsertPackages, BatchUpsertShellPackages, GetPackagesNeedingUpdate
- Make AllocateSyncRunID/FinishSyncRun no-ops (mirror mode has no sync_runs table)

CLI:
- Disable discover, update, build, deploy, pipeline, check-status commands
- All return "not available in mirror mode" to keep scripts from breaking
2026-04-12 01:31:36 +08:00

609 lines
20 KiB
Go

package packages
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/roots/wp-packages/internal/version"
)
type Package struct {
ID int64
Type string
Name string
DisplayName *string
Description *string
Author *string
Homepage *string
SlugURL *string
VersionsJSON string
Downloads int64
ActiveInstalls int64
CurrentVersion *string
WporgVersion *string
Rating *float64
NumRatings int
IsActive bool
LastCommitted *time.Time
LastSyncedAt *time.Time
LastSyncRunID *int64
TrunkRevision *int64
ContentHash *string
DeployedHash *string
ContentChangedAt *time.Time
WpPackagesInstallsTotal int
WpPackagesInstalls30d int
LastInstalledAt *time.Time
CreatedAt time.Time
UpdatedAt time.Time
// RawVersions holds the pre-normalization version map from the API.
// Not persisted directly — normalized into VersionsJSON before storage.
RawVersions map[string]string `json:"-"`
}
// NormalizeAndStoreVersions normalizes raw versions and serializes to VersionsJSON.
// It also sets CurrentVersion to the highest available version.
// Returns the number of valid versions.
func (p *Package) NormalizeAndStoreVersions() (int, error) {
if p.RawVersions == nil {
p.VersionsJSON = "{}"
return 0, nil
}
normalized := version.NormalizeVersions(p.RawVersions)
data, err := json.Marshal(normalized)
if err != nil {
return 0, fmt.Errorf("marshaling versions: %w", err)
}
p.VersionsJSON = string(data)
if latest := version.Latest(normalized); latest != "" {
p.CurrentVersion = &latest
}
return len(normalized), nil
}
func timeStr(t *time.Time) *string {
if t == nil {
return nil
}
s := t.UTC().Format(time.RFC3339)
return &s
}
// UpsertPackage inserts or updates a package record by (type, name).
func UpsertPackage(ctx context.Context, db *sql.DB, pkg *Package) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := db.ExecContext(ctx, `
INSERT INTO packages (
type, name, display_name, description, author, homepage, slug_url,
downloads, active_installs,
current_version, wporg_version, rating, num_ratings, is_active,
last_committed, last_synced_at, last_sync_run_id,
content_hash, content_changed_at,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(type, name) DO UPDATE SET
display_name = excluded.display_name,
description = excluded.description,
author = excluded.author,
homepage = excluded.homepage,
slug_url = excluded.slug_url,
downloads = excluded.downloads,
active_installs = excluded.active_installs,
current_version = excluded.current_version,
wporg_version = excluded.wporg_version,
rating = excluded.rating,
num_ratings = excluded.num_ratings,
is_active = excluded.is_active,
last_committed = CASE
WHEN excluded.last_committed > COALESCE(packages.last_committed, '')
THEN excluded.last_committed
ELSE packages.last_committed
END,
last_synced_at = COALESCE(excluded.last_synced_at, packages.last_synced_at),
last_sync_run_id = COALESCE(excluded.last_sync_run_id, packages.last_sync_run_id),
content_hash = COALESCE(excluded.content_hash, packages.content_hash),
content_changed_at = COALESCE(excluded.content_changed_at, packages.content_changed_at),
updated_at = excluded.updated_at`,
pkg.Type, pkg.Name, pkg.DisplayName, pkg.Description, pkg.Author,
pkg.Homepage, pkg.SlugURL,
pkg.Downloads, pkg.ActiveInstalls, pkg.CurrentVersion, pkg.WporgVersion, pkg.Rating,
pkg.NumRatings, boolToInt(pkg.IsActive),
timeStr(pkg.LastCommitted), timeStr(pkg.LastSyncedAt), pkg.LastSyncRunID,
pkg.ContentHash, timeStr(pkg.ContentChangedAt),
now, now,
)
if err != nil {
return fmt.Errorf("upserting package %s/%s: %w", pkg.Type, pkg.Name, err)
}
return nil
}
// UpsertShellPackage creates a minimal package record (for SVN discovery) or
// updates last_committed if the new date is more recent.
func UpsertShellPackage(ctx context.Context, db *sql.DB, pkgType, name string, lastCommitted *time.Time) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := db.ExecContext(ctx, `
INSERT INTO packages (type, name, last_committed, is_active, created_at, updated_at)
VALUES (?, ?, ?, 1, ?, ?)
ON CONFLICT(type, name) DO UPDATE SET
last_committed = CASE
WHEN excluded.last_committed > COALESCE(packages.last_committed, '')
THEN excluded.last_committed
ELSE packages.last_committed
END,
updated_at = excluded.updated_at`,
pkgType, name, timeStr(lastCommitted), now, now,
)
if err != nil {
return fmt.Errorf("upserting shell package %s/%s: %w", pkgType, name, err)
}
return nil
}
// ShellEntry holds minimal package data for batch SVN discovery upserts.
type ShellEntry struct {
Type string
Name string
LastCommitted *time.Time
}
// BatchUpsertShellPackages inserts or updates shell package records in a single transaction.
func BatchUpsertShellPackages(ctx context.Context, db *sql.DB, entries []ShellEntry) error {
if len(entries) == 0 {
return nil
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO packages (type, name, last_committed, is_active, created_at, updated_at)
VALUES (?, ?, ?, 1, ?, ?)
ON CONFLICT(type, name) DO UPDATE SET
last_committed = CASE
WHEN excluded.last_committed > COALESCE(packages.last_committed, '')
THEN excluded.last_committed
ELSE packages.last_committed
END,
updated_at = excluded.updated_at`)
if err != nil {
return fmt.Errorf("preparing statement: %w", err)
}
defer func() { _ = stmt.Close() }()
now := time.Now().UTC().Format(time.RFC3339)
for _, e := range entries {
if _, err := stmt.ExecContext(ctx, e.Type, e.Name, timeStr(e.LastCommitted), now, now); err != nil {
return fmt.Errorf("upserting shell package %s/%s: %w", e.Type, e.Name, err)
}
}
return tx.Commit()
}
// BatchUpsertPackages inserts or updates full package records in a single transaction.
func BatchUpsertPackages(ctx context.Context, db *sql.DB, pkgs []*Package) error {
if len(pkgs) == 0 {
return nil
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("beginning transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
stmt, err := tx.PrepareContext(ctx, `
INSERT INTO packages (
type, name, display_name, description, author, homepage, slug_url,
downloads, active_installs,
current_version, wporg_version, rating, num_ratings, is_active,
last_committed, last_synced_at, last_sync_run_id,
content_hash, content_changed_at,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(type, name) DO UPDATE SET
display_name = excluded.display_name,
description = excluded.description,
author = excluded.author,
homepage = excluded.homepage,
slug_url = excluded.slug_url,
downloads = excluded.downloads,
active_installs = excluded.active_installs,
current_version = excluded.current_version,
wporg_version = excluded.wporg_version,
rating = excluded.rating,
num_ratings = excluded.num_ratings,
is_active = excluded.is_active,
last_committed = CASE
WHEN excluded.last_committed > COALESCE(packages.last_committed, '')
THEN excluded.last_committed
ELSE packages.last_committed
END,
last_synced_at = COALESCE(excluded.last_synced_at, packages.last_synced_at),
last_sync_run_id = COALESCE(excluded.last_sync_run_id, packages.last_sync_run_id),
content_hash = COALESCE(excluded.content_hash, packages.content_hash),
content_changed_at = COALESCE(excluded.content_changed_at, packages.content_changed_at),
updated_at = excluded.updated_at`)
if err != nil {
return fmt.Errorf("preparing statement: %w", err)
}
defer func() { _ = stmt.Close() }()
now := time.Now().UTC().Format(time.RFC3339)
for _, pkg := range pkgs {
if _, err := stmt.ExecContext(ctx,
pkg.Type, pkg.Name, pkg.DisplayName, pkg.Description, pkg.Author,
pkg.Homepage, pkg.SlugURL,
pkg.Downloads, pkg.ActiveInstalls, pkg.CurrentVersion, pkg.WporgVersion, pkg.Rating,
pkg.NumRatings, boolToInt(pkg.IsActive),
timeStr(pkg.LastCommitted), timeStr(pkg.LastSyncedAt), pkg.LastSyncRunID,
pkg.ContentHash, timeStr(pkg.ContentChangedAt),
now, now,
); err != nil {
return fmt.Errorf("upserting package %s/%s: %w", pkg.Type, pkg.Name, err)
}
}
return tx.Commit()
}
type UpdateQueryOpts struct {
Type string
Name string
Names []string // filter to these slugs only
Force bool
IncludeInactive bool
Limit int
}
// GetPackagesNeedingUpdate returns packages that should be updated.
func GetPackagesNeedingUpdate(ctx context.Context, db *sql.DB, opts UpdateQueryOpts) ([]*Package, error) {
query := `SELECT id, type, name, last_committed, last_synced_at, is_active, content_hash, trunk_revision FROM packages WHERE 1=1`
var args []any
if opts.Name != "" {
query += ` AND name = ?`
args = append(args, opts.Name)
}
if len(opts.Names) > 0 {
placeholders := make([]string, len(opts.Names))
for i, n := range opts.Names {
placeholders[i] = "?"
args = append(args, n)
}
query += ` AND name IN (` + strings.Join(placeholders, ",") + `)`
}
if opts.Type != "" && opts.Type != "all" {
query += ` AND type = ?`
args = append(args, opts.Type)
}
if !opts.Force && opts.Name == "" {
if opts.IncludeInactive {
query += ` AND (last_synced_at IS NULL OR last_committed > last_synced_at OR (is_active = 0 AND (last_synced_at IS NULL OR last_synced_at < datetime('now', '-30 days'))))`
} else {
query += ` AND is_active = 1 AND (last_synced_at IS NULL OR last_committed > last_synced_at)`
}
} else if !opts.IncludeInactive && opts.Name == "" {
query += ` AND is_active = 1`
}
query += ` ORDER BY last_synced_at ASC NULLS FIRST`
if opts.Limit > 0 {
query += fmt.Sprintf(` LIMIT %d`, opts.Limit)
}
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("querying packages: %w", err)
}
defer func() { _ = rows.Close() }()
var pkgs []*Package
for rows.Next() {
var p Package
var isActive int
var lastCommitted, lastSyncedAt *string
if err := rows.Scan(&p.ID, &p.Type, &p.Name, &lastCommitted, &lastSyncedAt, &isActive, &p.ContentHash, &p.TrunkRevision); err != nil {
return nil, fmt.Errorf("scanning package row: %w", err)
}
p.IsActive = isActive == 1
if lastCommitted != nil {
if t, err := time.Parse(time.RFC3339, *lastCommitted); err == nil {
p.LastCommitted = &t
}
}
if lastSyncedAt != nil {
if t, err := time.Parse(time.RFC3339, *lastSyncedAt); err == nil {
p.LastSyncedAt = &t
}
}
pkgs = append(pkgs, &p)
}
return pkgs, rows.Err()
}
// DeactivatePackage sets is_active = 0 for a package.
func DeactivatePackage(ctx context.Context, db *sql.DB, id int64) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := db.ExecContext(ctx,
`UPDATE packages SET is_active = 0, updated_at = ?, content_changed_at = ? WHERE id = ?`,
now, now, id,
)
if err != nil {
return fmt.Errorf("deactivating package %d: %w", id, err)
}
return nil
}
// ReactivatePackage sets is_active = 1 for a package.
func ReactivatePackage(ctx context.Context, db *sql.DB, id int64) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := db.ExecContext(ctx,
`UPDATE packages SET is_active = 1, updated_at = ?, content_changed_at = ? WHERE id = ?`,
now, now, id,
)
if err != nil {
return fmt.Errorf("reactivating package %d: %w", id, err)
}
return nil
}
// GetAllPackages returns all packages, optionally filtered by type.
func GetAllPackages(ctx context.Context, db *sql.DB, pkgType string) ([]*Package, error) {
query := `SELECT id, type, name, is_active FROM packages WHERE 1=1`
var args []any
if pkgType != "" && pkgType != "all" {
query += ` AND type = ?`
args = append(args, pkgType)
}
rows, err := db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("querying packages: %w", err)
}
defer func() { _ = rows.Close() }()
var pkgs []*Package
for rows.Next() {
var p Package
if err := rows.Scan(&p.ID, &p.Type, &p.Name, &p.IsActive); err != nil {
return nil, fmt.Errorf("scanning package: %w", err)
}
pkgs = append(pkgs, &p)
}
return pkgs, rows.Err()
}
// StartStatusCheck inserts a new status_checks row and returns its ID.
func StartStatusCheck(ctx context.Context, db *sql.DB, started time.Time) (int64, error) {
res, err := db.ExecContext(ctx,
`INSERT INTO status_checks (started_at, status) VALUES (?, 'running')`,
started.Format(time.RFC3339))
if err != nil {
return 0, fmt.Errorf("inserting status check: %w", err)
}
return res.LastInsertId()
}
// FinishStatusCheck updates a status_checks row with the final results.
func FinishStatusCheck(ctx context.Context, db *sql.DB, id int64, started time.Time,
checked, deactivated, reactivated, failed int64, runErr error) error {
now := time.Now().UTC()
status := "completed"
var errMsg *string
if runErr != nil {
status = "failed"
s := runErr.Error()
errMsg = &s
} else if failed > 0 {
status = "completed_with_errors"
}
_, err := db.ExecContext(ctx, `
UPDATE status_checks SET
finished_at = ?, status = ?, checked = ?, deactivated = ?,
reactivated = ?, failed = ?, duration_seconds = ?, error_message = ?
WHERE id = ?`,
now.Format(time.RFC3339), status, checked, deactivated,
reactivated, failed, int(now.Sub(started).Seconds()), errMsg, id)
if err != nil {
return fmt.Errorf("finishing status check %d: %w", id, err)
}
return nil
}
// RecordStatusCheckChange inserts a per-package deactivation or reactivation event.
func RecordStatusCheckChange(ctx context.Context, db *sql.DB, statusCheckID int64, pkgType, pkgName, action string) {
_, _ = db.ExecContext(ctx,
`INSERT INTO status_check_changes (status_check_id, package_type, package_name, action, created_at)
VALUES (?, ?, ?, ?, ?)`,
statusCheckID, pkgType, pkgName, action, time.Now().UTC().Format(time.RFC3339))
}
// StatusCheckChange represents a per-package event from a status check run.
type StatusCheckChange struct {
PackageType string
PackageName string
Action string
}
// GetStatusCheckChanges returns the per-package changes for a given status check.
func GetStatusCheckChanges(ctx context.Context, db *sql.DB, statusCheckID int64) ([]StatusCheckChange, error) {
rows, err := db.QueryContext(ctx,
`SELECT package_type, package_name, action
FROM status_check_changes WHERE status_check_id = ?
ORDER BY id`, statusCheckID)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
var changes []StatusCheckChange
for rows.Next() {
var c StatusCheckChange
if err := rows.Scan(&c.PackageType, &c.PackageName, &c.Action); err != nil {
return nil, err
}
changes = append(changes, c)
}
return changes, rows.Err()
}
// StatusCheck represents a row from the status_checks table.
type StatusCheck struct {
ID int64
StartedAt string
FinishedAt string
Status string
Checked int64
Deactivated int64
Reactivated int64
Failed int64
DurationSeconds *int
ErrorMessage string
}
// GetStatusChecks returns the most recent status check runs.
func GetStatusChecks(ctx context.Context, db *sql.DB, limit int) ([]StatusCheck, error) {
rows, err := db.QueryContext(ctx, `
SELECT id, started_at, COALESCE(finished_at, ''), status,
checked, deactivated, reactivated, failed,
duration_seconds, COALESCE(error_message, '')
FROM status_checks ORDER BY started_at DESC LIMIT ?`, limit)
if err != nil {
return nil, fmt.Errorf("querying status checks: %w", err)
}
defer func() { _ = rows.Close() }()
var checks []StatusCheck
for rows.Next() {
var c StatusCheck
if err := rows.Scan(&c.ID, &c.StartedAt, &c.FinishedAt, &c.Status,
&c.Checked, &c.Deactivated, &c.Reactivated, &c.Failed,
&c.DurationSeconds, &c.ErrorMessage); err != nil {
return nil, fmt.Errorf("scanning status check: %w", err)
}
checks = append(checks, c)
}
return checks, rows.Err()
}
// RefreshSiteStats recomputes the package_stats row from the packages table.
func RefreshSiteStats(ctx context.Context, db *sql.DB) error {
_, err := db.ExecContext(ctx, `
INSERT OR REPLACE INTO package_stats (id, active_plugins, active_themes, plugin_installs, theme_installs, installs_30d, updated_at)
SELECT 1,
COALESCE(SUM(CASE WHEN type = 'plugin' THEN 1 ELSE 0 END), 0),
COALESCE(SUM(CASE WHEN type = 'theme' THEN 1 ELSE 0 END), 0),
COALESCE(SUM(CASE WHEN type = 'plugin' THEN wp_packages_installs_total ELSE 0 END), 0),
COALESCE(SUM(CASE WHEN type = 'theme' THEN wp_packages_installs_total ELSE 0 END), 0),
COALESCE(SUM(wp_packages_installs_30d), 0),
datetime('now')
FROM packages
WHERE is_active = 1`)
if err != nil {
return fmt.Errorf("refreshing package stats: %w", err)
}
return nil
}
// MarkPackagesChanged sets last_committed = now and trunk_revision for the given
// slugs of a specific type, so they'll be picked up by GetPackagesNeedingUpdate.
// slugRevisions maps each slug to its highest SVN revision from the changelog.
func MarkPackagesChanged(ctx context.Context, db *sql.DB, pkgType string, slugRevisions map[string]int64) (int64, error) {
if len(slugRevisions) == 0 {
return 0, nil
}
now := time.Now().UTC().Format(time.RFC3339)
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return 0, fmt.Errorf("beginning transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
stmt, err := tx.PrepareContext(ctx, `
UPDATE packages
SET last_committed = ?, updated_at = ?, trunk_revision = ?
WHERE type = ? AND name = ? AND is_active = 1`)
if err != nil {
return 0, fmt.Errorf("preparing statement: %w", err)
}
defer func() { _ = stmt.Close() }()
var affected int64
for slug, rev := range slugRevisions {
res, err := stmt.ExecContext(ctx, now, now, rev, pkgType, slug)
if err != nil {
return affected, fmt.Errorf("marking package %s/%s changed: %w", pkgType, slug, err)
}
n, _ := res.RowsAffected()
affected += n
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("committing: %w", err)
}
return affected, nil
}
// BackfillTrunkRevisions sets trunk_revision for active plugins that don't have one yet.
// Only updates rows where trunk_revision IS NULL to avoid overwriting newer data.
func BackfillTrunkRevisions(ctx context.Context, db *sql.DB, slugRevisions map[string]int64) (int64, error) {
if len(slugRevisions) == 0 {
return 0, nil
}
tx, err := db.BeginTx(ctx, nil)
if err != nil {
return 0, fmt.Errorf("beginning transaction: %w", err)
}
defer func() { _ = tx.Rollback() }()
stmt, err := tx.PrepareContext(ctx, `
UPDATE packages
SET trunk_revision = ?
WHERE type = 'plugin' AND name = ? AND is_active = 1 AND trunk_revision IS NULL`)
if err != nil {
return 0, fmt.Errorf("preparing statement: %w", err)
}
defer func() { _ = stmt.Close() }()
var affected int64
for slug, rev := range slugRevisions {
res, err := stmt.ExecContext(ctx, rev, slug)
if err != nil {
return affected, fmt.Errorf("backfilling trunk_revision for %s: %w", slug, err)
}
n, _ := res.RowsAffected()
affected += n
}
if err := tx.Commit(); err != nil {
return 0, fmt.Errorf("committing: %w", err)
}
return affected, nil
}
func boolToInt(b bool) int {
if b {
return 1
}
return 0
}