validator.go 18.2 KB
Newer Older
1
2
3
/*
Copyright IBM Corp. 2016 All Rights Reserved.

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
10
11
12
*/

package txvalidator

import (
	"fmt"

	"github.com/golang/protobuf/proto"
13
	"github.com/hyperledger/fabric/common/channelconfig"
14
	"github.com/hyperledger/fabric/common/configtx"
15
	commonerrors "github.com/hyperledger/fabric/common/errors"
16
	"github.com/hyperledger/fabric/common/flogging"
17
	"github.com/hyperledger/fabric/core/common/sysccprovider"
18
	"github.com/hyperledger/fabric/core/common/validation"
19
20
	"github.com/hyperledger/fabric/core/ledger"
	ledgerUtil "github.com/hyperledger/fabric/core/ledger/util"
21
	"github.com/hyperledger/fabric/msp"
22
	"github.com/hyperledger/fabric/protos/common"
23
	"github.com/hyperledger/fabric/protos/peer"
24
25
	"github.com/hyperledger/fabric/protos/utils"
	"github.com/op/go-logging"
26
	"github.com/pkg/errors"
27
	"golang.org/x/net/context"
28
29
)

30
31
// Support provides all of the needed to evaluate the VSCC
type Support interface {
32
33
34
35
36
37
	// Acquire implements semaphore-like acquire semantics
	Acquire(ctx context.Context, n int64) error

	// Release implements semaphore-like release semantics
	Release(n int64)

38
39
40
	// Ledger returns the ledger associated with this validator
	Ledger() ledger.PeerLedger

41
	// MSPManager returns the MSP manager for this channel
42
43
	MSPManager() msp.MSPManager

44
	// Apply attempts to apply a configtx to become the new config
45
	Apply(configtx *common.ConfigEnvelope) error
46
47
48
49

	// GetMSPIDs returns the IDs for the application MSPs
	// that have been defined in the channel
	GetMSPIDs(cid string) []string
50
51
52

	// Capabilities defines the capabilities for the application portion of this channel
	Capabilities() channelconfig.ApplicationCapabilities
53
54
}

55
56
57
58
//Validator interface which defines API to validate block transactions
// and return the bit array mask indicating invalid transactions which
// didn't pass validation.
type Validator interface {
59
	Validate(block *common.Block) error
60
61
62
63
64
65
}

// implementation of Validator interface, keeps
// reference to the ledger to enable tx simulation
// and execution of vscc
type txValidator struct {
66
67
	support Support
	vscc    vsccValidator
68
69
70
71
72
73
}

var logger *logging.Logger // package-level logger

func init() {
	// Init logger with module name
74
	logger = flogging.MustGetLogger("committer/txvalidator")
75
76
}

77
78
79
80
81
82
83
84
85
86
87
88
type blockValidationRequest struct {
	block *common.Block
	d     []byte
	tIdx  int
}

type blockValidationResult struct {
	tIdx                 int
	validationCode       peer.TxValidationCode
	txsChaincodeName     *sysccprovider.ChaincodeInstance
	txsUpgradedChaincode *sysccprovider.ChaincodeInstance
	err                  error
89
	txid                 string
90
91
}

92
// NewTxValidator creates new transactions validator
93
func NewTxValidator(support Support, sccp sysccprovider.SystemChaincodeProvider) Validator {
94
	// Encapsulates interface implementation
95
96
	return &txValidator{
		support: support,
97
		vscc:    newVSCCValidator(support, sccp)}
98
99
100
101
102
}

func (v *txValidator) chainExists(chain string) bool {
	// TODO: implement this function!
	return true
103
104
}

