|
9 | 9 |
|
10 | 10 | # Uncategorized helper functions from the spec |
11 | 11 | import |
12 | | - chronicles, results, taskpools, |
| 12 | + chronos, chronicles, results, taskpools, |
13 | 13 | eth/p2p/discoveryv5/node, |
14 | 14 | kzg4844/kzg, |
15 | 15 | ssz_serialization/[ |
@@ -163,30 +163,51 @@ proc recover_cells_and_proofs_parallel*( |
163 | 163 |
|
164 | 164 | for column in dataColumns: |
165 | 165 | if not (blobCount == column.column.len): |
166 | | - return err ("DataColumns do not have the same length") |
| 166 | + return err("DataColumns do not have the same length") |
167 | 167 |
|
168 | | - # spawn threads for recovery |
169 | 168 | var |
170 | | - pendingFuts = newSeq[Flowvar[Result[CellsAndProofs, void]]](blobCount) |
| 169 | + pendingFuts: seq[Flowvar[Result[CellsAndProofs, void]]] |
171 | 170 | res = newSeq[CellsAndProofs](blobCount) |
172 | | - for blobIdx in 0..<blobCount: |
| 171 | + |
| 172 | + let startTime = Moment.now() |
| 173 | + const reconstructionTimeout = 2.seconds |
| 174 | + |
| 175 | + # ---- Spawn phase with time limit ---- |
| 176 | + for blobIdx in 0 ..< blobCount: |
| 177 | + let now = Moment.now() |
| 178 | + if (now - startTime) > reconstructionTimeout: |
| 179 | + debug "PeerDAS reconstruction timed out while preparing columns", |
| 180 | + spawned = pendingFuts.len, total = blobCount |
| 181 | + break # Stop spawning new tasks |
| 182 | + |
173 | 183 | var |
174 | 184 | cellIndices = newSeq[CellIndex](columnCount) |
175 | 185 | cells = newSeq[Cell](columnCount) |
176 | 186 | for i in 0 ..< dataColumns.len: |
177 | 187 | cellIndices[i] = dataColumns[i][].index |
178 | 188 | cells[i] = dataColumns[i][].column[blobIdx] |
179 | | - pendingFuts[blobIdx] = |
180 | | - tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells) |
| 189 | + pendingFuts.add(tp.spawn recoverCellsAndKzgProofsTask(cellIndices, cells)) |
| 190 | + |
| 191 | + # ---- Sync phase ---- |
| 192 | + for i in 0 ..< pendingFuts.len: |
| 193 | + let now = Moment.now() |
| 194 | + if (now - startTime) > reconstructionTimeout: |
| 195 | + debug "PeerDAS reconstruction timed out", |
| 196 | + completed = i, totalSpawned = pendingFuts.len |
| 197 | + return err("Data column reconstruction timed out") |
181 | 198 |
|
182 | | - # sync threads |
183 | | - for i in 0..<blobCount: |
184 | 199 | let futRes = sync pendingFuts[i] |
185 | 200 | if futRes.isErr: |
186 | 201 | return err("KZG cells and proofs recovery failed") |
| 202 | + |
187 | 203 | res[i] = futRes.get |
| 204 | + |
| 205 | + if pendingFuts.len < blobCount: |
| 206 | + return err("Data column reconstruction timed out") |
| 207 | + |
188 | 208 | ok(res) |
189 | 209 |
|
| 210 | + |
190 | 211 | proc assemble_data_column_sidecars*( |
191 | 212 | signed_beacon_block: fulu.SignedBeaconBlock | gloas.SignedBeaconBlock, |
192 | 213 | blobs: seq[KzgBlob], cell_proofs: seq[KzgProof]): seq[fulu.DataColumnSidecar] = |
|
0 commit comments