Commit 9e65f859 authored by Yacov Manevich's avatar Yacov Manevich Committed by Gerrit Code Review
Browse files

Merge "[FAB-10742] Batch retrive of pvt data from ledger"

parents b5ee20f7 24078dff
......@@ -16,6 +16,7 @@ import (
"time"
pb "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/bccsp/factory"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/common/privdata"
"github.com/hyperledger/fabric/core/ledger"
......@@ -35,6 +36,7 @@ import (
func init() {
viper.Set("peer.gossip.pvtData.pullRetryThreshold", time.Second*3)
factory.InitFactories(nil)
}
type persistCall struct {
......
......@@ -22,9 +22,13 @@ import (
type StorageDataRetriever interface {
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil
CollectionRWSet(dig *gossip2.PvtDataDigest) (*util.PrivateRWSetWithConfig, error)
CollectionRWSet(dig []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error)
}
//go:generate mockery -dir . -name DataStore -case underscore -output mocks/
//go:generate mockery -dir ../../core/transientstore/ -name RWSetScanner -case underscore -output mocks/
//go:generate mockery -dir ../../core/ledger/ -name ConfigHistoryRetriever -case underscore -output mocks/
// DataStore defines set of APIs need to get private data
// from underlined data store
type DataStore interface {
......@@ -56,70 +60,110 @@ func NewDataRetriever(store DataStore) StorageDataRetriever {
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil
func (dr *dataRetriever) CollectionRWSet(dig *gossip2.PvtDataDigest) (*util.PrivateRWSetWithConfig, error) {
filter := map[string]ledger.PvtCollFilter{
dig.Namespace: map[string]bool{
dig.Collection: true,
},
}
func (dr *dataRetriever) CollectionRWSet(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
height, err := dr.store.LedgerHeight()
if err != nil {
// if there is an error getting info from the ledger, we need to try to read from transient store
return nil, errors.New(fmt.Sprint("Wasn't able to read ledger height, due to", err, "trying to lookup "+
"private data from transient store, namespace", dig.Namespace, "collection name", dig.Collection, "txID", dig.TxId))
return nil, fmt.Errorf("wasn't able to read ledger height, due to %s", err)
}
if height <= dig.BlockSeq {
if height <= blockNum {
logger.Debug("Current ledger height ", height, "is below requested block sequence number",
dig.BlockSeq, "retrieving private data from transient store, namespace", dig.Namespace, "collection name",
dig.Collection, "txID", dig.TxId)
blockNum, "retrieving private data from transient store")
}
if height <= dig.BlockSeq { // Check whenever current ledger height is equal or above block sequence num.
return dr.fromTransientStore(dig, filter)
results := make(Dig2PvtRWSetWithConfig)
if height <= blockNum { // Check whenever current ledger height is equal or below block sequence num.
for _, dig := range digests {
filter := map[string]ledger.PvtCollFilter{
dig.Namespace: map[string]bool{
dig.Collection: true,
},
}
pvtRWSet, err := dr.fromTransientStore(dig, filter)
if err != nil {
logger.Errorf("couldn't read from transient store private read-write set, "+
"digest %+v, because of %s", dig, err)
continue
}
results[DigKey{
Namespace: dig.Namespace,
Collection: dig.Collection,
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
}] = pvtRWSet
}
return results, nil
}
// Since ledger height is above block sequence number private data is might be available in the ledger
return dr.fromLedger(dig, filter)
return dr.fromLedger(digests, blockNum)
}
func (dr *dataRetriever) fromLedger(dig *gossip2.PvtDataDigest, filter map[string]ledger.PvtCollFilter) (*util.PrivateRWSetWithConfig, error) {
results := &util.PrivateRWSetWithConfig{}
pvtData, err := dr.store.GetPvtDataByNum(dig.BlockSeq, filter)
if err != nil {
return nil, errors.New(fmt.Sprint("wasn't able to obtain private data for collection", dig.Collection,
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err))
}
for _, data := range pvtData {
if data.WriteSet == nil {
logger.Warning("Received nil write set for collection", dig.Collection, "namespace", dig.Namespace)
continue
func (dr *dataRetriever) fromLedger(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
filter := make(map[string]ledger.PvtCollFilter)
for _, dig := range digests {
if _, ok := filter[dig.Namespace]; !ok {
filter[dig.Namespace] = make(ledger.PvtCollFilter)
}
pvtRWSet := dr.extractPvtRWsets(data.WriteSet.NsPvtRwset, dig.Namespace, dig.Collection)
results.RWSet = append(results.RWSet, pvtRWSet...)
filter[dig.Namespace][dig.Collection] = true
}
confHistoryRetriever, err := dr.store.GetConfigHistoryRetriever()
pvtData, err := dr.store.GetPvtDataByNum(blockNum, filter)
if err != nil {
return nil, errors.New(fmt.Sprint("cannot obtain configuration history retriever, for collection,", dig.Collection,
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err))
return nil, errors.New(fmt.Sprint("wasn't able to obtain private data, block sequence number", blockNum, "due to", err))
}
configInfo, err := confHistoryRetriever.MostRecentCollectionConfigBelow(dig.BlockSeq, dig.Namespace)
if err != nil {
return nil, errors.New(fmt.Sprint("cannot find recent collection config update below block sequence = ", dig.BlockSeq,
"collection name =", dig.Collection, "for chaincode", dig.Namespace))
}
results := make(Dig2PvtRWSetWithConfig)
for _, dig := range digests {
dig := dig
pvtRWSetWithConfig := &util.PrivateRWSetWithConfig{}
for _, data := range pvtData {
if data.WriteSet == nil {
logger.Warning("Received nil write set for collection tx in block", data.SeqInBlock, "block number", blockNum)
continue
}
if configInfo == nil {
return nil, errors.New(fmt.Sprint("no collection config update below block sequence = ", dig.BlockSeq,
"collection name =", dig.Collection, "for chaincode", dig.Namespace, "is available"))
}
configs := dr.extractCollectionConfigs(configInfo.CollectionConfig, dig)
if configs == nil {
return nil, errors.New(fmt.Sprint("no collection config was found for collection", dig.Collection,
"namespace", dig.Namespace, "txID", dig.TxId))
// private data doesn't hold rwsets for namespace and collection or
// belongs to different transaction
if !data.Has(dig.Namespace, dig.Collection) || data.SeqInBlock != dig.SeqInBlock {
continue
}
pvtRWSet := dr.extractPvtRWsets(data.WriteSet.NsPvtRwset, dig.Namespace, dig.Collection)
pvtRWSetWithConfig.RWSet = append(pvtRWSetWithConfig.RWSet, pvtRWSet...)
}
confHistoryRetriever, err := dr.store.GetConfigHistoryRetriever()
if err != nil {
return nil, errors.New(fmt.Sprint("cannot obtain configuration history retriever, for collection,", dig.Collection,
"txID", dig.TxId, "block sequence number", dig.BlockSeq, "due to", err))
}
configInfo, err := confHistoryRetriever.MostRecentCollectionConfigBelow(dig.BlockSeq, dig.Namespace)
if err != nil {
return nil, errors.New(fmt.Sprint("cannot find recent collection config update below block sequence = ", dig.BlockSeq,
"collection name =", dig.Collection, "for chaincode", dig.Namespace))
}
if configInfo == nil {
return nil, errors.New(fmt.Sprint("no collection config update below block sequence = ", dig.BlockSeq,
"collection name =", dig.Collection, "for chaincode", dig.Namespace, "is available"))
}
configs := dr.extractCollectionConfigs(configInfo.CollectionConfig, dig)
if configs == nil {
return nil, errors.New(fmt.Sprint("no collection config was found for collection ", dig.Collection,
" namespace ", dig.Namespace, " txID ", dig.TxId))
}
pvtRWSetWithConfig.CollectionConfig = configs
results[DigKey{
Namespace: dig.Namespace,
Collection: dig.Collection,
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
}] = pvtRWSetWithConfig
}
results.CollectionConfig = configs
return results, nil
}
......@@ -182,13 +226,13 @@ func (dr *dataRetriever) extractPvtRWsets(pvtRWSets []*rwset.NsPvtReadWriteSet,
for _, nsws := range pvtRWSets {
// and in each namespace - iterate over all collections
if nsws.Namespace != namespace {
logger.Warning("Received private data namespace", nsws.Namespace, "instead of", namespace, "skipping...")
logger.Warning("Received private data namespace ", nsws.Namespace, " instead of ", namespace, " skipping...")
continue
}
for _, col := range nsws.CollectionPvtRwset {
// This isn't the collection we're looking for
if col.CollectionName != collectionName {
logger.Warning("Received private data collection", col.CollectionName, "instead of", collectionName, "skipping...")
logger.Warning("Received private data collection ", col.CollectionName, " instead of ", collectionName, " skipping...")
continue
}
// Add the collection pRWset to the accumulated set
......
......@@ -12,6 +12,7 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/transientstore"
"github.com/hyperledger/fabric/gossip/privdata/mocks"
"github.com/hyperledger/fabric/protos/common"
gossip2 "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
......@@ -20,71 +21,6 @@ import (
"github.com/stretchr/testify/mock"
)
type mockedHistoryRetreiver struct {
mock.Mock
}
func (mock *mockedHistoryRetreiver) CollectionConfigAt(blockNum uint64, chaincodeName string) (*ledger.CollectionConfigInfo, error) {
args := mock.Called(blockNum, chaincodeName)
return args.Get(0).(*ledger.CollectionConfigInfo), args.Error(1)
}
func (mock *mockedHistoryRetreiver) MostRecentCollectionConfigBelow(blockNum uint64, chaincodeName string) (*ledger.CollectionConfigInfo, error) {
args := mock.Called(blockNum, chaincodeName)
return args.Get(0).(*ledger.CollectionConfigInfo), args.Error(1)
}
type mockedDataStore struct {
mock.Mock
}
func (ds *mockedDataStore) GetConfigHistoryRetriever() (ledger.ConfigHistoryRetriever, error) {
args := ds.Called()
return args.Get(0).(ledger.ConfigHistoryRetriever), args.Error(1)
}
func (ds *mockedDataStore) GetTxPvtRWSetByTxid(txid string, filter ledger.PvtNsCollFilter) (transientstore.RWSetScanner, error) {
args := ds.Called(txid, filter)
return args.Get(0).(transientstore.RWSetScanner), args.Error(1)
}
func (ds *mockedDataStore) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilter) ([]*ledger.TxPvtData, error) {
args := ds.Called(blockNum, filter)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).([]*ledger.TxPvtData), args.Error(1)
}
func (ds *mockedDataStore) LedgerHeight() (uint64, error) {
args := ds.Called()
return args.Get(0).(uint64), args.Error(1)
}
type mockedRWSetScanner struct {
mock.Mock
}
func (mock *mockedRWSetScanner) Close() {
}
func (mock *mockedRWSetScanner) Next() (*transientstore.EndorserPvtSimulationResults, error) {
args := mock.Called()
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*transientstore.EndorserPvtSimulationResults), args.Error(1)
}
func (mock *mockedRWSetScanner) NextWithConfig() (*transientstore.EndorserPvtSimulationResultsWithConfig, error) {
args := mock.Called()
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(*transientstore.EndorserPvtSimulationResultsWithConfig), args.Error(1)
}
/*
Test checks following scenario, it tries to obtain private data for
given block sequence which is greater than available ledger height,
......@@ -92,12 +28,13 @@ func (mock *mockedRWSetScanner) NextWithConfig() (*transientstore.EndorserPvtSim
*/
func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
t.Parallel()
dataStore := &mockedDataStore{}
dataStore := &mocks.DataStore{}
rwSetScanner := &mockedRWSetScanner{}
rwSetScanner := &mocks.RWSetScanner{}
namespace := "testChaincodeName1"
collectionName := "testCollectionName"
rwSetScanner.On("Close")
rwSetScanner.On("NextWithConfig").Return(&transientstore.EndorserPvtSimulationResultsWithConfig{
ReceivedAtBlockHeight: 2,
PvtSimulationResultsWithConfig: nil,
......@@ -107,24 +44,8 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
PvtRwset: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: collectionName,
Rwset: []byte{1, 2},
},
},
},
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: collectionName,
Rwset: []byte{3, 4},
},
},
},
pvtReadWriteSet(namespace, collectionName, []byte{1, 2}),
pvtReadWriteSet(namespace, collectionName, []byte{3, 4}),
},
},
CollectionConfigs: map[string]*common.CollectionConfigPackage{
......@@ -152,22 +73,30 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 2,
TxId: "testTxID",
SeqInBlock: 1,
})
}}, 2)
assertion := assert.New(t)
assertion.NoError(err)
assertion.NotNil(rwSets)
assertion.NotEmpty(rwSets.RWSet)
assertion.Equal(2, len(rwSets.RWSet))
assertion.NotEmpty(rwSets)
dig2pvtRWSet := rwSets[DigKey{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 2,
TxId: "testTxID",
SeqInBlock: 1,
}]
assertion.NotNil(dig2pvtRWSet)
pvtRWSets := dig2pvtRWSet.RWSet
assertion.Equal(2, len(pvtRWSets))
var mergedRWSet []byte
for _, rws := range rwSets.RWSet {
for _, rws := range pvtRWSets {
mergedRWSet = append(mergedRWSet, rws...)
}
......@@ -181,7 +110,7 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
*/
func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
t.Parallel()
dataStore := &mockedDataStore{}
dataStore := &mocks.DataStore{}
namespace := "testChaincodeName1"
collectionName := "testCollectionName"
......@@ -190,20 +119,8 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: collectionName,
Rwset: []byte{1, 2},
}},
},
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: collectionName,
Rwset: []byte{3, 4},
}},
},
pvtReadWriteSet(namespace, collectionName, []byte{1, 2}),
pvtReadWriteSet(namespace, collectionName, []byte{3, 4}),
},
},
SeqInBlock: 1,
......@@ -212,42 +129,37 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
dataStore.On("LedgerHeight").Return(uint64(10), nil)
dataStore.On("GetPvtDataByNum", uint64(5), mock.Anything).Return(result, nil)
historyRetreiver := &mockedHistoryRetreiver{}
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(&ledger.CollectionConfigInfo{
CollectionConfig: &common.CollectionConfigPackage{
Config: []*common.CollectionConfig{
{
Payload: &common.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &common.StaticCollectionConfig{
Name: collectionName,
},
},
},
},
},
}, nil)
historyRetreiver := &mocks.ConfigHistoryRetriever{}
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(newCollectionConfig(collectionName), nil)
dataStore.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)
retriever := NewDataRetriever(dataStore)
// Request digest for private data which is greater than current ledger height
// to make it query ledger for missed private data
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: uint64(5),
TxId: "testTxID",
SeqInBlock: 1,
})
}}, uint64(5))
assertion := assert.New(t)
assertion.NoError(err)
assertion.NotNil(rwSets)
assertion.NotEmpty(rwSets)
assertion.Equal(2, len(rwSets.RWSet))
pvtRWSet := rwSets[DigKey{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 5,
TxId: "testTxID",
SeqInBlock: 1,
}]
assertion.NotEmpty(pvtRWSet)
assertion.Equal(2, len(pvtRWSet.RWSet))
var mergedRWSet []byte
for _, rws := range rwSets.RWSet {
for _, rws := range pvtRWSet.RWSet {
mergedRWSet = append(mergedRWSet, rws...)
}
......@@ -256,7 +168,7 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) {
t.Parallel()
dataStore := &mockedDataStore{}
dataStore := &mocks.DataStore{}
namespace := "testChaincodeName1"
collectionName := "testCollectionName"
......@@ -269,22 +181,22 @@ func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: uint64(5),
TxId: "testTxID",
SeqInBlock: 1,
})
}}, uint64(5))
assertion := assert.New(t)
assertion.Error(err)
assertion.Nil(rwSets)
assertion.Empty(rwSets)
}
func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
t.Parallel()
dataStore := &mockedDataStore{}
dataStore := &mocks.DataStore{}
namespace := "testChaincodeName1"
collectionName := "testCollectionName"
......@@ -293,34 +205,10 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: collectionName,
Rwset: []byte{1},
}},
},
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: collectionName,
Rwset: []byte{2},
}},
},
{
Namespace: "invalidNamespace",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: collectionName,
Rwset: []byte{0, 0},
}},
},
{
Namespace: namespace,
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{{
CollectionName: "invalidCollectionName",
Rwset: []byte{0, 0},
}},
},
pvtReadWriteSet(namespace, collectionName, []byte{1}),
pvtReadWriteSet(namespace, collectionName, []byte{2}),
pvtReadWriteSet("invalidNamespace", collectionName, []byte{0, 0}),
pvtReadWriteSet(namespace, "invalidCollectionName", []byte{0, 0}),
},
},
SeqInBlock: 1,
......@@ -328,45 +216,172 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
dataStore.On("LedgerHeight").Return(uint64(10), nil)
dataStore.On("GetPvtDataByNum", uint64(5), mock.Anything).Return(result, nil)
historyRetreiver := &mockedHistoryRetreiver{}
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(&ledger.CollectionConfigInfo{
CollectionConfig: &common.CollectionConfigPackage{
Config: []*common.CollectionConfig{
{
Payload: &common.CollectionConfig_StaticCollectionConfig{
StaticCollectionConfig: &common.StaticCollectionConfig{
Name: collectionName,
},
},
},
},
},
}, nil)
historyRetreiver := &mocks.ConfigHistoryRetriever{}
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(newCollectionConfig(collectionName), nil)
dataStore.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)
retriever := NewDataRetriever(dataStore)
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet(&gossip2.PvtDataDigest{
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: uint64(5),
TxId: "testTxID",
SeqInBlock: 1,
})
}}, 5)
assertion := assert.New(t)
assertion.NoError(err)
assertion.NotNil(rwSets)
assertion.NotEmpty(rwSets)
assertion.Equal(2, len(rwSets.RWSet))
pvtRWSet := rwSets[DigKey{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 5,
TxId: "testTxID",
SeqInBlock: 1,
}]
assertion.NotEmpty(pvtRWSet)
assertion.Equal(2, len(pvtRWSet.RWSet))
var mergedRWSet []byte
for _, rws := range rwSets.RWSet {
for _, rws := range pvtRWSet.RWSet {
mergedRWSet = append(mergedRWSet, rws...)
}
assertion.Equal([]byte{1, 2}, mergedRWSet)
}
func TestNewDataRetriever_GetMultipleDigests(t *testing.T) {
t.Parallel()
dataStore := &mocks.DataStore{}
ns1, ns2 := "testChaincodeName1", "testChaincodeName2"
col1, col2 := "testCollectionName1", "testCollectionName2"
result := []*ledger.TxPvtData{
{
WriteSet: &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
pvtReadWriteSet(ns1, col1, []byte{1}),