Commit 9c77fe5a authored by yacovm's avatar yacovm
Browse files

[FAB-10302] Don't use protobuf as map keys



PvtDataDigest is a protobuf message that is used in the gossip private data
implementation.

The latest version of protobuf generates functions to protobuf structs,
which prevents using protobuf messages as map keys.

This change set moves the code to use an equivalent struct
that has the same fields but without functions.

Change-Id: Icdfdffb98625cb7ace37d6c0227906971e05fcc7
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 719efe2f
...@@ -94,10 +94,10 @@ type Coordinator interface { ...@@ -94,10 +94,10 @@ type Coordinator interface {
Close() Close()
} }
type dig2sources map[*gossip2.PvtDataDigest][]*peer.Endorsement type dig2sources map[DigKey][]*peer.Endorsement
func (d2s dig2sources) keys() []*gossip2.PvtDataDigest { func (d2s dig2sources) keys() []DigKey {
var res []*gossip2.PvtDataDigest var res []DigKey
for dig := range d2s { for dig := range d2s {
res = append(res, dig) res = append(res, dig)
} }
...@@ -257,10 +257,10 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa ...@@ -257,10 +257,10 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
} }
func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) { func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {
dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement) dig2src := make(map[DigKey][]*peer.Endorsement)
privateInfo.missingKeys.foreach(func(k rwSetKey) { privateInfo.missingKeys.foreach(func(k rwSetKey) {
logger.Debug("Fetching", k, "from peers") logger.Debug("Fetching", k, "from peers")
dig := &gossip2.PvtDataDigest{ dig := DigKey{
TxId: k.txID, TxId: k.txID,
SeqInBlock: k.seqInBlock, SeqInBlock: k.seqInBlock,
Collection: k.collection, Collection: k.collection,
......
...@@ -39,6 +39,24 @@ func init() { ...@@ -39,6 +39,24 @@ func init() {
factory.InitFactories(nil) factory.InitFactories(nil)
} }
// CollectionCriteria aggregates criteria of
// a collection
type CollectionCriteria struct {
Channel string
TxId string
Collection string
Namespace string
}
func fromCollectionCriteria(criteria common.CollectionCriteria) CollectionCriteria {
return CollectionCriteria{
TxId: criteria.TxId,
Collection: criteria.Collection,
Namespace: criteria.Namespace,
Channel: criteria.Channel,
}
}
type persistCall struct { type persistCall struct {
*mock.Call *mock.Call
store *mockTransientStore store *mockTransientStore
...@@ -240,13 +258,13 @@ func (v *validatorMock) Validate(block *common.Block) error { ...@@ -240,13 +258,13 @@ func (v *validatorMock) Validate(block *common.Block) error {
return nil return nil
} }
type digests []*proto.PvtDataDigest type digests []DigKey
func (d digests) Equal(other digests) bool { func (d digests) Equal(other digests) bool {
flatten := func(d digests) map[proto.PvtDataDigest]struct{} { flatten := func(d digests) map[DigKey]struct{} {
m := map[proto.PvtDataDigest]struct{}{} m := map[DigKey]struct{}{}
for _, dig := range d { for _, dig := range d {
m[*dig] = struct{}{} m[dig] = struct{}{}
} }
return m return m
} }
...@@ -271,8 +289,8 @@ func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall { ...@@ -271,8 +289,8 @@ func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall {
return fc return fc
} }
func (fc *fetchCall) expectingDigests(dig []*proto.PvtDataDigest) *fetchCall { func (fc *fetchCall) expectingDigests(digests []DigKey) *fetchCall {
fc.fetcher.expectedDigests = dig fc.fetcher.expectedDigests = digests
return fc return fc
} }
...@@ -284,7 +302,7 @@ func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call { ...@@ -284,7 +302,7 @@ func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call {
type fetcherMock struct { type fetcherMock struct {
t *testing.T t *testing.T
mock.Mock mock.Mock
expectedDigests []*proto.PvtDataDigest expectedDigests []DigKey
expectedEndorsers map[string]struct{} expectedEndorsers map[string]struct{}
} }
...@@ -318,8 +336,8 @@ func (f *fetcherMock) fetch(dig2src dig2sources, _ uint64) (*FetchedPvtDataConta ...@@ -318,8 +336,8 @@ func (f *fetcherMock) fetch(dig2src dig2sources, _ uint64) (*FetchedPvtDataConta
func createcollectionStore(expectedSignedData common.SignedData) *collectionStore { func createcollectionStore(expectedSignedData common.SignedData) *collectionStore {
return &collectionStore{ return &collectionStore{
expectedSignedData: expectedSignedData, expectedSignedData: expectedSignedData,
policies: make(map[collectionAccessPolicy]common.CollectionCriteria), policies: make(map[collectionAccessPolicy]CollectionCriteria),
store: make(map[common.CollectionCriteria]collectionAccessPolicy), store: make(map[CollectionCriteria]collectionAccessPolicy),
} }
} }
...@@ -327,8 +345,8 @@ type collectionStore struct { ...@@ -327,8 +345,8 @@ type collectionStore struct {
expectedSignedData common.SignedData expectedSignedData common.SignedData
acceptsAll bool acceptsAll bool
lenient bool lenient bool
store map[common.CollectionCriteria]collectionAccessPolicy store map[CollectionCriteria]collectionAccessPolicy
policies map[collectionAccessPolicy]common.CollectionCriteria policies map[collectionAccessPolicy]CollectionCriteria
} }
func (cs *collectionStore) thatAcceptsAll() *collectionStore { func (cs *collectionStore) thatAcceptsAll() *collectionStore {
...@@ -341,7 +359,7 @@ func (cs *collectionStore) andIsLenient() *collectionStore { ...@@ -341,7 +359,7 @@ func (cs *collectionStore) andIsLenient() *collectionStore {
return cs return cs
} }
func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collectionStore { func (cs *collectionStore) thatAccepts(cc CollectionCriteria) *collectionStore {
sp := collectionAccessPolicy{ sp := collectionAccessPolicy{
cs: cs, cs: cs,
n: util.RandomUInt64(), n: util.RandomUInt64(),
...@@ -352,7 +370,7 @@ func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collection ...@@ -352,7 +370,7 @@ func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collection
} }
func (cs *collectionStore) RetrieveCollectionAccessPolicy(cc common.CollectionCriteria) (privdata.CollectionAccessPolicy, error) { func (cs *collectionStore) RetrieveCollectionAccessPolicy(cc common.CollectionCriteria) (privdata.CollectionAccessPolicy, error) {
if sp, exists := cs.store[cc]; exists { if sp, exists := cs.store[fromCollectionCriteria(cc)]; exists {
return &sp, nil return &sp, nil
} }
if cs.acceptsAll || cs.lenient { if cs.acceptsAll || cs.lenient {
...@@ -818,7 +836,7 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) { ...@@ -818,7 +836,7 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
Validator: &validatorMock{}, Validator: &validatorMock{},
}, peerSelfSignedData) }, peerSelfSignedData)
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{ {
TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1, TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1,
}, },
...@@ -941,7 +959,7 @@ func TestCoordinatorStoreBlock(t *testing.T) { ...@@ -941,7 +959,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer. // but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer.
// Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since // Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since
// the MemberOrgs() call doesn't return org2 but only org0 and org1. // the MemberOrgs() call doesn't return org2 but only org0 and org1.
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{ {
TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1, TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1,
}, },
...@@ -995,7 +1013,7 @@ func TestCoordinatorStoreBlock(t *testing.T) { ...@@ -995,7 +1013,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// In this case, we should try to fetch data from peers. // In this case, we should try to fetch data from peers.
block = bf.AddTxn("tx3", "ns3", hash, "c3").create() block = bf.AddTxn("tx3", "ns3", hash, "c3").create()
fetcher = &fetcherMock{t: t} fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{ {
TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1, TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1,
}, },
...@@ -1044,7 +1062,7 @@ func TestCoordinatorStoreBlock(t *testing.T) { ...@@ -1044,7 +1062,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible // private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible
// for from the transient store or from peers - the test would fail because the Mock wasn't initialized. // for from the transient store or from peers - the test would fail because the Mock wasn't initialized.
block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create() block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create()
cs = createcollectionStore(peerSelfSignedData).thatAccepts(common.CollectionCriteria{ cs = createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{
TxId: "tx3", TxId: "tx3",
Collection: "c3", Collection: "c3",
Namespace: "ns3", Namespace: "ns3",
...@@ -1126,7 +1144,7 @@ func TestProceedWithoutPrivateData(t *testing.T) { ...@@ -1126,7 +1144,7 @@ func TestProceedWithoutPrivateData(t *testing.T) {
fetcher := &fetcherMock{t: t} fetcher := &fetcherMock{t: t}
// Have the peer return in response to the pull, a private data with a non matching hash // Have the peer return in response to the pull, a private data with a non matching hash
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{ fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{ {
TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1, TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1,
}, },
...@@ -1191,7 +1209,7 @@ func TestCoordinatorGetBlocks(t *testing.T) { ...@@ -1191,7 +1209,7 @@ func TestCoordinatorGetBlocks(t *testing.T) {
// Green path - block and private data is returned, but the requester isn't eligible for all the private data, // Green path - block and private data is returned, but the requester isn't eligible for all the private data,
// but only to a subset of it. // but only to a subset of it.
cs = createcollectionStore(sd).thatAccepts(common.CollectionCriteria{ cs = createcollectionStore(sd).thatAccepts(CollectionCriteria{
Namespace: "ns1", Namespace: "ns1",
Collection: "c2", Collection: "c2",
TxId: "tx1", TxId: "tx1",
......
...@@ -35,7 +35,8 @@ const ( ...@@ -35,7 +35,8 @@ const (
btlPullMarginDefault = 10 btlPullMarginDefault = 10
) )
// DigKey // DigKey defines a digest that
// specifies a specific hashed RWSet
type DigKey struct { type DigKey struct {
TxId string TxId string
Namespace string Namespace string
...@@ -244,9 +245,15 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon ...@@ -244,9 +245,15 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon
purgedPvt := p.getPurgedCollections(members, dig2Filter, blockSeq) purgedPvt := p.getPurgedCollections(members, dig2Filter, blockSeq)
// Need to remove purged digest from mapping // Need to remove purged digest from mapping
for _, dig := range purgedPvt { for _, dig := range purgedPvt {
res.PurgedElements = append(res.PurgedElements, dig) res.PurgedElements = append(res.PurgedElements, &proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
// remove digest so we won't even try to pull purged data // remove digest so we won't even try to pull purged data
delete(dig2Filter, *dig) delete(dig2Filter, dig)
itemsLeftToCollect-- itemsLeftToCollect--
} }
...@@ -271,7 +278,13 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon ...@@ -271,7 +278,13 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon
logger.Debug("Got empty response for", resp.Digest) logger.Debug("Got empty response for", resp.Digest)
continue continue
} }
delete(dig2Filter, *resp.Digest) delete(dig2Filter, DigKey{
TxId: resp.Digest.TxId,
BlockSeq: resp.Digest.BlockSeq,
SeqInBlock: resp.Digest.SeqInBlock,
Namespace: resp.Digest.Namespace,
Collection: resp.Digest.Collection,
})
itemsLeftToCollect-- itemsLeftToCollect--
} }
res.AvailableElemenets = append(res.AvailableElemenets, responses...) res.AvailableElemenets = append(res.AvailableElemenets, responses...)
...@@ -360,7 +373,13 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil ...@@ -360,7 +373,13 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
} }
// Add the peer to the mapping from peer to digest slice // Add the peer to the mapping from peer to digest slice
peer := remotePeer{pkiID: string(selectedPeer.PKIID), endpoint: selectedPeer.Endpoint} peer := remotePeer{pkiID: string(selectedPeer.PKIID), endpoint: selectedPeer.Endpoint}
res[peer] = append(res[peer], dig) res[peer] = append(res[peer], proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
} }
var noneSelectedPeers []discovery.NetworkMember var noneSelectedPeers []discovery.NetworkMember
...@@ -379,7 +398,7 @@ type collectionRoutingFilter struct { ...@@ -379,7 +398,7 @@ type collectionRoutingFilter struct {
endorser filter.RoutingFilter endorser filter.RoutingFilter
} }
type digestToFilterMapping map[proto.PvtDataDigest]collectionRoutingFilter type digestToFilterMapping map[DigKey]collectionRoutingFilter
func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter { func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter {
var filters []filter.RoutingFilter var filters []filter.RoutingFilter
...@@ -393,7 +412,13 @@ func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter ...@@ -393,7 +412,13 @@ func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter
func (dig2f digestToFilterMapping) digests() []proto.PvtDataDigest { func (dig2f digestToFilterMapping) digests() []proto.PvtDataDigest {
var digs []proto.PvtDataDigest var digs []proto.PvtDataDigest
for d := range dig2f { for d := range dig2f {
digs = append(digs, d) digs = append(digs, proto.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
SeqInBlock: d.SeqInBlock,
Namespace: d.Namespace,
Collection: d.Collection,
})
} }
return digs return digs
} }
...@@ -412,8 +437,15 @@ func (dig2f digestToFilterMapping) String() string { ...@@ -412,8 +437,15 @@ func (dig2f digestToFilterMapping) String() string {
} }
func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) { func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {
filters := make(map[proto.PvtDataDigest]collectionRoutingFilter) filters := make(map[DigKey]collectionRoutingFilter)
for digest, sources := range dig2src { for digest, sources := range dig2src {
digKey := DigKey{
TxId: digest.TxId,
BlockSeq: digest.BlockSeq,
SeqInBlock: digest.SeqInBlock,
Namespace: digest.Namespace,
Collection: digest.Collection,
}
cc := fcommon.CollectionCriteria{ cc := fcommon.CollectionCriteria{
Channel: p.channel, Channel: p.channel,
TxId: digest.TxId, TxId: digest.TxId,
...@@ -453,7 +485,7 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err ...@@ -453,7 +485,7 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
filters[*digest] = collectionRoutingFilter{ filters[digKey] = collectionRoutingFilter{
anyPeer: anyPeerInCollection, anyPeer: anyPeerInCollection,
endorser: endorserPeer, endorser: endorserPeer,
} }
...@@ -462,9 +494,9 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err ...@@ -462,9 +494,9 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
} }
func (p *puller) getPurgedCollections(members []discovery.NetworkMember, func (p *puller) getPurgedCollections(members []discovery.NetworkMember,
dig2Filter digestToFilterMapping, blockSeq uint64) []*proto.PvtDataDigest { dig2Filter digestToFilterMapping, blockSeq uint64) []DigKey {
var res []*proto.PvtDataDigest var res []DigKey
for dig := range dig2Filter { for dig := range dig2Filter {
dig := dig dig := dig
...@@ -480,13 +512,13 @@ func (p *puller) getPurgedCollections(members []discovery.NetworkMember, ...@@ -480,13 +512,13 @@ func (p *puller) getPurgedCollections(members []discovery.NetworkMember,
logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+ logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+
"has been purged at peers [%v]", p.channel, dig.Namespace, "has been purged at peers [%v]", p.channel, dig.Namespace,
dig.Collection, dig.TxId, membersWithPurgedData) dig.Collection, dig.TxId, membersWithPurgedData)
res = append(res, &dig) res = append(res, dig)
} }
} }
return res return res
} }
func (p *puller) purgedFilter(dig proto.PvtDataDigest, blockSeq uint64) (filter.RoutingFilter, error) { func (p *puller) purgedFilter(dig DigKey, blockSeq uint64) (filter.RoutingFilter, error) {
cc := fcommon.CollectionCriteria{ cc := fcommon.CollectionCriteria{
Channel: p.channel, Channel: p.channel,
TxId: dig.TxId, TxId: dig.TxId,
......
...@@ -311,7 +311,7 @@ func TestPullerFromOnly1Peer(t *testing.T) { ...@@ -311,7 +311,7 @@ func TestPullerFromOnly1Peer(t *testing.T) {
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create(), uint64(1)) fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig)).toSources().create(), uint64(1))
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0]) rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1]) rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1])
fetched := []util.PrivateRWSet{rws1, rws2} fetched := []util.PrivateRWSet{rws1, rws2}
...@@ -356,7 +356,7 @@ func TestPullerDataNotAvailable(t *testing.T) { ...@@ -356,7 +356,7 @@ func TestPullerDataNotAvailable(t *testing.T) {
}) })
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create(), uint64(1)) fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig)).toSources().create(), uint64(1))
assert.Empty(t, fetchedMessages.AvailableElemenets) assert.Empty(t, fetchedMessages.AvailableElemenets)
assert.NoError(t, err) assert.NoError(t, err)
} }
...@@ -371,7 +371,7 @@ func TestPullerNoPeersKnown(t *testing.T) { ...@@ -371,7 +371,7 @@ func TestPullerNoPeersKnown(t *testing.T) {
p1 := gn.newPuller("p1", policyStore, factoryMock) p1 := gn.newPuller("p1", policyStore, factoryMock)
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() d2s := dasf.mapDigest(&DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1)) fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.Empty(t, fetchedMessages) assert.Empty(t, fetchedMessages)
assert.Error(t, err) assert.Error(t, err)
...@@ -389,7 +389,7 @@ func TestPullPeerFilterError(t *testing.T) { ...@@ -389,7 +389,7 @@ func TestPullPeerFilterError(t *testing.T) {
p1 := gn.newPuller("p1", policyStore, factoryMock) p1 := gn.newPuller("p1", policyStore, factoryMock)
gn.peers[0].On("PeerFilter", mock.Anything, mock.Anything).Return(nil, errors.New("Failed obtaining filter")) gn.peers[0].On("PeerFilter", mock.Anything, mock.Anything).Return(nil, errors.New("Failed obtaining filter"))
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() d2s := dasf.mapDigest(&DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1)) fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.Error(t, err) assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed obtaining filter") assert.Contains(t, err.Error(), "Failed obtaining filter")
...@@ -457,7 +457,7 @@ func TestPullerPeerNotEligible(t *testing.T) { ...@@ -457,7 +457,7 @@ func TestPullerPeerNotEligible(t *testing.T) {
p3 := gn.newPuller("p3", policyStore, factoryMock3) p3 := gn.newPuller("p3", policyStore, factoryMock3)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig}, mock.Anything).Return(store, nil) p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig}, mock.Anything).Return(store, nil)
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create() d2s := dasf.mapDigest(&DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1)) fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.Empty(t, fetchedMessages.AvailableElemenets) assert.Empty(t, fetchedMessages.AvailableElemenets)
assert.NoError(t, err) assert.NoError(t, err)
...@@ -550,7 +550,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) { ...@@ -550,7 +550,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig2}, mock.Anything).Return(store2, nil) p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig2}, mock.Anything).Return(store2, nil)
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig1).toSources().mapDigest(dig2).toSources().create(), uint64(1)) fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig1)).toSources().mapDigest(toDigKey(dig2)).toSources().create(), uint64(1))
assert.NoError(t, err) assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0]) rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1]) rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1])
...@@ -659,7 +659,7 @@ func TestPullerRetries(t *testing.T) { ...@@ -659,7 +659,7 @@ func TestPullerRetries(t *testing.T) {
// Fetch from someone // Fetch from someone
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create(), uint64(1)) fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig)).toSources().create(), uint64(1))
assert.NoError(t, err) assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0]) rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1]) rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1])
...@@ -751,7 +751,7 @@ func TestPullerPreferEndorsers(t *testing.T) { ...@@ -751,7 +751,7 @@ func TestPullerPreferEndorsers(t *testing.T) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig1}, uint64(0)).Return(store, nil) p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig1}, uint64(0)).Return(store, nil)
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(dig1).toSources("p3").mapDigest(dig2).toSources().create() d2s := dasf.mapDigest(toDigKey(dig1)).toSources("p3").mapDigest(toDigKey(dig2)).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1)) fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.NoError(t, err) assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0]) rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
...@@ -854,7 +854,7 @@ func TestPullerAvoidPullingPurgedData(t *testing.T) { ...@@ -854,7 +854,7 @@ func TestPullerAvoidPullingPurgedData(t *testing.T) {
) )
dasf := &digestsAndSourceFactory{} dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(dig1).toSources("p3", "p2").mapDigest(dig2).toSources("p3").create() d2s := dasf.mapDigest(toDigKey(dig1)).toSources("p3", "p2").mapDigest(toDigKey(dig2)).toSources("p3").create()
// trying to fetch missing pvt data for block seq 1 // trying to fetch missing pvt data for block seq 1
fetchedMessages, err := p1.fetch(d2s, uint64(1)) fetchedMessages, err := p1.fetch(d2s, uint64(1))
...@@ -934,7 +934,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) { ...@@ -934,7 +934,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) {
dataRetreiver := &counterDataRetreiver{PrivateDataRetriever: NewDataRetriever(dataStore), numberOfCalls: 0} dataRetreiver := &counterDataRetreiver{PrivateDataRetriever: NewDataRetriever(dataStore), numberOfCalls: 0}
p2.PrivateDataRetriever = dataRetreiver p2.PrivateDataRetriever = dataRetreiver
dig1 := &proto.PvtDataDigest{ dig1 := &DigKey{
TxId: "txID1", TxId: "txID1",
Collection: col1, Collection: col1,
Namespace: ns1, Namespace: ns1,
...@@ -942,7 +942,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) { ...@@ -942,7 +942,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) {
SeqInBlock: 1, SeqInBlock: 1,
} }
dig2 := &proto.PvtDataDigest{ dig2 := &DigKey{
TxId: "txID1", TxId: "txID1",
Collection: col2, Collection: col2,
Namespace: ns2, Namespace: ns2,
...@@ -959,3 +959,13 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) { ...@@ -959,3 +959,13 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) {
assert.Equal(t, 2, len(fetchedMessages.AvailableElemenets[0].Payload)) assert.Equal(t, 2, len(fetchedMessages.AvailableElemenets[0].Payload))
assert.Equal(t, 2, len(fetchedMessages.AvailableElemenets[1].Payload))