Skip to content

Commit 47c08af

Browse files
authored
market: Real-world fixes (#319)
* market: Improve indexing logging * Add missing ipni peerid constraint * cuhttp: allow external tls termination * better errors in indexing * don't error on nil errors * indexing: pass correct errors * indexing: More aggresive retry * indexing: More conservative batch size * ipni: log index rate * indexing: Schedule ipni first * indexing: Bump cql params * indexing: ab test sorted inserts * nope * indexing: Less consistency * ipni: Fix prev parse in serve * ipni: Set ad addresses when serving * ipni: Bump some log levels * ipni: debugging chunk reader * Export dagstore metrics * remotebstore: Fix metrics * export metrics properly this time * tune min read size * indexing: Try a completely different schema * use bufio for faster indexing readers * debug get piecehashrange * more ipni debug * ipni from-car also needs next node * check numblocks in indexstore * flag for ipni chunk debug * indexing: Fix offset checks * fastparamfetch: Improve aria opts * http is bad * ipni: Announce correct address * make gen * ipni/dealfilter fixes * more dealfilter fixes * fix http header marshal * fix max concurrent deal size backpressure * fix null error on deal status check * fix dealstatus/sector insert * fix panic in sector alloc * make gen * fix seal start duration * deal ui fixes * improve piece state webui * fix how indexing tasks are created * webui: move task page under pages * webui: task run page * webui: make sector page load fast * webui: Better snap page * webui: Add snap data to the sector page * webui: make piece page readable * webui: show multiple deals on the piece page * webui: Show related pipelines in task page * webui: nicer deal list * market: Fix sector to deal assignment * webui: Fix deal page pefore publish * webui: Improve deal status display * market: Fix sector offset finding * webui: show piecepark on the piece page * urlpiecereader: Improve errors * webui: Advanced task display on piece page * webui: Small deal page improvements * webui: Fix deal page on sealed deals * market: Correctly mark deals as sealed * indexing: Fix non-imported deal indexing * ipni: Limit max from-car entry size * ipni: Small entry cache to support retry after read timeout * ipni: Bump write timeout * ipni: Speculatively precompute next entry * ipni: More informative logs * market: Set deal sector regprooftype in ingest * make gen * fix snap deal ingest * webui: Show failed snap info on the sector page * park piece: better retry backoff * webui: Improve the Storage Deals page * Reindexing * fix ipni retrievals * cleanup indexing tasks more correctly * ipni debug * ipni: Detailed ad output, entry scan * webui: better scanned entry select * More ipni debugging * libp2p: Handle boost transports proto * make gen * http: Add missing info endpoint * ipni: Make db reads really work this time * storage: Show reservations in expvar * storage debug; proposal cid in deals * webui: Show proposal CID * webui: Nicer time format on tasks pages * ipni: wip always index from db * ipni: Auto-repair missing Ads * webui: Fix IPNI status * webui: typo in ipni * fix bronkn storage release * webui: Improve the piece page more * webui: Message Cid component * webui: Epoch component * ipni: fix bad entry panic * ipni: search by entry cid * ipni: Ad skip option * webui: Pipeline remove button * webui: pieceref lookup on the piece page * gen, mod tidy * snap ingest: fix sector size check * storage: Fix max storage pct math * webui: Fix ipni status page on fresh providers * ipni: Space provider announcments * webui: Pipeline waterfalls * webui: market waterfall * webui: sdr/snap pipeline waterfalls * market-specific backpressure knobs * cli: Drop testDebugMigPcid * webui: market bulk fail management * tasks: Put sealing before piecepark * webui: market bulk fail remove * gen, don't hardcode f0 * fix indexstore test * Add missing retry in movestorage * Appease the linter * fix indexstore test * make gen * Fix indexstore test for real * fix localpath test
1 parent 15f0d33 commit 47c08af

File tree

101 files changed

+7076
-1346
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

101 files changed

+7076
-1346
lines changed

cmd/curio/debug-ipni.go

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"os"
9+
10+
"github.com/ipfs/go-cid"
11+
carv2 "github.com/ipld/go-car/v2"
12+
"github.com/urfave/cli/v2"
13+
"golang.org/x/xerrors"
14+
15+
"github.com/filecoin-project/curio/market/ipni/chunker"
16+
)
17+
18+
var testDebugIpniChunks = &cli.Command{
19+
Name: "ipni-piece-chunks",
20+
Usage: "generate ipni chunks from a file",
21+
Action: func(c *cli.Context) error {
22+
ck := chunker.NewInitialChunker()
23+
24+
f, err := os.Open(c.Args().First())
25+
if err != nil {
26+
return xerrors.Errorf("opening file: %w", err)
27+
}
28+
defer f.Close()
29+
30+
opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)}
31+
blockReader, err := carv2.NewBlockReader(bufio.NewReaderSize(f, 4<<20), opts...)
32+
if err != nil {
33+
return fmt.Errorf("getting block reader over piece: %w", err)
34+
}
35+
36+
blockMetadata, err := blockReader.SkipNext()
37+
for err == nil {
38+
if err := ck.Accept(blockMetadata.Cid.Hash(), int64(blockMetadata.Offset), blockMetadata.Size+40); err != nil {
39+
return xerrors.Errorf("accepting block: %w", err)
40+
}
41+
42+
blockMetadata, err = blockReader.SkipNext()
43+
}
44+
if !errors.Is(err, io.EOF) {
45+
return xerrors.Errorf("reading block: %w", err)
46+
}
47+
48+
_, err = ck.Finish(c.Context, nil, cid.Undef)
49+
if err != nil {
50+
return xerrors.Errorf("chunking CAR multihash iterator: %w", err)
51+
}
52+
53+
return nil
54+
},
55+
}

