Commit 7105f8bb authored by senthil's avatar senthil Committed by Senthil Nathan N
Browse files

recon: commit only validTx's pvtData in pvtStore



Currently, gossip constructs pvtData only for validTx
as per VSCC. Hence, we are not storing all pvtData of
a block.

In the ledger, currently, we are storing pvtData of
invalidTx (i.e., failed MVCC checks) too. To be
consistent, this CR can stores the pvtData of only
validTx (as per MVCC check).

FAB-11805 #done

Change-Id: Ic4b228cade0456bc1ddc4ab80595e9e82d7aaa25
Signed-off-by: default avatarsenthil <cendhu@gmail.com>
parent 8c966365
...@@ -44,7 +44,7 @@ func (bg *BlockGenerator) NextBlock(simulationResults [][]byte) *common.Block { ...@@ -44,7 +44,7 @@ func (bg *BlockGenerator) NextBlock(simulationResults [][]byte) *common.Block {
return block return block
} }
// NextBlock constructs next block in sequence that includes a number of transactions - one per simulationResults // NextBlockWithTxid constructs next block in sequence that includes a number of transactions - one per simulationResults
func (bg *BlockGenerator) NextBlockWithTxid(simulationResults [][]byte, txids []string) *common.Block { func (bg *BlockGenerator) NextBlockWithTxid(simulationResults [][]byte, txids []string) *common.Block {
// Length of simulationResults should be same as the length of txids. // Length of simulationResults should be same as the length of txids.
if len(simulationResults) != len(txids) { if len(simulationResults) != len(txids) {
...@@ -68,8 +68,11 @@ func (bg *BlockGenerator) NextTestBlock(numTx int, txSize int) *common.Block { ...@@ -68,8 +68,11 @@ func (bg *BlockGenerator) NextTestBlock(numTx int, txSize int) *common.Block {
// NextTestBlocks constructs 'numBlocks' number of blocks for testing // NextTestBlocks constructs 'numBlocks' number of blocks for testing
func (bg *BlockGenerator) NextTestBlocks(numBlocks int) []*common.Block { func (bg *BlockGenerator) NextTestBlocks(numBlocks int) []*common.Block {
blocks := []*common.Block{} blocks := []*common.Block{}
numTx := 10
for i := 0; i < numBlocks; i++ { for i := 0; i < numBlocks; i++ {
blocks = append(blocks, bg.NextTestBlock(10, 100)) block := bg.NextTestBlock(numTx, 100)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = lutils.NewTxValidationFlagsSetValue(numTx, pb.TxValidationCode_VALID)
blocks = append(blocks, block)
} }
return blocks return blocks
} }
......
...@@ -16,6 +16,7 @@ import ( ...@@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/ledgerconfig" "github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage" "github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
lutil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common" "github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
...@@ -88,8 +89,6 @@ func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) { ...@@ -88,8 +89,6 @@ func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation // CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error { func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
blockNum := blockAndPvtdata.Block.Header.Number blockNum := blockAndPvtdata.Block.Header.Number
missingPvtData := blockAndPvtdata.MissingPvtData
s.rwlock.Lock() s.rwlock.Lock()
defer s.rwlock.Unlock() defer s.rwlock.Unlock()
...@@ -103,11 +102,10 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error ...@@ -103,11 +102,10 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
// when re-processing blocks (rejoin the channel or re-fetching last few block), // when re-processing blocks (rejoin the channel or re-fetching last few block),
// skip the pvt data commit to the pvtdata blockstore // skip the pvt data commit to the pvtdata blockstore
logger.Debugf("Writing block [%d] to pvt block store", blockNum) logger.Debugf("Writing block [%d] to pvt block store", blockNum)
var pvtdata []*ledger.TxPvtData // as the ledger has already validated all txs in this block, we need to
for _, v := range blockAndPvtdata.PvtData { // use the validated info to commit only the pvtData of valid tx
pvtdata = append(pvtdata, v) validTxPvtData, validTxMissingPvtData := constructValidTxPvtDataAndMissingData(blockAndPvtdata)
} if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, validTxPvtData, validTxMissingPvtData); err != nil {
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata, missingPvtData); err != nil {
return err return err
} }
writtenToPvtStore = true writtenToPvtStore = true
...@@ -126,6 +124,35 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error ...@@ -126,6 +124,35 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
return nil return nil
} }
func constructValidTxPvtDataAndMissingData(blockAndPvtData *ledger.BlockAndPvtData) ([]*ledger.TxPvtData,
ledger.TxMissingPvtDataMap) {
var validTxPvtData []*ledger.TxPvtData
validTxMissingPvtData := make(ledger.TxMissingPvtDataMap)
txsFilter := lutil.TxValidationFlags(blockAndPvtData.Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
numTxs := uint64(len(blockAndPvtData.Block.Data.Data))
// for all valid tx, construct pvtdata and missing pvtdata list
for txNum := uint64(0); txNum < numTxs; txNum++ {
if txsFilter.IsInvalid(int(txNum)) {
continue
}
if pvtdata, ok := blockAndPvtData.PvtData[txNum]; ok {
validTxPvtData = append(validTxPvtData, pvtdata)
}
if missingPvtData, ok := blockAndPvtData.MissingPvtData[txNum]; ok {
for _, missing := range missingPvtData {
validTxMissingPvtData.Add(txNum, missing.Namespace,
missing.Collection, missing.IsEligible)
}
}
}
return validTxPvtData, validTxMissingPvtData
}
// CommitPvtDataOfOldBlocks commits the pvtData of old blocks // CommitPvtDataOfOldBlocks commits the pvtData of old blocks
func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error { func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
err := s.pvtdataStore.CommitPvtDataOfOldBlocks(blocksPvtData) err := s.pvtdataStore.CommitPvtDataOfOldBlocks(blocksPvtData)
......
...@@ -20,7 +20,10 @@ import ( ...@@ -20,7 +20,10 @@ import (
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil" btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage" "github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
lutil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset" "github.com/hyperledger/fabric/protos/ledger/rwset"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
...@@ -56,7 +59,9 @@ func TestStore(t *testing.T) { ...@@ -56,7 +59,9 @@ func TestStore(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Nil(t, pvtdata) assert.Nil(t, pvtdata)
// block 2 has pvt data for tx 3 and 5 only // block 2 has pvt data for tx 3, 5 and 6. However, tx 6
// is marked as invalid in the block and hence should not
// have been stored
pvtdata, err = store.GetPvtDataByNum(2, nil) pvtdata, err = store.GetPvtDataByNum(2, nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 2, len(pvtdata)) assert.Equal(t, 2, len(pvtdata))
...@@ -72,20 +77,10 @@ func TestStore(t *testing.T) { ...@@ -72,20 +77,10 @@ func TestStore(t *testing.T) {
blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil) blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, len(sampleData[2].MissingPvtData), len(blockAndPvtdata.MissingPvtData))
for txNum := range blockAndPvtdata.MissingPvtData {
assert.ElementsMatch(t, sampleData[2].MissingPvtData[txNum],
blockAndPvtdata.MissingPvtData)
}
assert.True(t, proto.Equal(sampleData[2].Block, blockAndPvtdata.Block)) assert.True(t, proto.Equal(sampleData[2].Block, blockAndPvtdata.Block))
blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil) blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, len(sampleData[2].MissingPvtData), len(blockAndPvtdata.MissingPvtData))
for txNum := range blockAndPvtdata.MissingPvtData {
assert.ElementsMatch(t, sampleData[2].MissingPvtData[txNum],
blockAndPvtdata.MissingPvtData)
}
assert.True(t, proto.Equal(sampleData[3].Block, blockAndPvtdata.Block)) assert.True(t, proto.Equal(sampleData[3].Block, blockAndPvtdata.Block))
// pvt data retrieval for block 3 with filter should return filtered pvtdata // pvt data retrieval for block 3 with filter should return filtered pvtdata
...@@ -101,6 +96,15 @@ func TestStore(t *testing.T) { ...@@ -101,6 +96,15 @@ func TestStore(t *testing.T) {
assert.Equal(t, 1, len(blockAndPvtdata.PvtData[6].WriteSet.NsPvtRwset)) assert.Equal(t, 1, len(blockAndPvtdata.PvtData[6].WriteSet.NsPvtRwset))
// any other transaction entry should be nil // any other transaction entry should be nil
assert.Nil(t, blockAndPvtdata.PvtData[2]) assert.Nil(t, blockAndPvtdata.PvtData[2])
// test missing data retrieval in the presence of invalid tx. Block 5 had
// missing data (for tx4 and tx5). However, tx5 was marked as invalid tx.
// Hence, only tx4's missing data should be returned
expectedMissingDataInfo := make(ledger.MissingPvtDataInfo)
expectedMissingDataInfo.Add(5, 4, "ns-4", "coll-4")
missingDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(1)
assert.NoError(t, err)
assert.Equal(t, expectedMissingDataInfo, missingDataInfo)
} }
func TestStoreWithExistingBlockchain(t *testing.T) { func TestStoreWithExistingBlockchain(t *testing.T) {
...@@ -326,10 +330,25 @@ func sampleDataWithPvtdataForSelectiveTx(t *testing.T) []*ledger.BlockAndPvtData ...@@ -326,10 +330,25 @@ func sampleDataWithPvtdataForSelectiveTx(t *testing.T) []*ledger.BlockAndPvtData
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]}) blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]})
} }
// txNum 3, 5 in block 2 has pvtdata
blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5}) // txNum 3, 5, 6 in block 2 has pvtdata but txNum 6 is invalid
blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5, 6})
txFilter := lutil.TxValidationFlags(blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
txFilter.SetFlag(6, pb.TxValidationCode_INVALID_WRITESET)
blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter
// txNum 4, 6 in block 3 has pvtdata // txNum 4, 6 in block 3 has pvtdata
blockAndpvtdata[3].PvtData = samplePvtData(t, []uint64{4, 6}) blockAndpvtdata[3].PvtData = samplePvtData(t, []uint64{4, 6})
// txNum 4, 5 in block 5 has missing pvt data but txNum 5 is invalid
missingData := make(ledger.TxMissingPvtDataMap)
missingData.Add(4, "ns-4", "coll-4", true)
missingData.Add(5, "ns-5", "coll-5", true)
blockAndpvtdata[5].MissingPvtData = missingData
txFilter = lutil.TxValidationFlags(blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
txFilter.SetFlag(5, pb.TxValidationCode_INVALID_WRITESET)
blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter
return blockAndpvtdata return blockAndpvtdata
} }
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment