Commit 65856efe authored by Artem Barger's avatar Artem Barger Committed by Gerrit Code Review
Browse files

Merge "[FAB-10302] Don't use protobuf as map keys"

parents bcf14bca 9c77fe5a
......@@ -94,10 +94,10 @@ type Coordinator interface {
Close()
}
type dig2sources map[*gossip2.PvtDataDigest][]*peer.Endorsement
type dig2sources map[DigKey][]*peer.Endorsement
func (d2s dig2sources) keys() []*gossip2.PvtDataDigest {
var res []*gossip2.PvtDataDigest
func (d2s dig2sources) keys() []DigKey {
var res []DigKey
for dig := range d2s {
res = append(res, dig)
}
......@@ -257,10 +257,10 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
}
func (c *coordinator) fetchFromPeers(blockSeq uint64, ownedRWsets map[rwSetKey][]byte, privateInfo *privateDataInfo) {
dig2src := make(map[*gossip2.PvtDataDigest][]*peer.Endorsement)
dig2src := make(map[DigKey][]*peer.Endorsement)
privateInfo.missingKeys.foreach(func(k rwSetKey) {
logger.Debug("Fetching", k, "from peers")
dig := &gossip2.PvtDataDigest{
dig := DigKey{
TxId: k.txID,
SeqInBlock: k.seqInBlock,
Collection: k.collection,
......
......@@ -39,6 +39,24 @@ func init() {
factory.InitFactories(nil)
}
// CollectionCriteria aggregates criteria of
// a collection
type CollectionCriteria struct {
Channel string
TxId string
Collection string
Namespace string
}
func fromCollectionCriteria(criteria common.CollectionCriteria) CollectionCriteria {
return CollectionCriteria{
TxId: criteria.TxId,
Collection: criteria.Collection,
Namespace: criteria.Namespace,
Channel: criteria.Channel,
}
}
type persistCall struct {
*mock.Call
store *mockTransientStore
......@@ -240,13 +258,13 @@ func (v *validatorMock) Validate(block *common.Block) error {
return nil
}
type digests []*proto.PvtDataDigest
type digests []DigKey
func (d digests) Equal(other digests) bool {
flatten := func(d digests) map[proto.PvtDataDigest]struct{} {
m := map[proto.PvtDataDigest]struct{}{}
flatten := func(d digests) map[DigKey]struct{} {
m := map[DigKey]struct{}{}
for _, dig := range d {
m[*dig] = struct{}{}
m[dig] = struct{}{}
}
return m
}
......@@ -271,8 +289,8 @@ func (fc *fetchCall) expectingEndorsers(orgs ...string) *fetchCall {
return fc
}
func (fc *fetchCall) expectingDigests(dig []*proto.PvtDataDigest) *fetchCall {
fc.fetcher.expectedDigests = dig
func (fc *fetchCall) expectingDigests(digests []DigKey) *fetchCall {
fc.fetcher.expectedDigests = digests
return fc
}
......@@ -284,7 +302,7 @@ func (fc *fetchCall) Return(returnArguments ...interface{}) *mock.Call {
type fetcherMock struct {
t *testing.T
mock.Mock
expectedDigests []*proto.PvtDataDigest
expectedDigests []DigKey
expectedEndorsers map[string]struct{}
}
......@@ -318,8 +336,8 @@ func (f *fetcherMock) fetch(dig2src dig2sources, _ uint64) (*FetchedPvtDataConta
func createcollectionStore(expectedSignedData common.SignedData) *collectionStore {
return &collectionStore{
expectedSignedData: expectedSignedData,
policies: make(map[collectionAccessPolicy]common.CollectionCriteria),
store: make(map[common.CollectionCriteria]collectionAccessPolicy),
policies: make(map[collectionAccessPolicy]CollectionCriteria),
store: make(map[CollectionCriteria]collectionAccessPolicy),
}
}
......@@ -327,8 +345,8 @@ type collectionStore struct {
expectedSignedData common.SignedData
acceptsAll bool
lenient bool
store map[common.CollectionCriteria]collectionAccessPolicy
policies map[collectionAccessPolicy]common.CollectionCriteria
store map[CollectionCriteria]collectionAccessPolicy
policies map[collectionAccessPolicy]CollectionCriteria
}
func (cs *collectionStore) thatAcceptsAll() *collectionStore {
......@@ -341,7 +359,7 @@ func (cs *collectionStore) andIsLenient() *collectionStore {
return cs
}
func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collectionStore {
func (cs *collectionStore) thatAccepts(cc CollectionCriteria) *collectionStore {
sp := collectionAccessPolicy{
cs: cs,
n: util.RandomUInt64(),
......@@ -352,7 +370,7 @@ func (cs *collectionStore) thatAccepts(cc common.CollectionCriteria) *collection
}
func (cs *collectionStore) RetrieveCollectionAccessPolicy(cc common.CollectionCriteria) (privdata.CollectionAccessPolicy, error) {
if sp, exists := cs.store[cc]; exists {
if sp, exists := cs.store[fromCollectionCriteria(cc)]; exists {
return &sp, nil
}
if cs.acceptsAll || cs.lenient {
......@@ -818,7 +836,7 @@ func TestCoordinatorToFilterOutPvtRWSetsWithWrongHash(t *testing.T) {
Validator: &validatorMock{},
}, peerSelfSignedData)
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx1", Namespace: "ns1", Collection: "c1", BlockSeq: 1,
},
......@@ -941,7 +959,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// but it is also missing ns2: c1, and that data doesn't exist in the transient store - but in a peer.
// Additionally, the coordinator should pass an endorser identity of org1, but not of org2, since
// the MemberOrgs() call doesn't return org2 but only org0 and org1.
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx1", Namespace: "ns1", Collection: "c2", BlockSeq: 1,
},
......@@ -995,7 +1013,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// In this case, we should try to fetch data from peers.
block = bf.AddTxn("tx3", "ns3", hash, "c3").create()
fetcher = &fetcherMock{t: t}
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx3", Namespace: "ns3", Collection: "c3", BlockSeq: 1,
},
......@@ -1044,7 +1062,7 @@ func TestCoordinatorStoreBlock(t *testing.T) {
// private data from the transient store or peers, and in fact- if it attempts to fetch the data it's not eligible
// for from the transient store or from peers - the test would fail because the Mock wasn't initialized.
block = bf.AddTxn("tx3", "ns3", hash, "c3", "c2", "c1").AddTxn("tx1", "ns1", hash, "c1").create()
cs = createcollectionStore(peerSelfSignedData).thatAccepts(common.CollectionCriteria{
cs = createcollectionStore(peerSelfSignedData).thatAccepts(CollectionCriteria{
TxId: "tx3",
Collection: "c3",
Namespace: "ns3",
......@@ -1126,7 +1144,7 @@ func TestProceedWithoutPrivateData(t *testing.T) {
fetcher := &fetcherMock{t: t}
// Have the peer return in response to the pull, a private data with a non matching hash
fetcher.On("fetch", mock.Anything).expectingDigests([]*proto.PvtDataDigest{
fetcher.On("fetch", mock.Anything).expectingDigests([]DigKey{
{
TxId: "tx1", Namespace: "ns3", Collection: "c2", BlockSeq: 1,
},
......@@ -1191,7 +1209,7 @@ func TestCoordinatorGetBlocks(t *testing.T) {
// Green path - block and private data is returned, but the requester isn't eligible for all the private data,
// but only to a subset of it.
cs = createcollectionStore(sd).thatAccepts(common.CollectionCriteria{
cs = createcollectionStore(sd).thatAccepts(CollectionCriteria{
Namespace: "ns1",
Collection: "c2",
TxId: "tx1",
......
......@@ -35,7 +35,8 @@ const (
btlPullMarginDefault = 10
)
// DigKey
// DigKey defines a digest that
// specifies a specific hashed RWSet
type DigKey struct {
TxId string
Namespace string
......@@ -244,9 +245,15 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon
purgedPvt := p.getPurgedCollections(members, dig2Filter, blockSeq)
// Need to remove purged digest from mapping
for _, dig := range purgedPvt {
res.PurgedElements = append(res.PurgedElements, dig)
res.PurgedElements = append(res.PurgedElements, &proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
// remove digest so we won't even try to pull purged data
delete(dig2Filter, *dig)
delete(dig2Filter, dig)
itemsLeftToCollect--
}
......@@ -271,7 +278,13 @@ func (p *puller) fetch(dig2src dig2sources, blockSeq uint64) (*FetchedPvtDataCon
logger.Debug("Got empty response for", resp.Digest)
continue
}
delete(dig2Filter, *resp.Digest)
delete(dig2Filter, DigKey{
TxId: resp.Digest.TxId,
BlockSeq: resp.Digest.BlockSeq,
SeqInBlock: resp.Digest.SeqInBlock,
Namespace: resp.Digest.Namespace,
Collection: resp.Digest.Collection,
})
itemsLeftToCollect--
}
res.AvailableElemenets = append(res.AvailableElemenets, responses...)
......@@ -360,7 +373,13 @@ func (p *puller) assignDigestsToPeers(members []discovery.NetworkMember, dig2Fil
}
// Add the peer to the mapping from peer to digest slice
peer := remotePeer{pkiID: string(selectedPeer.PKIID), endpoint: selectedPeer.Endpoint}
res[peer] = append(res[peer], dig)
res[peer] = append(res[peer], proto.PvtDataDigest{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
})
}
var noneSelectedPeers []discovery.NetworkMember
......@@ -379,7 +398,7 @@ type collectionRoutingFilter struct {
endorser filter.RoutingFilter
}
type digestToFilterMapping map[proto.PvtDataDigest]collectionRoutingFilter
type digestToFilterMapping map[DigKey]collectionRoutingFilter
func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter {
var filters []filter.RoutingFilter
......@@ -393,7 +412,13 @@ func (dig2f digestToFilterMapping) flattenFilterValues() []filter.RoutingFilter
func (dig2f digestToFilterMapping) digests() []proto.PvtDataDigest {
var digs []proto.PvtDataDigest
for d := range dig2f {
digs = append(digs, d)
digs = append(digs, proto.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
SeqInBlock: d.SeqInBlock,
Namespace: d.Namespace,
Collection: d.Collection,
})
}
return digs
}
......@@ -412,8 +437,15 @@ func (dig2f digestToFilterMapping) String() string {
}
func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, error) {
filters := make(map[proto.PvtDataDigest]collectionRoutingFilter)
filters := make(map[DigKey]collectionRoutingFilter)
for digest, sources := range dig2src {
digKey := DigKey{
TxId: digest.TxId,
BlockSeq: digest.BlockSeq,
SeqInBlock: digest.SeqInBlock,
Namespace: digest.Namespace,
Collection: digest.Collection,
}
cc := fcommon.CollectionCriteria{
Channel: p.channel,
TxId: digest.TxId,
......@@ -453,7 +485,7 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
return nil, errors.WithStack(err)
}
filters[*digest] = collectionRoutingFilter{
filters[digKey] = collectionRoutingFilter{
anyPeer: anyPeerInCollection,
endorser: endorserPeer,
}
......@@ -462,9 +494,9 @@ func (p *puller) computeFilters(dig2src dig2sources) (digestToFilterMapping, err
}
func (p *puller) getPurgedCollections(members []discovery.NetworkMember,
dig2Filter digestToFilterMapping, blockSeq uint64) []*proto.PvtDataDigest {
dig2Filter digestToFilterMapping, blockSeq uint64) []DigKey {
var res []*proto.PvtDataDigest
var res []DigKey
for dig := range dig2Filter {
dig := dig
......@@ -480,13 +512,13 @@ func (p *puller) getPurgedCollections(members []discovery.NetworkMember,
logger.Debugf("Private data on channel [%s], chaincode [%s], collection name [%s] for txID = [%s],"+
"has been purged at peers [%v]", p.channel, dig.Namespace,
dig.Collection, dig.TxId, membersWithPurgedData)
res = append(res, &dig)
res = append(res, dig)
}
}
return res
}
func (p *puller) purgedFilter(dig proto.PvtDataDigest, blockSeq uint64) (filter.RoutingFilter, error) {
func (p *puller) purgedFilter(dig DigKey, blockSeq uint64) (filter.RoutingFilter, error) {
cc := fcommon.CollectionCriteria{
Channel: p.channel,
TxId: dig.TxId,
......
......@@ -311,7 +311,7 @@ func TestPullerFromOnly1Peer(t *testing.T) {
dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create(), uint64(1))
fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig)).toSources().create(), uint64(1))
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1])
fetched := []util.PrivateRWSet{rws1, rws2}
......@@ -356,7 +356,7 @@ func TestPullerDataNotAvailable(t *testing.T) {
})
dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create(), uint64(1))
fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig)).toSources().create(), uint64(1))
assert.Empty(t, fetchedMessages.AvailableElemenets)
assert.NoError(t, err)
}
......@@ -371,7 +371,7 @@ func TestPullerNoPeersKnown(t *testing.T) {
p1 := gn.newPuller("p1", policyStore, factoryMock)
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
d2s := dasf.mapDigest(&DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.Empty(t, fetchedMessages)
assert.Error(t, err)
......@@ -389,7 +389,7 @@ func TestPullPeerFilterError(t *testing.T) {
p1 := gn.newPuller("p1", policyStore, factoryMock)
gn.peers[0].On("PeerFilter", mock.Anything, mock.Anything).Return(nil, errors.New("Failed obtaining filter"))
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
d2s := dasf.mapDigest(&DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.Error(t, err)
assert.Contains(t, err.Error(), "Failed obtaining filter")
......@@ -457,7 +457,7 @@ func TestPullerPeerNotEligible(t *testing.T) {
p3 := gn.newPuller("p3", policyStore, factoryMock3)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig}, mock.Anything).Return(store, nil)
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&proto.PvtDataDigest{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
d2s := dasf.mapDigest(&DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.Empty(t, fetchedMessages.AvailableElemenets)
assert.NoError(t, err)
......@@ -550,7 +550,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig2}, mock.Anything).Return(store2, nil)
dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig1).toSources().mapDigest(dig2).toSources().create(), uint64(1))
fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig1)).toSources().mapDigest(toDigKey(dig2)).toSources().create(), uint64(1))
assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1])
......@@ -659,7 +659,7 @@ func TestPullerRetries(t *testing.T) {
// Fetch from someone
dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(dig).toSources().create(), uint64(1))
fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig)).toSources().create(), uint64(1))
assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
rws2 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[1])
......@@ -751,7 +751,7 @@ func TestPullerPreferEndorsers(t *testing.T) {
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", []*proto.PvtDataDigest{dig1}, uint64(0)).Return(store, nil)
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(dig1).toSources("p3").mapDigest(dig2).toSources().create()
d2s := dasf.mapDigest(toDigKey(dig1)).toSources("p3").mapDigest(toDigKey(dig2)).toSources().create()
fetchedMessages, err := p1.fetch(d2s, uint64(1))
assert.NoError(t, err)
rws1 := util.PrivateRWSet(fetchedMessages.AvailableElemenets[0].Payload[0])
......@@ -854,7 +854,7 @@ func TestPullerAvoidPullingPurgedData(t *testing.T) {
)
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(dig1).toSources("p3", "p2").mapDigest(dig2).toSources("p3").create()
d2s := dasf.mapDigest(toDigKey(dig1)).toSources("p3", "p2").mapDigest(toDigKey(dig2)).toSources("p3").create()
// trying to fetch missing pvt data for block seq 1
fetchedMessages, err := p1.fetch(d2s, uint64(1))
......@@ -934,7 +934,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) {
dataRetreiver := &counterDataRetreiver{PrivateDataRetriever: NewDataRetriever(dataStore), numberOfCalls: 0}
p2.PrivateDataRetriever = dataRetreiver
dig1 := &proto.PvtDataDigest{
dig1 := &DigKey{
TxId: "txID1",
Collection: col1,
Namespace: ns1,
......@@ -942,7 +942,7 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) {
SeqInBlock: 1,
}
dig2 := &proto.PvtDataDigest{
dig2 := &DigKey{
TxId: "txID1",
Collection: col2,
Namespace: ns2,
......@@ -959,3 +959,13 @@ func TestPullerIntegratedWithDataRetreiver(t *testing.T) {
assert.Equal(t, 2, len(fetchedMessages.AvailableElemenets[0].Payload))
assert.Equal(t, 2, len(fetchedMessages.AvailableElemenets[1].Payload))
}
func toDigKey(dig *proto.PvtDataDigest) *DigKey {
return &DigKey{
TxId: dig.TxId,
BlockSeq: dig.BlockSeq,
SeqInBlock: dig.SeqInBlock,
Namespace: dig.Namespace,
Collection: dig.Collection,
}
}
......@@ -13,7 +13,6 @@ import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/protos/common"
gossip2 "github.com/hyperledger/fabric/protos/gossip"
"github.com/hyperledger/fabric/protos/ledger/rwset"
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
"github.com/hyperledger/fabric/protos/msp"
......@@ -262,10 +261,10 @@ func (df *pvtDataFactory) create() []*ledger.TxPvtData {
type digestsAndSourceFactory struct {
d2s dig2sources
lastDig *gossip2.PvtDataDigest
lastDig *DigKey
}
func (f *digestsAndSourceFactory) mapDigest(dig *gossip2.PvtDataDigest) *digestsAndSourceFactory {
func (f *digestsAndSourceFactory) mapDigest(dig *DigKey) *digestsAndSourceFactory {
f.lastDig = dig
return f
}
......@@ -280,7 +279,7 @@ func (f *digestsAndSourceFactory) toSources(peers ...string) *digestsAndSourceFa
Endorser: []byte(p),
})
}
f.d2s[f.lastDig] = endorsements
f.d2s[*f.lastDig] = endorsements
return f
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment