Commit f37beaa9 authored by manish's avatar manish
Missingdata-recon: Handle coll eligibility change

This CR handles the event when a peer becomes eligible
for receiving data for an existing collection. All the
missing data entries for the collection that were previously
 marked as 'ineligible' are converted to 'eligible' in a
background goroutine so that the query results for reporting
missing data also include these entries for previous blocks

FAB-11437 #done

Change-Id: I145a079b69e8bf02b4c97da23fbf08d7ce2ae268
Signed-off-by: default avatarmanish <>
parent a2bf9dc7
......@@ -136,6 +136,11 @@ func (batch *UpdateBatch) Delete(key []byte) {
batch.KVs[string(key)] = nil
// Len returns the number of entries in the batch
func (batch *UpdateBatch) Len() int {
return len(batch.KVs)
// Iterator extends actual leveldb iterator
type Iterator struct {
......@@ -38,6 +38,9 @@ const confMaxBatchSize = "ledger.state.couchDBConfig.maxBatchUpdateSize"
const confAutoWarmIndexes = "ledger.state.couchDBConfig.autoWarmIndexes"
const confWarmIndexesAfterNBlocks = "ledger.state.couchDBConfig.warmIndexesAfterNBlocks"
var confCollElgProcMaxDbBatchSize = &conf{"ledger.pvtdataStore.collElgProcMaxDbBatchSize", 5000}
var confCollElgProcDbBatchesInterval = &conf{"ledger.pvtdataStore.collElgProcDbBatchesInterval", 1000}
// GetRootPath returns the filesystem path.
// All ledger related contents are expected to be stored under this path
func GetRootPath() string {
......@@ -85,7 +88,7 @@ func GetMaxBlockfileSize() int {
return 64 * 1024 * 1024
//GetTotalLimit exposes the totalLimit variable
// GetTotalQueryLimit exposes the totalLimit variable
func GetTotalQueryLimit() int {
totalQueryLimit := viper.GetInt(confTotalQueryLimit)
// if queryLimit was unset, default to 10000
......@@ -95,7 +98,7 @@ func GetTotalQueryLimit() int {
return totalQueryLimit
//GetQueryLimit exposes the queryLimit variable
// GetInternalQueryLimit exposes the queryLimit variable
func GetInternalQueryLimit() int {
internalQueryLimit := viper.GetInt(confInternalQueryLimit)
// if queryLimit was unset, default to 1000
......@@ -125,6 +128,26 @@ func GetPvtdataStorePurgeInterval() uint64 {
return uint64(purgeInterval)
// GetPvtdataStoreCollElgProcMaxDbBatchSize returns the maximum db batch size for converting
// the ineligible missing data entries to eligible missing data entries
func GetPvtdataStoreCollElgProcMaxDbBatchSize() int {
collElgProcMaxDbBatchSize := viper.GetInt(confCollElgProcMaxDbBatchSize.Name)
if collElgProcMaxDbBatchSize <= 0 {
collElgProcMaxDbBatchSize = confCollElgProcMaxDbBatchSize.DefaultVal
return collElgProcMaxDbBatchSize
// GetPvtdataStoreCollElgProcDbBatchesInterval returns the minimum duration (in milliseconds) between writing
// two consecutive db batches for converting the ineligible missing data entries to eligible missing data entries
func GetPvtdataStoreCollElgProcDbBatchesInterval() int {
collElgProcDbBatchesInterval := viper.GetInt(confCollElgProcDbBatchesInterval.Name)
if collElgProcDbBatchesInterval <= 0 {
collElgProcDbBatchesInterval = confCollElgProcDbBatchesInterval.DefaultVal
return collElgProcDbBatchesInterval
//IsHistoryDBEnabled exposes the historyDatabase variable
func IsHistoryDBEnabled() bool {
return viper.GetBool(confEnableHistoryDatabase)
......@@ -162,3 +185,8 @@ func GetWarmIndexesAfterNBlocks() int {
return warmAfterNBlocks
type conf struct {
Name string
DefaultVal int
......@@ -158,6 +158,22 @@ func TestPvtdataStorePurgeInterval(t *testing.T) {
assert.Equal(t, uint64(1000), updatedValue) //test config returns 1000
func TestPvtdataStoreCollElgProcMaxDbBatchSize(t *testing.T) {
defaultVal := confCollElgProcMaxDbBatchSize.DefaultVal
testVal := defaultVal + 1
assert.Equal(t, defaultVal, GetPvtdataStoreCollElgProcMaxDbBatchSize())
viper.Set("ledger.pvtdataStore.collElgProcMaxDbBatchSize", testVal)
assert.Equal(t, testVal, GetPvtdataStoreCollElgProcMaxDbBatchSize())
func TestCollElgProcDbBatchesInterval(t *testing.T) {
defaultVal := confCollElgProcDbBatchesInterval.DefaultVal
testVal := defaultVal + 1
assert.Equal(t, defaultVal, GetPvtdataStoreCollElgProcDbBatchesInterval())
viper.Set("ledger.pvtdataStore.collElgProcDbBatchesInterval", testVal)
assert.Equal(t, testVal, GetPvtdataStoreCollElgProcDbBatchesInterval())
func TestIsHistoryDBEnabledDefault(t *testing.T) {
defaultValue := IsHistoryDBEnabled()
......@@ -8,11 +8,13 @@ package pvtdatastorage
import (
......@@ -23,6 +25,7 @@ var (
expiryKeyPrefix = []byte{3}
eligibleMissingDataKeyPrefix = []byte{4}
ineligibleMissingDataKeyPrefix = []byte{5}
collElgKeyPrefix = []byte{6}
nilByte = byte(0)
emptyValue = []byte{}
......@@ -145,9 +148,51 @@ func decodeMissingDataValue(bitmapBytes []byte) (*bitset.BitSet, error) {
return bitmap, nil
func encodeCollElgKey(blkNum uint64) []byte {
return append(collElgKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum)...)
func decodeCollElgKey(b []byte) uint64 {
blkNum, _ := util.DecodeReverseOrderVarUint64(b[1:])
return blkNum
func encodeCollElgVal(m *CollElgInfo) ([]byte, error) {
return proto.Marshal(m)
func decodeCollElgVal(b []byte) (*CollElgInfo, error) {
m := &CollElgInfo{}
if err := proto.Unmarshal(b, m); err != nil {
return nil, errors.WithStack(err)
return m, nil
func createRangeScanKeysForEligibleMissingDataEntries(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(0)...)
return startKey, endKey
func createRangeScanKeysForIneligibleMissingData(maxBlkNum uint64, ns, coll string) (startKey, endKey []byte) {
startKey = encodeMissingDataKey(
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: maxBlkNum},
isEligible: false,
endKey = encodeMissingDataKey(
nsCollBlk: nsCollBlk{ns: ns, coll: coll, blkNum: 0},
isEligible: false,
func createRangeScanKeysForCollElg() (startKey, endKey []byte) {
return encodeCollElgKey(math.MaxUint64),
......@@ -26,3 +26,11 @@ message Collections {
message TxNums {
repeated uint64 list = 1;
message CollElgInfo {
map<string, CollNames> nsCollMap = 1;
message CollNames {
repeated string entries = 1;
......@@ -37,3 +37,16 @@ func (e *ExpiryData) addMissingData(ns, coll string) {
collections := e.getOrCreateCollections(ns)
collections.MissingDataMap[coll] = true
func newCollElgInfo(nsCollMap map[string][]string) *CollElgInfo {
m := &CollElgInfo{NsCollMap: map[string]*CollNames{}}
for ns, colls := range nsCollMap {
collNames, ok := m.NsCollMap[ns]
if !ok {
collNames = &CollNames{}
m.NsCollMap[ns] = collNames
collNames.Entries = colls
return m
......@@ -60,6 +60,11 @@ type Store interface {
Commit() error
// Rollback rolls back the pvt data passed in the previous invoke to the `Prepare` function
Rollback() error
// ProcessCollsEligibilityEnabled notifies the store when the peer becomes eligible to recieve data for an
// existing collection. Parameter 'committingBlk' refers to the block number that contains the corresponding
// collection upgrade transaction and the parameter 'nsCollMap' contains the collections for which the peer
// is now eligible to recieve pvt data
ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap map[string][]string) error
// IsEmpty returns true if the store does not have any block committed yet
IsEmpty() (bool, error)
// LastCommittedBlockHeight returns the height of the last committed block
......@@ -10,6 +10,7 @@ import (
......@@ -35,6 +36,7 @@ type store struct {
lastCommittedBlock uint64
batchPending bool
purgerLock sync.Mutex
collElgProcSync *collElgProcSync
type blkTranNumKey []byte
......@@ -88,10 +90,16 @@ func NewProvider() Provider {
// OpenStore returns a handle to a store
func (p *provider) OpenStore(ledgerid string) (Store, error) {
dbHandle := p.dbProvider.GetDBHandle(ledgerid)
s := &store{db: dbHandle, ledgerid: ledgerid}
s := &store{db: dbHandle, ledgerid: ledgerid,
collElgProcSync: &collElgProcSync{
notification: make(chan bool, 1),
procComplete: make(chan bool, 1),
if err := s.initState(); err != nil {
return nil, err
logger.Debugf("Pvtdata store opened. Initial state: isEmpty [%t], lastCommittedBlock [%d], batchPending [%t]",
s.isEmpty, s.lastCommittedBlock, s.batchPending)
return s, nil
......@@ -352,6 +360,22 @@ func (s *store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M
return missingPvtDataInfo, nil
func (s *store) ProcessCollsEligibilityEnabled(committingBlk uint64, nsCollMap map[string][]string) error {
key := encodeCollElgKey(committingBlk)
m := newCollElgInfo(nsCollMap)
val, err := encodeCollElgVal(m)
if err != nil {
return err
batch := leveldbhelper.NewUpdateBatch()
batch.Put(key, val)
if err = s.db.WriteBatch(batch, true); err != nil {
return err
return nil
func (s *store) performPurgeIfScheduled(latestCommittedBlk uint64) {
if latestCommittedBlk%ledgerconfig.GetPvtdataStorePurgeInterval() != 0 {
......@@ -411,6 +435,80 @@ func (s *store) retrieveExpiryEntries(minBlkNum, maxBlkNum uint64) ([]*expiryEnt
return expiryEntries, nil
func (s *store) launchCollElgProc() {
go func() {
s.processCollElgEvents() // process collection eligibility events when store is opened - in case there is an unprocessed events from previous run
for {
logger.Debugf("Waiting for collection eligibility event")
func (s *store) processCollElgEvents() {
logger.Debugf("Starting to process collection eligibility events")
maxBatchSize := ledgerconfig.GetPvtdataStoreCollElgProcMaxDbBatchSize()
batchesInterval := ledgerconfig.GetPvtdataStoreCollElgProcDbBatchesInterval()
defer s.purgerLock.Unlock()
collElgStartKey, collElgEndKey := createRangeScanKeysForCollElg()
eventItr := s.db.GetIterator(collElgStartKey, collElgEndKey)
defer eventItr.Release()
batch := leveldbhelper.NewUpdateBatch()
totalEntriesConverted := 0
for eventItr.Next() {
collElgKey, collElgVal := eventItr.Key(), eventItr.Value()
blkNum := decodeCollElgKey(collElgKey)
CollElgInfo, err := decodeCollElgVal(collElgVal)
logger.Debugf("Processing collection eligibility event [blkNum=%d], CollElgInfo=%s", blkNum, CollElgInfo)
if err != nil {
logger.Errorf("This error is not expected %s", err)
for ns, colls := range CollElgInfo.NsCollMap {
var coll string
for _, coll = range colls.Entries {
logger.Infof("Converting missing data entries from inelligible to eligible for [ns=%s, coll=%s]", ns, coll)
startKey, endKey := createRangeScanKeysForIneligibleMissingData(blkNum, ns, coll)
collItr := s.db.GetIterator(startKey, endKey)
collEntriesConverted := 0
for collItr.Next() { // each entry
originalKey, originalVal := collItr.Key(), collItr.Value()
modifiedKey := decodeMissingDataKey(originalKey)
modifiedKey.isEligible = true
copyVal := make([]byte, len(originalVal))
copy(copyVal, originalVal)
batch.Put(encodeMissingDataKey(modifiedKey), copyVal)
if batch.Len() > maxBatchSize {
s.db.WriteBatch(batch, true)
batch = leveldbhelper.NewUpdateBatch()
sleepTime := time.Duration(batchesInterval)
logger.Infof("Going to sleep for %d milliseconds between batches. Entries for [ns=%s, coll=%s] converted so far = %d",
sleepTime, ns, coll, collEntriesConverted)
time.Sleep(sleepTime * time.Millisecond)
} // entry loop
logger.Infof("Converted all [%d] entries for [ns=%s, coll=%s]", collEntriesConverted, ns, coll)
totalEntriesConverted += collEntriesConverted
} // coll loop
} // ns loop
batch.Delete(collElgKey) // delete the collection eligibility event key as well
} // event loop
s.db.WriteBatch(batch, true)
logger.Debugf("Converted [%d] inelligible mising data entries to elligible", totalEntriesConverted)
// LastCommittedBlockHeight implements the function in the interface `Store`
func (s *store) LastCommittedBlockHeight() (uint64, error) {
if s.isEmpty {
......@@ -458,3 +556,31 @@ func (s *store) getLastCommittedBlockNum() (bool, uint64, error) {
return false, decodeLastCommittedBlockVal(v), nil
type collElgProcSync struct {
notification, procComplete chan bool
func (sync *collElgProcSync) notify() {
select {
case sync.notification <- true:
logger.Debugf("Signaled to collection elgibility processing routine")
default: //noop
logger.Debugf("Previous signal still pending. Skipping new signal")
func (sync *collElgProcSync) waitForNotification() {
func (sync *collElgProcSync) done() {
select {
case sync.procComplete <- true:
func (sync *collElgProcSync) waitForDone() {
......@@ -13,12 +13,13 @@ import (
btltestutil ""