Skip to content

Commit

Permalink
rollback: fix capability usage at gossip
Browse files Browse the repository at this point in the history
Currently, we pass a static capability to the gossip which uses it
to check whether to store the pvtData of invalid tx. With a channel
update, the capability does not get updated. Hence, we need to use
the channel with bundle listener so that the capability gets
updated after a commit of a config update transaction.

FAB-16114 #done

Change-Id: I19d769432ce5783a1afdded5e7d4be9d682c2672
Signed-off-by: senthil <[email protected]>
Signed-off-by: manish <[email protected]>
  • Loading branch information
cendhu authored and manish-sethi committed Aug 1, 2019
1 parent ea4fb38 commit ad802f9
Show file tree
Hide file tree
Showing 9 changed files with 455 additions and 192 deletions.
27 changes: 26 additions & 1 deletion core/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,15 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccp
cs.Resources = bundle
}

cp := &capabilityProvider{}

cs.bundleSource = channelconfig.NewBundleSource(
bundle,
gossipCallbackWrapper,
trustedRootsCallbackWrapper,
mspCallback,
peerSingletonCallback,
cp.updateChannelConfig,
)

vcs := struct {
Expand Down Expand Up @@ -460,6 +463,7 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccp
if err != nil {
return errors.Wrapf(err, "[channel %s] failed opening transient store", bundle.ConfigtxValidator().ChainID())
}

csStoreSupport := &CollectionSupport{
PeerLedger: ledger,
}
Expand All @@ -476,7 +480,7 @@ func createChain(cid string, ledger ledger.PeerLedger, cb *common.Block, ccp ccp
Store: store,
Cs: simpleCollectionStore,
IdDeserializeFactory: csStoreSupport,
Capabilities: cs.Application.Capabilities(),
CapabilityProvider: cp,
})

chains.Lock()
Expand Down Expand Up @@ -843,3 +847,24 @@ func (*configSupport) GetChannelConfig(channel string) cc.Config {
}
return chain.cs.bundleSource.ConfigtxValidator()
}

type capabilityProvider struct {
lock sync.Mutex
bundle *channelconfig.Bundle
}

func (cp *capabilityProvider) Capabilities() channelconfig.ApplicationCapabilities {
cp.lock.Lock()
defer cp.lock.Unlock()
ac, ok := cp.bundle.ApplicationConfig()
if !ok {
return nil
}
return ac.Capabilities()
}

func (cp *capabilityProvider) updateChannelConfig(bundle *channelconfig.Bundle) {
cp.lock.Lock()
defer cp.lock.Unlock()
cp.bundle = bundle
}
24 changes: 0 additions & 24 deletions gossip/mocks/app_capabilities.go

This file was deleted.

21 changes: 12 additions & 9 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
vsccErrors "github.com/hyperledger/fabric/common/errors"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/committer"
Expand All @@ -39,7 +40,6 @@ var logger = util.GetLogger(util.PrivateDataLogger, "")

//go:generate mockery -dir ../../core/common/privdata/ -name CollectionStore -case underscore -output mocks/
//go:generate mockery -dir ../../core/committer/ -name Committer -case underscore -output mocks/
//go:generate mockery -dir ./ -name AppCapabilities -case underscore -output ../mocks/

// TransientStore holds private data that the corresponding blocks haven't been committed yet into the ledger
type TransientStore interface {
Expand Down Expand Up @@ -106,11 +106,12 @@ type Fetcher interface {
fetch(dig2src dig2sources) (*privdatacommon.FetchedPvtDataContainer, error)
}

// AppCapabilities defines the capabilities for the application portion of a channel
type AppCapabilities interface {
// StorePvtDataOfInvalidTx() returns true if the peer needs to store the pvtData of
// invalid transactions.
StorePvtDataOfInvalidTx() bool
//go:generate mockery -dir ./ -name CapabilityProvider -case underscore -output mocks/

// CapabilityProvider contains functions to retrieve capability information for a channel
type CapabilityProvider interface {
// Capabilities defines the capabilities for the application portion of this channel
Capabilities() channelconfig.ApplicationCapabilities
}

// Support encapsulates set of interfaces to
Expand All @@ -122,7 +123,7 @@ type Support struct {
committer.Committer
TransientStore
Fetcher
AppCapabilities
CapabilityProvider
}

type coordinator struct {
Expand Down Expand Up @@ -705,7 +706,8 @@ func (c *coordinator) listMissingPrivateData(block *common.Block, ownedRWsets ma
privateRWsetsInBlock: privateRWsetsInBlock,
coordinator: c,
}
txList, err := data.forEachTxn(c.Support.StorePvtDataOfInvalidTx(), txsFilter, bi.inspectTransaction)
storePvtDataOfInvalidTx := c.Support.CapabilityProvider.Capabilities().StorePvtDataOfInvalidTx()
txList, err := data.forEachTxn(storePvtDataOfInvalidTx, txsFilter, bi.inspectTransaction)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -882,7 +884,8 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo common

seqs2Namespaces := aggregatedCollections(make(map[seqAndDataModel]map[string][]*rwset.CollectionPvtReadWriteSet))
data := blockData(blockAndPvtData.Block.Data.Data)
data.forEachTxn(c.Support.StorePvtDataOfInvalidTx(), make(txValidationFlags, len(data)),
storePvtDataOfInvalidTx := c.Support.CapabilityProvider.Capabilities().StorePvtDataOfInvalidTx()
data.forEachTxn(storePvtDataOfInvalidTx, make(txValidationFlags, len(data)),
func(seqInBlock uint64, chdr *common.ChannelHeader, txRWSet *rwsetutil.TxRwSet, _ []*peer.Endorsement) error {
item, exists := blockAndPvtData.PvtData[seqInBlock]
if !exists {
Expand Down
Loading

0 comments on commit ad802f9

Please sign in to comment.