Skip to content

Commit 977917d

Browse files
committed
Rewrite fetcher logic to be more robust
1 parent 1055433 commit 977917d

15 files changed

+675
-101
lines changed

.gitignore

+3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
# MacOS Leftovers
22
.DS_Store
33

4+
# Data Leftovers
5+
indexer-db/*
6+
47
# Editor Leftovers
58
.vscode
69
.idea

client/http.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package client
2+
3+
import (
4+
"fmt"
5+
6+
rpcClient "github.com/gnolang/gno/tm2/pkg/bft/rpc/client"
7+
core_types "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types"
8+
)
9+
10+
// Client is the TM2 HTTP client
11+
type Client struct {
12+
client rpcClient.Client
13+
}
14+
15+
// NewClient creates a new TM2 HTTP client
16+
func NewClient(remote string) *Client {
17+
return &Client{
18+
client: rpcClient.NewHTTP(remote, ""),
19+
}
20+
}
21+
22+
func (c *Client) GetLatestBlockNumber() (int64, error) {
23+
status, err := c.client.Status()
24+
if err != nil {
25+
return 0, fmt.Errorf("unable to get chain status, %w", err)
26+
}
27+
28+
return status.SyncInfo.LatestBlockHeight, nil
29+
}
30+
31+
func (c *Client) GetBlock(blockNum int64) (*core_types.ResultBlock, error) {
32+
block, err := c.client.Block(&blockNum)
33+
if err != nil {
34+
return nil, fmt.Errorf("unable to get block, %w", err)
35+
}
36+
37+
return block, nil
38+
}
39+
40+
func (c *Client) GetBlockResults(blockNum int64) (*core_types.ResultBlockResults, error) {
41+
results, err := c.client.BlockResults(&blockNum)
42+
if err != nil {
43+
return nil, fmt.Errorf("unable to get block results, %w", err)
44+
}
45+
46+
return results, nil
47+
}

cmd/start.go

+91-8
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,26 @@ package main
33
import (
44
"context"
55
"flag"
6+
"fmt"
67

8+
"github.com/gnolang/tx-indexer/client"
9+
"github.com/gnolang/tx-indexer/fetch"
10+
"github.com/gnolang/tx-indexer/serve"
11+
"github.com/gnolang/tx-indexer/storage"
712
"github.com/peterbourgon/ff/v3/ffcli"
13+
"go.uber.org/zap"
814
)
915

10-
type startCfg struct{}
16+
const (
17+
defaultRemote = "http://127.0.0.1:26657"
18+
defaultDBPath = "indexer-db"
19+
)
20+
21+
type startCfg struct {
22+
listenAddress string
23+
remote string
24+
dbPath string
25+
}
1126

1227
// newStartCmd creates the indexer start command
1328
func newStartCmd() *ffcli.Command {
@@ -19,19 +34,87 @@ func newStartCmd() *ffcli.Command {
1934
return &ffcli.Command{
2035
Name: "start",
2136
ShortUsage: "start [flags]",
22-
LongHelp: "Starts the indexer",
37+
ShortHelp: "Starts the indexer service",
38+
LongHelp: "Starts the indexer service, which includes the fetcher and JSON-RPC server",
2339
FlagSet: fs,
24-
Exec: cfg.exec,
40+
Exec: func(ctx context.Context, _ []string) error {
41+
return cfg.exec(ctx)
42+
},
2543
}
2644
}
2745

2846
// registerFlags registers the indexer start command flags
29-
func (c *startCfg) registerFlags(_ *flag.FlagSet) {
30-
// TODO define flags
47+
func (c *startCfg) registerFlags(fs *flag.FlagSet) {
48+
fs.StringVar(
49+
&c.listenAddress,
50+
"listen-address",
51+
serve.DefaultListenAddress,
52+
"the IP:PORT URL for the indexer JSON-RPC server",
53+
)
54+
55+
fs.StringVar(
56+
&c.remote,
57+
"remote",
58+
defaultRemote,
59+
"the JSON-RPC URL of the Gno chain",
60+
)
61+
62+
fs.StringVar(
63+
&c.dbPath,
64+
"db-path",
65+
defaultDBPath,
66+
"the absolute path for the indexer DB (embedded)",
67+
)
3168
}
3269

3370
// exec executes the indexer start command
34-
func (c *startCfg) exec(_ context.Context, _ []string) error {
35-
// TODO add implementation
36-
return nil
71+
func (c *startCfg) exec(ctx context.Context) error {
72+
// Create a new logger
73+
logger, err := zap.NewDevelopment()
74+
if err != nil {
75+
return fmt.Errorf("unable to create logger, %w", err)
76+
}
77+
78+
// Create a DB instance
79+
db, err := storage.New(c.dbPath)
80+
if err != nil {
81+
return fmt.Errorf("unable to open storage DB, %w", err)
82+
}
83+
84+
defer func() {
85+
if err := db.Close(); err != nil {
86+
logger.Error("unable to gracefully close DB", zap.Error(err))
87+
}
88+
}()
89+
90+
// Create the fetcher instance
91+
f := fetch.New(
92+
db,
93+
client.NewClient(c.remote),
94+
fetch.WithLogger(
95+
logger.Named("fetcher"),
96+
),
97+
)
98+
99+
// Create the JSON-RPC service
100+
j := serve.NewJSONRPC(
101+
serve.WithLogger(
102+
logger.Named("json-rpc"),
103+
),
104+
serve.WithListenAddress(
105+
c.listenAddress,
106+
),
107+
)
108+
109+
// Create a new waiter
110+
w := newWaiter(ctx)
111+
112+
// Add the fetcher service
113+
w.add(f.FetchTransactions)
114+
115+
// Add the JSON-RPC service
116+
w.add(j.Serve)
117+
118+
// Wait for the services to stop
119+
return w.wait()
37120
}

cmd/waiter.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"os"
6+
"os/signal"
7+
"syscall"
8+
9+
"golang.org/x/sync/errgroup"
10+
)
11+
12+
type waitFunc func(ctx context.Context) error
13+
14+
// waiter is a concept used for waiting on running services
15+
type waiter struct {
16+
ctx context.Context
17+
cancel context.CancelFunc
18+
19+
waitFns []waitFunc
20+
}
21+
22+
// newWaiter creates a new waiter instance
23+
func newWaiter(ctx context.Context) *waiter {
24+
w := &waiter{
25+
waitFns: []waitFunc{},
26+
}
27+
28+
w.ctx, w.cancel = signal.NotifyContext(
29+
ctx,
30+
os.Interrupt,
31+
syscall.SIGINT,
32+
syscall.SIGTERM,
33+
syscall.SIGQUIT,
34+
)
35+
36+
return w
37+
}
38+
39+
// add adds a new wait service
40+
func (w *waiter) add(fns ...waitFunc) {
41+
w.waitFns = append(w.waitFns, fns...)
42+
}
43+
44+
// wait blocks until all added wait services finish
45+
func (w *waiter) wait() error {
46+
g, ctx := errgroup.WithContext(w.ctx)
47+
48+
g.Go(func() error {
49+
<-ctx.Done()
50+
w.cancel()
51+
52+
return nil
53+
})
54+
55+
for _, fn := range w.waitFns {
56+
fn := fn
57+
58+
g.Go(
59+
func() error {
60+
return fn(ctx)
61+
},
62+
)
63+
}
64+
65+
return g.Wait()
66+
}

0 commit comments

Comments
 (0)