reconcile.go 11.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package privdata

import (
	"encoding/hex"
	"fmt"
12
	"math"
13
14
15
16
17
18
19
20
21
22
23
24
25
26
	"sync"
	"time"

	util2 "github.com/hyperledger/fabric/common/util"
	"github.com/hyperledger/fabric/core/committer"
	"github.com/hyperledger/fabric/core/ledger"
	privdatacommon "github.com/hyperledger/fabric/gossip/privdata/common"
	"github.com/hyperledger/fabric/protos/common"
	gossip2 "github.com/hyperledger/fabric/protos/gossip"
	"github.com/pkg/errors"
	"github.com/spf13/viper"
)

const (
27
	reconcileSleepIntervalConfigKey = "peer.gossip.pvtData.reconcileSleepInterval"
28
	reconcileSleepIntervalDefault   = time.Minute * 5
29
	reconcileBatchSizeConfigKey     = "peer.gossip.pvtData.reconcileBatchSize"
30
	reconcileBatchSizeDefault       = 10
31
	reconciliationEnabledConfigKey  = "peer.gossip.pvtData.reconciliationEnabled"
32
33
34
35
36
37
38
39
40
41
42
43
44
45
)

// ReconciliationFetcher interface which defines API to fetch
// private data elements that have to be reconciled
type ReconciliationFetcher interface {
	FetchReconciledItems(dig2collectionConfig privdatacommon.Dig2CollectionConfig) (*privdatacommon.FetchedPvtDataContainer, error)
}

//go:generate mockery -dir . -name ReconciliationFetcher -case underscore -output mocks/
//go:generate mockery -dir ../../core/ledger/ -name MissingPvtDataTracker -case underscore -output mocks/
//go:generate mockery -dir ../../core/ledger/ -name ConfigHistoryRetriever -case underscore -output mocks/

// Reconciler completes missing parts of private data that weren't available during commit time.
// this is done by getting from the ledger a list of missing private data and pulling it from the other peers.
46
type PvtDataReconciler interface {
47
48
49
50
51
52
	// Start function start the reconciler based on a scheduler, as was configured in reconciler creation
	Start()
	// Stop function stops reconciler
	Stop()
}

53
type Reconciler struct {
54
	config *ReconcilerConfig
55
56
57
58
59
60
61
	ReconciliationFetcher
	committer.Committer
	stopChan  chan struct{}
	startOnce sync.Once
	stopOnce  sync.Once
}

62
63
64
65
66
67
68
69
70
71
72
73
74
75
// NoOpReconciler non functional reconciler to be used
// in case reconciliation has been disabled
type NoOpReconciler struct {
}

func (*NoOpReconciler) Start() {
	// do nothing
	logger.Debug("Private data reconciliation has been disabled")
}

func (*NoOpReconciler) Stop() {
	// do nothing
}

76
77
78
79
// ReconcilerConfig holds config flags that are read from core.yaml
type ReconcilerConfig struct {
	sleepInterval time.Duration
	batchSize     int
80
	IsEnabled     bool
81
82
83
}

// this func reads reconciler configuration values from core.yaml and returns ReconcilerConfig
84
func GetReconcilerConfig() *ReconcilerConfig {
85
86
87
88
89
90
91
92
93
94
	reconcileSleepInterval := viper.GetDuration(reconcileSleepIntervalConfigKey)
	if reconcileSleepInterval == 0 {
		logger.Warning("Configuration key", reconcileSleepIntervalConfigKey, "isn't set, defaulting to", reconcileSleepIntervalDefault)
		reconcileSleepInterval = reconcileSleepIntervalDefault
	}
	reconcileBatchSize := viper.GetInt(reconcileBatchSizeConfigKey)
	if reconcileBatchSize == 0 {
		logger.Warning("Configuration key", reconcileBatchSizeConfigKey, "isn't set, defaulting to", reconcileBatchSizeDefault)
		reconcileBatchSize = reconcileBatchSizeDefault
	}
95
	isEnabled := viper.GetBool(reconciliationEnabledConfigKey)
96
	return &ReconcilerConfig{sleepInterval: reconcileSleepInterval, batchSize: reconcileBatchSize, IsEnabled: isEnabled}
97
98
99
}

// NewReconciler creates a new instance of reconciler
100
func NewReconciler(c committer.Committer, fetcher ReconciliationFetcher, config *ReconcilerConfig) *Reconciler {
101
	logger.Debug("Private data reconciliation is enabled")
102
	return &Reconciler{
103
104
105
106
107
108
109
		config:                config,
		Committer:             c,
		ReconciliationFetcher: fetcher,
		stopChan:              make(chan struct{}),
	}
}

110
func (r *Reconciler) Stop() {
111
112
113
114
115
	r.stopOnce.Do(func() {
		close(r.stopChan)
	})
}

