packages.wenpai.net/internal/og/generate.go
Ben Word b8904937fb
Fix OG image generation and simplify pipeline (#54)
Remove OG image generation from the pipeline — the 3 AM systemd timer
handles all generation via `generate-og`. Remove the `--limit` flag so
the nightly job processes all packages that need updates. Add
concurrency (20 goroutines) for render + upload with serial DB writes.
Only regenerate images when Composer install counts change, ignoring
WP.org install count changes.

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-22 18:08:07 -05:00

214 lines
5.8 KiB
Go

package og
import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"
)
const uploadConcurrency = 20
// PackageOGRow holds the data needed for OG image generation decisions.
type PackageOGRow struct {
ID int64
Type string
Name string
DisplayName string
Description string
CurrentVersion string
ActiveInstalls int64
WpPackagesInstallsTotal int64
OGImageGeneratedAt *string
OGImageInstalls int64
OGImageWpInstalls int64
}
// FormatInstalls returns a human-readable install count with rounding
// to reduce unnecessary OG image regeneration when counts change slightly.
func FormatInstalls(n int64) string {
if n >= 1_000_000 {
return fmt.Sprintf("%.1fM", float64(n)/1_000_000)
}
if n >= 1_000 {
return fmt.Sprintf("%.0fK", float64(n)/1_000)
}
if n >= 100 {
rounded := (n / 100) * 100
return fmt.Sprintf("%d+", rounded)
}
if n >= 10 {
rounded := (n / 10) * 10
return fmt.Sprintf("%d+", rounded)
}
return fmt.Sprintf("%d", n)
}
// getPackagesNeedingOG returns packages that need OG image generation:
// - Never generated (og_image_generated_at IS NULL)
// - Composer install count changed since last generation
func getPackagesNeedingOG(ctx context.Context, db *sql.DB) ([]PackageOGRow, error) {
q := `SELECT id, type, name, COALESCE(display_name, ''), COALESCE(description, ''),
COALESCE(current_version, ''), active_installs, wp_packages_installs_total,
og_image_generated_at, og_image_installs, og_image_wp_installs
FROM packages
WHERE is_active = 1
AND (
og_image_generated_at IS NULL
OR wp_packages_installs_total != og_image_wp_installs
)
ORDER BY active_installs DESC`
rows, err := db.QueryContext(ctx, q)
if err != nil {
return nil, fmt.Errorf("querying packages for OG: %w", err)
}
defer func() { _ = rows.Close() }()
var pkgs []PackageOGRow
for rows.Next() {
var p PackageOGRow
if err := rows.Scan(&p.ID, &p.Type, &p.Name, &p.DisplayName, &p.Description,
&p.CurrentVersion, &p.ActiveInstalls, &p.WpPackagesInstallsTotal,
&p.OGImageGeneratedAt, &p.OGImageInstalls, &p.OGImageWpInstalls); err != nil {
return nil, fmt.Errorf("scanning OG row: %w", err)
}
pkgs = append(pkgs, p)
}
return pkgs, rows.Err()
}
// MarkOGGenerated updates the OG tracking columns after successful generation.
func markOGGenerated(ctx context.Context, db *sql.DB, id, activeInstalls, wpInstalls int64) error {
now := time.Now().UTC().Format(time.RFC3339)
_, err := db.ExecContext(ctx, `UPDATE packages SET
og_image_generated_at = ?,
og_image_installs = ?,
og_image_wp_installs = ?
WHERE id = ?`, now, activeInstalls, wpInstalls, id)
if err != nil {
return fmt.Errorf("marking OG generated for package %d: %w", id, err)
}
return nil
}
// GenerateResult holds the outcome of a generation run.
type GenerateResult struct {
Generated int
Skipped int
Errors int
}
// GenerateAll generates OG images for all packages that need them.
// Rendering and uploading run concurrently; DB writes stay serial.
func GenerateAll(ctx context.Context, db *sql.DB, uploader *Uploader, logger *slog.Logger) (GenerateResult, error) {
pkgs, err := getPackagesNeedingOG(ctx, db)
if err != nil {
return GenerateResult{}, err
}
if len(pkgs) == 0 {
logger.Info("no packages need OG image generation")
return GenerateResult{}, nil
}
// Filter out packages whose formatted display values haven't changed.
var work []PackageOGRow
var skipped int
for _, pkg := range pkgs {
if pkg.OGImageGeneratedAt != nil &&
FormatInstalls(pkg.WpPackagesInstallsTotal) == FormatInstalls(pkg.OGImageWpInstalls) {
skipped++
continue
}
work = append(work, pkg)
}
logger.Info("generating OG images", "packages", len(work), "skipped", skipped)
if len(work) == 0 {
return GenerateResult{Skipped: skipped}, nil
}
type dbUpdate struct {
id, installs, wpInstalls int64
}
var generated, errCount atomic.Int64
ch := make(chan PackageOGRow)
updates := make(chan dbUpdate, uploadConcurrency*2)
// Render + upload concurrently.
var wg sync.WaitGroup
for range uploadConcurrency {
wg.Add(1)
go func() {
defer wg.Done()
for pkg := range ch {
data := PackageData{
DisplayName: pkg.DisplayName,
Name: pkg.Name,
Type: pkg.Type,
CurrentVersion: pkg.CurrentVersion,
Description: pkg.Description,
ActiveInstalls: FormatInstalls(pkg.ActiveInstalls),
WpPackagesInstalls: FormatInstalls(pkg.WpPackagesInstallsTotal),
}
pngBytes, err := GeneratePackageImage(data)
if err != nil {
logger.Error("generating OG image", "package", pkg.Name, "error", err)
errCount.Add(1)
continue
}
key := fmt.Sprintf("social/%s/%s.png", pkg.Type, pkg.Name)
if err := uploader.Upload(ctx, key, pngBytes); err != nil {
logger.Error("uploading OG image", "package", pkg.Name, "error", err)
errCount.Add(1)
continue
}
updates <- dbUpdate{pkg.ID, pkg.ActiveInstalls, pkg.WpPackagesInstallsTotal}
if n := generated.Add(1); n%500 == 0 {
logger.Info("OG generation progress", "generated", n, "total", len(work))
}
}
}()
}
// Serial DB writer to avoid SQLite contention.
var dbErrors atomic.Int64
var dbWg sync.WaitGroup
dbWg.Add(1)
go func() {
defer dbWg.Done()
for u := range updates {
if err := markOGGenerated(ctx, db, u.id, u.installs, u.wpInstalls); err != nil {
logger.Error("marking OG generated", "id", u.id, "error", err)
dbErrors.Add(1)
}
}
}()
for _, pkg := range work {
if ctx.Err() != nil {
break
}
ch <- pkg
}
close(ch)
wg.Wait()
close(updates)
dbWg.Wait()
return GenerateResult{
Generated: int(generated.Load()),
Skipped: skipped,
Errors: int(errCount.Load() + dbErrors.Load()),
}, nil
}