105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
// Validate performs the validation of a block. The validation
// of each transaction in the block is performed in parallel.
// The approach is as follows: the committer thread starts the
// tx validation function in a goroutine (using a semaphore to cap
// the number of concurrent validating goroutines). The committer
// thread then reads results of validation (in orderer of completion
// of the goroutines) from the results channel. The goroutines
// perform the validation of the txs in the block and enqueue the
// validation result in the results channel. A few note-worthy facts:
// 1) to keep the approach simple, the committer thread enqueues
//    all transactions in the block and then moves on to reading the
//    results.
// 2) for parallel validation to work, it is important that the
//    validation function does not change the state of the system.
//    Otherwise the order in which validation is perform matters
//    and we have to resort to sequential validation (or some locking).
//    This is currently true, because the only function that affects
//    state is when a config transaction is received, but they are
//    guaranteed to be alone in the block. If/when this assumption
//    is violated, this code must be changed.
125
func (v *txValidator) Validate(block *common.Block) error {
126
127
128
	var err error
	var errPos int

129
130
	logger.Debug("START Block Validation")
	defer logger.Debug("END Block Validation")
131
132
	// Initialize trans as valid here, then set invalidation reason code upon invalidation below
	txsfltr := ledgerUtil.NewTxValidationFlags(len(block.Data.Data))
133
	// txsChaincodeNames records all the invoked chaincodes by tx in a block
134
	txsChaincodeNames := make(map[int]*sysccprovider.ChaincodeInstance)
135
	// upgradedChaincodes records all the chaincodes that are upgraded in a block
136
	txsUpgradedChaincodes := make(map[int]*sysccprovider.ChaincodeInstance)
137
138
	// array of txids
	txidArray := make([]string, len(block.Data.Data))
139

140
141
142
143
144
145
	results := make(chan *blockValidationResult)
	go func() {
		for tIdx, d := range block.Data.Data {
			// ensure that we don't have too many concurrent validation workers
			v.support.Acquire(context.Background(), 1)

146
			go func(index int, data []byte) {
147
148
				defer v.support.Release(1)

149
150
				v.validateTx(&blockValidationRequest{
					d:     data,
151
					block: block,
152
					tIdx:  index,
153
				}, results)
154
			}(tIdx, d)
155
156
		}
	}()
157

158
	logger.Debugf("expecting %d block validation responses", len(block.Data.Data))
159

160
161
162
	// now we read responses in the order in which they come back
	for i := 0; i < len(block.Data.Data); i++ {
		res := <-results
163

164
165
166
167
168
		if res.err != nil {
			// if there is an error, we buffer its value, wait for
			// all workers to complete validation and then return
			// the error from the first tx in this block that returned an error
			logger.Debugf("got terminal error %s for idx %d", res.err, res.tIdx)
169

170
171
172
173
174
175
176
177
			if err == nil || res.tIdx < errPos {
				err = res.err
				errPos = res.tIdx
			}
		} else {
			// if there was no error, we set the txsfltr and we set the
			// txsChaincodeNames and txsUpgradedChaincodes maps
			logger.Debugf("got result for idx %d, code %d", res.tIdx, res.validationCode)
178

179
			txsfltr.SetFlag(res.tIdx, res.validationCode)
180

181
182
183
			if res.validationCode == peer.TxValidationCode_VALID {
				if res.txsChaincodeName != nil {
					txsChaincodeNames[res.tIdx] = res.txsChaincodeName
184
				}
185
186
				if res.txsUpgradedChaincode != nil {
					txsUpgradedChaincodes[res.tIdx] = res.txsUpgradedChaincode
187
				}
188
				txidArray[res.tIdx] = res.txid
189
190
191
			}
		}
	}
192

193
194
195
196
197
198
199
	// if we're here, all workers have completed the validation.
	// If there was an error we return the error from the first
	// tx in this block that returned an error
	if err != nil {
		return err
	}

200
201
202
203
204
205
	// if we operate with this capability, we mark invalid any transaction that has a txid
	// which is equal to that of a previous tx in this block
	if v.support.Capabilities().ForbidDuplicateTXIdInBlock() {
		markTXIdDuplicates(txidArray, txsfltr)
	}

206
207
208
	// if we're here, all workers have completed validation and
	// no error was reported; we set the tx filter and return
	// success
209
	v.invalidTXsForUpgradeCC(txsChaincodeNames, txsUpgradedChaincodes, txsfltr)
210

211
212
213
214
215
216
	// make sure no transaction has skipped validation
	err = v.allValidated(txsfltr, block)
	if err != nil {
		return err
	}

217
218
	// Initialize metadata structure
	utils.InitBlockMetadata(block)
219
220

	block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsfltr
221
222

	return nil
223
224
}

225
226
227
228
229
230
231
232
233
234
235
236
// allValidated returns error if some of the validation flags have not been set
// during validation
func (v *txValidator) allValidated(txsfltr ledgerUtil.TxValidationFlags, block *common.Block) error {
	for id, f := range txsfltr {
		if peer.TxValidationCode(f) == peer.TxValidationCode_NOT_VALIDATED {
			return errors.Errorf("transaction %d in block %d has skipped validation", id, block.Header.Number)
		}
	}

	return nil
}

237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
func markTXIdDuplicates(txids []string, txsfltr ledgerUtil.TxValidationFlags) {
	txidMap := make(map[string]struct{})

	for id, txid := range txids {
		if txid == "" {
			continue
		}

		_, in := txidMap[txid]
		if in {
			logger.Error("Duplicate txid", txid, "found, skipping")
			txsfltr.SetFlag(id, peer.TxValidationCode_DUPLICATE_TXID)
		} else {
			txidMap[txid] = struct{}{}
		}
	}
}

255
func (v *txValidator) validateTx(req *blockValidationRequest, results chan<- *blockValidationResult) {
256
257
258
	block := req.block
	d := req.d
	tIdx := req.tIdx
259
	txID := ""
260
261
262
263
264
265
266
267
268

	if d == nil {
		results <- &blockValidationResult{
			tIdx: tIdx,
		}
		return
	}

	if env, err := utils.GetEnvelopeFromBlock(d); err != nil {
269
		logger.Warningf("Error getting tx from block: %+v", err)
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
		results <- &blockValidationResult{
			tIdx:           tIdx,
			validationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
		}
		return
	} else if env != nil {
		// validate the transaction: here we check that the transaction
		// is properly formed, properly signed and that the security
		// chain binding proposal to endorsements to tx holds. We do
		// NOT check the validity of endorsements, though. That's a
		// job for VSCC below
		logger.Debugf("validateTx starts for block %p env %p txn %d", block, env, tIdx)
		defer logger.Debugf("validateTx completes for block %p env %p txn %d", block, env, tIdx)
		var payload *common.Payload
		var err error
		var txResult peer.TxValidationCode
		var txsChaincodeName *sysccprovider.ChaincodeInstance
		var txsUpgradedChaincode *sysccprovider.ChaincodeInstance

289
		if payload, txResult = validation.ValidateTransaction(env, v.support.Capabilities()); txResult != peer.TxValidationCode_VALID {
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
			logger.Errorf("Invalid transaction with index %d", tIdx)
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: txResult,
			}
			return
		}

		chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
		if err != nil {
			logger.Warningf("Could not unmarshal channel header, err %s, skipping", err)
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
			}
			return
		}

		channel := chdr.ChannelId
309
		logger.Debugf("Transaction is for channel %s", channel)
310
311

		if !v.chainExists(channel) {
312
			logger.Errorf("Dropping transaction for non-existent channel %s", channel)
313
314
315
316
317
318
319
320
321
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: peer.TxValidationCode_TARGET_CHAIN_NOT_FOUND,
			}
			return
		}

		if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
			// Check duplicate transactions
322
			txID = chdr.TxId
323
324
325
326
327
328
329
330
331
332
333
			if _, err := v.support.Ledger().GetTransactionByID(txID); err == nil {
				logger.Error("Duplicate transaction found, ", txID, ", skipping")
				results <- &blockValidationResult{
					tIdx:           tIdx,
					validationCode: peer.TxValidationCode_DUPLICATE_TXID,
				}
				return
			}

			// Validate tx with vscc and policy
			logger.Debug("Validating transaction vscc tx validate")
334
			err, cde := v.vscc.VSCCValidateTx(payload, d)