116
func (r *Reconciler) Start() {
117
118
119
120
121
	r.startOnce.Do(func() {
		go r.run()
	})
}

122
func (r *Reconciler) run() {
123
124
125
126
127
128
	for {
		select {
		case <-r.stopChan:
			return
		case <-time.After(r.config.sleepInterval):
			logger.Debug("Start reconcile missing private info")
129
			numOfItems, minBlock, maxBlock, err := r.reconcile()
130
131
			if err != nil {
				logger.Error("Failed to reconcile missing private info, error: ", err.Error())
132
133
134
135
136
				break
			}
			if numOfItems > 0 {
				logger.Infof("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d - %d]", numOfItems, minBlock, maxBlock)
				break
137
			}
138
			logger.Debug("Reconciliation cycle finished successfully. no items to reconcile")
139
140
141
142
		}
	}
}

143
144
// returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error
func (r *Reconciler) reconcile() (int, uint64, uint64, error) {
145
146
	missingPvtDataTracker, err := r.GetMissingPvtDataTracker()
	if err != nil {
147
		logger.Error("reconciliation error when trying to get missingPvtDataTrcker:", err)
148
		return 0, 0, 0, err
149
150
	}
	if missingPvtDataTracker == nil {
151
		logger.Error("got nil as MissingPvtDataTracker, exiting...")
152
		return 0, 0, 0, errors.New("got nil as MissingPvtDataTracker, exiting...")
153
154
155
	}
	missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForMostRecentBlocks(r.config.batchSize)
	if err != nil {
156
		logger.Error("reconciliation error when trying to get missing pvt data info recent blocks:", err)
157
		return 0, 0, 0, err
158
159
160
161
	}
	// if missingPvtDataInfo is nil, len will return 0
	if len(missingPvtDataInfo) == 0 {
		logger.Debug("No missing private data to reconcile, exiting...")
162
		return 0, 0, 0, nil
163
	}
164
	logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...")
165
	dig2collectionCfg, minBlock, maxBlock := r.getDig2CollectionConfig(missingPvtDataInfo)
166

167
168
	fetchedData, err := r.FetchReconciledItems(dig2collectionCfg)
	if err != nil {
169
		logger.Error("reconciliation error when trying to fetch missing items from different peers:", err)
170
		return 0, 0, 0, err
171
	}
172
173
	if len(fetchedData.AvailableElements) == 0 {
		logger.Warning("failed to reconcile missing private data from the other peers")
174
		return 0, 0, 0, nil
175
	}
176
177
178

	pvtDataToCommit := r.preparePvtDataToCommit(fetchedData.AvailableElements)
	// commit missing private data that was reconciled and log mismatched
179
	pvtdataHashMismatch, err := r.CommitPvtDataOfOldBlocks(pvtDataToCommit)
180
181
	r.logMismatched(pvtdataHashMismatch)
	if err != nil {
182
		return 0, 0, 0, errors.Wrap(err, "failed to commit private data")
183
184
	}

185
	return len(fetchedData.AvailableElements), minBlock, maxBlock, nil
186
187
188
189
190
191
192
}

type collectionConfigKey struct {
	chaincodeName, collectionName string
	blockNum                      uint64
}

193
194
195
196
func (r *Reconciler) getDig2CollectionConfig(missingPvtDataInfo ledger.MissingPvtDataInfo) (privdatacommon.Dig2CollectionConfig, uint64, uint64) {
	var minBlock, maxBlock uint64
	minBlock = math.MaxUint64
	maxBlock = 0
197
198
199
	collectionConfigCache := make(map[collectionConfigKey]*common.StaticCollectionConfig)
	dig2collectionCfg := make(map[privdatacommon.DigKey]*common.StaticCollectionConfig)
	for blockNum, blockPvtDataInfo := range missingPvtDataInfo {
200
201
202
203
204
205
		if blockNum < minBlock {
			minBlock = blockNum
		}
		if blockNum > maxBlock {
			maxBlock = blockNum
		}
206
207
208
		for seqInBlock, collectionPvtDataInfo := range blockPvtDataInfo {
			for _, pvtDataInfo := range collectionPvtDataInfo {
				collConfigKey := collectionConfigKey{
manish's avatar
manish committed
209
210
					chaincodeName:  pvtDataInfo.Namespace,
					collectionName: pvtDataInfo.Collection,
211
212
213
					blockNum:       blockNum,
				}
				if _, exists := collectionConfigCache[collConfigKey]; !exists {
manish's avatar
manish committed
214
					collectionConfig, err := r.getMostRecentCollectionConfig(pvtDataInfo.Namespace, pvtDataInfo.Collection, blockNum)
215
216
217
218
219
220
221
222
					if err != nil {
						logger.Debug(err)
						continue
					}
					collectionConfigCache[collConfigKey] = collectionConfig
				}
				digKey := privdatacommon.DigKey{
					SeqInBlock: seqInBlock,
manish's avatar
manish committed
223
224
					Collection: pvtDataInfo.Collection,
					Namespace:  pvtDataInfo.Namespace,
225
226
227
228
229
230
					BlockSeq:   blockNum,
				}
				dig2collectionCfg[digKey] = collectionConfigCache[collConfigKey]
			}
		}
	}
231
	return dig2collectionCfg, minBlock, maxBlock
232
233
}

