Skip to content

Commit

Permalink
Merge "filelock to allow one of peer start, reset, rollback" into rel…
Browse files Browse the repository at this point in the history
…ease-1.4
  • Loading branch information
manish-sethi authored and Gerrit Code Review committed Jul 16, 2019
2 parents 61199ed + b137f5b commit d339219
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 10 deletions.
56 changes: 56 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package leveldbhelper
import (
"fmt"
"sync"
"syscall"

"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/util"
Expand Down Expand Up @@ -154,3 +155,58 @@ func (dbInst *DB) WriteBatch(batch *leveldb.Batch, sync bool) error {
}
return nil
}

// FileLock encapsulate the DB that holds the file lock.
// As the FileLock to be used by a single process/goroutine,
// there is no need for the semaphore to synchronize the
// FileLock usage.
type FileLock struct {
db *leveldb.DB
filePath string
}

// NewFileLock returns a new file based lock manager.
func NewFileLock(filePath string) *FileLock {
return &FileLock{
filePath: filePath,
}
}

// Lock acquire a file lock. We achieve this by opening
// a db for the given filePath. Internally, leveldb acquires a
// file lock while opening a db. If the db is opened again by the same or
// another process, error would be returned. When the db is closed
// or the owner process dies, the lock would be released and hence
// the other process can open the db. We exploit this leveldb
// functionality to acquire and release file lock as the leveldb
// supports this for Windows, Solaris, and Unix.
func (f *FileLock) Lock() error {
dbOpts := &opt.Options{}
var err error
var dirEmpty bool
if dirEmpty, err = util.CreateDirIfMissing(f.filePath); err != nil {
panic(fmt.Sprintf("Error creating dir if missing: %s", err))
}
dbOpts.ErrorIfMissing = !dirEmpty
f.db, err = leveldb.OpenFile(f.filePath, dbOpts)
if err != nil && err == syscall.EAGAIN {
return errors.New(fmt.Sprintf("lock is already acquired on file %s", f.filePath))
}
if err != nil {
panic(fmt.Sprintf("Error acquiring lock on file %s: %s", f.filePath, err))
}
return nil
}

// Unlock releases a previously acquired lock. We achieve this by closing
// the previously opened db. FileUnlock can be called multiple times.
func (f *FileLock) Unlock() {
if f.db == nil {
return
}
if err := f.db.Close(); err != nil {
logger.Warningf("unable to release the lock on file %s: %s", f.filePath, err)
return
}
f.db = nil
}
47 changes: 47 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package leveldbhelper