335
			if err != nil {
336
				logger.Errorf("VSCCValidateTx for transaction txId = %s returned error: %s", txID, err)
337
				switch err.(type) {
338
				case *commonerrors.VSCCExecutionFailureError:
339
340
341
342
343
					results <- &blockValidationResult{
						tIdx: tIdx,
						err:  err,
					}
					return
344
				case *commonerrors.VSCCInfoLookupFailureError:
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
					results <- &blockValidationResult{
						tIdx: tIdx,
						err:  err,
					}
					return
				default:
					results <- &blockValidationResult{
						tIdx:           tIdx,
						validationCode: cde,
					}
					return
				}
			}

			invokeCC, upgradeCC, err := v.getTxCCInstance(payload)
			if err != nil {
361
				logger.Errorf("Get chaincode instance from transaction txId = %s returned error: %+v", txID, err)
362
363
364
365
366
367
368
369
				results <- &blockValidationResult{
					tIdx:           tIdx,
					validationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
				}
				return
			}
			txsChaincodeName = invokeCC
			if upgradeCC != nil {
370
				logger.Infof("Find chaincode upgrade transaction for chaincode %s on channel %s with new version %s", upgradeCC.ChaincodeName, upgradeCC.ChainID, upgradeCC.ChaincodeVersion)
371
372
373
374
375
				txsUpgradedChaincode = upgradeCC
			}
		} else if common.HeaderType(chdr.Type) == common.HeaderType_CONFIG {
			configEnvelope, err := configtx.UnmarshalConfigEnvelope(payload.Data)
			if err != nil {
376
377
				err = errors.WithMessage(err, "error unmarshalling config which passed initial validity checks")
				logger.Criticalf("%+v", err)
378
379
380
381
382
383
384
385
				results <- &blockValidationResult{
					tIdx: tIdx,
					err:  err,
				}
				return
			}

			if err := v.support.Apply(configEnvelope); err != nil {
386
387
				err = errors.WithMessage(err, "error validating config which passed initial validity checks")
				logger.Criticalf("%+v", err)
388
389
390
391
392
393
394
				results <- &blockValidationResult{
					tIdx: tIdx,
					err:  err,
				}
				return
			}
			logger.Debugf("config transaction received for chain %s", channel)
395
396
397
398
399
400
401
402
403
404
		} else if common.HeaderType(chdr.Type) == common.HeaderType_PEER_RESOURCE_UPDATE {
			// FIXME: in the context of FAB-7341, we should introduce validation
			//        for this kind of transaction here. For now we just ignore this
			//        type of transaction and delegate its validation to other components

			results <- &blockValidationResult{
				tIdx: tIdx,
				err:  nil,
			}
			return
405
406
407
408
409
410
411
412
413
414
415
		} else {
			logger.Warningf("Unknown transaction type [%s] in block number [%d] transaction index [%d]",
				common.HeaderType(chdr.Type), block.Header.Number, tIdx)
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: peer.TxValidationCode_UNKNOWN_TX_TYPE,
			}
			return
		}

		if _, err := proto.Marshal(env); err != nil {
416
			logger.Warningf("Cannot marshal transaction: %s", err)
417
418
419
420
421
422
423
424
425
426
427
428
			results <- &blockValidationResult{
				tIdx:           tIdx,
				validationCode: peer.TxValidationCode_MARSHAL_TX_ERROR,
			}
			return
		}
		// Succeeded to pass down here, transaction is valid
		results <- &blockValidationResult{
			tIdx:                 tIdx,
			txsChaincodeName:     txsChaincodeName,
			txsUpgradedChaincode: txsUpgradedChaincode,
			validationCode:       peer.TxValidationCode_VALID,
429
			txid:                 txID,
430
431
432
433
434
435
436
437
438
439
440
441
		}
		return
	} else {
		logger.Warning("Nil tx from block")
		results <- &blockValidationResult{
			tIdx:           tIdx,
			validationCode: peer.TxValidationCode_NIL_ENVELOPE,
		}
		return
	}
}

442
// generateCCKey generates a unique identifier for chaincode in specific channel
443
444
445
446
447
func (v *txValidator) generateCCKey(ccName, chainID string) string {
	return fmt.Sprintf("%s/%s", ccName, chainID)
}