cmd/curio/seal.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ var sealStartCmd = &cli.Command{
152152
}
153153

154154
for _, n := range num {
155-
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
155+
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof, user_sector_duration_epochs) values ($1, $2, $3, $4)", mid, n, spt, userDuration)
156156
if err != nil {
157157
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
158158
}

cmd/curio/tasks/tasks.go

+21-15
Original file line numberDiff line numberDiff line change
@@ -178,18 +178,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
178178
return ffi.NewSealCalls(stor, lstor, si), nil
179179
})
180180

181-
{
182-
// Piece handling
183-
if cfg.Subsystems.EnableParkPiece {
184-
parkPieceTask, err := piece2.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks)
185-
if err != nil {
186-
return nil, err
187-
}
188-
cleanupPieceTask := piece2.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0)
189-
activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask)
190-
}
191-
}
192-
193181
hasAnySealingTask := cfg.Subsystems.EnableSealSDR ||
194182
cfg.Subsystems.EnableSealSDRTrees ||
195183
cfg.Subsystems.EnableSendPrecommitMsg ||
@@ -199,7 +187,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
199187
cfg.Subsystems.EnableBatchSeal ||
200188
cfg.Subsystems.EnableUpdateEncode ||
201189
cfg.Subsystems.EnableUpdateProve ||
202-
cfg.Subsystems.EnableUpdateSubmit
190+
cfg.Subsystems.EnableUpdateSubmit ||
191+
cfg.Subsystems.EnableCommP
203192

204193
if hasAnySealingTask {
205194
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine)
@@ -209,6 +198,18 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
209198
activeTasks = append(activeTasks, sealingTasks...)
210199
}
211200

