-
Notifications
You must be signed in to change notification settings - Fork 3
PLC Replica (Take 2) #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
30 commits
Select commit
Hold shift + click to select a range
33e79ff
move didplc lib into subdir
DavidBuchanan314 2df04b6
transplant replica impl
DavidBuchanan314 360e4a1
go.mod updates
DavidBuchanan314 60d50da
update replica impl to conform to interface changes
DavidBuchanan314 907f9eb
remove GetOperationLog, GetOperationLogAudit
DavidBuchanan314 ea660c9
simplify marshaling
DavidBuchanan314 23cf599
fix json marshal error handling in server.go
DavidBuchanan314 9cc0979
Document InFlight better, use utils.Int64Comparator
DavidBuchanan314 ed8bb07
More InFlight doc
DavidBuchanan314 ffb228d
increase ascii art by 100%
DavidBuchanan314 90151f7
remove SkipDefaultTransaction
DavidBuchanan314 2e111dd
add seq column (unused)
DavidBuchanan314 9f49248
standardize db arg, add back pg helper
DavidBuchanan314 5c6f307
CommitOperations returns ErrHeadMismatch when appropriate
DavidBuchanan314 f862544
set Last-Modified in handleDIDDoc
DavidBuchanan314 59b1df4
use host:port for cursor peristence
DavidBuchanan314 82352c7
log error response body
DavidBuchanan314 4ed7348
return http 410 for tombstoned DIDs
DavidBuchanan314 752d4a8
update README
DavidBuchanan314 0f854f5
update dependencies
DavidBuchanan314 d2b8324
replica dockerfile
DavidBuchanan314 d0c5251
report lastCommittedOpTime in _health
DavidBuchanan314 ee7856c
enable gorm error translation (fixes ErrHeadMismatch detection)
DavidBuchanan314 1f7394b
more tests
DavidBuchanan314 73a1e45
stop tampering with db connection params
DavidBuchanan314 d2aea3f
update readme
DavidBuchanan314 ddac1a5
disable otel reporting unless explicitly configured
DavidBuchanan314 5eb92f8
change default service port to 6780
DavidBuchanan314 2344251
distinguish replays from truly invalid ops
DavidBuchanan314 ad68733
update README
DavidBuchanan314 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| FROM golang:1.25 AS builder | ||
| WORKDIR /src | ||
| COPY go.mod go.sum ./ | ||
| RUN go mod download | ||
| COPY . . | ||
| RUN go build -o /plc-replica ./cmd/replica | ||
|
|
||
| FROM debian:bookworm-slim | ||
| RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/* | ||
| COPY --from=builder /plc-replica /usr/local/bin/plc-replica | ||
| WORKDIR /data | ||
| ENTRYPOINT ["plc-replica"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| # PLC Replica Service | ||
|
|
||
| The `replica` command implements a `did:plc` read-replica service that syncs operations from an upstream PLC directory service, and exposes the standard HTTP APIs for resolving and auditing DID documents. | ||
|
|
||
| It performs full cryptographic validation of all inbound PLC operations, including enforcing constraints around operation nullification. | ||
|
|
||
| ``` | ||
| NAME: | ||
| plc-replica - PLC directory replica server | ||
|
|
||
| USAGE: | ||
| plc-replica [global options] | ||
|
|
||
| GLOBAL OPTIONS: | ||
| --db-url string Database URL (e.g. sqlite://replica.db?_journal_mode=WAL, postgres://user:pass@host/db) (default: "sqlite://replica.db?mode=rwc&cache=shared&_journal_mode=WAL") [$DATABASE_URL] | ||
| --bind string HTTP server listen address (default: ":6780") [$REPLICA_BIND] | ||
| --metrics-addr string Metrics HTTP server listen address (default: ":9464") [$METRICS_ADDR] | ||
| --no-ingest Disable ingestion from upstream directory [$NO_INGEST] | ||
| --upstream-directory-url string Upstream PLC directory base URL (default: "https://plc.directory") [$UPSTREAM_DIRECTORY_URL] | ||
| --cursor-override int Initial cursor value used to sync from the upstream host. May be useful when switching the upstream host (default: -1) [$CURSOR_OVERRIDE] | ||
| --num-workers int Number of validation worker threads (0 = auto) (default: 0) [$NUM_WORKERS] | ||
| --log-level string Log level (debug, info, warn, error) (default: "info") [$LOG_LEVEL] | ||
| --log-json Output logs in JSON format [$LOG_JSON] | ||
| --help, -h show help | ||
| ``` | ||
|
|
||
| ## HTTP API | ||
|
|
||
| It exposes the following endpoints, as described in the `did:plc` [spec](https://web.plc.directory/spec/v0.1/did-plc) | ||
|
|
||
| - `GET /{did}` (see Format Differences below) | ||
| - `GET /{did}/data` | ||
| - `GET /{did}/log` | ||
| - `GET /{did}/log/audit` | ||
| - `GET /{did}/log/last` | ||
|
|
||
| Actually, some of these aren't mentioned in the spec, but they are in the [API docs](https://web.plc.directory/api/redoc) and implemented by the [reference implementation](https://github.com/did-method-plc/did-method-plc/tree/main/packages/server). | ||
|
|
||
| It does not support POSTing DID updates to `/{did}` - it only discovers new operations by importing from the upstream instance. | ||
|
|
||
| It does not currently implement the `/export` and `/export/stream` endpoints, although it may in the future. | ||
|
|
||
| ### DID Document Format Differences | ||
|
|
||
| The reference implementation returns DID documents in `application/did+ld+json` format, whereas this replica returns them in `application/did+json` format. Both are described in the [DID specification](https://www.w3.org/TR/did-1.0/), but in practical terms the difference is that the `@context` field is missing. | ||
|
|
||
| Secondarily, service identifiers include the DID ([relevant issue](https://github.com/did-method-plc/did-method-plc/issues/90)) | ||
|
|
||
| Although these differences are spec-compliant, some PLC client libraries may have trouble with these differences. | ||
|
|
||
|
|
||
| ## Databases | ||
|
|
||
| The service supports either PostgreSQL or SQLite. Postgres has more horizontal scaling headroom on the read path, but SQLite performs better when backfilling. | ||
|
|
||
| When using PostgresSQL, you may wish to set `synchronous_commit` to `off`. This can improve ingest performance, at the cost potentially losing some recently-committed data after e.g. a power failure. Since this is a replica service, it should be able to quickly re-sync from the upstream host if that happens, so no data is truly lost. | ||
|
|
||
| ## Backfilling | ||
|
|
||
| When the service is started for the first time, it has to "backfill" the entire PLC operation history from the upstream instance. Until it "catches up", it will not provide up-to-date responses to queries. Depending on your hardware, it should take less than 24h to complete a backfill (at time of writing). Backfilling tends to be bottlenecked by database throughput. | ||
|
|
||
| ## Metrics and Tracing | ||
|
|
||
| In addition to the `--metrics-addr` CLI flag, the [`OTEL_EXPORTER_OTLP_ENDPOINT`](https://opentelemetry.io/docs/languages/sdk-configuration/otlp-exporter/#otel_exporter_otlp_endpoint) env var may be set to configure trace reporting. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "log/slog" | ||
| "net/http" | ||
| "os" | ||
| "runtime" | ||
|
|
||
| "github.com/did-method-plc/go-didplc/replica" | ||
| "github.com/prometheus/client_golang/prometheus/promhttp" | ||
| "github.com/urfave/cli/v3" | ||
| "golang.org/x/sync/errgroup" | ||
| ) | ||
|
|
||
| func main() { | ||
| cmd := &cli.Command{ | ||
| Name: "plc-replica", | ||
| Usage: "PLC directory replica server", | ||
| Flags: []cli.Flag{ | ||
| &cli.StringFlag{ | ||
| Name: "db-url", | ||
| Usage: "Database URL (e.g. sqlite://replica.db?_journal_mode=WAL, postgres://user:pass@host/db)", | ||
| Value: "sqlite://replica.db?mode=rwc&cache=shared&_journal_mode=WAL", | ||
| Sources: cli.EnvVars("DATABASE_URL"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "bind", | ||
| Usage: "HTTP server listen address", | ||
| Value: ":6780", | ||
| Sources: cli.EnvVars("REPLICA_BIND"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "metrics-addr", | ||
| Usage: "Metrics HTTP server listen address", | ||
| Value: ":9464", | ||
| Sources: cli.EnvVars("METRICS_ADDR"), | ||
| }, | ||
| &cli.BoolFlag{ | ||
| Name: "no-ingest", | ||
| Usage: "Disable ingestion from upstream directory", | ||
| Sources: cli.EnvVars("NO_INGEST"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "upstream-directory-url", | ||
| Usage: "Upstream PLC directory base URL", | ||
| Value: "https://plc.directory", | ||
| Sources: cli.EnvVars("UPSTREAM_DIRECTORY_URL"), | ||
| }, | ||
| &cli.Int64Flag{ | ||
| Name: "cursor-override", | ||
| Usage: "Initial cursor value used to sync from the upstream host. May be useful when switching the upstream host", | ||
| Value: -1, | ||
| Sources: cli.EnvVars("CURSOR_OVERRIDE"), | ||
| }, | ||
| &cli.IntFlag{ | ||
| Name: "num-workers", | ||
| Usage: "Number of validation worker threads (0 = auto)", | ||
| Value: 0, | ||
| Sources: cli.EnvVars("NUM_WORKERS"), | ||
| }, | ||
| &cli.StringFlag{ | ||
| Name: "log-level", | ||
| Usage: "Log level (debug, info, warn, error)", | ||
| Value: "info", | ||
| Sources: cli.EnvVars("LOG_LEVEL"), | ||
| }, | ||
| &cli.BoolFlag{ | ||
| Name: "log-json", | ||
| Usage: "Output logs in JSON format", | ||
| Sources: cli.EnvVars("LOG_JSON"), | ||
| }, | ||
| }, | ||
| Action: run, | ||
| } | ||
|
|
||
| if err := cmd.Run(context.Background(), os.Args); err != nil { | ||
| slog.Error("fatal error", "error", err) | ||
| os.Exit(1) | ||
| } | ||
| } | ||
|
|
||
| func run(ctx context.Context, cmd *cli.Command) error { | ||
| // Parse configuration | ||
| dbURL := cmd.String("db-url") | ||
| httpAddr := cmd.String("bind") | ||
| metricsAddr := cmd.String("metrics-addr") | ||
| noIngest := cmd.Bool("no-ingest") | ||
| directoryURL := cmd.String("upstream-directory-url") | ||
| cursorOverride := cmd.Int64("cursor-override") | ||
| numWorkers := cmd.Int("num-workers") | ||
| logLevel := cmd.String("log-level") | ||
| logJSON := cmd.Bool("log-json") | ||
|
|
||
| // Initialize logger | ||
| var level slog.Level | ||
| switch logLevel { | ||
| case "debug": | ||
| level = slog.LevelDebug | ||
| case "info": | ||
| level = slog.LevelInfo | ||
| case "warn": | ||
| level = slog.LevelWarn | ||
| case "error": | ||
| level = slog.LevelError | ||
| default: | ||
| level = slog.LevelInfo | ||
| } | ||
|
|
||
| var handler slog.Handler | ||
| opts := &slog.HandlerOptions{Level: level} | ||
| if logJSON { | ||
| handler = slog.NewJSONHandler(os.Stdout, opts) | ||
| } else { | ||
| handler = slog.NewTextHandler(os.Stdout, opts) | ||
| } | ||
| logger := slog.New(handler) | ||
| slog.SetDefault(logger) | ||
|
|
||
| if numWorkers <= 0 { | ||
| numWorkers = runtime.NumCPU() | ||
| } | ||
|
|
||
| otelShutdown, err := setupOTel(ctx) | ||
| if err != nil { | ||
| return fmt.Errorf("otel setup: %w", err) | ||
| } | ||
| defer otelShutdown(context.Background()) | ||
|
|
||
| store, err := replica.NewGormOpStore(dbURL, logger) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to create store: %w", err) | ||
| } | ||
|
|
||
| state := replica.NewReplicaState() | ||
| server := replica.NewServer(store, state, httpAddr, logger) | ||
| g, gctx := errgroup.WithContext(ctx) | ||
|
|
||
| g.Go(server.Run) | ||
|
|
||
| g.Go(func() error { | ||
| mux := http.NewServeMux() | ||
| mux.Handle("/metrics", promhttp.Handler()) | ||
| slog.Info("metrics server listening", "addr", metricsAddr) | ||
| return http.ListenAndServe(metricsAddr, mux) | ||
| }) | ||
|
|
||
| if !noIngest { | ||
| ingestor, err := replica.NewIngestor(store, state, directoryURL, cursorOverride, numWorkers, logger) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| g.Go(func() error { | ||
| return ingestor.Run(gctx) | ||
| }) | ||
| } | ||
|
|
||
| return g.Wait() | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| package main | ||
|
|
||
| import ( | ||
| "context" | ||
| "errors" | ||
| "os" | ||
|
|
||
| "go.opentelemetry.io/otel" | ||
| "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" | ||
| "go.opentelemetry.io/otel/exporters/prometheus" | ||
| "go.opentelemetry.io/otel/propagation" | ||
| sdkmetric "go.opentelemetry.io/otel/sdk/metric" | ||
| "go.opentelemetry.io/otel/sdk/resource" | ||
| sdktrace "go.opentelemetry.io/otel/sdk/trace" | ||
| semconv "go.opentelemetry.io/otel/semconv/v1.26.0" | ||
| ) | ||
|
|
||
| func setupOTel(ctx context.Context) (shutdown func(context.Context) error, err error) { | ||
| res, err := resource.New(ctx, | ||
| resource.WithAttributes( | ||
| semconv.ServiceName("plc-replica"), | ||
| ), | ||
| ) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| var shutdowns []func(context.Context) error | ||
|
|
||
| // Traces: OTLP HTTP exporter, only enabled if an endpoint is configured. | ||
| if os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT") != "" || os.Getenv("OTEL_EXPORTER_OTLP_TRACES_ENDPOINT") != "" { | ||
| traceExporter, err := otlptracehttp.New(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| tp := sdktrace.NewTracerProvider( | ||
| sdktrace.WithBatcher(traceExporter), | ||
| sdktrace.WithResource(res), | ||
| ) | ||
| otel.SetTracerProvider(tp) | ||
| otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( | ||
| propagation.TraceContext{}, | ||
| propagation.Baggage{}, | ||
| )) | ||
| shutdowns = append(shutdowns, tp.Shutdown) | ||
| } | ||
|
|
||
| // Metrics: Prometheus exporter, served via /metrics HTTP endpoint. | ||
| promExporter, err := prometheus.New() | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| mp := sdkmetric.NewMeterProvider( | ||
| sdkmetric.WithReader(promExporter), | ||
| sdkmetric.WithResource(res), | ||
| ) | ||
| otel.SetMeterProvider(mp) | ||
| shutdowns = append(shutdowns, mp.Shutdown) | ||
|
|
||
| shutdown = func(ctx context.Context) error { | ||
| var errs []error | ||
| for _, fn := range shutdowns { | ||
| errs = append(errs, fn(ctx)) | ||
| } | ||
| return errors.Join(errs...) | ||
| } | ||
| return shutdown, nil | ||
| } |
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the db tests are run under both pg and sqlite, and also with the race detector