From 7eaead1c5fcbe9fc5e1fd75883f6a7c571efa811 Mon Sep 17 00:00:00 2001 From: senthil Date: Wed, 12 Aug 2020 17:36:57 +0530 Subject: [PATCH] refactor pvtdatastore Signed-off-by: senthil --- core/ledger/pvtdatastorage/store.go | 229 ++++++++++++---------------- 1 file changed, 97 insertions(+), 132 deletions(-) diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index d0b6e1efc1f..ce1ef35b0e3 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -108,11 +108,8 @@ type storeEntries struct { type lastUpdatedOldBlocksList []uint64 type entriesForPvtDataOfOldBlocks struct { - // for each , store the dataEntry, i.e., pvtData - dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet - // store the retrieved (& updated) expiryData in expiryEntries - expiryEntries map[expiryKey]*ExpiryData - // for each , store the retrieved (& updated) bitmap in the missingDataEntries + dataEntries map[dataKey]*rwset.CollectionPvtReadWriteSet + expiryEntries map[expiryKey]*ExpiryData missingDataEntries map[nsCollBlk]*bitset.BitSet } @@ -263,13 +260,12 @@ func (s *Store) Commit(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtD // CommitPvtDataOfOldBlocks commits the pvtData (i.e., previously missing data) of old blocks. // The parameter `blocksPvtData` refers a list of old block's pvtdata which are missing in the pvtstore. -// Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following four +// Given a list of old block's pvtData, `CommitPvtDataOfOldBlocks` performs the following three // operations -// (1) construct dataEntries for all pvtData -// (2) construct update entries (i.e., dataEntries, expiryEntries, missingDataEntries) +// (1) construct update entries (i.e., dataEntries, expiryEntries, missingDataEntries) // from the above created data entries -// (3) create a db update batch from the update entries -// (4) commit the update batch to the pvtStore +// (2) create a db update batch from the update entries +// (3) commit the update batch to the pvtStore func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error { if s.isLastUpdatedOldBlocksSet { return &ErrIllegalCall{`The lastUpdatedOldBlocksList is set. It means that the @@ -277,63 +273,56 @@ func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPv } logger.Debugf("Constructing pvtdatastore entries for pvtData of [%d] old blocks", len(blocksPvtData)) - updateEntries, err := s.constructUpdateEntries(blocksPvtData) + entries, err := s.constructEntries(blocksPvtData) if err != nil { return err } logger.Debug("Constructing update batch from pvtdatastore entries") - batch, err := s.constructUpdateBatch(updateEntries) - if err != nil { + batch := s.db.NewUpdateBatch() + if err := entries.addToUpdateBatch(batch); err != nil { return err } logger.Debug("Committing the update batch to pvtdatastore") - if err := s.commitBatch(batch); err != nil { - return err - } - - return nil + return s.db.WriteBatch(batch, true) } -func (s *Store) constructUpdateEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) { +func (s *Store) constructEntries(blocksPvtData map[uint64][]*ledger.TxPvtData) (*entriesForPvtDataOfOldBlocks, error) { var dataEntries []*dataEntry for blkNum, pvtData := range blocksPvtData { dataEntries = append(dataEntries, prepareDataEntries(blkNum, pvtData)...) } - updateEntries := &entriesForPvtDataOfOldBlocks{ + entries := &entriesForPvtDataOfOldBlocks{ dataEntries: make(map[dataKey]*rwset.CollectionPvtReadWriteSet), expiryEntries: make(map[expiryKey]*ExpiryData), - missingDataEntries: make(map[nsCollBlk]*bitset.BitSet)} + missingDataEntries: make(map[nsCollBlk]*bitset.BitSet), + } - // for each data entry, first, get the expiryData and missingData from the pvtStore. - // Second, update the expiryData and missingData as per the data entry. Finally, add - // the data entry along with the updated expiryData and missingData to the update entries for _, dataEntry := range dataEntries { - // get the expiryBlk number to construct the expiryKey - expiryKey, err := s.constructExpiryKeyFromDataEntry(dataEntry) + var expData *ExpiryData + nsCollBlk := dataEntry.key.nsCollBlk + txNum := dataEntry.key.txNum + + expKey, err := s.constructExpiryKeyFromDataEntry(dataEntry) if err != nil { return nil, err } - - // get the existing expiryData entry - var expiryData *ExpiryData - if !neverExpires(expiryKey.expiringBlk) { - if expiryData, err = s.getExpiryDataFromUpdateEntriesOrStore(updateEntries, expiryKey); err != nil { + if !neverExpires(expKey.expiringBlk) { + if expData, err = s.getExpiryDataFromEntriesOrStore(entries, expKey); err != nil { return nil, err } - if expiryData == nil { + if expData == nil { // data entry is already expired // and purged (a rare scenario) continue } + expData.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum) } - // get the existing missingData entry var missingData *bitset.BitSet - nsCollBlk := dataEntry.key.nsCollBlk - if missingData, err = s.getMissingDataFromUpdateEntriesOrStore(updateEntries, nsCollBlk); err != nil { + if missingData, err = s.getMissingDataFromEntriesOrStore(entries, nsCollBlk); err != nil { return nil, err } if missingData == nil { @@ -341,15 +330,11 @@ func (s *Store) constructUpdateEntries(blocksPvtData map[uint64][]*ledger.TxPvtD // and purged (a rare scenario) continue } + missingData.Clear(uint(txNum)) - updateEntries.addDataEntry(dataEntry) - if expiryData != nil { // would be nil for the never expiring entry - expiryEntry := &expiryEntry{&expiryKey, expiryData} - updateEntries.updateAndAddExpiryEntry(expiryEntry, dataEntry.key) - } - updateEntries.updateAndAddMissingDataEntry(missingData, dataEntry.key) + entries.add(dataEntry, expKey, expData, missingData) } - return updateEntries, nil + return entries, nil } func (s *Store) constructExpiryKeyFromDataEntry(dataEntry *dataEntry) (expiryKey, error) { @@ -359,140 +344,120 @@ func (s *Store) constructExpiryKeyFromDataEntry(dataEntry *dataEntry) (expiryKey if err != nil { return expiryKey{}, err } - return expiryKey{expiringBlk, nsCollBlk.blkNum}, nil + + return expiryKey{ + expiringBlk: expiringBlk, + committingBlk: nsCollBlk.blkNum, + }, nil } -func (s *Store) getExpiryDataFromUpdateEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, expiryKey expiryKey) (*ExpiryData, error) { - expiryData, ok := updateEntries.expiryEntries[expiryKey] - if !ok { - var err error - expiryData, err = s.getExpiryDataOfExpiryKey(&expiryKey) - if err != nil { - return nil, err - } +func (s *Store) getExpiryDataFromEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, expiryKey expiryKey) (*ExpiryData, error) { + if expiryData, ok := updateEntries.expiryEntries[expiryKey]; ok { + return expiryData, nil + } + + expiryData, err := s.getExpiryDataOfExpiryKey(&expiryKey) + if err != nil { + return nil, err } return expiryData, nil } -func (s *Store) getMissingDataFromUpdateEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { - missingData, ok := updateEntries.missingDataEntries[nsCollBlk] - if !ok { - var err error - missingDataKey := &missingDataKey{nsCollBlk, true} - missingData, err = s.getBitmapOfMissingDataKey(missingDataKey) - if err != nil { - return nil, err - } +func (s *Store) getMissingDataFromEntriesOrStore(updateEntries *entriesForPvtDataOfOldBlocks, nsCollBlk nsCollBlk) (*bitset.BitSet, error) { + if missingData, ok := updateEntries.missingDataEntries[nsCollBlk]; ok { + return missingData, nil } - return missingData, nil -} -func (updateEntries *entriesForPvtDataOfOldBlocks) addDataEntry(dataEntry *dataEntry) { - dataKey := dataKey{dataEntry.key.nsCollBlk, dataEntry.key.txNum} - updateEntries.dataEntries[dataKey] = dataEntry.value + missingDataKey := &missingDataKey{ + nsCollBlk: nsCollBlk, + isEligible: true, + } + missingData, err := s.getBitmapOfMissingDataKey(missingDataKey) + if err != nil { + return nil, err + } + return missingData, nil } -func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddExpiryEntry(expiryEntry *expiryEntry, dataKey *dataKey) { - txNum := dataKey.txNum - nsCollBlk := dataKey.nsCollBlk - // update - expiryEntry.value.addPresentData(nsCollBlk.ns, nsCollBlk.coll, txNum) - // we cannot delete entries from MissingDataMap as - // we keep only one entry per missing - // irrespective of the number of txNum. +func (e *entriesForPvtDataOfOldBlocks) add(datEntry *dataEntry, expKey expiryKey, expData *ExpiryData, missingData *bitset.BitSet) { + dataKey := dataKey{ + nsCollBlk: datEntry.key.nsCollBlk, + txNum: datEntry.key.txNum, + } + e.dataEntries[dataKey] = datEntry.value - // add - expiryKey := expiryKey{expiryEntry.key.expiringBlk, expiryEntry.key.committingBlk} - updateEntries.expiryEntries[expiryKey] = expiryEntry.value -} + if expData != nil { + e.expiryEntries[expKey] = expData + } -func (updateEntries *entriesForPvtDataOfOldBlocks) updateAndAddMissingDataEntry(missingData *bitset.BitSet, dataKey *dataKey) { - txNum := dataKey.txNum - nsCollBlk := dataKey.nsCollBlk - // update - missingData.Clear(uint(txNum)) - // add - updateEntries.missingDataEntries[nsCollBlk] = missingData + e.missingDataEntries[dataKey.nsCollBlk] = missingData } -func (s *Store) constructUpdateBatch(updateEntries *entriesForPvtDataOfOldBlocks) (*leveldbhelper.UpdateBatch, error) { - batch := s.db.NewUpdateBatch() - - // add the following four types of entries to the update batch: (1) new data entries - // (i.e., pvtData), (2) updated expiry entries, (3) updated missing data entries, and - // (4) updated block list - - // (1) add new data entries to the batch - if err := addNewDataEntriesToUpdateBatch(batch, updateEntries); err != nil { - return nil, err - } - - // (2) add updated expiryEntry to the batch - if err := addUpdatedExpiryEntriesToUpdateBatch(batch, updateEntries); err != nil { - return nil, err +func (e *entriesForPvtDataOfOldBlocks) addToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { + if err := e.addDataEntriesToUpdateBatch(batch); err != nil { + return err } - // (3) add updated missingData to the batch - if err := addUpdatedMissingDataEntriesToUpdateBatch(batch, updateEntries); err != nil { - return nil, err + if err := e.addExpiryEntriesToUpdateBatch(batch); err != nil { + return err } - return batch, nil + return e.addMissingDataEntriesToUpdateBatch(batch) } -func addNewDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch, entries *entriesForPvtDataOfOldBlocks) error { - var keyBytes, valBytes []byte +func (e *entriesForPvtDataOfOldBlocks) addDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { + var key, val []byte var err error - for dataKey, pvtData := range entries.dataEntries { - keyBytes = encodeDataKey(&dataKey) - if valBytes, err = encodeDataValue(pvtData); err != nil { + + for dataKey, pvtData := range e.dataEntries { + key = encodeDataKey(&dataKey) + if val, err = encodeDataValue(pvtData); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } return nil } -func addUpdatedExpiryEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch, entries *entriesForPvtDataOfOldBlocks) error { - var keyBytes, valBytes []byte +func (e *entriesForPvtDataOfOldBlocks) addExpiryEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { + var key, val []byte var err error - for expiryKey, expiryData := range entries.expiryEntries { - keyBytes = encodeExpiryKey(&expiryKey) - if valBytes, err = encodeExpiryValue(expiryData); err != nil { + + for expiryKey, expiryData := range e.expiryEntries { + key = encodeExpiryKey(&expiryKey) + if val, err = encodeExpiryValue(expiryData); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } return nil } -func addUpdatedMissingDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch, entries *entriesForPvtDataOfOldBlocks) error { - var keyBytes, valBytes []byte +func (e *entriesForPvtDataOfOldBlocks) addMissingDataEntriesToUpdateBatch(batch *leveldbhelper.UpdateBatch) error { + var key, val []byte var err error - for nsCollBlk, missingData := range entries.missingDataEntries { - keyBytes = encodeMissingDataKey(&missingDataKey{nsCollBlk, true}) - // if the missingData is empty, we need to delete the missingDataKey + + for nsCollBlk, missingData := range e.missingDataEntries { + key = encodeMissingDataKey( + &missingDataKey{ + nsCollBlk: nsCollBlk, + isEligible: true, + }, + ) + if missingData.None() { - batch.Delete(keyBytes) + batch.Delete(key) continue } - if valBytes, err = encodeMissingDataValue(missingData); err != nil { + + if val, err = encodeMissingDataValue(missingData); err != nil { return err } - batch.Put(keyBytes, valBytes) + batch.Put(key, val) } return nil } -func (s *Store) commitBatch(batch *leveldbhelper.UpdateBatch) error { - // commit the batch to the store - if err := s.db.WriteBatch(batch, true); err != nil { - return err - } - - return nil -} - // GetLastUpdatedOldBlocksPvtData returns the pvtdata of blocks listed in `lastUpdatedOldBlocksList` // TODO FAB-16293 -- GetLastUpdatedOldBlocksPvtData() can be removed either in v2.0 or in v2.1. // If we decide to rebuild stateDB in v2.0, by default, the rebuild logic would take