diff --git a/pkg/sync/README.md b/pkg/sync/README.md index be8878b7b..b067e4da9 100644 --- a/pkg/sync/README.md +++ b/pkg/sync/README.md @@ -11,18 +11,27 @@ The sync mechanism consists of two main components: 1. **Header Sync Service** - responsible for synchronizing block headers 2. **Data Sync Service** - responsible for synchronizing block data (transactions and metadata) +Both services are instances of the same generic `SyncService` and differ only by the header type they specialize (signed headers vs. block data). + ## Architecture ```mermaid graph TD subgraph "Node" - BM[Block Manager] + Exec["Block Executor (aggregator nodes)"] + Sync["Block Syncer (follower nodes)"] HSS[Header Sync Service] DSS[Data Sync Service] P2P[P2P Client] - - BM -->|Headers| HSS - BM -->|Data| DSS + Store[(Shared Store)] + + Exec -->|WriteToStoreAndBroadcast| HSS + Exec -->|WriteToStoreAndBroadcast| DSS + HSS -->|Persist| Store + DSS -->|Persist| Store + Store -->|Load next block| Sync + Sync -->|Republish DA data| HSS + Sync -->|Republish DA data| DSS HSS <-->|P2P| P2P DSS <-->|P2P| P2P end @@ -31,7 +40,8 @@ graph TD DAL[Data Availability Layer] end - BM <-->|Submit/Retrieve| DAL + Exec -.->|Submit headers/data| DAL + Sync -->|Retrieve blobs| DAL subgraph "Other Nodes" ON[Other Nodes] @@ -70,76 +80,83 @@ classDiagram HeaderSyncService --|> SyncService : H = *types.SignedHeader ``` -### 2. Block Manager (`block/manager.go`) +#### Lifecycle and responsibilities -The Block Manager orchestrates the synchronization process through several key goroutines: +- `node/full.go` wires the header and data services into both aggregator and follower nodes (see `initHeaderSyncService` and `initDataSyncService`). +- Both services wrap a go-header `Store` instance that is prefixed per sync type, allowing them to share disk state while keeping namespaces separate. +- `WriteToStoreAndBroadcast` (also used by the block executor) ensures the store is initialized with genesis data, starts the go-header syncer once via `SyncerStatus`, and gossips new items through libp2p. +- When the node runs in follower mode, the go-header syncer fills the store from peers; when running as an aggregator, locally produced blocks flow through the same method. -#### a. SyncLoop +### 2. Block Syncer (`block/internal/syncing/syncer.go`) -```mermaid -flowchart TD - SL[SyncLoop] --> |periodic| NBRCH[Send Signal to Retrieve Channel] - SL --> |periodic| NBHCH[Send Signal to Header Store Channel] - SL --> |periodic| NBDCH[Send Signal to Data Store Channel] - SL --> |on header event| HC[Process Header] - SL --> |on data event| DC[Process Data] - - HC --> |cache header| TSYNC[Try Sync Next Block] - DC --> |cache data| TSYNC - - TSYNC --> |if header & data available| AB[Apply Block] - AB --> |if successful| SB[Store Block] - SB --> |if successful| UH[Update Height] -``` +Follower nodes construct the block syncer to hydrate local state from the shared go-header stores and the DA layer. The syncer owns two long-lived goroutines that coordinate incoming events and outbound fetches. + +#### a. `processLoop` -#### b. HeaderStoreRetrieveLoop +- Listens on `heightInCh` for new `DAHeightEvent` values sourced from P2P or DA. +- Uses the in-memory cache to de-duplicate and park out-of-order heights. +- Calls `trySyncNextBlock` to execute the next block when header and data are available. ```mermaid flowchart TD - HSRL[HeaderStoreRetrieveLoop] --> |on signal| CH[Check Height] - CH --> |if new headers| GH[Get Headers] - GH --> |for each header| VH[Validate Header] - VH --> |if valid| SH[Send to headerInCh] + Start[Start] --> Select{select} + Select -->|"ctx.Done()"| Stop[Stop] + Select -->|heightInCh| Handle[processHeightEvent] + Handle --> Start ``` -#### c. DataStoreRetrieveLoop +#### b. `syncLoop` + +- Sleeps until genesis if necessary, then runs at the cadence of the configured block time. +- Drains cached events, pulls new ranges from the go-header stores (`tryFetchFromP2P`), and queries the DA layer with backoff (`tryFetchFromDA`). +- Pushes results into `heightInCh`, falling back to the cache when the queue is full. ```mermaid flowchart TD - DSRL[DataStoreRetrieveLoop] --> |on signal| CD[Check Height] - CD --> |if new data| GD[Get Data] - GD --> |for each data| SD[Send to dataInCh] + Begin[Start loop] --> Pending[processPendingEvents] + Pending --> P2P[tryFetchFromP2P] + P2P --> DA[tryFetchFromDA] + DA --> Wait{"ctx.Done()?"} + Wait -->|yes| End[Stop] + Wait -->|no| Sleep[time.After/backoff] + Sleep --> Begin ``` -#### d. RetrieveLoop +#### c. Supporting helpers -```mermaid -flowchart TD - RL[RetrieveLoop] --> |on signal| PDA[Process Next DA Header] - PDA --> |if successful| IH[Increment Height] - IH --> RL -``` +- `processPendingEvents` replays cached events once the next height becomes available. +- `trySyncNextBlock` validates, executes, and persists the block via the execution client and shared store. + - Persists the block through a `store.Store` batch, bumps height/state, and marks headers/data as seen to enforce sequential progress and metrics updates. +- `tryFetchFromDA` manages DA backoff windows and advances `daHeight` on success. +- `tryFetchFromP2P` reads the latest height from both header and data go-header stores, enqueueing any ranges the node has not yet processed. -## Communication Channels +## Communication Paths -The Block Manager uses several channels for communication between its components: +The block syncer relies on a handful of queues and shared stores to keep the node in sync: -1. `headerInCh` - Receives headers from both P2P and DA layer -2. `dataInCh` - Receives data from both P2P and DA layer -3. `headerStoreCh` - Signals to check for new headers in the store -4. `dataStoreCh` - Signals to check for new data in the store -5. `retrieveCh` - Signals to retrieve data from the DA layer -6. `HeaderCh` - Sends headers to the HeaderSyncService for broadcasting -7. `DataCh` - Sends data to the DataSyncService for broadcasting +1. `heightInCh` – Buffered queue that carries `common.DAHeightEvent` values from both the P2P handler and DA retriever into `processLoop`. +2. `cache.Manager` – In-memory structure that tracks pending events and deduplicates headers/data that arrive out of order. +3. `headerStore` / `dataStore` – go-header stores exposed by the sync services. Aggregators append to them when producing blocks; followers poll them in `tryFetchFromP2P` to learn about new ranges. +4. `errorCh` – Channel surfaced to the higher-level block components so critical execution failures inside the syncer can halt the node cleanly. ## Synchronization Process -1. Headers and data are received through P2P gossip or retrieved from the DA layer -2. They are stored in the respective stores and cached in memory -3. When both a header and its corresponding data are available, the block is applied -4. The state is updated and the next block is processed -5. New blocks created by the node are broadcast to peers via the P2P network -6. Headers are submitted to the DA layer for finality +1. Aggregator executors call `WriteToStoreAndBroadcast`, or remote peers gossip new headers and block data through the sync services. +2. `SyncService` instances persist the payload in the prefixed go-header stores and broadcast it over libp2p. +3. Follower syncers observe the updated store heights, fetch any missing data via P2P or the DA layer, and enqueue events on `heightInCh`. +4. The syncer executes the block via the execution client, writes it to `store.Store` using a batch, updates in-memory state, and records metrics. +5. For DA-sourced events, the syncer republishes the block by calling `WriteToStoreAndBroadcast` on the header and data services (`block/internal/syncing/syncer.go:389-392`) so gossip peers stay updated. +6. Successfully applied blocks are now visible to both the local node and the sync services, keeping aggregator and follower paths in sync. +7. Aggregator nodes additionally submit headers/data to the DA layer for finality. + +## Integration with Block Components + +The sync package is consumed by both the block executor (aggregator mode) and the block syncer (follower mode): + +- **Aggregator nodes** – `node/full.go:101` constructs `block.NewAggregatorComponents`, which in turn creates `block/internal/executing.Executor`. After the executor commits a block, it calls `WriteToStoreAndBroadcast` on the header and data services (`block/internal/executing/executor.go:415`). This persists the block in the shared store and gossips it to peers. +- **Follower nodes** – `node/full.go:116` builds `block.NewSyncComponents`, wiring the same sync services into `block/internal/syncing.Syncer`. The syncer consumes updates written by the services (`block/internal/syncing/syncer.go:77`) and merges them with DA retrieval to hydrate local state. +- **Common broadcaster contract** – Both block paths depend only on the slim `block/internal/common.Broadcaster` interface, so alternate sync implementations can be plugged in as long as they expose `WriteToStoreAndBroadcast` and `Store`. +- **Execution engine boundary** – Because the sync services operate on generic header types, swapping execution engines only requires satisfying the `core/execution.Executor` interface; the sync plumbing remains unchanged. ## Dependencies