201+
{
202+
// Piece handling
203+
if cfg.Subsystems.EnableParkPiece {
204+
parkPieceTask, err := piece2.NewParkPieceTask(db, must.One(slrLazy.Val()), cfg.Subsystems.ParkPieceMaxTasks)
205+
if err != nil {
206+
return nil, err
207+
}
208+
cleanupPieceTask := piece2.NewCleanupPieceTask(db, must.One(slrLazy.Val()), 0)
209+
activeTasks = append(activeTasks, parkPieceTask, cleanupPieceTask)
210+
}
211+
}
212+
212213
miners := make([]address.Address, 0, len(maddrs))
213214
for k := range maddrs {
214215
miners = append(miners, address.Address(k))
@@ -237,7 +238,10 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
237238
// PSD and Deal find task do not require many resources. They can run on all machines
238239
psdTask := storage_market.NewPSDTask(dm, db, sender, as, &cfg.Market.StorageMarketConfig.MK12, full)
239240
dealFindTask := storage_market.NewFindDealTask(dm, db, full, &cfg.Market.StorageMarketConfig.MK12)
240-
activeTasks = append(activeTasks, psdTask, dealFindTask)
241+
242+
checkIndexesTask := indexing.NewCheckIndexesTask(db, iStore)
243+
244+
activeTasks = append(activeTasks, psdTask, dealFindTask, checkIndexesTask)
241245

242246
// Start libp2p hosts and handle streams
243247
err = libp2p.NewDealProvider(ctx, db, cfg, dm.MK12Handler, full, sender, miners, machine)
@@ -250,7 +254,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
250254

251255
indexingTask := indexing.NewIndexingTask(db, sc, iStore, pp, cfg, idxMax)
252256
ipniTask := indexing.NewIPNITask(db, sc, iStore, pp, cfg, idxMax)
253-
activeTasks = append(activeTasks, indexingTask, ipniTask)
257+
activeTasks = append(activeTasks, ipniTask, indexingTask)
254258

255259
if cfg.HTTP.Enable {
256260
err = cuhttp.StartHTTPServer(ctx, dependencies)
@@ -277,6 +281,8 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
277281
}
278282
go machineDetails(dependencies, activeTasks, ht.ResourcesAvailable().MachineID, dependencies.Name)
279283

284+
*dependencies.MachineID = int64(ht.ResourcesAvailable().MachineID)
285+
280286
if hasAnySealingTask {
281287
watcher, err := message.NewMessageWatcher(db, ht, chainSched, full)
282288
if err != nil {

cmd/curio/test-cli.go

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ var testCmd = &cli.Command{
2828
Subcommands: []*cli.Command{
2929
//provingInfoCmd,
3030
wdPostCmd,
31+
testDebugCmd,
3132
},
3233
Before: func(cctx *cli.Context) error {
3334
return nil

cmd/curio/test-debug.go

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package main
2+
3+
import (
4+
"github.com/urfave/cli/v2"
5+
)
6+
7+
var testDebugCmd = &cli.Command{
8+
Name: "debug",
9+
Usage: "Collection of debugging utilities",
10+
Subcommands: []*cli.Command{
11+
testDebugIpniChunks,
12+
},
13+
}

cuhttp/server.go

+23-15
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"crypto/tls"
66
"fmt"
77
"net/http"
8+
"strings"
89
"time"
910

1011
"github.com/CAFxX/httpcompression"
@@ -73,7 +74,7 @@ func compressionMiddleware(config *config.CompressionConfig) (func(http.Handler)
7374
func libp2pConnMiddleware(next http.Handler) http.Handler {
7475
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
7576
// Check if the request path is "/"
76-
if r.URL.Path == "/" {
77+
if r.URL.Path == "/" || r.URL.Path == "" {
7778
// Check if the request is a WebSocket upgrade request
7879
if isWebSocketUpgrade(r) {
7980
// Rewrite the path to "/libp2p"
@@ -92,26 +93,18 @@ func isWebSocketUpgrade(r *http.Request) bool {
9293
if r.Method != http.MethodGet {
9394
return false
9495
}
95-
if r.Header.Get("Upgrade") != "websocket" {
96+
if strings.ToLower(r.Header.Get("Upgrade")) != "websocket" {
9697
return false
9798
}
98-
if r.Header.Get("Connection") != "Upgrade" {
99+
if strings.ToLower(r.Header.Get("Connection")) != "upgrade" {
99100
return false
100101
}
101102
return true
102103
}
103104

104105
func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
105-
ch := cache{db: d.DB}
106106
cfg := d.Cfg.HTTP
107107

108-
// Set up the autocert manager for Let's Encrypt
109-
certManager := autocert.Manager{
110-
Cache: ch,
111-
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
112-
HostPolicy: autocert.HostWhitelist(cfg.DomainName),
113-
}
114-
115108
// Setup the Chi router for more complex routing (if needed in the future)
116109
chiRouter := chi.NewRouter()
117110

@@ -160,20 +153,35 @@ func StartHTTPServer(ctx context.Context, d *deps.Deps) error {
160153
Addr: cfg.ListenAddress,
161154
Handler: libp2pConnMiddleware(loggingMiddleware(compressionMw(chiRouter))), // Attach middlewares
162155
ReadTimeout: cfg.ReadTimeout,
163-
WriteTimeout: cfg.WriteTimeout,
156+
WriteTimeout: time.Hour * 2,
164157
IdleTimeout: cfg.IdleTimeout,
165158
ReadHeaderTimeout: cfg.ReadHeaderTimeout,
166-
TLSConfig: &tls.Config{
159+
}
160+
161+
if !cfg.DelegateTLS {
162+
// Set up the autocert manager for Let's Encrypt
163+
certManager := autocert.Manager{
164+
Cache: cache{db: d.DB},
165+
Prompt: autocert.AcceptTOS, // Automatically accept the Terms of Service
166+
HostPolicy: autocert.HostWhitelist(cfg.DomainName),
167+
}
168+
169+
server.TLSConfig = &tls.Config{
167170
GetCertificate: certManager.GetCertificate,
168-
},
171+
}
169172
}
170173

171174
// We don't need to run an HTTP server. Any HTTP request should simply be handled as HTTPS.
172175

173176
// Start the server with TLS
174177
go func() {
175178
log.Infof("Starting HTTPS server for https://%s on %s", cfg.DomainName, cfg.ListenAddress)
176-
serr := server.ListenAndServeTLS("", "")
179+
var serr error
180+
if !cfg.DelegateTLS {
181+
serr = server.ListenAndServeTLS("", "")
182+
} else {
183+
serr = server.ListenAndServe()
184+
}
177185
if serr != nil {
178186
log.Errorf("Failed to start HTTPS server: %s", serr)
179187
panic(serr)

deps/config/doc_gen.go

+44-23
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)