234
func (r *Reconciler) getMostRecentCollectionConfig(chaincodeName string, collectionName string, blockNum uint64) (*common.StaticCollectionConfig, error) {
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
	configHistoryRetriever, err := r.GetConfigHistoryRetriever()
	if err != nil {
		return nil, errors.Wrap(err, "configHistoryRetriever is not available")
	}

	configInfo, err := configHistoryRetriever.MostRecentCollectionConfigBelow(blockNum, chaincodeName)
	if err != nil {
		return nil, errors.New(fmt.Sprintf("cannot find recent collection config update below block sequence = %d for chaincode %s", blockNum, chaincodeName))
	}
	if configInfo == nil {
		return nil, errors.New(fmt.Sprintf("no collection config update below block sequence = %d for chaincode %s is available", blockNum, chaincodeName))
	}

	collectionConfig := extractCollectionConfig(configInfo.CollectionConfig, collectionName)
	if collectionConfig == nil {
		return nil, errors.New(fmt.Sprintf("no collection config was found for collection %s for chaincode %s", collectionName, chaincodeName))
	}

	staticCollectionConfig, wasCastingSuccessful := collectionConfig.Payload.(*common.CollectionConfig_StaticCollectionConfig)
	if !wasCastingSuccessful {
		return nil, errors.New(fmt.Sprintf("expected collection config of type CollectionConfig_StaticCollectionConfig for collection %s for chaincode %s, while got different config type...", collectionName, chaincodeName))
	}
	return staticCollectionConfig.StaticCollectionConfig, nil
}

260
func (r *Reconciler) preparePvtDataToCommit(elements []*gossip2.PvtDataElement) []*ledger.BlockPvtData {
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
	rwSetByBlockByKeys := r.groupRwsetByBlock(elements)

	// populate the private RWSets passed to the ledger
	var pvtDataToCommit []*ledger.BlockPvtData

	for blockNum, rwSetKeys := range rwSetByBlockByKeys {
		blockPvtData := &ledger.BlockPvtData{
			BlockNum:  blockNum,
			WriteSets: make(map[uint64]*ledger.TxPvtData),
		}
		for seqInBlock, nsRWS := range rwSetKeys.bySeqsInBlock() {
			rwsets := nsRWS.toRWSet()
			logger.Debugf("Preparing to commit [%d] private write set, missed from transaction index [%d] of block number [%d]", len(rwsets.NsPvtRwset), seqInBlock, blockNum)
			blockPvtData.WriteSets[seqInBlock] = &ledger.TxPvtData{
				SeqInBlock: seqInBlock,
				WriteSet:   rwsets,
			}
		}
		pvtDataToCommit = append(pvtDataToCommit, blockPvtData)
	}
	return pvtDataToCommit
}

284
func (r *Reconciler) logMismatched(pvtdataMismatched []*ledger.PvtdataHashMismatch) {
285
286
	if len(pvtdataMismatched) > 0 {
		for _, hashMismatch := range pvtdataMismatched {
287
			logger.Warningf("failed to reconcile pvtdata chaincode %s, collection %s, block num %d, tx num %d due to hash mismatch",
288
				hashMismatch.Namespace, hashMismatch.Collection, hashMismatch.BlockNum, hashMismatch.TxNum)
289
290
291
292
293
		}
	}
}

// return a mapping from block num to rwsetByKeys
294
func (r *Reconciler) groupRwsetByBlock(elements []*gossip2.PvtDataElement) map[uint64]rwsetByKeys {
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
	rwSetByBlockByKeys := make(map[uint64]rwsetByKeys) // map from block num to rwsetByKeys

	// Iterate over data fetched from peers
	for _, element := range elements {
		dig := element.Digest
		if _, exists := rwSetByBlockByKeys[dig.BlockSeq]; !exists {
			rwSetByBlockByKeys[dig.BlockSeq] = make(map[rwSetKey][]byte)
		}
		for _, rws := range element.Payload {
			hash := hex.EncodeToString(util2.ComputeSHA256(rws))
			key := rwSetKey{
				txID:       dig.TxId,
				namespace:  dig.Namespace,
				collection: dig.Collection,
				seqInBlock: dig.SeqInBlock,
				hash:       hash,
			}
			rwSetByBlockByKeys[dig.BlockSeq][key] = rws
		}
	}
	return rwSetByBlockByKeys
}