Commit f3a84171 authored by Manish Sethi's avatar Manish Sethi Committed by Gerrit Code Review
Browse files

Merge "recon: commit only validTx's pvtData in pvtStore"

parents 61656ce8 7105f8bb
......@@ -44,7 +44,7 @@ func (bg *BlockGenerator) NextBlock(simulationResults [][]byte) *common.Block {
return block
}
// NextBlock constructs next block in sequence that includes a number of transactions - one per simulationResults
// NextBlockWithTxid constructs next block in sequence that includes a number of transactions - one per simulationResults
func (bg *BlockGenerator) NextBlockWithTxid(simulationResults [][]byte, txids []string) *common.Block {
// Length of simulationResults should be same as the length of txids.
if len(simulationResults) != len(txids) {
......@@ -68,8 +68,11 @@ func (bg *BlockGenerator) NextTestBlock(numTx int, txSize int) *common.Block {
// NextTestBlocks constructs 'numBlocks' number of blocks for testing
func (bg *BlockGenerator) NextTestBlocks(numBlocks int) []*common.Block {
blocks := []*common.Block{}
numTx := 10
for i := 0; i < numBlocks; i++ {
blocks = append(blocks, bg.NextTestBlock(10, 100))
block := bg.NextTestBlock(numTx, 100)
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = lutils.NewTxValidationFlagsSetValue(numTx, pb.TxValidationCode_VALID)
blocks = append(blocks, block)
}
return blocks
}
......
......@@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
lutil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/pkg/errors"
)
......@@ -88,8 +89,6 @@ func (s *Store) Init(btlPolicy pvtdatapolicy.BTLPolicy) {
// CommitWithPvtData commits the block and the corresponding pvt data in an atomic operation
func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error {
blockNum := blockAndPvtdata.Block.Header.Number
missingPvtData := blockAndPvtdata.MissingPvtData
s.rwlock.Lock()
defer s.rwlock.Unlock()
......@@ -103,11 +102,10 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
// when re-processing blocks (rejoin the channel or re-fetching last few block),
// skip the pvt data commit to the pvtdata blockstore
logger.Debugf("Writing block [%d] to pvt block store", blockNum)
var pvtdata []*ledger.TxPvtData
for _, v := range blockAndPvtdata.PvtData {
pvtdata = append(pvtdata, v)
}
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, pvtdata, missingPvtData); err != nil {
// as the ledger has already validated all txs in this block, we need to
// use the validated info to commit only the pvtData of valid tx
validTxPvtData, validTxMissingPvtData := constructValidTxPvtDataAndMissingData(blockAndPvtdata)
if err := s.pvtdataStore.Prepare(blockAndPvtdata.Block.Header.Number, validTxPvtData, validTxMissingPvtData); err != nil {
return err
}
writtenToPvtStore = true
......@@ -126,6 +124,35 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
return nil
}
func constructValidTxPvtDataAndMissingData(blockAndPvtData *ledger.BlockAndPvtData) ([]*ledger.TxPvtData,
ledger.TxMissingPvtDataMap) {
var validTxPvtData []*ledger.TxPvtData
validTxMissingPvtData := make(ledger.TxMissingPvtDataMap)
txsFilter := lutil.TxValidationFlags(blockAndPvtData.Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
numTxs := uint64(len(blockAndPvtData.Block.Data.Data))
// for all valid tx, construct pvtdata and missing pvtdata list
for txNum := uint64(0); txNum < numTxs; txNum++ {
if txsFilter.IsInvalid(int(txNum)) {
continue
}
if pvtdata, ok := blockAndPvtData.PvtData[txNum]; ok {
validTxPvtData = append(validTxPvtData, pvtdata)
}
if missingPvtData, ok := blockAndPvtData.MissingPvtData[txNum]; ok {
for _, missing := range missingPvtData {
validTxMissingPvtData.Add(txNum, missing.Namespace,
missing.Collection, missing.IsEligible)
}
}
}
return validTxPvtData, validTxMissingPvtData
}
// CommitPvtDataOfOldBlocks commits the pvtData of old blocks
func (s *Store) CommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error {
err := s.pvtdataStore.CommitPvtDataOfOldBlocks(blocksPvtData)
......
......@@ -20,7 +20,10 @@ import (
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
btltestutil "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy/testutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatastorage"
lutil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/rwset"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)
......@@ -56,7 +59,9 @@ func TestStore(t *testing.T) {
assert.NoError(t, err)
assert.Nil(t, pvtdata)
// block 2 has pvt data for tx 3 and 5 only
// block 2 has pvt data for tx 3, 5 and 6. However, tx 6
// is marked as invalid in the block and hence should not
// have been stored
pvtdata, err = store.GetPvtDataByNum(2, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(pvtdata))
......@@ -72,20 +77,10 @@ func TestStore(t *testing.T) {
blockAndPvtdata, err := store.GetPvtDataAndBlockByNum(2, nil)
assert.NoError(t, err)
assert.Equal(t, len(sampleData[2].MissingPvtData), len(blockAndPvtdata.MissingPvtData))
for txNum := range blockAndPvtdata.MissingPvtData {
assert.ElementsMatch(t, sampleData[2].MissingPvtData[txNum],
blockAndPvtdata.MissingPvtData)
}
assert.True(t, proto.Equal(sampleData[2].Block, blockAndPvtdata.Block))
blockAndPvtdata, err = store.GetPvtDataAndBlockByNum(3, nil)
assert.NoError(t, err)
assert.Equal(t, len(sampleData[2].MissingPvtData), len(blockAndPvtdata.MissingPvtData))
for txNum := range blockAndPvtdata.MissingPvtData {
assert.ElementsMatch(t, sampleData[2].MissingPvtData[txNum],
blockAndPvtdata.MissingPvtData)
}
assert.True(t, proto.Equal(sampleData[3].Block, blockAndPvtdata.Block))
// pvt data retrieval for block 3 with filter should return filtered pvtdata
......@@ -101,6 +96,15 @@ func TestStore(t *testing.T) {
assert.Equal(t, 1, len(blockAndPvtdata.PvtData[6].WriteSet.NsPvtRwset))
// any other transaction entry should be nil
assert.Nil(t, blockAndPvtdata.PvtData[2])
// test missing data retrieval in the presence of invalid tx. Block 5 had
// missing data (for tx4 and tx5). However, tx5 was marked as invalid tx.
// Hence, only tx4's missing data should be returned
expectedMissingDataInfo := make(ledger.MissingPvtDataInfo)
expectedMissingDataInfo.Add(5, 4, "ns-4", "coll-4")
missingDataInfo, err := store.GetMissingPvtDataInfoForMostRecentBlocks(1)
assert.NoError(t, err)
assert.Equal(t, expectedMissingDataInfo, missingDataInfo)
}
func TestStoreWithExistingBlockchain(t *testing.T) {
......@@ -326,10 +330,25 @@ func sampleDataWithPvtdataForSelectiveTx(t *testing.T) []*ledger.BlockAndPvtData
for i := 0; i < 10; i++ {
blockAndpvtdata = append(blockAndpvtdata, &ledger.BlockAndPvtData{Block: blocks[i]})
}
// txNum 3, 5 in block 2 has pvtdata
blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5})
// txNum 3, 5, 6 in block 2 has pvtdata but txNum 6 is invalid
blockAndpvtdata[2].PvtData = samplePvtData(t, []uint64{3, 5, 6})
txFilter := lutil.TxValidationFlags(blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
txFilter.SetFlag(6, pb.TxValidationCode_INVALID_WRITESET)
blockAndpvtdata[2].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter
// txNum 4, 6 in block 3 has pvtdata
blockAndpvtdata[3].PvtData = samplePvtData(t, []uint64{4, 6})
// txNum 4, 5 in block 5 has missing pvt data but txNum 5 is invalid
missingData := make(ledger.TxMissingPvtDataMap)
missingData.Add(4, "ns-4", "coll-4", true)
missingData.Add(5, "ns-5", "coll-5", true)
blockAndpvtdata[5].MissingPvtData = missingData
txFilter = lutil.TxValidationFlags(blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
txFilter.SetFlag(5, pb.TxValidationCode_INVALID_WRITESET)
blockAndpvtdata[5].Block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txFilter
return blockAndpvtdata
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment