Skip to content

Commit

Permalink
Merge pull request #6115 from filecoin-project/feat/incoming-block
Browse files Browse the repository at this point in the history
feat: api: add SyncIncomingBlocks
  • Loading branch information
LinZexiao authored Aug 22, 2023
2 parents 89dc82b + 55bf6e5 commit eb6fe63
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 2 deletions.
4 changes: 4 additions & 0 deletions app/submodule/syncer/syncer_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,7 @@ func (sa *syncerAPI) SyncState(ctx context.Context) (*types.SyncState, error) {

return syncState, nil
}

func (sa *syncerAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
return sa.syncer.ChainSyncManager.BlockProposer().IncomingBlocks(ctx)
}
1 change: 1 addition & 0 deletions pkg/chainsync/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type BlockProposer interface {
SendHello(ci *types2.ChainInfo) error
SendOwnBlock(ci *types2.ChainInfo) error
SendGossipBlock(ci *types2.ChainInfo) error
IncomingBlocks(ctx context.Context) (<-chan *types2.BlockHeader, error)
}

var _ = (BlockProposer)((*dispatcher.Dispatcher)(nil))
Expand Down
42 changes: 42 additions & 0 deletions pkg/chainsync/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
atmoic2 "sync/atomic"
"time"

"github.com/filecoin-project/pubsub"
"github.com/filecoin-project/venus/pkg/chainsync/types"
types2 "github.com/filecoin-project/venus/venus-shared/types"
"github.com/streadway/handy/atomic"
Expand All @@ -23,6 +24,8 @@ const DefaultInQueueSize = 5
// DefaultWorkQueueSize is the bucketSize of the work queue
const DefaultWorkQueueSize = 15

const LocalIncoming = "incoming"

// dispatchSyncer is the interface of the logic syncing incoming chains
type dispatchSyncer interface {
Head() *types2.TipSet
Expand All @@ -44,6 +47,7 @@ func NewDispatcherWithSizes(syncer dispatchSyncer, workQueueSize, inQueueSize in
registeredCb: func(t *types.Target, err error) {},
cancelControler: list.New(),
maxCount: 1,
incomingPubsub: pubsub.New(50),
}
}

Expand Down Expand Up @@ -84,6 +88,8 @@ type Dispatcher struct {
lk sync.Mutex
conCurrent atomic.Int
maxCount int64

incomingPubsub *pubsub.PubSub
}

// SyncTracker returns the target tracker of syncing
Expand All @@ -107,6 +113,7 @@ func (d *Dispatcher) SendGossipBlock(ci *types2.ChainInfo) error {
}

func (d *Dispatcher) addTracker(ci *types2.ChainInfo) error {
d.incomingPubsub.Pub(ci.Head.Blocks(), LocalIncoming)
d.incoming <- &types.Target{
ChainInfo: *ci,
Base: d.syncer.Head(),
Expand Down Expand Up @@ -176,6 +183,41 @@ func (d *Dispatcher) Concurrent() int64 {
return d.maxCount
}

func (d *Dispatcher) IncomingBlocks(ctx context.Context) (<-chan *types2.BlockHeader, error) {
sub := d.incomingPubsub.Sub(LocalIncoming)
out := make(chan *types2.BlockHeader, 32)

go func() {
defer func() {
close(out)

d.incomingPubsub.Unsub(sub)
}()

for {
select {
case val, ok := <-sub:
if !ok {
return
}
for _, blk := range val.([]*types2.BlockHeader) {
select {
case out <- blk:
case <-ctx.Done():
return
default:
log.Infof("incoming blocks subscription due to slow reader")
}
}
case <-ctx.Done():
return
}
}
}()

return out, nil
}

func (d *Dispatcher) selectTarget(lastTarget *types.Target, ch <-chan struct{}) (*types.Target, bool) {
exitFor:
for { // we are purpose to consume all notifies in channel
Expand Down
63 changes: 63 additions & 0 deletions venus-shared/api/chain/v0/method.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ curl http://<ip>:<port>/rpc/v0 -X POST -H "Content-Type: application/json" -H "
* [ChainTipSetWeight](#chaintipsetweight)
* [Concurrent](#concurrent)
* [SetConcurrent](#setconcurrent)
* [SyncIncomingBlocks](#syncincomingblocks)
* [SyncState](#syncstate)
* [SyncSubmitBlock](#syncsubmitblock)
* [SyncerTracker](#syncertracker)
Expand Down Expand Up @@ -5633,6 +5634,68 @@ Inputs:

Response: `{}`

### SyncIncomingBlocks
SyncIncomingBlocks returns a channel streaming incoming, potentially not
yet synced block headers.


Perms: read

Inputs: `[]`

Response:
```json
{
"Miner": "f01234",
"Ticket": {
"VRFProof": "Bw=="
},
"ElectionProof": {
"WinCount": 9,
"VRFProof": "Bw=="
},
"BeaconEntries": [
{
"Round": 42,
"Data": "Ynl0ZSBhcnJheQ=="
}
],
"WinPoStProof": [
{
"PoStProof": 8,
"ProofBytes": "Ynl0ZSBhcnJheQ=="
}
],
"Parents": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
],
"ParentWeight": "0",
"Height": 10101,
"ParentStateRoot": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"ParentMessageReceipts": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Messages": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"BLSAggregate": {
"Type": 2,
"Data": "Ynl0ZSBhcnJheQ=="
},
"Timestamp": 42,
"BlockSig": {
"Type": 2,
"Data": "Ynl0ZSBhcnJheQ=="
},
"ForkSignaling": 42,
"ParentBaseFee": "0"
}
```

### SyncState


Expand Down
15 changes: 15 additions & 0 deletions venus-shared/api/chain/v0/mock/mock_fullnode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions venus-shared/api/chain/v0/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions venus-shared/api/chain/v0/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ type ISyncer interface {
ChainTipSetWeight(ctx context.Context, tsk types.TipSetKey) (big.Int, error) //perm:read
SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error //perm:write
SyncState(ctx context.Context) (*types.SyncState, error) //perm:read
// SyncIncomingBlocks returns a channel streaming incoming, potentially not
// yet synced block headers.
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) //perm:read
}
63 changes: 63 additions & 0 deletions venus-shared/api/chain/v1/method.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ curl http://<ip>:<port>/rpc/v1 -X POST -H "Content-Type: application/json" -H "
* [ChainTipSetWeight](#chaintipsetweight)
* [Concurrent](#concurrent)
* [SetConcurrent](#setconcurrent)
* [SyncIncomingBlocks](#syncincomingblocks)
* [SyncState](#syncstate)
* [SyncSubmitBlock](#syncsubmitblock)
* [SyncerTracker](#syncertracker)
Expand Down Expand Up @@ -6768,6 +6769,68 @@ Inputs:

Response: `{}`

### SyncIncomingBlocks
SyncIncomingBlocks returns a channel streaming incoming, potentially not
yet synced block headers.


Perms: read

Inputs: `[]`

Response:
```json
{
"Miner": "f01234",
"Ticket": {
"VRFProof": "Bw=="
},
"ElectionProof": {
"WinCount": 9,
"VRFProof": "Bw=="
},
"BeaconEntries": [
{
"Round": 42,
"Data": "Ynl0ZSBhcnJheQ=="
}
],
"WinPoStProof": [
{
"PoStProof": 8,
"ProofBytes": "Ynl0ZSBhcnJheQ=="
}
],
"Parents": [
{
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
],
"ParentWeight": "0",
"Height": 10101,
"ParentStateRoot": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"ParentMessageReceipts": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"Messages": {
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
},
"BLSAggregate": {
"Type": 2,
"Data": "Ynl0ZSBhcnJheQ=="
},
"Timestamp": 42,
"BlockSig": {
"Type": 2,
"Data": "Ynl0ZSBhcnJheQ=="
},
"ForkSignaling": 42,
"ParentBaseFee": "0"
}
```

### SyncState


Expand Down
15 changes: 15 additions & 0 deletions venus-shared/api/chain/v1/mock/mock_fullnode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions venus-shared/api/chain/v1/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions venus-shared/api/chain/v1/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ type ISyncer interface {
ChainTipSetWeight(ctx context.Context, tsk types.TipSetKey) (big.Int, error) //perm:read
SyncSubmitBlock(ctx context.Context, blk *types.BlockMsg) error //perm:write
SyncState(ctx context.Context) (*types.SyncState, error) //perm:read
// SyncIncomingBlocks returns a channel streaming incoming, potentially not
// yet synced block headers.
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) //perm:read
}
2 changes: 0 additions & 2 deletions venus-shared/compatible-checks/api-diff.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ github.com/filecoin-project/venus/venus-shared/api/chain/v0.FullNode <> github.c
> StateReplay {[func(context.Context, types.TipSetKey, cid.Cid) (*types.InvocResult, error) <> func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error)] base=func out type: #0 input; nested={[*types.InvocResult <> *api.InvocResult] base=pointed type; nested={[types.InvocResult <> api.InvocResult] base=struct field; nested={[types.InvocResult <> api.InvocResult] base=exported field type: #4 field named ExecutionTrace; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=struct field; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=exported fields count: 6 != 4; nested=nil}}}}}}
- SyncCheckBad
- SyncCheckpoint
- SyncIncomingBlocks
- SyncMarkBad
- SyncUnmarkAllBad
- SyncUnmarkBad
Expand Down Expand Up @@ -233,7 +232,6 @@ github.com/filecoin-project/venus/venus-shared/api/chain/v1.FullNode <> github.c
> StateReplay {[func(context.Context, types.TipSetKey, cid.Cid) (*types.InvocResult, error) <> func(context.Context, types.TipSetKey, cid.Cid) (*api.InvocResult, error)] base=func out type: #0 input; nested={[*types.InvocResult <> *api.InvocResult] base=pointed type; nested={[types.InvocResult <> api.InvocResult] base=struct field; nested={[types.InvocResult <> api.InvocResult] base=exported field type: #4 field named ExecutionTrace; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=struct field; nested={[types.ExecutionTrace <> types.ExecutionTrace] base=exported fields count: 6 != 4; nested=nil}}}}}}
- SyncCheckBad
- SyncCheckpoint
- SyncIncomingBlocks
- SyncMarkBad
- SyncUnmarkAllBad
- SyncUnmarkBad
Expand Down

0 comments on commit eb6fe63

Please sign in to comment.