Commit 7851d469 authored by Manish Sethi's avatar Manish Sethi Committed by Gerrit Code Review
Browse files

Merge "[FAB-13013] fix code review comments of CR#26293"

parents e5745e6b fefb5a77
......@@ -21,8 +21,8 @@ import (
// StorageDataRetriever defines an API to retrieve private date from the storage
type StorageDataRetriever interface {
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil
CollectionRWSet(dig []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error)
// available otherwise returns nil, bool which is true if data fetched from ledger and false if was fetched from transient store, and an error
CollectionRWSet(dig []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error)
}
//go:generate mockery -dir . -name DataStore -case underscore -output mocks/
......@@ -59,12 +59,12 @@ func NewDataRetriever(store DataStore) StorageDataRetriever {
}
// CollectionRWSet retrieves for give digest relevant private data if
// available otherwise returns nil
func (dr *dataRetriever) CollectionRWSet(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
// available otherwise returns nil, bool which is true if data fetched from ledger and false if was fetched from transient store, and an error
func (dr *dataRetriever) CollectionRWSet(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error) {
height, err := dr.store.LedgerHeight()
if err != nil {
// if there is an error getting info from the ledger, we need to try to read from transient store
return nil, fmt.Errorf("wasn't able to read ledger height, due to %s", err)
return nil, false, fmt.Errorf("wasn't able to read ledger height, due to %s", err)
}
if height <= blockNum {
logger.Debug("Current ledger height ", height, "is below requested block sequence number",
......@@ -94,10 +94,11 @@ func (dr *dataRetriever) CollectionRWSet(digests []*gossip2.PvtDataDigest, block
}] = pvtRWSet
}
return results, nil
return results, false, nil
}
// Since ledger height is above block sequence number private data is might be available in the ledger
return dr.fromLedger(digests, blockNum)
results, err := dr.fromLedger(digests, blockNum)
return results, true, err
}
func (dr *dataRetriever) fromLedger(digests []*gossip2.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
......
......@@ -74,7 +74,7 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 2,
......@@ -138,7 +138,7 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query ledger for missed private data
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: uint64(5),
......@@ -182,7 +182,7 @@ func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: uint64(5),
......@@ -225,7 +225,7 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: uint64(5),
......@@ -311,7 +311,7 @@ func TestNewDataRetriever_GetMultipleDigests(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Collection: col1,
BlockSeq: uint64(5),
......@@ -382,7 +382,7 @@ func TestNewDataRetriever_EmptyWriteSet(t *testing.T) {
retriever := NewDataRetriever(dataStore)
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Collection: col1,
BlockSeq: uint64(5),
......@@ -432,7 +432,7 @@ func TestNewDataRetriever_FailedObtainConfigHistoryRetriever(t *testing.T) {
retriever := NewDataRetriever(dataStore)
_, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
_, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Collection: col1,
BlockSeq: uint64(5),
......@@ -486,7 +486,7 @@ func TestNewDataRetriever_NoCollectionConfig(t *testing.T) {
retriever := NewDataRetriever(dataStore)
assertion := assert.New(t)
_, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
_, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Collection: col1,
BlockSeq: uint64(5),
......@@ -496,7 +496,7 @@ func TestNewDataRetriever_NoCollectionConfig(t *testing.T) {
assertion.Error(err)
assertion.Contains(err.Error(), "cannot find recent collection config update below block sequence")
_, err = retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
_, _, err = retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns2,
Collection: col2,
BlockSeq: uint64(5),
......@@ -517,7 +517,7 @@ func TestNewDataRetriever_FailedGetLedgerHeight(t *testing.T) {
dataStore.On("LedgerHeight").Return(uint64(0), errors.New("failed to read ledger height"))
retriever := NewDataRetriever(dataStore)
_, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
_, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Collection: col1,
BlockSeq: uint64(5),
......@@ -543,7 +543,7 @@ func TestNewDataRetriever_FailToReadFromTransientStore(t *testing.T) {
retriever := NewDataRetriever(dataStore)
rwset, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwset, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 2,
......@@ -574,7 +574,7 @@ func TestNewDataRetriever_FailedToReadNext(t *testing.T) {
// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 2,
......@@ -622,7 +622,7 @@ func TestNewDataRetriever_EmptyPvtRWSetInTransientStore(t *testing.T) {
retriever := NewDataRetriever(dataStore)
rwSets, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Collection: collectionName,
BlockSeq: 2,
......
......@@ -43,7 +43,7 @@ type Dig2PvtRWSetWithConfig map[privdatacommon.DigKey]*util.PrivateRWSetWithConf
// of retrieving required private data
type PrivateDataRetriever interface {
// CollectionRWSet returns the bytes of CollectionPvtReadWriteSet for a given txID and collection from the transient store
CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error)
CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error)
}
// gossip defines capabilities that the gossip module gives the Coordinator
......@@ -150,12 +150,12 @@ func (p *puller) createResponse(message proto.ReceivedMessage) []*proto.PvtDataE
block2dig := groupDigestsByBlockNum(msg.GetPrivateReq().Digests)
for blockNum, digests := range block2dig {
dig2rwSets, err := p.CollectionRWSet(digests, blockNum)
dig2rwSets, wasFetchedFromLedger, err := p.CollectionRWSet(digests, blockNum)
if err != nil {
logger.Warningf("could not obtain private collection rwset for block %d, because of %s, continue...", blockNum, err)
continue
}
returned = append(returned, p.filterNotEligible(dig2rwSets, fcommon.SignedData{
returned = append(returned, p.filterNotEligible(dig2rwSets, wasFetchedFromLedger, fcommon.SignedData{
Identity: message.GetConnectionInfo().Identity,
Data: authInfo.SignedData,
Signature: authInfo.Signature,
......@@ -594,7 +594,7 @@ func (p *puller) purgedFilter(dig privdatacommon.DigKey) (filter.RoutingFilter,
}, nil
}
func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, signedData fcommon.SignedData, endpoint string) []*proto.PvtDataElement {
func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, shouldCheckLatestConfig bool, signedData fcommon.SignedData, endpoint string) []*proto.PvtDataElement {
var returned []*proto.PvtDataElement
for d, rwSets := range dig2rwSets {
if rwSets == nil {
......@@ -607,7 +607,7 @@ func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, signedData
continue
}
eligibleForCollection := p.isEligibleByLatestConfig(p.channel, d.Collection, d.Namespace, signedData)
eligibleForCollection := shouldCheckLatestConfig && p.isEligibleByLatestConfig(p.channel, d.Collection, d.Namespace, signedData)
if !eligibleForCollection {
colAP, err := p.AccessPolicy(rwSets.CollectionConfig, p.channel)
......
......@@ -156,9 +156,9 @@ type dataRetrieverMock struct {
mock.Mock
}
func (dr *dataRetrieverMock) CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
func (dr *dataRetrieverMock) CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error) {
args := dr.Called(dig, blockNum)
return args.Get(0).(Dig2PvtRWSetWithConfig), args.Error(1)
return args.Get(0).(Dig2PvtRWSetWithConfig), args.Bool(1), args.Error(2)
}
type receivedMsg struct {
......@@ -336,7 +336,7 @@ func TestPullerFromOnly1Peer(t *testing.T) {
}: p2TransientStore,
}
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, true, nil)
factoryMock3 := &collectionAccessFactoryMock{}
policyMock3 := &collectionAccessPolicyMock{}
......@@ -389,7 +389,7 @@ func TestPullerDataNotAvailable(t *testing.T) {
},
}
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Return(store, true, nil)
p3 := gn.newPuller("p3", newCollectionStore(), factoryMock)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Run(func(_ mock.Arguments) {
......@@ -485,7 +485,7 @@ func TestPullerPeerNotEligible(t *testing.T) {
},
}
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Return(store, true, nil)
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p3")
factoryMock3 := &collectionAccessFactoryMock{}
......@@ -496,7 +496,7 @@ func TestPullerPeerNotEligible(t *testing.T) {
factoryMock3.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock1, nil)
p3 := gn.newPuller("p3", policyStore, factoryMock3)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Return(store, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), mock.Anything).Return(store, true, nil)
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(&privdatacommon.DigKey{Collection: "col1", TxId: "txID1", Namespace: "ns1"}).toSources().create()
fetchedMessages, err := p1.fetch(d2s)
......@@ -553,7 +553,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
}: p2TransientStore,
}
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), mock.Anything).Return(store1, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), mock.Anything).Return(store1, true, nil)
p3TransientStore := &util.PrivateRWSetWithConfig{
RWSet: newPRWSet(),
......@@ -588,7 +588,7 @@ func TestPullerDifferentPeersDifferentCollections(t *testing.T) {
Namespace: "ns1",
}
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), mock.Anything).Return(store2, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), mock.Anything).Return(store2, true, nil)
dasf := &digestsAndSourceFactory{}
fetchedMessages, err := p1.fetch(dasf.mapDigest(toDigKey(dig1)).toSources().mapDigest(toDigKey(dig2)).toSources().create())
......@@ -660,7 +660,7 @@ func TestPullerRetries(t *testing.T) {
factoryMock2.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock2, nil)
p2 := gn.newPuller("p2", policyStore, factoryMock2)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, true, nil)
// p3
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p1")
......@@ -672,7 +672,7 @@ func TestPullerRetries(t *testing.T) {
factoryMock3.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock3, nil)
p3 := gn.newPuller("p3", policyStore, factoryMock3)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, true, nil)
// p4
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p4")
......@@ -684,7 +684,7 @@ func TestPullerRetries(t *testing.T) {
factoryMock4.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock4, nil)
p4 := gn.newPuller("p4", policyStore, factoryMock4)
p4.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, nil)
p4.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, true, nil)
// p5
policyStore = newCollectionStore().withPolicy("col1", uint64(100)).thatMapsTo("p5")
......@@ -696,7 +696,7 @@ func TestPullerRetries(t *testing.T) {
factoryMock5.On("AccessPolicy", mock.Anything, mock.Anything).Return(accessPolicyMock5, nil)
p5 := gn.newPuller("p5", policyStore, factoryMock5)
p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, nil)
p5.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig)), uint64(0)).Return(store, true, nil)
// Fetch from someone
dasf := &digestsAndSourceFactory{}
......@@ -785,11 +785,11 @@ func TestPullerPreferEndorsers(t *testing.T) {
// We only define an action for dig2 on p2, and the test would fail with panic if any other peer is asked for
// a private RWSet on dig2
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, true, nil)
// We only define an action for dig1 on p3, and the test would fail with panic if any other peer is asked for
// a private RWSet on dig1
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), uint64(0)).Return(store, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), uint64(0)).Return(store, true, nil)
dasf := &digestsAndSourceFactory{}
d2s := dasf.mapDigest(toDigKey(dig1)).toSources("p3").mapDigest(toDigKey(dig2)).toSources().create()
......@@ -887,11 +887,11 @@ func TestPullerFetchReconciledItemsPreferPeersFromOriginalConfig(t *testing.T) {
// We only define an action for dig2 on p2, and the test would fail with panic if any other peer is asked for
// a private RWSet on dig2
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, true, nil)
// We only define an action for dig1 on p3, and the test would fail with panic if any other peer is asked for
// a private RWSet on dig1
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), uint64(0)).Return(store, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), uint64(0)).Return(store, true, nil)
d2cc := privdatacommon.Dig2CollectionConfig{
privdatacommon.DigKey{
......@@ -994,16 +994,16 @@ func TestPullerAvoidPullingPurgedData(t *testing.T) {
}: privateData2,
}
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), 0).Return(store, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), 0).Return(store, nil).
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), 0).Return(store, true, nil)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig1)), 0).Return(store, true, nil).
Run(
func(arg mock.Arguments) {
assert.Fail(t, "we should not fetch private data from peers where it was purged")
},
)
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, nil).
p3.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, true, nil)
p2.PrivateDataRetriever.(*dataRetrieverMock).On("CollectionRWSet", mock.MatchedBy(protoMatcher(dig2)), uint64(0)).Return(store, true, nil).
Run(
func(mock.Arguments) {
assert.Fail(t, "we should not fetch private data of collection2 from peer 2")
......@@ -1028,7 +1028,7 @@ type counterDataRetreiver struct {
PrivateDataRetriever
}
func (c *counterDataRetreiver) CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, error) {
func (c *counterDataRetreiver) CollectionRWSet(dig []*proto.PvtDataDigest, blockNum uint64) (Dig2PvtRWSetWithConfig, bool, error) {
c.numberOfCalls += 1
return c.PrivateDataRetriever.CollectionRWSet(dig, blockNum)
}
......
......@@ -9,6 +9,7 @@ package privdata
import (
"encoding/hex"
"fmt"
"math"
"sync"
"time"
......@@ -42,14 +43,14 @@ type ReconciliationFetcher interface {
// Reconciler completes missing parts of private data that weren't available during commit time.
// this is done by getting from the ledger a list of missing private data and pulling it from the other peers.
type Reconciler interface {
type PvtDataReconciler interface {
// Start function start the reconciler based on a scheduler, as was configured in reconciler creation
Start()
// Stop function stops reconciler
Stop()
}
type reconciler struct {
type Reconciler struct {
config *ReconcilerConfig
ReconciliationFetcher
committer.Committer
......@@ -76,7 +77,7 @@ func (*NoOpReconciler) Stop() {
type ReconcilerConfig struct {
sleepInterval time.Duration
batchSize int
isEnabled bool
IsEnabled bool
}
// this func reads reconciler configuration values from core.yaml and returns ReconcilerConfig
......@@ -92,16 +93,13 @@ func GetReconcilerConfig() *ReconcilerConfig {
reconcileBatchSize = reconcileBatchSizeDefault
}
isEnabled := viper.GetBool(reconciliationEnabledConfigKey)
return &ReconcilerConfig{sleepInterval: reconcileSleepInterval, batchSize: reconcileBatchSize, isEnabled: isEnabled}
return &ReconcilerConfig{sleepInterval: reconcileSleepInterval, batchSize: reconcileBatchSize, IsEnabled: isEnabled}
}
// NewReconciler creates a new instance of reconciler
func NewReconciler(c committer.Committer, fetcher ReconciliationFetcher, config *ReconcilerConfig) Reconciler {
if !config.isEnabled {
return &NoOpReconciler{}
}
func NewReconciler(c committer.Committer, fetcher ReconciliationFetcher, config *ReconcilerConfig) *Reconciler {
logger.Debug("Private data reconciliation is enabled")
return &reconciler{
return &Reconciler{
config: config,
Committer: c,
ReconciliationFetcher: fetcher,
......@@ -109,66 +107,71 @@ func NewReconciler(c committer.Committer, fetcher ReconciliationFetcher, config
}
}
func (r *reconciler) Stop() {
func (r *Reconciler) Stop() {
r.stopOnce.Do(func() {
close(r.stopChan)
})
}
func (r *reconciler) Start() {
func (r *Reconciler) Start() {
r.startOnce.Do(func() {
go r.run()
})
}
func (r *reconciler) run() {
func (r *Reconciler) run() {
for {
select {
case <-r.stopChan:
return
case <-time.After(r.config.sleepInterval):
logger.Debug("Start reconcile missing private info")
err := r.reconcile()
numOfItems, minBlock, maxBlock, err := r.reconcile()
if err != nil {
logger.Error("Failed to reconcile missing private info, error: ", err.Error())
} else {
logger.Debug("Reconciliation cycle finished successfully")
break
}
if numOfItems > 0 {
logger.Infof("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d - %d]", numOfItems, minBlock, maxBlock)
break
}
logger.Debug("Reconciliation cycle finished successfully. no items to reconcile")
}
}
}
func (r *reconciler) reconcile() error {
// returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error
func (r *Reconciler) reconcile() (int, uint64, uint64, error) {
missingPvtDataTracker, err := r.GetMissingPvtDataTracker()
if err != nil {
logger.Error("reconciliation error when trying to get missingPvtDataTrcker:", err)
return err
return 0, 0, 0, err
}
if missingPvtDataTracker == nil {
logger.Error("got nil as MissingPvtDataTracker, exiting...")
return errors.New("got nil as MissingPvtDataTracker, exiting...")
return 0, 0, 0, errors.New("got nil as MissingPvtDataTracker, exiting...")
}
missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForMostRecentBlocks(r.config.batchSize)
if err != nil {
logger.Error("reconciliation error when trying to get missing pvt data info recent blocks:", err)
return err
return 0, 0, 0, err
}
// if missingPvtDataInfo is nil, len will return 0
if len(missingPvtDataInfo) == 0 {
logger.Debug("No missing private data to reconcile, exiting...")
return nil
return 0, 0, 0, nil
}
logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...")
dig2collectionCfg := r.getDig2CollectionConfig(missingPvtDataInfo)
dig2collectionCfg, minBlock, maxBlock := r.getDig2CollectionConfig(missingPvtDataInfo)
fetchedData, err := r.FetchReconciledItems(dig2collectionCfg)
if err != nil {
logger.Error("reconciliation error when trying to fetch missing items from different peers:", err)
return err
return 0, 0, 0, err
}
if len(fetchedData.AvailableElements) == 0 {
logger.Warning("failed to reconcile missing private data from the other peers")
return nil
return 0, 0, 0, nil
}
pvtDataToCommit := r.preparePvtDataToCommit(fetchedData.AvailableElements)
......@@ -176,10 +179,10 @@ func (r *reconciler) reconcile() error {
pvtdataHashMismatch, err := r.CommitPvtData(pvtDataToCommit)
r.logMismatched(pvtdataHashMismatch)
if err != nil {
return errors.Wrap(err, "failed to commit private data")
return 0, 0, 0, errors.Wrap(err, "failed to commit private data")
}
return nil
return len(fetchedData.AvailableElements), minBlock, maxBlock, nil
}
type collectionConfigKey struct {
......@@ -187,10 +190,19 @@ type collectionConfigKey struct {
blockNum uint64
}
func (r *reconciler) getDig2CollectionConfig(missingPvtDataInfo ledger.MissingPvtDataInfo) privdatacommon.Dig2CollectionConfig {
func (r *Reconciler) getDig2CollectionConfig(missingPvtDataInfo ledger.MissingPvtDataInfo) (privdatacommon.Dig2CollectionConfig, uint64, uint64) {
var minBlock, maxBlock uint64
minBlock = math.MaxUint64
maxBlock = 0
collectionConfigCache := make(map[collectionConfigKey]*common.StaticCollectionConfig)
dig2collectionCfg := make(map[privdatacommon.DigKey]*common.StaticCollectionConfig)
for blockNum, blockPvtDataInfo := range missingPvtDataInfo {
if blockNum < minBlock {
minBlock = blockNum
}
if blockNum > maxBlock {
maxBlock = blockNum
}
for seqInBlock, collectionPvtDataInfo := range blockPvtDataInfo {
for _, pvtDataInfo := range collectionPvtDataInfo {
collConfigKey := collectionConfigKey{
......@@ -216,10 +228,10 @@ func (r *reconciler) getDig2CollectionConfig(missingPvtDataInfo ledger.MissingPv
}
}
}
return dig2collectionCfg
return dig2collectionCfg, minBlock, maxBlock
}
func (r *reconciler) getMostRecentCollectionConfig(chaincodeName string, collectionName string, blockNum uint64) (*common.StaticCollectionConfig, error) {
func (r *Reconciler) getMostRecentCollectionConfig(chaincodeName string, collectionName string, blockNum uint64) (*common.StaticCollectionConfig, error) {
configHistoryRetriever, err := r.GetConfigHistoryRetriever()
if err != nil {
return nil, errors.Wrap(err, "configHistoryRetriever is not available")
......@@ -245,7 +257,7 @@ func (r *reconciler) getMostRecentCollectionConfig(chaincodeName string, collect
return staticCollectionConfig.StaticCollectionConfig, nil
}
func (r *reconciler) preparePvtDataToCommit(elements []*gossip2.PvtDataElement) []*ledger.BlockPvtData {
func (r *Reconciler) preparePvtDataToCommit(elements []*gossip2.PvtDataElement) []*ledger.BlockPvtData {
rwSetByBlockByKeys := r.groupRwsetByBlock(elements)
// populate the private RWSets passed to the ledger
......@@ -269,17 +281,17 @@ func (r *reconciler) preparePvtDataToCommit(elements []*gossip2.PvtDataElement)
return pvtDataToCommit
}
func (r *reconciler) logMismatched(pvtdataMismatched []*ledger.PvtdataHashMismatch) {
func (r *Reconciler) logMismatched(pvtdataMismatched []*ledger.PvtdataHashMismatch) {
if len(pvtdataMismatched) > 0 {
for _, hashMismatch := range pvtdataMismatched {
logger.Warningf("failed to reconciliation pvtdata chaincode %s, collection %s, block num %d, tx num %d due to hash mismatch",
logger.Warningf("failed to reconcile pvtdata chaincode %s, collection %s, block num %d, tx num %d due to hash mismatch",
hashMismatch.Namespace, hashMismatch.Collection, hashMismatch.BlockNum, hashMismatch.TxNum)
}
}
}
// return a mapping from block num to rwsetByKeys
func (r *reconciler) groupRwsetByBlock(elements []*gossip2.PvtDataElement) map[uint64]rwsetByKeys {
func (r *Reconciler) groupRwsetByBlock(elements []*gossip2.PvtDataElement) map[uint64]rwsetByKeys {
rwSetByBlockByKeys := make(map[uint64]rwsetByKeys) // map from block num to rwsetByKeys
// Iterate over data fetched from peers
......
......@@ -37,8 +37,8 @@ func TestNoItemsToReconcile(t *testing.T) {
committer.On("GetMissingPvtDataTracker").Return(missingPvtDataTracker, nil)
fetcher.On("FetchReconciledItems", mock.Anything).Return(nil, errors.New("this function shouldn't be called"))
r := &reconciler{config: &ReconcilerConfig{sleepInterval: time.Minute, batchSize: 1, isEnabled: true}, ReconciliationFetcher: fetcher, Committer: committer}
err := r.reconcile()
r := &Reconciler{config: &ReconcilerConfig{sleepInterval: time.Minute, batchSize: 1, IsEnabled: true}, ReconciliationFetcher: fetcher, Committer: committer}
_, _, _, err := r.reconcile()
assert.NoError(t, err)
}
......@@ -73,8 +73,8 @@ func TestNotReconcilingWhenCollectionConfigNotAvailable(t *testing.T) {
fetchCalled = true
}).Return(nil, errors.New("called with no digests"))
r := &reconciler{config: &ReconcilerConfig{sleepInterval: time.Minute, batchSize: 1, isEnabled: true}, ReconciliationFetcher: fetcher, Committer: committer}
err := r.reconcile()
r := &Reconciler{config: &ReconcilerConfig{sleepInterval: time.Minute, batchSize: 1, IsEnabled: true}, ReconciliationFetcher: fetcher, Committer: committer}
_, _, _, err := r.reconcile()
assert.Error(t, err)
assert.Equal(t, "called with no digests", err.Error())
......@@ -147,8 +147,8 @@ func TestReconciliationHappyPathWithoutScheduler(t *testing.T) {
commitPvtDataHappened = true
}).Return([]*ledger.PvtdataHashMismatch{}, nil)
r := &reconciler{config