Commit 8c966365 authored by Senthil Nathan N's avatar Senthil Nathan N
Browse files

recon: convert missingData slice to a map



For any internal usage, it is good to have map so that lookup and
comparison code can be written with lesser lines of codes. If the
order of the stored entries do not matter, then slice is not needed
and adds a lot of complexity during lookups.

Given above, this CR changes the slice used for missingData to a map.

In the BlockAndPvtData, we have Block, BlockPvtData, and Missing.
These names are not very informative and get confused with another
struct named BlockPvtData. Hence, we rename
BlockAndPvtData.BlockPvtData to BlockAndPvtData.PvtData

FAB-12895 #done

Change-Id: I60af0520fc028392703a38e46c5c15ac2707a139
Signed-off-by: default avatarsenthil <cendhu@gmail.com>
parent e061a2d2
......@@ -321,8 +321,8 @@ func endTxSimulation(chainID string, ccid *pb.ChaincodeID, txsim ledger.TxSimula
defer _commitLock_.Unlock()
blockAndPvtData := &ledger.BlockAndPvtData{
Block: block,
BlockPvtData: make(map[uint64]*ledger.TxPvtData),
Block: block,
PvtData: make(ledger.TxPvtDataMap),
}
// All tests are performed with just one transaction in a block.
......@@ -336,7 +336,7 @@ func endTxSimulation(chainID string, ccid *pb.ChaincodeID, txsim ledger.TxSimula
if txSimulationResults.PvtSimulationResults != nil {
blockAndPvtData.BlockPvtData[seqInBlock] = &ledger.TxPvtData{
blockAndPvtData.PvtData[seqInBlock] = &ledger.TxPvtData{
SeqInBlock: seqInBlock,
WriteSet: txSimulationResults.PvtSimulationResults,
}
......
......@@ -70,17 +70,17 @@ func TestConstructValidInvalidBlocksPvtData(t *testing.T) {
}
// construct a missingData list for block1
missingData := &ledger.MissingPrivateDataList{}
missingData.Add("", 3, "ns-1", "coll-1", true)
missingData.Add("", 3, "ns-1", "coll-2", true)
missingData.Add("", 6, "ns-6", "coll-2", true)
missingData.Add("", 7, "ns-1", "coll-2", true)
missingData := make(ledger.TxMissingPvtDataMap)
missingData.Add(3, "ns-1", "coll-1", true)
missingData.Add(3, "ns-1", "coll-2", true)
missingData.Add(6, "ns-6", "coll-2", true)
missingData.Add(7, "ns-1", "coll-2", true)
// commit block1
blockAndPvtData1 := &ledger.BlockAndPvtData{
Block: blk1,
BlockPvtData: pvtDataBlk1,
Missing: missingData}
Block: blk1,
PvtData: pvtDataBlk1,
MissingPvtData: missingData}
assert.NoError(t, lg.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtData1))
// construct pvtData from missing data in tx3, tx6, and tx7
......
......@@ -171,17 +171,17 @@ func TestKVLedgerBlockStorageWithPvtdata(t *testing.T) {
pvtdataAndBlock, _ := ledger.GetPvtDataAndBlockByNum(0, nil)
assert.Equal(t, gb, pvtdataAndBlock.Block)
assert.Nil(t, pvtdataAndBlock.BlockPvtData)
assert.Nil(t, pvtdataAndBlock.PvtData)
pvtdataAndBlock, _ = ledger.GetPvtDataAndBlockByNum(1, nil)
assert.Equal(t, block1, pvtdataAndBlock.Block)
assert.NotNil(t, pvtdataAndBlock.BlockPvtData)
assert.True(t, pvtdataAndBlock.BlockPvtData[0].Has("ns1", "coll1"))
assert.True(t, pvtdataAndBlock.BlockPvtData[0].Has("ns1", "coll2"))
assert.NotNil(t, pvtdataAndBlock.PvtData)
assert.True(t, pvtdataAndBlock.PvtData[0].Has("ns1", "coll1"))
assert.True(t, pvtdataAndBlock.PvtData[0].Has("ns1", "coll2"))
pvtdataAndBlock, _ = ledger.GetPvtDataAndBlockByNum(2, nil)
assert.Equal(t, block2, pvtdataAndBlock.Block)
assert.Nil(t, pvtdataAndBlock.BlockPvtData)
assert.Nil(t, pvtdataAndBlock.PvtData)
}
func TestKVLedgerDBRecovery(t *testing.T) {
......@@ -497,7 +497,7 @@ func prepareNextBlockForTest(t *testing.T, l lgr.PeerLedger, bg *testutil.BlockG
pubSimBytes, _ := simRes.GetPubSimulationBytes()
block := bg.NextBlock([][]byte{pubSimBytes})
return &lgr.BlockAndPvtData{Block: block,
BlockPvtData: map[uint64]*lgr.TxPvtData{0: {SeqInBlock: 0, WriteSet: simRes.PvtSimulationResults}},
PvtData: lgr.TxPvtDataMap{0: {SeqInBlock: 0, WriteSet: simRes.PvtSimulationResults}},
}
}
......
......@@ -53,7 +53,8 @@ func (c *committer) copyOfBlockAndPvtdata(blk *ledger.BlockAndPvtData) *ledger.B
c.assert.NoError(err)
blkCopy := &common.Block{}
c.assert.NoError(proto.Unmarshal(blkBytes, blkCopy))
return &ledger.BlockAndPvtData{Block: blkCopy, BlockPvtData: blk.BlockPvtData}
return &ledger.BlockAndPvtData{Block: blkCopy, PvtData: blk.PvtData,
MissingPvtData: blk.MissingPvtData}
}
///////////////// block generation code ///////////////////////////////////////////
......@@ -90,5 +91,6 @@ func (g *blkGenerator) nextBlockAndPvtdata(trans ...*txAndPvtdata) *ledger.Block
g.lastNum++
g.lastHash = block.Header.Hash()
setBlockFlagsToValid(block)
return &ledger.BlockAndPvtData{Block: block, BlockPvtData: blockPvtdata}
return &ledger.BlockAndPvtData{Block: block, PvtData: blockPvtdata,
MissingPvtData: make(ledger.TxMissingPvtDataMap)}
}
......@@ -85,7 +85,7 @@ func (v *verifier) verifyMostRecentCollectionConfigBelow(blockNum uint64, chainc
func (v *verifier) verifyBlockAndPvtData(blockNum uint64, filter ledger.PvtNsCollFilter, verifyLogic func(r *retrievedBlockAndPvtdata)) {
out, err := v.lgr.GetPvtDataAndBlockByNum(blockNum, filter)
v.assert.NoError(err)
v.t.Logf("Retrieved Block = %s, pvtdata = %s", spew.Sdump(out.Block), spew.Sdump(out.BlockPvtData))
v.t.Logf("Retrieved Block = %s, pvtdata = %s", spew.Sdump(out.Block), spew.Sdump(out.PvtData))
verifyLogic(&retrievedBlockAndPvtdata{out, v.assert})
}
......@@ -121,7 +121,7 @@ type retrievedBlockAndPvtdata struct {
}
func (r *retrievedBlockAndPvtdata) sameAs(expectedBlockAndPvtdata *ledger.BlockAndPvtData) {
r.samePvtdata(expectedBlockAndPvtdata.BlockPvtData)
r.samePvtdata(expectedBlockAndPvtdata.PvtData)
r.sameBlockHeaderAndData(expectedBlockAndPvtdata.Block)
r.sameMetadata(expectedBlockAndPvtdata.Block)
}
......@@ -131,11 +131,11 @@ func (r *retrievedBlockAndPvtdata) hasNumTx(numTx int) {
}
func (r *retrievedBlockAndPvtdata) hasNoPvtdata() {
r.assert.Len(r.BlockPvtData, 0)
r.assert.Len(r.PvtData, 0)
}
func (r *retrievedBlockAndPvtdata) pvtdataShouldContain(txSeq int, ns, coll, key, value string) {
txPvtData := r.BlockAndPvtData.BlockPvtData[uint64(txSeq)]
txPvtData := r.BlockAndPvtData.PvtData[uint64(txSeq)]
for _, nsdata := range txPvtData.WriteSet.NsPvtRwset {
if nsdata.Namespace == ns {
for _, colldata := range nsdata.CollectionPvtRwset {
......@@ -156,7 +156,7 @@ func (r *retrievedBlockAndPvtdata) pvtdataShouldContain(txSeq int, ns, coll, key
}
func (r *retrievedBlockAndPvtdata) pvtdataShouldNotContain(ns, coll string) {
allTxPvtData := r.BlockAndPvtData.BlockPvtData
allTxPvtData := r.BlockAndPvtData.PvtData
for _, txPvtData := range allTxPvtData {
r.assert.False(txPvtData.Has(ns, coll))
}
......@@ -187,9 +187,9 @@ func (r *retrievedBlockAndPvtdata) containsValidationCode(txSeq int, validationC
}
func (r *retrievedBlockAndPvtdata) samePvtdata(expectedPvtdata map[uint64]*ledger.TxPvtData) {
r.assert.Equal(len(expectedPvtdata), len(r.BlockAndPvtData.BlockPvtData))
r.assert.Equal(len(expectedPvtdata), len(r.BlockAndPvtData.PvtData))
for txNum, pvtData := range expectedPvtdata {
actualPvtData := r.BlockAndPvtData.BlockPvtData[txNum]
actualPvtData := r.BlockAndPvtData.PvtData[txNum]
r.assert.Equal(pvtData.SeqInBlock, actualPvtData.SeqInBlock)
r.assert.True(proto.Equal(pvtData.WriteSet, actualPvtData.WriteSet))
}
......
......@@ -125,7 +125,7 @@ func newTxMgrTestHelper(t *testing.T, txMgr txmgr.TxMgr) *txMgrTestHelper {
func (h *txMgrTestHelper) validateAndCommitRWSet(txRWSet *rwset.TxReadWriteSet) {
rwSetBytes, _ := proto.Marshal(txRWSet)
block := h.bg.NextBlock([][]byte{rwSetBytes})
err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, BlockPvtData: nil}, true)
err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, PvtData: nil}, true)
assert.NoError(h.t, err)
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
invalidTxNum := 0
......@@ -142,7 +142,7 @@ func (h *txMgrTestHelper) validateAndCommitRWSet(txRWSet *rwset.TxReadWriteSet)
func (h *txMgrTestHelper) checkRWsetInvalid(txRWSet *rwset.TxReadWriteSet) {
rwSetBytes, _ := proto.Marshal(txRWSet)
block := h.bg.NextBlock([][]byte{rwSetBytes})
err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, BlockPvtData: nil}, true)
err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, PvtData: nil}, true)
assert.NoError(h.t, err)
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
invalidTxNum := 0
......
......@@ -1232,7 +1232,7 @@ func prepareNextBlockForTestFromSimulator(t *testing.T, bg *testutil.BlockGenera
pubSimBytes, _ := simRes.GetPubSimulationBytes()
block := bg.NextBlock([][]byte{pubSimBytes})
return &ledger.BlockAndPvtData{Block: block,
BlockPvtData: map[uint64]*ledger.TxPvtData{0: {SeqInBlock: 0, WriteSet: simRes.PvtSimulationResults}},
PvtData: ledger.TxPvtDataMap{0: {SeqInBlock: 0, WriteSet: simRes.PvtSimulationResults}},
}
}
......
......@@ -53,7 +53,7 @@ func (impl *DefaultImpl) ValidateAndPrepareBatch(blockAndPvtdata *ledger.BlockAn
return nil, err
}
logger.Debug("validating rwset...")
if pvtUpdates, err = validateAndPreparePvtBatch(internalBlock, impl.db, pubAndHashUpdates, blockAndPvtdata.BlockPvtData); err != nil {
if pvtUpdates, err = validateAndPreparePvtBatch(internalBlock, impl.db, pubAndHashUpdates, blockAndPvtdata.PvtData); err != nil {
return nil, err
}
logger.Debug("postprocessing ProtoBlock...")
......
......@@ -225,29 +225,29 @@ type TxPvtData struct {
WriteSet *rwset.TxPvtReadWriteSet
}
// MissingPrivateData represents a private RWSet
// that isn't present among the private data passed
// to the ledger at the commit of the corresponding block
type MissingPrivateData struct {
TxId string
SeqInBlock uint64
// TxPvtDataMap is a map from txNum to the pvtData
type TxPvtDataMap map[uint64]*TxPvtData
// MissingPvtData contains a namespace and collection for
// which the pvtData is not present. It also denotes
// whether the missing pvtData is eligible (i.e., whether
// the peer is member of the [namespace, collection]
type MissingPvtData struct {
Namespace string
Collection string
IsEligible bool
}
// MissingPrivateDataList encapsulates a list of
// MissingPrivateData
type MissingPrivateDataList struct {
List []*MissingPrivateData
}
// TxMissingPvtDataMap is a map from txNum to the list of
// missing pvtData
type TxMissingPvtDataMap map[uint64][]*MissingPvtData
// BlockAndPvtData encapsulates the block and a map that contains the tuples <seqInBlock, *TxPvtData>
// The map is expected to contain the entries only for the transactions that has associated pvt data
type BlockAndPvtData struct {
Block *common.Block
BlockPvtData map[uint64]*TxPvtData
Missing *MissingPrivateDataList
Block *common.Block
PvtData TxPvtDataMap
MissingPvtData TxMissingPvtDataMap
}
// BlockPvtData contains the private data for a block
......@@ -257,8 +257,8 @@ type BlockPvtData struct {
}
// Add adds a given missing private data in the MissingPrivateDataList
func (missing *MissingPrivateDataList) Add(txId string, txNum uint64, ns, coll string, isEligible bool) {
missing.List = append(missing.List, &MissingPrivateData{txId, txNum, ns, coll, isEligible})
func (txMissingPvtData TxMissingPvtDataMap) Add(txNum uint64, ns, coll string, isEligible bool) {
txMissingPvtData[txNum] = append(txMissingPvtData[txNum], &MissingPvtData{ns, coll, isEligible})
}
// PvtCollFilter represents the set of the collection names (as keys of the map with value 'true')
......
......@@ -21,7 +21,6 @@ import (
)
var logger = flogging.MustGetLogger("ledgerstorage")
var isMissingDataReconEnabled = false
// Provider encapusaltes two providers 1) block store provider and 2) and pvt data store provider
type Provider struct {
......@@ -89,12 +88,7 @@ func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
blockNum := blockAndPvtdata.Block.Header.Number
missingDataList := blockAndPvtdata.Missing
if !isMissingDataReconEnabled {
// should not store any entries for missing data
missingDataList = nil
}
missingPvtData := blockAndPvtdata.MissingPvtData
s.rwlock.Lock()
defer s.rwlock.Unlock()
......@@ -110,10 +104,10 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
// skip the pvt data commit to the pvtdata blockstore
logger.Debugf("Writing block [%d] to pvt block store", blockNum)
var pvtdata []*ledger.TxPvtData
for _, v := range blockAndPvtdata.BlockPvtData {
for _, v := range blockAndPvtdata.PvtData {
pvtdata = append(pvtdata, v)
}
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata, missingDataList); err != nil {
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata, missingPvtData); err != nil {
return err
}
writtenToPvtStore = true
......@@ -156,7 +150,7 @@ func (s *Store) GetPvtDataAndBlockByNum(blockNum uint64, filter ledger.PvtNsColl
if pvtdata, err = s.getPvtDataByNumWithoutLock(blockNum, filter); err != nil {
return nil, err
}
return &ledger.BlockAndPvtData{Block: block, BlockPvtData: constructPvtdataMap(pvtdata)}, nil
return &ledger.BlockAndPvtData{Block: block, PvtData: constructPvtdataMap(pvtdata)}, nil
}
// GetPvtDataByNum returns only the pvt data corresponding to the given block number
......
......@@ -72,12 +72,20 @@ func TestStore(t *testing.T) {
blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil)
assert.NoError(t, err)
assert.Equal(t, sampleData[2].Missing, blockAndPvtdata.Missing)
assert.Equal(t, len(sampleData[2].MissingPvtData), len(blockAndPvtdata.MissingPvtData))
for txNum := range blockAndPvtdata.MissingPvtData {
assert.ElementsMatch(t, sampleData[2].MissingPvtData[txNum],
blockAndPvtdata.MissingPvtData)
}
assert.True(t, proto.Equal(sampleData[2].Block, blockAndPvtdata.Block))
blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil)
assert.NoError(t, err)
assert.Equal(t, sampleData[3].Missing, blockAndPvtdata.Missing)
assert.Equal(t, len(sampleData[2].MissingPvtData), len(blockAndPvtdata.MissingPvtData))
for txNum := range blockAndPvtdata.MissingPvtData {
assert.ElementsMatch(t, sampleData[2].MissingPvtData[txNum],
blockAndPvtdata.MissingPvtData)
}
assert.True(t, proto.Equal(sampleData[3].Block, blockAndPvtdata.Block))
// pvt data retrieval for block 3 with filter should return filtered pvtdata
......@@ -87,12 +95,12 @@ func TestStore(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, sampleData[3].Block, blockAndPvtdata.Block)
// two transactions should be present
assert.Equal(t, 2, len(blockAndPvtdata.BlockPvtData))
assert.Equal(t, 2, len(blockAndPvtdata.PvtData))
// both tran number 4 and 6 should have only one collection because of filter
assert.Equal(t, 1, len(blockAndPvtdata.BlockPvtData[4].WriteSet.NsPvtRwset))
assert.Equal(t, 1, len(blockAndPvtdata.BlockPvtData[6].WriteSet.NsPvtRwset))
assert.Equal(t, 1, len(blockAndPvtdata.PvtData[4].WriteSet.NsPvtRwset))
assert.Equal(t, 1, len(blockAndPvtdata.PvtData[6].WriteSet.NsPvtRwset))
// any other transaction entry should be nil
assert.Nil(t, blockAndPvtdata.BlockPvtData[2])
assert.Nil(t, blockAndPvtdata.PvtData[2])
}
func TestStoreWithExistingBlockchain(t *testing.T) {
......@@ -142,7 +150,7 @@ func TestStoreWithExistingBlockchain(t *testing.T) {
// Add one more block with ovtdata associated with one of the trans and commit in the normal course
pvtdata := samplePvtData(t, []uint64{0})
assert.NoError(t, store.CommitWithPvtData(&ledger.BlockAndPvtData{Block: blockToAdd, BlockPvtData: pvtdata}))
assert.NoError(t, store.CommitWithPvtData(&ledger.BlockAndPvtData{Block: blockToAdd, PvtData: pvtdata}))
pvtdataBlockHt, err = store.pvtdataStore.LastCommittedBlockHeight()
assert.NoError(t, err)
assert.Equal(t, uint64(10), pvtdataBlockHt)
......@@ -167,7 +175,7 @@ func TestCrashAfterPvtdataStorePreparation(t *testing.T) {
}
blokNumAtCrash := dataAtCrash.Block.Header.Number
var pvtdataAtCrash []*ledger.TxPvtData
for _, p := range dataAtCrash.BlockPvtData {
for _, p := range dataAtCrash.PvtData {
pvtdataAtCrash = append(pvtdataAtCrash, p)
}
// Only call Prepare on pvt data store and mimic a crash
......@@ -189,14 +197,14 @@ func TestCrashAfterPvtdataStorePreparation(t *testing.T) {
pvtdata, err := store.GetPvtDataByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
constructed := constructPvtdataMap(pvtdata)
for k, v := range dataAtCrash.BlockPvtData {
for k, v := range dataAtCrash.PvtData {
ov, ok := constructed[k]
assert.True(t, ok)
assert.Equal(t, v.SeqInBlock, ov.SeqInBlock)
assert.True(t, proto.Equal(v.WriteSet, ov.WriteSet))
}
for k, v := range constructed {
ov, ok := dataAtCrash.BlockPvtData[k]
ov, ok := dataAtCrash.PvtData[k]
assert.True(t, ok)
assert.Equal(t, v.SeqInBlock, ov.SeqInBlock)
assert.True(t, proto.Equal(v.WriteSet, ov.WriteSet))
......@@ -222,7 +230,7 @@ func TestCrashBeforePvtdataStoreCommit(t *testing.T) {
}
blokNumAtCrash := dataAtCrash.Block.Header.Number
var pvtdataAtCrash []*ledger.TxPvtData
for _, p := range dataAtCrash.BlockPvtData {
for _, p := range dataAtCrash.PvtData {
pvtdataAtCrash = append(pvtdataAtCrash, p)
}
......@@ -238,7 +246,7 @@ func TestCrashBeforePvtdataStoreCommit(t *testing.T) {
store.Init(btlPolicyForSampleData())
blkAndPvtdata, err := store.GetPvtDataAndBlockByNum(blokNumAtCrash, nil)
assert.NoError(t, err)
assert.Equal(t, dataAtCrash.Missing, blkAndPvtdata.Missing)
assert.Equal(t, dataAtCrash.MissingPvtData, blkAndPvtdata.MissingPvtData)
assert.True(t, proto.Equal(dataAtCrash.Block, blkAndPvtdata.Block))
}
......@@ -319,9 +327,9 @@ func sampleDataWithPvtdataForSelectiveTx(t *testing.T) []*ledger.BlockAndPvtData
blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]})
}
// txNum 3, 5 in block 2 has pvtdata
blockAndpvtdata[2].BlockPvtData = samplePvtData(t, []uint64{3, 5})
blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5})
// txNum 4, 6 in block 3 has pvtdata
blockAndpvtdata[3].BlockPvtData = samplePvtData(t, []uint64{4, 6})
blockAndpvtdata[3].PvtData = samplePvtData(t, []uint64{4, 6})
return blockAndpvtdata
}
......@@ -331,8 +339,8 @@ func sampleDataWithPvtdataForAllTxs(t *testing.T) []*ledger.BlockAndPvtData {
for i := 0; i < 10; i++ {
blockAndpvtdata = append(blockAndpvtdata,
&ledger.BlockAndPvtData{
Block: blocks[i],
BlockPvtData: samplePvtData(t, []uint64{uint64(i), uint64(i + 1)}),
Block: blocks[i],
PvtData: samplePvtData(t, []uint64{uint64(i), uint64(i + 1)}),
},
)
}
......
......@@ -15,10 +15,11 @@ import (
"github.com/willf/bitset"
)
func prepareStoreEntries(blockNum uint64, pvtdata []*ledger.TxPvtData, btlPolicy pvtdatapolicy.BTLPolicy, missingData *ledger.MissingPrivateDataList) (*storeEntries, error) {
dataEntries := prepareDataEntries(blockNum, pvtdata)
func prepareStoreEntries(blockNum uint64, pvtData []*ledger.TxPvtData, btlPolicy pvtdatapolicy.BTLPolicy,
missingPvtData ledger.TxMissingPvtDataMap) (*storeEntries, error) {
dataEntries := prepareDataEntries(blockNum, pvtData)
missingDataEntries := prepareMissingDataEntries(blockNum, missingData)
missingDataEntries := prepareMissingDataEntries(blockNum, missingPvtData)
expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, missingDataEntries, btlPolicy)
if err != nil {
......@@ -47,23 +48,21 @@ func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEnt
return dataEntries
}
func prepareMissingDataEntries(committingBlk uint64, missingData *ledger.MissingPrivateDataList) map[missingDataKey]*bitset.BitSet {
if missingData == nil {
return nil
}
func prepareMissingDataEntries(committingBlk uint64, missingPvtData ledger.TxMissingPvtDataMap) map[missingDataKey]*bitset.BitSet {
missingDataEntries := make(map[missingDataKey]*bitset.BitSet)
for _, missing := range missingData.List {
key := missingDataKey{nsCollBlk{missing.Namespace, missing.Collection, committingBlk},
missing.IsEligible}
for txNum, missingData := range missingPvtData {
for _, nsColl := range missingData {
key := missingDataKey{nsCollBlk{nsColl.Namespace, nsColl.Collection, committingBlk},
nsColl.IsEligible}
if _, ok := missingDataEntries[key]; !ok {
missingDataEntries[key] = &bitset.BitSet{}
}
bitmap := missingDataEntries[key]
if _, ok := missingDataEntries[key]; !ok {
missingDataEntries[key] = &bitset.BitSet{}
}
bitmap := missingDataEntries[key]
bitmap.Set(uint(missing.SeqInBlock))
bitmap.Set(uint(txNum))
}
}
return missingDataEntries
......
......@@ -55,7 +55,7 @@ type Store interface {
// that enough preparation is done such that `Commit` function invoked afterwards can commit the
// data and the store is capable of surviving a crash between this function call and the next
// invoke to the `Commit`
Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missing *ledger.MissingPrivateDataList) error
Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtData ledger.TxMissingPvtDataMap) error
// Commit commits the pvt data passed in the previous invoke to the `Prepare` function
Commit() error
// Rollback rolls back the pvt data passed in the previous invoke to the `Prepare` function
......
......@@ -162,7 +162,7 @@ func (s *store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
}
// Prepare implements the function in the interface `Store`
func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingData *ledger.MissingPrivateDataList) error {
func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvtData ledger.TxMissingPvtDataMap) error {
if s.batchPending {
return &ErrIllegalCall{`A pending batch exists as as result of last invoke to "Prepare" call.
Invoke "Commit" or "Rollback" on the pending batch before invoking "Prepare" function`}
......@@ -176,7 +176,7 @@ func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingDat
var err error
var keyBytes, valBytes []byte
storeEntries, err := prepareStoreEntries(blockNum, pvtData, s.btlPolicy, missingData)
storeEntries, err := prepareStoreEntries(blockNum, pvtData, s.btlPolicy, missingPvtData)
if err != nil {
return err
}
......
......@@ -61,26 +61,26 @@ func TestStoreBasicCommitAndRetrieval(t *testing.T) {
}
// construct missing data for block 1
blk1MissingData := &ledger.MissingPrivateDataList{}
blk1MissingData := make(ledger.TxMissingPvtDataMap)
// eligible missing data in tx1
blk1MissingData.Add("tx1", 1, "ns-1", "coll-1", true)
blk1MissingData.Add("tx1", 1, "ns-1", "coll-2", true)
blk1MissingData.Add("tx1", 1, "ns-2", "coll-1", true)
blk1MissingData.Add("tx1", 1, "ns-2", "coll-2", true)
blk1MissingData.Add(1, "ns-1", "coll-1", true)
blk1MissingData.Add(1, "ns-1", "coll-2", true)
blk1MissingData.Add(1, "ns-2", "coll-1", true)
blk1MissingData.Add(1, "ns-2", "coll-2", true)
// eligible missing data in tx2
blk1MissingData.Add("tx2", 2, "ns-3", "coll-1", true)
blk1MissingData.Add(2, "ns-3", "coll-1", true)
// ineligible missing data in tx4
blk1MissingData.Add("tx4", 4, "ns-4", "coll-1", false)
blk1MissingData.Add("tx4", 4, "ns-4", "coll-2", false)
blk1MissingData.Add(4, "ns-4", "coll-1", false)
blk1MissingData.Add(4, "ns-4", "coll-2", false)
// construct missing data for block 2
blk2MissingData := &ledger.MissingPrivateDataList{}
blk2MissingData := make(ledger.TxMissingPvtDataMap)
// eligible missing data in tx1
blk2MissingData.Add("tx1", 1, "ns-1", "coll-1", true)
blk2MissingData.Add("tx1", 1, "ns-1", "coll-2", true)
blk2MissingData.Add(1, "ns-1", "coll-1", true)
blk2MissingData.Add(1, "ns-1", "coll-2", true)
// eligible missing data in tx3
blk2MissingData.Add("tx3", 3, "ns-1", "coll-1", true)
blk2MissingData.Add(3, "ns-1", "coll-1", true)
// no pvt data with block 0
assert.NoError(store.Prepare(0, nil, nil))
......@@ -189,26 +189,26 @@ func TestCommitPvtDataOfOldBlocks(t *testing.T) {
}
// CONSTRUCT MISSING DATA FOR BLOCK 1
blk1MissingData := &ledger.MissingPrivateDataList{}
blk1MissingData := make(ledger.TxMissingPvtDataMap)
// eligible missing data in tx1
blk1MissingData.Add("tx1", 1, "ns-1", "coll-1", true)
blk1MissingData.Add("tx1", 1, "ns-1", "coll-2", true)
blk1MissingData.Add("tx1", 1, "ns-2", "coll-1", true)
blk1MissingData.Add("tx1", 1, "ns-2", "coll-2", true)
blk1MissingData.Add(1, "ns-1", "coll-1", true)
blk1MissingData.Add(1, "ns-1", "coll-2", true)
blk1MissingData.Add(1, "ns-2", "coll-1", true)
blk1MissingData.Add(1, "ns-2", "coll-2", true)
// eligible missing data in tx2
blk1MissingData.Add("tx2", 2, "ns-1", "coll-1", true)
blk1MissingData.Add("tx2", 2, "ns-1", "coll-2", true)
blk1MissingData.Add("tx2", 2, "ns-3", "coll-1", true)
blk1MissingData.Add("tx2", 2, "ns-3", "coll-2", true)
blk1MissingData.Add(2, "ns-1", "coll-1", true)
blk1MissingData.Add(2, "ns-1", "coll-2", true)
blk1MissingData.Add(2, "ns-3", "coll-1", true)
blk1MissingData.Add(2, "ns-3", "coll-2", true)
// CONSTRUCT MISSING DATA FOR BLOCK 2
blk2MissingData := &ledger.MissingPrivateDataList{}
blk2MissingData := make(ledger.TxMissingPvtDataMap)
// eligible missing data in tx1
blk2MissingData.Add("tx1", 1, "ns-1", "coll-1", true)
blk2MissingData.Add("tx1", 1, "ns-1", "coll-2", true)
blk2MissingData.Add(1, "ns-1", "coll-1", true)
blk2MissingData.Add(1, "ns-1", "coll-2", true)
// eligible missing data in tx3
blk2MissingData.Add("tx3", 3, "ns-1", "coll-1", true)
blk2MissingData.Add(3, "ns-1", "coll-1", true)
// COMMIT BLOCK 0 WITH NO DATA
assert.NoError(store.Prepare(0, nil, nil))
......@@ -391,19 +391,19 @@ func TestExpiryDataNotIncluded(t *testing.T) {
store := env.TestStore
// construct missing data for block 1
blk1MissingData := &ledger.MissingPrivateDataList{}
blk1MissingData := make(ledger.TxMissingPvtDataMap)
// eligible missing data in tx1
blk1MissingData.Add("tx1", 1, "ns-1", "coll-1", true)
blk1MissingData.Add("tx1", 1, "ns-1", "coll-2", true)
blk1MissingData.Add(1, "ns-1", "coll-1", true)
blk1MissingData.Add(1, "ns-1", "coll-2", true)
// ineligible missing data in tx4
blk1MissingData.Add("tx4", 4, "ns-3", "coll-1", false)
blk1MissingData.Add("tx4", 4, "ns-3", "coll-2", false)
blk1MissingData.