// invalidTXsForUpgradeCC invalid all txs that should be invalided because of chaincode upgrade txs
448
func (v *txValidator) invalidTXsForUpgradeCC(txsChaincodeNames map[int]*sysccprovider.ChaincodeInstance, txsUpgradedChaincodes map[int]*sysccprovider.ChaincodeInstance, txsfltr ledgerUtil.TxValidationFlags) {
449
	if len(txsUpgradedChaincodes) == 0 {
450
		return
451
452
453
454
	}

	// Invalid former cc upgrade txs if there're two or more txs upgrade the same cc
	finalValidUpgradeTXs := make(map[string]int)
455
	upgradedChaincodes := make(map[string]*sysccprovider.ChaincodeInstance)
456
457
458
459
460
461
462
463
464
465
466
	for tIdx, cc := range txsUpgradedChaincodes {
		if cc == nil {
			continue
		}
		upgradedCCKey := v.generateCCKey(cc.ChaincodeName, cc.ChainID)

		if finalIdx, exist := finalValidUpgradeTXs[upgradedCCKey]; !exist {
			finalValidUpgradeTXs[upgradedCCKey] = tIdx
			upgradedChaincodes[upgradedCCKey] = cc
		} else if finalIdx < tIdx {
			logger.Infof("Invalid transaction with index %d: chaincode was upgraded by latter tx", finalIdx)
467
			txsfltr.SetFlag(finalIdx, peer.TxValidationCode_CHAINCODE_VERSION_CONFLICT)
468
469
470
471
472
473

			// record latter cc upgrade tx info
			finalValidUpgradeTXs[upgradedCCKey] = tIdx
			upgradedChaincodes[upgradedCCKey] = cc
		} else {
			logger.Infof("Invalid transaction with index %d: chaincode was upgraded by latter tx", tIdx)
474
			txsfltr.SetFlag(tIdx, peer.TxValidationCode_CHAINCODE_VERSION_CONFLICT)
475
476
477
478
479
480
481
482
483
484
485
486
		}
	}

	// invalid txs which invoke the upgraded chaincodes
	for tIdx, cc := range txsChaincodeNames {
		if cc == nil {
			continue
		}
		ccKey := v.generateCCKey(cc.ChaincodeName, cc.ChainID)
		if _, exist := upgradedChaincodes[ccKey]; exist {
			if txsfltr.IsValid(tIdx) {
				logger.Infof("Invalid transaction with index %d: chaincode was upgraded in the same block", tIdx)
487
				txsfltr.SetFlag(tIdx, peer.TxValidationCode_CHAINCODE_VERSION_CONFLICT)
488
489
490
491
492
			}
		}
	}
}

493
func (v *txValidator) getTxCCInstance(payload *common.Payload) (invokeCCIns, upgradeCCIns *sysccprovider.ChaincodeInstance, err error) {
494
495
496
497
498
499
500
	// This is duplicated unpacking work, but make test easier.
	chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
	if err != nil {
		return nil, nil, err
	}

	// Chain ID
501
	chainID := chdr.ChannelId // it is guaranteed to be an existing channel by now
502
503
504
505
506
507
508

	// ChaincodeID
	hdrExt, err := utils.GetChaincodeHeaderExtension(payload.Header)
	if err != nil {
		return nil, nil, err
	}
	invokeCC := hdrExt.ChaincodeId
509
	invokeIns := &sysccprovider.ChaincodeInstance{ChainID: chainID, ChaincodeName: invokeCC.Name, ChaincodeVersion: invokeCC.Version}
510
511
512
513

	// Transaction
	tx, err := utils.GetTransaction(payload.Data)
	if err != nil {
514
		logger.Errorf("GetTransaction failed: %+v", err)
515
516
517
518
519
520
		return invokeIns, nil, nil
	}

	// ChaincodeActionPayload
	cap, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
	if err != nil {
521
		logger.Errorf("GetChaincodeActionPayload failed: %+v", err)
522
523
524
525
526
527
		return invokeIns, nil, nil
	}

	// ChaincodeProposalPayload
	cpp, err := utils.GetChaincodeProposalPayload(cap.ChaincodeProposalPayload)
	if err != nil {
528
		logger.Errorf("GetChaincodeProposalPayload failed: %+v", err)
529
530
531
532
533
534
535
		return invokeIns, nil, nil
	}

	// ChaincodeInvocationSpec
	cis := &peer.ChaincodeInvocationSpec{}
	err = proto.Unmarshal(cpp.Input, cis)
	if err != nil {
536
		logger.Errorf("GetChaincodeInvokeSpec failed: %+v", err)
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
		return invokeIns, nil, nil
	}

	if invokeCC.Name == "lscc" {
		if string(cis.ChaincodeSpec.Input.Args[0]) == "upgrade" {
			upgradeIns, err := v.getUpgradeTxInstance(chainID, cis.ChaincodeSpec.Input.Args[2])
			if err != nil {
				return invokeIns, nil, nil
			}
			return invokeIns, upgradeIns, nil
		}
	}

	return invokeIns, nil, nil
}

553
func (v *txValidator) getUpgradeTxInstance(chainID string, cdsBytes []byte) (*sysccprovider.ChaincodeInstance, error) {
554
555
556
557
558
	cds, err := utils.GetChaincodeDeploymentSpec(cdsBytes)
	if err != nil {
		return nil, err
	}

559
	return &sysccprovider.ChaincodeInstance{
560
561
562
563
564
		ChainID:          chainID,
		ChaincodeName:    cds.ChaincodeSpec.ChaincodeId.Name,
		ChaincodeVersion: cds.ChaincodeSpec.ChaincodeId.Version,
	}, nil
}