import (
"fmt"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -109,6 +110,52 @@ func TestLevelDBHelper(t *testing.T) {
assert.Equal(t, []string{"key1", "key2"}, keys)
}

func TestFileLock(t *testing.T) {
// create 1st fileLock manager
fileLockPath := testDBPath + "/fileLock"
fileLock1 := NewFileLock(fileLockPath)
assert.Nil(t, fileLock1.db)
assert.Equal(t, fileLock1.filePath, fileLockPath)

// acquire the file lock using the fileLock manager 1
err := fileLock1.Lock()
assert.NoError(t, err)
assert.NotNil(t, fileLock1.db)

// create 2nd fileLock manager
fileLock2 := NewFileLock(fileLockPath)
assert.Nil(t, fileLock2.db)
assert.Equal(t, fileLock2.filePath, fileLockPath)

// try to acquire the file lock again using the fileLock2
// would result in an error
err = fileLock2.Lock()
expectedErr := fmt.Sprintf("lock is already acquired on file %s", fileLockPath)
assert.EqualError(t, err, expectedErr)
assert.Nil(t, fileLock2.db)

// release the file lock acquired using fileLock1
fileLock1.Unlock()
assert.Nil(t, fileLock1.db)

// As the fileLock1 has released the lock,
// the fileLock2 can acquire the lock.
err = fileLock2.Lock()
assert.NoError(t, err)
assert.NotNil(t, fileLock2.db)

// release the file lock acquired using fileLock 2
fileLock2.Unlock()
assert.Nil(t, fileLock1.db)

// unlock can be called multiple times and it is safe
fileLock2.Unlock()
assert.Nil(t, fileLock1.db)

// cleanup
assert.NoError(t, os.RemoveAll(fileLockPath))
}

func TestCreateDBInEmptyDir(t *testing.T) {
assert.NoError(t, os.RemoveAll(testDBPath), "")
assert.NoError(t, os.MkdirAll(testDBPath, 0775), "")
Expand Down
11 changes: 10 additions & 1 deletion core/ledger/kvledger/kv_ledger_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Provider struct {
initializer *ledger.Initializer
collElgNotifier *collElgNotifier
stats *stats
fileLock *leveldbhelper.FileLock
}

// NewProvider instantiates a new Provider.
Expand All @@ -60,9 +61,16 @@ func NewProvider() (ledger.PeerLedgerProvider, error) {
idStore := openIDStore(ledgerconfig.GetLedgerProviderPath())
// Initialize the history database (index for history of values by key)
historydbProvider := historyleveldb.NewHistoryDBProvider()

fileLock := leveldbhelper.NewFileLock(ledgerconfig.GetFileLockPath())
if err := fileLock.Lock(); err != nil {
return nil, errors.Wrap(err, "as another peer node command is executing,"+
" wait for that command to complete its execution or terminate it before retrying")
}

logger.Info("ledger provider Initialized")
provider := &Provider{idStore, nil,
nil, historydbProvider, nil, nil, nil, nil, nil, nil}
nil, historydbProvider, nil, nil, nil, nil, nil, nil, fileLock}
return provider, nil
}

Expand Down Expand Up @@ -195,6 +203,7 @@ func (provider *Provider) Close() {
provider.historydbProvider.Close()
provider.bookkeepingProvider.Close()
provider.configHistoryMgr.Close()
provider.fileLock.Unlock()
}

// recoverUnderConstructionLedger checks whether the under construction flag is set - this would be the case
Expand Down
11 changes: 10 additions & 1 deletion core/ledger/kvledger/reset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@ package kvledger

import (
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/pkg/errors"
)

// ResetAllKVLedgers resets all ledger to the genesis block.
func ResetAllKVLedgers() error {
logger.Info("Resetting all channel ledgers to genesis block")
fileLock := leveldbhelper.NewFileLock(ledgerconfig.GetFileLockPath())
if err := fileLock.Lock(); err != nil {
return errors.Wrap(err, "as another peer node command is executing,"+
" wait for that command to complete its execution or terminate it before retrying")
}
defer fileLock.Unlock()

logger.Info("Resetting all ledgers to genesis block")
ledgerDataFolder := ledgerconfig.GetRootPath()
logger.Infof("Ledger data folder from config = [%s]", ledgerDataFolder)

Expand Down
9 changes: 9 additions & 0 deletions core/ledger/kvledger/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,21 @@ SPDX-License-Identifier: Apache-2.0
package kvledger

import (
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgerstorage"
"github.com/pkg/errors"
)

// RollbackKVLedger rollbacks a ledger to a specified block number
func RollbackKVLedger(ledgerID string, blockNum uint64) error {
fileLock := leveldbhelper.NewFileLock(ledgerconfig.GetFileLockPath())
if err := fileLock.Lock(); err != nil {
return errors.Wrap(err, "as another peer node command is executing,"+
" wait for that command to complete its execution or terminate it before retrying")
}
defer fileLock.Unlock()

blockstorePath := ledgerconfig.GetBlockStorePath()
if err := ledgerstorage.ValidateRollbackParams(blockstorePath, ledgerID, blockNum); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions core/ledger/ledgerconfig/ledger_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const confBookkeeper = "bookkeeper"
const confConfigHistory = "configHistory"
const confChains = "chains"
const confPvtdataStore = "pvtdataStore"
const fileLockPath = "fileLock"
const confTotalQueryLimit = "ledger.state.totalQueryLimit"
const confInternalQueryLimit = "ledger.state.couchDBConfig.internalQueryLimit"
const confEnableHistoryDatabase = "ledger.history.enableHistoryDatabase"
Expand Down Expand Up @@ -78,6 +79,11 @@ func GetInternalBookkeeperPath() string {
return filepath.Join(GetRootPath(), confBookkeeper)
}

// GetFileLockPath returns the filesystem path that is used to create a file lock
func GetFileLockPath() string {
return filepath.Join(GetRootPath(), fileLockPath)
}

// GetConfigHistoryPath returns the filesystem path that is used for maintaining history of chaincodes collection configurations
func GetConfigHistoryPath() string {
return filepath.Join(GetRootPath(), confConfigHistory)
Expand Down
2 changes: 2 additions & 0 deletions core/ledger/ledgerconfig/ledger_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func TestLedgerConfigPathDefault(t *testing.T) {
assert.Equal(t, "/var/hyperledger/production/ledgersData/chains", GetBlockStorePath())
assert.Equal(t, "/var/hyperledger/production/ledgersData/pvtdataStore", GetPvtdataStorePath())
assert.Equal(t, "/var/hyperledger/production/ledgersData/bookkeeper", GetInternalBookkeeperPath())
assert.Equal(t, "/var/hyperledger/production/ledgersData/fileLock", GetFileLockPath())
}

func TestLedgerConfigPath(t *testing.T) {
Expand All @@ -68,6 +69,7 @@ func TestLedgerConfigPath(t *testing.T) {
assert.Equal(t, "/tmp/hyperledger/production/ledgersData/chains", GetBlockStorePath())
assert.Equal(t, "/tmp/hyperledger/production/ledgersData/pvtdataStore", GetPvtdataStorePath())
assert.Equal(t, "/tmp/hyperledger/production/ledgersData/bookkeeper", GetInternalBookkeeperPath())
assert.Equal(t, "/tmp/hyperledger/production/ledgersData/fileLock", GetFileLockPath())
}

func TestGetTotalLimitDefault(t *testing.T) {
Expand Down
38 changes: 30 additions & 8 deletions integration/ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,20 +122,32 @@ var _ bool = Describe("Rollback & Reset Ledger", func() {
org2peer0 := setup.network.Peer("org2", "peer0")
// block 0: genesis, block 1: org1Anchor, block 2: org2Anchor, block 3: org3Anchor
// block 4: chaincode instantiation, block 5 to 9: chaincode invoke to add marbles.
By("Rolling back peer0.org2 to block 3 from block 9")
Expect(helper.getLedgerHeight(org2peer0)).Should(Equal(10))

By("Rolling back peer0.org2 to block 3 from block 9 while the peer node is online")
expectedErrMessage := "as another peer node command is executing," +
" wait for that command to complete its execution or terminate it before retrying"
helper.rollback(org2peer0, 3, expectedErrMessage, false)

By("Rolling back peer0.org2 to block 3 from block 9 while the peer node is offline")
setup.terminateAllProcess()
helper.rollback(org2peer0, 3)
helper.rollback(org2peer0, 3, "", true)

assertPostRollbackOrReset()
})

It("resets the ledger to the genesis block", func() {
org2peer0 := setup.network.Peer("org2", "peer0")
By("Resetting peer0.org2 to the genesis block")
Expect(helper.getLedgerHeight(org2peer0)).Should(Equal(10))

By("Resetting peer0.org2 to the genesis block while the peer node is online")
expectedErrMessage := "as another peer node command is executing," +
" wait for that command to complete its execution or terminate it before retrying"
helper.reset(org2peer0, expectedErrMessage, false)

By("Resetting peer0.org2 to the genesis block while the peer node is offline")
setup.terminateAllProcess()
helper.reset(org2peer0)
helper.reset(org2peer0, "", true)

assertPostRollbackOrReset()
})
Expand Down Expand Up @@ -307,18 +319,28 @@ func (nh *networkHelper) invokeChaincode(peer *nwo.Peer, command commands.Chainc
Expect(sess.Err).To(gbytes.Say("Chaincode invoke successful."))
}

func (nh *networkHelper) rollback(peer *nwo.Peer, blockNumber int) {
func (nh *networkHelper) rollback(peer *nwo.Peer, blockNumber int, expectedErrMessage string, expectSuccess bool) {
rollbackCmd := commands.NodeRollback{ChannelID: nh.channelID, BlockNumber: blockNumber}
sess, err := nh.PeerUserSession(peer, "User1", rollbackCmd)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, nh.EventuallyTimeout).Should(gexec.Exit(0))
if expectSuccess {
Eventually(sess, nh.EventuallyTimeout).Should(gexec.Exit(0))
} else {
Eventually(sess, nh.EventuallyTimeout).Should(gexec.Exit())
Expect(sess.Err).To(gbytes.Say(expectedErrMessage))
}
}

func (nh *networkHelper) reset(peer *nwo.Peer) {
func (nh *networkHelper) reset(peer *nwo.Peer, expectedErrMessage string, expectSuccess bool) {
resetCmd := commands.NodeReset{}
sess, err := nh.PeerUserSession(peer, "User1", resetCmd)
Expect(err).NotTo(HaveOccurred())
Eventually(sess, nh.EventuallyTimeout).Should(gexec.Exit(0))
if expectSuccess {
Eventually(sess, nh.EventuallyTimeout).Should(gexec.Exit(0))
} else {
Eventually(sess, nh.EventuallyTimeout).Should(gexec.Exit())
Expect(sess.Err).To(gbytes.Say(expectedErrMessage))
}
}

func (nh *networkHelper) waitUntilEndorserEnabled(peer *nwo.Peer) {
Expand Down

0 comments on commit d339219

Please sign in to comment.