-
Notifications
You must be signed in to change notification settings - Fork 3
PLC Replica #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PLC Replica #24
Changes from all commits
190ecc9
cc3abfb
b2946a4
10af62c
342320c
61bdec5
900121c
ca1052c
dbce2c0
e668b3d
9734b0a
317a6b9
4f9b18f
62562ad
a303184
9b92c12
7740007
76552ff
8a1d950
f124616
f46f3ea
2bf6318
7a05f5e
93733f6
b4fd59a
1422224
8c2f77b
b198525
c0d3082
1237f22
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| Dual MIT/Apache-2.0 License | ||
|
|
||
| Copyright (c) 2025 Bluesky Social PBC | ||
| Copyright (c) 2025-2026 Bluesky Social PBC | ||
|
|
||
| Except as otherwise noted in individual files, this software is licensed under the MIT license (<http://opensource.org/licenses/MIT>), or the Apache License, Version 2.0 (<http://www.apache.org/licenses/LICENSE-2.0>), at your option. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| # PLC Replica Service | ||
|
|
||
| The `replica` command implements a `did:plc` read-replica service that syncs operations from an upstream PLC directory service, and exposes the standard HTTP APIs for resolving and auditing DID documents. | ||
|
|
||
| It performs full cryptographic validation of all inbound PLC operations, including enforcing constraints around operation nullification. | ||
|
|
||
| ``` | ||
| NAME: | ||
| replica - PLC directory replica server | ||
|
|
||
| USAGE: | ||
| replica [global options] | ||
|
|
||
| GLOBAL OPTIONS: | ||
| --postgres-url string PostgreSQL connection string (if set, uses Postgres instead of SQLite) [$POSTGRES_URL] | ||
| --sqlite-path string SQLite database file path (used when --postgres-url is not set) (default: "replica.db") [$SQLITE_PATH] | ||
| --http-addr string HTTP server listen address (default: ":8080") [$HTTP_ADDR] | ||
| --metrics-addr string Metrics HTTP server listen address (default: ":9464") [$METRICS_ADDR] | ||
| --no-ingest Disable ingestion from upstream directory (default: false) [$NO_INGEST] | ||
| --upstream-directory-url string Upstream PLC directory base URL (default: "https://plc.directory") [$UPSTREAM_DIRECTORY_URL] | ||
| --cursor-override int Starting cursor (sequence number) for ingestion (default: -1) [$CURSOR_OVERRIDE] | ||
| --num-workers int Number of validation worker threads (0 = auto) (default: 0) [$NUM_WORKERS] | ||
| --log-level string Log level (debug, info, warn, error) (default: "info") [$LOG_LEVEL] | ||
| --log-json Output logs in JSON format (default: false) [$LOG_JSON] | ||
| --help, -h show help | ||
| ``` | ||
|
|
||
| ## HTTP API | ||
|
|
||
| It exposes the following endpoints, as described in the `did:plc` [spec](https://web.plc.directory/spec/v0.1/did-plc) | ||
|
|
||
| - `GET /{did}` (see Format Differences below) | ||
| - `GET /{did}/data` | ||
| - `GET /{did}/log` | ||
| - `GET /{did}/log/audit` | ||
| - `GET /{did}/log/last` | ||
|
|
||
| Actually, some of these aren't mentioned in the spec, but they are in the [API docs](https://web.plc.directory/api/redoc) and implemented by the [reference implementation](https://github.com/did-method-plc/did-method-plc/tree/main/packages/server). | ||
|
|
||
| It does not support POSTing DID updates to `/{did}` - it only discovers new operations by importing from the upstream instance. | ||
|
|
||
| It does not currently implement the `/export` and `/export/stream` endpoints, although it may in the future. | ||
|
|
||
| ### DID Document Format Differences | ||
|
|
||
| The reference implementation returns DID documents in `application/did+ld+json` format, whereas this replica returns them in `application/did+json` format. Both are described in the [DID specification](https://www.w3.org/TR/did-1.0/), but in practical terms the difference is that the `@context` field is missing. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
|
||
| Secondarily, service identifiers include the DID ([relevant issue](https://github.com/did-method-plc/did-method-plc/issues/90)) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
|
||
| Although these differences are spec-compliant, some PLC client libraries may have trouble with these differences. | ||
|
|
||
|
|
||
| ## Databases | ||
|
|
||
| The service supports either PostgreSQL or SQLite. Postgres has more horizontal scaling headroom on the read path, but SQLite performs better when backfilling. | ||
|
|
||
| ## Backfilling | ||
|
|
||
| When the service is started for the first time, it has to "backfill" the entire PLC operation history from the upstream instance. Until it "catches up", it will not provide up-to-date responses to queries. Depending on your hardware, it should take less than 24h to complete a backfill (at time of writing). Backfilling tends to be bottlenecked by database throughput. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how would operators know that backfill is complete? other than looking at CPU utilization or something. does it log differently? maybe it should not return any API responses until backfill is complete? thinking of ways to prevent operational accidents, especially if this was running in kubernetes or a host was redeployed or something. wouldn't want old/bad API responses during the backfill, if possible. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "net/http" | ||
| "os" | ||
| "runtime" | ||
|
|
||
| "github.com/did-method-plc/go-didplc/replica" | ||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||
| "github.com/urfave/cli/v3" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|
|
||
| func main() { | ||
| cmd := &cli.Command{ | ||
| Name: "plc-replica", | ||
| Usage: "PLC directory replica server", | ||
| Flags: []cli.Flag{ | ||
| &cli.StringFlag{ | ||
| Name: "postgres-url", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is a convention to use |
||
| Usage: "PostgreSQL connection string (if set, uses Postgres instead of SQLite)", | ||
| Sources: cli.EnvVars("POSTGRES_URL"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "sqlite-path", | ||
| Usage: "SQLite database file path (used when --postgres-url is not set)", | ||
| Value: "replica.db", | ||
| Sources: cli.EnvVars("SQLITE_PATH"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "bind", | ||
| Usage: "HTTP server listen address", | ||
| Value: ":8080", | ||
| Sources: cli.EnvVars("REPLICA_BIND"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "metrics-addr", | ||
| Usage: "Metrics HTTP server listen address", | ||
| Value: ":9464", | ||
| Sources: cli.EnvVars("METRICS_ADDR"), | ||
| }, | ||
| &cli.BoolFlag{ | ||
| Name: "no-ingest", | ||
| Usage: "Disable ingestion from upstream directory", | ||
| Sources: cli.EnvVars("NO_INGEST"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "upstream-directory-url", | ||
| Usage: "Upstream PLC directory base URL", | ||
| Value: "https://plc.directory", | ||
| Sources: cli.EnvVars("UPSTREAM_DIRECTORY_URL"), | ||
| }, | ||
| &cli.Int64Flag{ | ||
| Name: "cursor-override", | ||
| Usage: "Starting cursor (sequence number) for ingestion", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this says "starting" which implies it only works when first creating the replica? should be more explicit either way. if you ever need to change the upstream URL, you'd probably need to change the cursor as well; we did that recently with the relay rollout. |
||
| Value: -1, | ||
| Sources: cli.EnvVars("CURSOR_OVERRIDE"), | ||
| }, | ||
| &cli.IntFlag{ | ||
| Name: "num-workers", | ||
| Usage: "Number of validation worker threads (0 = auto)", | ||
| Value: 0, | ||
| Sources: cli.EnvVars("NUM_WORKERS"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "log-level", | ||
| Usage: "Log level (debug, info, warn, error)", | ||
| Value: "info", | ||
| Sources: cli.EnvVars("LOG_LEVEL"), | ||
| }, | ||
| &cli.BoolFlag{ | ||
| Name: "log-json", | ||
| Usage: "Output logs in JSON format", | ||
| Sources: cli.EnvVars("LOG_JSON"), | ||
| }, | ||
| }, | ||
| Action: run, | ||
| } | ||
|
|
||
| if err := cmd.Run(context.Background(), os.Args); err != nil { | ||
| slog.Error("fatal error", "error", err) | ||
| os.Exit(1) | ||
| } | ||
| } | ||
|
|
||
| func run(ctx context.Context, cmd *cli.Command) error { | ||
| // Parse configuration | ||
| postgresURL := cmd.String("postgres-url") | ||
| sqlitePath := cmd.String("sqlite-path") | ||
| httpAddr := cmd.String("bind") | ||
| metricsAddr := cmd.String("metrics-addr") | ||
| noIngest := cmd.Bool("no-ingest") | ||
| directoryURL := cmd.String("upstream-directory-url") | ||
| cursorOverride := cmd.Int64("cursor-override") | ||
| numWorkers := cmd.Int("num-workers") | ||
| logLevel := cmd.String("log-level") | ||
| logJSON := cmd.Bool("log-json") | ||
|
|
||
| // Initialize logger | ||
| var level slog.Level | ||
| switch logLevel { | ||
| case "debug": | ||
| level = slog.LevelDebug | ||
| case "info": | ||
| level = slog.LevelInfo | ||
| case "warn": | ||
| level = slog.LevelWarn | ||
| case "error": | ||
| level = slog.LevelError | ||
| default: | ||
| level = slog.LevelInfo | ||
| } | ||
|
|
||
| var handler slog.Handler | ||
| opts := &slog.HandlerOptions{Level: level} | ||
| if logJSON { | ||
| handler = slog.NewJSONHandler(os.Stdout, opts) | ||
| } else { | ||
| handler = slog.NewTextHandler(os.Stdout, opts) | ||
| } | ||
| logger := slog.New(handler) | ||
| slog.SetDefault(logger) | ||
|
|
||
| if numWorkers <= 0 { | ||
| numWorkers = runtime.NumCPU() | ||
| } | ||
|
|
||
| otelShutdown, err := setupOTel(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("otel setup: %w", err) | ||
| } | ||
| defer otelShutdown(context.Background()) | ||
|
|
||
| var store *replica.GormOpStore | ||
|
|
||
| if postgresURL != "" { | ||
| slog.Info("using database", "type", "postgres", "url", postgresURL) | ||
| store, err = replica.NewGormOpStoreWithPostgres(postgresURL, logger) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create postgres store: %w", err) | ||
| } | ||
| } else { | ||
| slog.Info("using database", "type", "sqlite", "path", sqlitePath) | ||
| store, err = replica.NewGormOpStoreWithSqlite(sqlitePath, logger) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create sqlite store: %w", err) | ||
| } | ||
| } | ||
|
|
||
| server := replica.NewServer(store, httpAddr, logger) | ||
| g, gctx := errgroup.WithContext(ctx) | ||
|
|
||
| g.Go(server.Run) | ||
|
|
||
| g.Go(func() error { | ||
| mux := http.NewServeMux() | ||
| mux.Handle("/metrics", promhttp.Handler()) | ||
| slog.Info("metrics server listening", "addr", metricsAddr) | ||
| return http.ListenAndServe(metricsAddr, mux) | ||
| }) | ||
|
|
||
| if !noIngest { | ||
| ingestor, err := replica.NewIngestor(store, directoryURL, cursorOverride, numWorkers, logger) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| g.Go(func() error { | ||
| return ingestor.Run(gctx) | ||
| }) | ||
| } | ||
|
|
||
| return g.Wait() | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
|
|
||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" | ||
| "go.opentelemetry.io/otel/exporters/prometheus" | ||
| "go.opentelemetry.io/otel/propagation" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/resource" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| semconv "go.opentelemetry.io/otel/semconv/v1.26.0" | ||
| ) | ||
|
|
||
| func setupOTel(ctx context.Context) (shutdown func(context.Context) error, err error) { | ||
| res, err := resource.New(ctx, | ||
| resource.WithAttributes( | ||
| semconv.ServiceName("plc-replica"), | ||
| ), | ||
| ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Traces: OTLP HTTP exporter, configured via OTEL_EXPORTER_OTLP_* env vars. | ||
| traceExporter, err := otlptracehttp.New(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| tp := sdktrace.NewTracerProvider( | ||
| sdktrace.WithBatcher(traceExporter), | ||
| sdktrace.WithResource(res), | ||
| ) | ||
| otel.SetTracerProvider(tp) | ||
| otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( | ||
| propagation.TraceContext{}, | ||
| propagation.Baggage{}, | ||
| )) | ||
|
|
||
| // Metrics: Prometheus exporter, served via /metrics HTTP endpoint. | ||
| promExporter, err := prometheus.New() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| mp := sdkmetric.NewMeterProvider( | ||
| sdkmetric.WithReader(promExporter), | ||
| sdkmetric.WithResource(res), | ||
| ) | ||
| otel.SetMeterProvider(mp) | ||
|
|
||
| shutdown = func(ctx context.Context) error { | ||
| return errors.Join(tp.Shutdown(ctx), mp.Shutdown(ctx)) | ||
| } | ||
| return shutdown, nil | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.