handler.go 43.1 KB
Newer Older
1
/*
2
Copyright IBM Corp. All Rights Reserved.
3

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
10
11
*/

package chaincode

import (
	"fmt"
	"io"
12
	"strconv"
13
	"strings"
14
15
16
17
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
18
	"github.com/hyperledger/fabric/common/channelconfig"
19
	"github.com/hyperledger/fabric/common/flogging"
20
	commonledger "github.com/hyperledger/fabric/common/ledger"
21
	"github.com/hyperledger/fabric/core/aclmgmt/resources"
22
	"github.com/hyperledger/fabric/core/common/ccprovider"
23
	"github.com/hyperledger/fabric/core/common/privdata"
24
	"github.com/hyperledger/fabric/core/common/sysccprovider"
25
	"github.com/hyperledger/fabric/core/container/ccintf"
26
	"github.com/hyperledger/fabric/core/ledger"
27
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
28
	"github.com/hyperledger/fabric/core/peer"
29
	"github.com/hyperledger/fabric/protos/common"
30
	pb "github.com/hyperledger/fabric/protos/peer"
31
	"github.com/pkg/errors"
32
33
)

34
var chaincodeLogger = flogging.MustGetLogger("chaincode")
35

36
// An ACLProvider performs access control checks when invoking
37
38
39
40
41
// chaincode.
type ACLProvider interface {
	CheckACL(resName string, channelID string, idinfo interface{}) error
}

42
// A Registry is responsible for tracking handlers.
43
44
45
type Registry interface {
	Register(*Handler) error
	Ready(cname string)
46
	Failed(cname string, err error)
47
48
49
	Deregister(cname string) error
}

50
51
// An Invoker invokes chaincode.
type Invoker interface {
52
	Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, spec *pb.ChaincodeInput) (*pb.ChaincodeMessage, error)
53
54
}

55
56
57
58
59
60
// SystemCCProvider provides system chaincode metadata.
type SystemCCProvider interface {
	IsSysCC(name string) bool
	IsSysCCAndNotInvokableCC2CC(name string) bool
}

61
62
63
64
65
66
// TransactionRegistry tracks active transactions for each channel.
type TransactionRegistry interface {
	Add(channelID, txID string) bool
	Remove(channelID, txID string)
}

67
// A ContextRegistry is responsible for managing transaction contexts.
68
type ContextRegistry interface {
69
	Create(txParams *ccprovider.TransactionParams) (*TransactionContext, error)
70
71
72
73
74
	Get(chainID, txID string) *TransactionContext
	Delete(chainID, txID string)
	Close()
}

75
76
// InstantiationPolicyChecker is used to evaluate instantiation policies.
type InstantiationPolicyChecker interface {
77
78
79
	CheckInstantiationPolicy(name, version string, cd *ccprovider.ChaincodeData) error
}

80
// Adapter from function to InstantiationPolicyChecker interface.
81
82
83
84
85
86
type CheckInstantiationPolicyFunc func(name, version string, cd *ccprovider.ChaincodeData) error

func (c CheckInstantiationPolicyFunc) CheckInstantiationPolicy(name, version string, cd *ccprovider.ChaincodeData) error {
	return c(name, version, cd)
}

87
// QueryResponseBuilder is responsible for building QueryResponse messages for query
88
// transactions initiated by chaincode.
89
type QueryResponseBuilder interface {
90
91
	BuildQueryResponse(txContext *TransactionContext, iter commonledger.ResultsIterator,
		iterID string, isPaginated bool, totalReturnLimit int32) (*pb.QueryResponse, error)
92
93
94
95
96
}

// ChaincodeDefinitionGetter is responsible for retrieving a chaincode definition
// from the system. The definition is used by the InstantiationPolicyChecker.
type ChaincodeDefinitionGetter interface {
97
	ChaincodeDefinition(chaincodeName string, txSim ledger.QueryExecutor) (ccprovider.ChaincodeDefinition, error)
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
}

// LedgerGetter is used to get ledgers for chaincode.
type LedgerGetter interface {
	GetLedger(cid string) ledger.PeerLedger
}

// UUIDGenerator is responsible for creating unique query identifiers.
type UUIDGenerator interface {
	New() string
}
type UUIDGeneratorFunc func() string

func (u UUIDGeneratorFunc) New() string { return u() }

113
114
115
116
117
118
119
// ApplicationConfigRetriever to retrieve the application configuration for a channel
type ApplicationConfigRetriever interface {
	// GetApplicationConfig returns the channelconfig.Application for the channel
	// and whether the Application config exists
	GetApplicationConfig(cid string) (channelconfig.Application, bool)
}

120
// Handler implements the peer side of the chaincode stream.
121
type Handler struct {
122
123
124
125
	// Keepalive specifies the interval at which keep-alive messages are sent.
	Keepalive time.Duration
	// SystemCCVersion specifies the current system chaincode version
	SystemCCVersion string
126
127
128
	// DefinitionGetter is used to retrieve the chaincode definition from the
	// Lifecycle System Chaincode.
	DefinitionGetter ChaincodeDefinitionGetter
129
130
	// Invoker is used to invoke chaincode.
	Invoker Invoker
131
132
133
134
135
136
	// Registry is used to track active handlers.
	Registry Registry
	// ACLProvider is used to check if a chaincode invocation should be allowed.
	ACLProvider ACLProvider
	// TXContexts is a collection of TransactionContext instances
	// that are accessed by channel name and transaction ID.
137
	TXContexts ContextRegistry
138
	// activeTransactions holds active transaction identifiers.
139
	ActiveTransactions TransactionRegistry
140
141
	// SystemCCProvider provides access to system chaincode metadata
	SystemCCProvider SystemCCProvider
142
143
	// InstantiationPolicyChecker is used to evaluate the chaincode instantiation policies.
	InstantiationPolicyChecker InstantiationPolicyChecker
144
145
146
147
148
149
	// QueryResponeBuilder is used to build query responses
	QueryResponseBuilder QueryResponseBuilder
	// LedgerGetter is used to get the ledger associated with a channel
	LedgerGetter LedgerGetter
	// UUIDGenerator is used to generate UUIDs
	UUIDGenerator UUIDGenerator
150
151
	// AppConfig is used to retrieve the application config for a channel
	AppConfig ApplicationConfigRetriever
152
153
154

	// state holds the current handler state. It will be created, established, or
	// ready.
155
	state State
156
	// chaincodeID holds the ID of the chaincode that registered with the peer.
157
	chaincodeID *pb.ChaincodeID
158
159
160
161
162
163
164
165
166
167
168
	// ccInstances holds information about the chaincode instance associated with
	// the peer.
	ccInstance *sysccprovider.ChaincodeInstance

	// serialLock is used to serialize sends across the grpc chat stream.
	serialLock sync.Mutex
	// chatStream is the bidirectional grpc stream used to communicate with the
	// chaincode instance.
	chatStream ccintf.ChaincodeStream
	// errChan is used to communicate errors from the async send to the receive loop
	errChan chan error
169
	// Metrics holds chaincode handler metrics
Will Lahti's avatar
Will Lahti committed
170
	Metrics *HandlerMetrics
171
172
}

173
174
// handleMessage is called by ProcessStream to dispatch messages.
func (h *Handler) handleMessage(msg *pb.ChaincodeMessage) error {
175
	chaincodeLogger.Debugf("[%s] Fabric side handling ChaincodeMessage of type: %s in state %s", shorttxid(msg.Txid), msg.Type, h.state)
176

177
178
179
180
	if msg.Type == pb.ChaincodeMessage_KEEPALIVE {
		return nil
	}

181
	switch h.state {
182
	case Created:
183
		return h.handleMessageCreatedState(msg)
184
	case Ready:
185
186
		return h.handleMessageReadyState(msg)
	default:
187
		return errors.Errorf("handle message: invalid state %s for transaction %s", h.state, msg.Txid)
188
189
190
191
192
193
	}
}

func (h *Handler) handleMessageCreatedState(msg *pb.ChaincodeMessage) error {
	switch msg.Type {
	case pb.ChaincodeMessage_REGISTER:
194
		h.HandleRegister(msg)
195
	default:
196
		return fmt.Errorf("[%s] Fabric side handler cannot handle message (%s) while in created state", msg.Txid, msg.Type)
197
198
199
200
201
202
203
	}
	return nil
}

func (h *Handler) handleMessageReadyState(msg *pb.ChaincodeMessage) error {
	switch msg.Type {
	case pb.ChaincodeMessage_COMPLETED, pb.ChaincodeMessage_ERROR:
204
		h.Notify(msg)
205
206

	case pb.ChaincodeMessage_PUT_STATE:
207
		go h.HandleTransaction(msg, h.HandlePutState)
208
	case pb.ChaincodeMessage_DEL_STATE:
209
		go h.HandleTransaction(msg, h.HandleDelState)
210
	case pb.ChaincodeMessage_INVOKE_CHAINCODE:
211
		go h.HandleTransaction(msg, h.HandleInvokeChaincode)
212
213

	case pb.ChaincodeMessage_GET_STATE:
214
		go h.HandleTransaction(msg, h.HandleGetState)
215
	case pb.ChaincodeMessage_GET_STATE_BY_RANGE:
216
		go h.HandleTransaction(msg, h.HandleGetStateByRange)
217
	case pb.ChaincodeMessage_GET_QUERY_RESULT:
218
		go h.HandleTransaction(msg, h.HandleGetQueryResult)
219
	case pb.ChaincodeMessage_GET_HISTORY_FOR_KEY:
220
		go h.HandleTransaction(msg, h.HandleGetHistoryForKey)
221
	case pb.ChaincodeMessage_QUERY_STATE_NEXT:
222
		go h.HandleTransaction(msg, h.HandleQueryStateNext)
223
	case pb.ChaincodeMessage_QUERY_STATE_CLOSE:
224
		go h.HandleTransaction(msg, h.HandleQueryStateClose)
225

226
227
228
229
	case pb.ChaincodeMessage_GET_STATE_METADATA:
		go h.HandleTransaction(msg, h.HandleGetStateMetadata)
	case pb.ChaincodeMessage_PUT_STATE_METADATA:
		go h.HandleTransaction(msg, h.HandlePutStateMetadata)
230
	default:
231
		return fmt.Errorf("[%s] Fabric side handler cannot handle message (%s) while in ready state", msg.Txid, msg.Type)
232
233
234
235
236
	}

	return nil
}

237
238
239
240
type MessageHandler interface {
	Handle(*pb.ChaincodeMessage, *TransactionContext) (*pb.ChaincodeMessage, error)
}

241
type handleFunc func(*pb.ChaincodeMessage, *TransactionContext) (*pb.ChaincodeMessage, error)
242

243
244
// HandleTransaction is a middleware function that obtains and verifies a transaction
// context prior to forwarding the message to the provided delegate. Response messages
245
// returned by the delegate are sent to the chat stream. Any errors returned by the
246
247
// delegate are packaged as chaincode error messages.
func (h *Handler) HandleTransaction(msg *pb.ChaincodeMessage, delegate handleFunc) {
248
	chaincodeLogger.Debugf("[%s] handling %s from chaincode", shorttxid(msg.Txid), msg.Type.String())
249
250
251
252
	if !h.registerTxid(msg) {
		return
	}

253
	startTime := time.Now()
254
	var txContext *TransactionContext
255
	var err error
256
	if msg.Type == pb.ChaincodeMessage_INVOKE_CHAINCODE {
257
		txContext, err = h.getTxContextForInvoke(msg.ChannelId, msg.Txid, msg.Payload, "")
258
	} else {
259
		txContext, err = h.isValidTxSim(msg.ChannelId, msg.Txid, "no ledger context")
260
261
	}

262
	chaincodeName := h.chaincodeID.Name + ":" + h.chaincodeID.Version
Will Lahti's avatar
Will Lahti committed
263
	meterLabels := []string{
264
265
266
		"type", msg.Type.String(),
		"channel", msg.ChannelId,
		"chaincode", chaincodeName,
Will Lahti's avatar
Will Lahti committed
267
268
	}
	h.Metrics.ShimRequestsReceived.With(meterLabels...).Add(1)
269

270
271
272
	var resp *pb.ChaincodeMessage
	if err == nil {
		resp, err = delegate(msg, txContext)
273
274
	}

275
276
	if err != nil {
		err = errors.Wrapf(err, "%s failed: transaction ID: %s", msg.Type, msg.Txid)
277
		chaincodeLogger.Errorf("[%s] Failed to handle %s. error: %+v", shorttxid(msg.Txid), msg.Type, err)
278
		resp = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: []byte(err.Error()), Txid: msg.Txid, ChannelId: msg.ChannelId}
279
280
	}

281
	chaincodeLogger.Debugf("[%s] Completed %s. Sending %s", shorttxid(msg.Txid), msg.Type, resp.Type)
282
	h.ActiveTransactions.Remove(msg.ChannelId, msg.Txid)
283
	h.serialSendAsync(resp)
Will Lahti's avatar
Will Lahti committed
284
285
286
287

	meterLabels = append(meterLabels, "success", strconv.FormatBool(resp.Type != pb.ChaincodeMessage_ERROR))
	h.Metrics.ShimRequestDuration.With(meterLabels...).Observe(time.Since(startTime).Seconds())
	h.Metrics.ShimRequestsCompleted.With(meterLabels...).Add(1)
288
289
}

Gabor Hosszu's avatar
Gabor Hosszu committed
290
291
292
func shorttxid(txid string) string {
	if len(txid) < 8 {
		return txid
293
	}
Gabor Hosszu's avatar
Gabor Hosszu committed
294
	return txid[0:8]
295
296
}

297
298
299
// ParseName parses a chaincode name into a ChaincodeInstance. The name should
// be of the form "chaincode-name:version/channel-name" with optional elements.
func ParseName(ccName string) *sysccprovider.ChaincodeInstance {
300
	ci := &sysccprovider.ChaincodeInstance{}
301

302
303
304
	z := strings.SplitN(ccName, "/", 2)
	if len(z) == 2 {
		ci.ChainID = z[1]
305
	}
306
307
308
	z = strings.SplitN(z[0], ":", 2)
	if len(z) == 2 {
		ci.ChaincodeVersion = z[1]
309
	}
310
	ci.ChaincodeName = z[0]
311

312
	return ci
313
314
}

315
316
317
318
func (h *Handler) ChaincodeName() string {
	if h.ccInstance == nil {
		return ""
	}
Matthew Sykes's avatar
Matthew Sykes committed
319
	return h.ccInstance.ChaincodeName
320
321
}

322
// serialSend serializes msgs so gRPC will be happy
Matthew Sykes's avatar
Matthew Sykes committed
323
324
325
func (h *Handler) serialSend(msg *pb.ChaincodeMessage) error {
	h.serialLock.Lock()
	defer h.serialLock.Unlock()
326

327
	if err := h.chatStream.Send(msg); err != nil {
328
		err = errors.WithMessage(err, fmt.Sprintf("[%s] error sending %s", shorttxid(msg.Txid), msg.Type))
329
		chaincodeLogger.Errorf("%+v", err)
330
		return err
331
	}
332
333

	return nil
334
335
}

336
337
338
339
340
// serialSendAsync serves the same purpose as serialSend (serialize msgs so gRPC will
// be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop
// can be nonblocking. Only errors need to be handled and these are handled by
// communication on supplied error channel. A typical use will be a non-blocking or
// nil channel
341
func (h *Handler) serialSendAsync(msg *pb.ChaincodeMessage) {
342
	go func() {
Matthew Sykes's avatar
Matthew Sykes committed
343
		if err := h.serialSend(msg); err != nil {
344
345
346
347
348
349
			// provide an error response to the caller
			resp := &pb.ChaincodeMessage{
				Type:      pb.ChaincodeMessage_ERROR,
				Payload:   []byte(err.Error()),
				Txid:      msg.Txid,
				ChannelId: msg.ChannelId,
350
			}
351
352
353
354
			h.Notify(resp)

			// surface send error to stream processing
			h.errChan <- err
355
356
		}
	}()
357
358
}

359
// Check if the transactor is allow to call this chaincode on this channel
Matthew Sykes's avatar
Matthew Sykes committed
360
func (h *Handler) checkACL(signedProp *pb.SignedProposal, proposal *pb.Proposal, ccIns *sysccprovider.ChaincodeInstance) error {
361
362
	// ensure that we don't invoke a system chaincode
	// that is not invokable through a cc2cc invocation
363
	if h.SystemCCProvider.IsSysCCAndNotInvokableCC2CC(ccIns.ChaincodeName) {
364
		return errors.Errorf("system chaincode %s cannot be invoked with a cc2cc invocation", ccIns.ChaincodeName)
365
366
367
368
	}

	// if we are here, all we know is that the invoked chaincode is either
	// - a system chaincode that *is* invokable through a cc2cc
369
370
371
	//   (but we may still have to determine whether the invoker can perform this invocation)
	// - an application chaincode
	//   (and we still need to determine whether the invoker can invoke it)
372

373
	if h.SystemCCProvider.IsSysCC(ccIns.ChaincodeName) {
374
375
376
377
378
379
		// Allow this call
		return nil
	}

	// A Nil signedProp will be rejected for non-system chaincodes
	if signedProp == nil {
380
		return errors.Errorf("signed proposal must not be nil from caller [%s]", ccIns.String())
381
382
	}

383
	return h.ACLProvider.CheckACL(resources.Peer_ChaincodeToChaincode, ccIns.ChainID, signedProp)
384
385
}

386
387
388
389
390
391
392
func (h *Handler) deregister() {
	if h.chaincodeID != nil {
		h.Registry.Deregister(h.chaincodeID.Name)
	}
}

func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
Matthew Sykes's avatar
Matthew Sykes committed
393
	defer h.deregister()
394

395
396
397
	h.chatStream = stream
	h.errChan = make(chan error, 1)

398
399
400
401
402
403
404
	var keepaliveCh <-chan time.Time
	if h.Keepalive != 0 {
		ticker := time.NewTicker(h.Keepalive)
		defer ticker.Stop()
		keepaliveCh = ticker.C
	}

405
	// holds return values from gRPC Recv below
406
407
408
409
	type recvMsg struct {
		msg *pb.ChaincodeMessage
		err error
	}
410
	msgAvail := make(chan *recvMsg, 1)
411

412
413
414
415
	receiveMessage := func() {
		in, err := h.chatStream.Recv()
		msgAvail <- &recvMsg{in, err}
	}
416

417
	go receiveMessage()
418
419
	for {
		select {
420
421
		case rmsg := <-msgAvail:
			switch {
422
			// Defer the deregistering of the this handler.
423
424
425
426
427
			case rmsg.err == io.EOF:
				chaincodeLogger.Debugf("received EOF, ending chaincode support stream: %s", rmsg.err)
				return rmsg.err
			case rmsg.err != nil:
				err := errors.Wrap(rmsg.err, "receive failed")
428
				chaincodeLogger.Errorf("handling chaincode support stream: %+v", err)
429
				return err
430
			case rmsg.msg == nil:
431
				err := errors.New("received nil message, ending chaincode support stream")
432
				chaincodeLogger.Debugf("%+v", err)
433
				return err
434
435
436
437
438
439
440
441
442
			default:
				err := h.handleMessage(rmsg.msg)
				if err != nil {
					err = errors.WithMessage(err, "error handling message, ending stream")
					chaincodeLogger.Errorf("[%s] %+v", shorttxid(rmsg.msg.Txid), err)
					return err
				}

				go receiveMessage()
443
			}
444

Matthew Sykes's avatar
Matthew Sykes committed
445
		case sendErr := <-h.errChan:
446
			err := errors.Wrapf(sendErr, "received error while sending message, ending chaincode support stream")
447
448
			chaincodeLogger.Errorf("%s", err)
			return err
449
		case <-keepaliveCh:
450
451
			// if no error message from serialSend, KEEPALIVE happy, and don't care about error
			// (maybe it'll work later)
452
			h.serialSendAsync(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE})
453
454
455
456
457
			continue
		}
	}
}

458
// sendReady sends READY to chaincode serially (just like REGISTER)
Matthew Sykes's avatar
Matthew Sykes committed
459
func (h *Handler) sendReady() error {
460
	chaincodeLogger.Debugf("sending READY for chaincode %+v", h.chaincodeID)
461
462
	ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY}

463
	// if error in sending tear down the h
Matthew Sykes's avatar
Matthew Sykes committed
464
	if err := h.serialSend(ccMsg); err != nil {
465
		chaincodeLogger.Errorf("error sending READY (%s) for chaincode %+v", err, h.chaincodeID)
466
467
468
		return err
	}

469
	h.state = Ready
470

471
	chaincodeLogger.Debugf("Changed to state ready for chaincode %+v", h.chaincodeID)
472
473
474
475

	return nil
}

476
477
478
479
480
// notifyRegistry will send ready on registration success and
// update the launch state of the chaincode in the handler registry.
func (h *Handler) notifyRegistry(err error) {
	if err == nil {
		err = h.sendReady()
481
482
483
	}

	if err != nil {
484
485
486
		h.Registry.Failed(h.chaincodeID.Name, err)
		chaincodeLogger.Errorf("failed to start %s", h.chaincodeID)
		return
487
	}
488
489

	h.Registry.Ready(h.chaincodeID.Name)
490
491
}

492
// handleRegister is invoked when chaincode tries to register.
493
func (h *Handler) HandleRegister(msg *pb.ChaincodeMessage) {
Matthew Sykes's avatar
Matthew Sykes committed
494
	chaincodeLogger.Debugf("Received %s in state %s", msg.Type, h.state)
495
496
497
	chaincodeID := &pb.ChaincodeID{}
	err := proto.Unmarshal(msg.Payload, chaincodeID)
	if err != nil {
498
		chaincodeLogger.Errorf("Error in received %s, could NOT unmarshal registration info: %s", pb.ChaincodeMessage_REGISTER, err)
499
500
501
502
		return
	}

	// Now register with the chaincodeSupport
503
504
	h.chaincodeID = chaincodeID
	err = h.Registry.Register(h)
505
	if err != nil {
506
		h.notifyRegistry(err)
507
508
509
		return
	}

510
511
	// get the component parts so we can use the root chaincode
	// name in keys
512
	h.ccInstance = ParseName(h.chaincodeID.Name)
513

514
	chaincodeLogger.Debugf("Got %s for chaincodeID = %s, sending back %s", pb.ChaincodeMessage_REGISTER, chaincodeID, pb.ChaincodeMessage_REGISTERED)
Matthew Sykes's avatar
Matthew Sykes committed
515
	if err := h.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil {
516
517
		chaincodeLogger.Errorf("error sending %s: %s", pb.ChaincodeMessage_REGISTERED, err)
		h.notifyRegistry(err)
518
519
		return
	}
520

521
	h.state = Established
522

523
	chaincodeLogger.Debugf("Changed state to established for %+v", h.chaincodeID)
524

525
	// for dev mode this will also move to ready automatically
526
	h.notifyRegistry(nil)
527
528
}

529
func (h *Handler) Notify(msg *pb.ChaincodeMessage) {
530
	tctx := h.TXContexts.Get(msg.ChannelId, msg.Txid)
531
	if tctx == nil {
532
		chaincodeLogger.Debugf("notifier Txid:%s, channelID:%s does not exist for handling message %s", msg.Txid, msg.ChannelId, msg.Type)
533
		return
534
	}
535

536
	chaincodeLogger.Debugf("[%s] notifying Txid:%s, channelID:%s", shorttxid(msg.Txid), msg.Txid, msg.ChannelId)
537
	tctx.ResponseNotifier <- msg
538
	tctx.CloseQueryIterators()
539
540
}

541
// is this a txid for which there is a valid txsim
542
func (h *Handler) isValidTxSim(channelID string, txid string, fmtStr string, args ...interface{}) (*TransactionContext, error) {
543
	txContext := h.TXContexts.Get(channelID, txid)
544
	if txContext == nil || txContext.TXSimulator == nil {
545
		err := errors.Errorf(fmtStr, args...)
546
		chaincodeLogger.Errorf("no ledger context: %s %s\n\n %+v", channelID, txid, err)
547
		return nil, err
548
549
550
551
	}
	return txContext, nil
}

552
// register Txid to prevent overlapping handle messages from chaincode
Matthew Sykes's avatar
Matthew Sykes committed
553
func (h *Handler) registerTxid(msg *pb.ChaincodeMessage) bool {
554
	// Check if this is the unique state request from this chaincode txid
555
556
557
558
559
560
561
562
	if h.ActiveTransactions.Add(msg.ChannelId, msg.Txid) {
		return true
	}

	// Log the issue and drop the request
	chaincodeName := "unknown"
	if h.chaincodeID != nil {
		chaincodeName = h.chaincodeID.Name
563
	}
564
	chaincodeLogger.Errorf("[%s] Another request pending for this CC: %s, Txid: %s, ChannelID: %s. Cannot process.", shorttxid(msg.Txid), chaincodeName, msg.Txid, msg.ChannelId)
565
	return false
566
567
}

568
func (h *Handler) checkMetadataCap(msg *pb.ChaincodeMessage) error {
569
570
571
	ac, exists := h.AppConfig.GetApplicationConfig(msg.ChannelId)
	if !exists {
		return errors.Errorf("application config does not exist for %s", msg.ChannelId)
572
573
574
	}

	if !ac.Capabilities().KeyLevelEndorsement() {
575
		return errors.New("key level endorsement is not enabled, channel application capability of V1_3 or later is required")
576
577
578
579
	}
	return nil
}

580
581
582
583
584
585
586
587
588
589
590
591
592
func errorIfCreatorHasNoReadAccess(chaincodeName, collection string, txContext *TransactionContext) error {
	accessAllowed, err := hasReadAccess(chaincodeName, collection, txContext)
	if err != nil {
		return err
	}
	if !accessAllowed {
		return errors.Errorf("tx creator does not have read access permission on privatedata in chaincodeName:%s collectionName: %s",
			chaincodeName, collection)
	}
	return nil
}

func hasReadAccess(chaincodeName, collection string, txContext *TransactionContext) (bool, error) {
593
594
595
596
597
	// check to see if read access has already been checked in the scope of this chaincode simulation
	if txContext.AllowedCollectionAccess[collection] {
		return true, nil
	}

598
599
600
601
602
	cc := common.CollectionCriteria{
		Channel:    txContext.ChainID,
		Namespace:  chaincodeName,
		Collection: collection,
	}
603
604
605
606
607
608
609
610
611
612

	accessAllowed, err := txContext.CollectionStore.HasReadAccess(cc, txContext.SignedProp, txContext.TXSimulator)
	if err != nil {
		return false, err
	}
	if accessAllowed {
		txContext.AllowedCollectionAccess[collection] = accessAllowed
	}

	return accessAllowed, err
613
614
}

615
// Handles query to ledger to get state
616
func (h *Handler) HandleGetState(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
617
	getState := &pb.GetState{}
618
619
	err := proto.Unmarshal(msg.Payload, getState)
	if err != nil {
620
		return nil, errors.Wrap(err, "unmarshal failed")
621
	}
622

623
	var res []byte
624
	chaincodeName := h.ChaincodeName()
625
	collection := getState.Collection
626
	chaincodeLogger.Debugf("[%s] getting state for chaincode %s, key %s, channel %s", shorttxid(msg.Txid), chaincodeName, getState.Key, txContext.ChainID)
627

628
	if isCollectionSet(collection) {
629
630
631
		if txContext.IsInitTransaction {
			return nil, errors.New("private data APIs are not allowed in chaincode Init()")
		}
632
633
634
635
		if err := errorIfCreatorHasNoReadAccess(chaincodeName, collection, txContext); err != nil {
			return nil, err
		}
		res, err = txContext.TXSimulator.GetPrivateData(chaincodeName, collection, getState.Key)
636
	} else {
637
		res, err = txContext.TXSimulator.GetState(chaincodeName, getState.Key)
638
639
	}
	if err != nil {
640
		return nil, errors.WithStack(err)
641
642
	}
	if res == nil {
643
		chaincodeLogger.Debugf("[%s] No state associated with key: %s. Sending %s with an empty payload", shorttxid(msg.Txid), getState.Key, pb.ChaincodeMessage_RESPONSE)
644
	}
645
646

	// Send response msg back to chaincode. GetState will not trigger event
647
	return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
648
649
}

650
651
// Handles query to ledger to get state metadata
func (h *Handler) HandleGetStateMetadata(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
652
653
	err := h.checkMetadataCap(msg)
	if err != nil {
654
		return nil, err
655
656
	}

657
	getStateMetadata := &pb.GetStateMetadata{}
658
	err = proto.Unmarshal(msg.Payload, getStateMetadata)
659
660
661
662
663
	if err != nil {
		return nil, errors.Wrap(err, "unmarshal failed")
	}

	chaincodeName := h.ChaincodeName()
664
	collection := getStateMetadata.Collection
665
666
667
	chaincodeLogger.Debugf("[%s] getting state metadata for chaincode %s, key %s, channel %s", shorttxid(msg.Txid), chaincodeName, getStateMetadata.Key, txContext.ChainID)

	var metadata map[string][]byte
668
	if isCollectionSet(collection) {
669
670
671
		if txContext.IsInitTransaction {
			return nil, errors.New("private data APIs are not allowed in chaincode Init()")
		}
672
673
674
675
		if err := errorIfCreatorHasNoReadAccess(chaincodeName, collection, txContext); err != nil {
			return nil, err
		}
		metadata, err = txContext.TXSimulator.GetPrivateDataMetadata(chaincodeName, collection, getStateMetadata.Key)
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
	} else {
		metadata, err = txContext.TXSimulator.GetStateMetadata(chaincodeName, getStateMetadata.Key)
	}
	if err != nil {
		return nil, errors.WithStack(err)
	}
	var metadataResult pb.StateMetadataResult
	for metakey := range metadata {
		md := &pb.StateMetadata{Metakey: metakey, Value: metadata[metakey]}
		metadataResult.Entries = append(metadataResult.Entries, md)
	}
	res, err := proto.Marshal(&metadataResult)
	if err != nil {
		return nil, errors.WithStack(err)
	}

	// Send response msg back to chaincode. GetState will not trigger event
	return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

696
// Handles query to ledger to rage query state
697
func (h *Handler) HandleGetStateByRange(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
698
	getStateByRange := &pb.GetStateByRange{}
699
700
	err := proto.Unmarshal(msg.Payload, getStateByRange)
	if err != nil {
701
		return nil, errors.Wrap(err, "unmarshal failed")
702
	}
703

704
705
706
707
708
709
710
	metadata, err := getQueryMetadataFromBytes(getStateByRange.Metadata)
	if err != nil {
		return nil, err
	}

	totalReturnLimit := calculateTotalReturnLimit(metadata)

711
	iterID := h.UUIDGenerator.New()
712

713
	var rangeIter commonledger.ResultsIterator
714
715
716
717
	var paginationInfo map[string]interface{}

	isPaginated := false

718
719
720
	chaincodeName := h.ChaincodeName()
	collection := getStateByRange.Collection
	if isCollectionSet(collection) {
721
722
723
		if txContext.IsInitTransaction {
			return nil, errors.New("private data APIs are not allowed in chaincode Init()")
		}
724
725
726
727
		if err := errorIfCreatorHasNoReadAccess(chaincodeName, collection, txContext); err != nil {
			return nil, err
		}
		rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(chaincodeName, collection,
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
			getStateByRange.StartKey, getStateByRange.EndKey)
	} else if isMetadataSetForPagination(metadata) {
		paginationInfo, err = createPaginationInfoFromMetadata(metadata, totalReturnLimit, pb.ChaincodeMessage_GET_STATE_BY_RANGE)
		if err != nil {
			return nil, err
		}
		isPaginated = true

		startKey := getStateByRange.StartKey

		if isMetadataSetForPagination(metadata) {
			if metadata.Bookmark != "" {
				startKey = metadata.Bookmark
			}
		}
		rangeIter, err = txContext.TXSimulator.GetStateRangeScanIteratorWithMetadata(chaincodeName,
			startKey, getStateByRange.EndKey, paginationInfo)
745
	} else {
746
		rangeIter, err = txContext.TXSimulator.GetStateRangeScanIterator(chaincodeName, getStateByRange.StartKey, getStateByRange.EndKey)
747
748
	}
	if err != nil {
749
		return nil, errors.WithStack(err)
750
751
	}
	txContext.InitializeQueryContext(iterID, rangeIter)
752
753

	payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, rangeIter, iterID, isPaginated, totalReturnLimit)
754
	if err != nil {
755
		txContext.CleanupQueryContext(iterID)
756
		return nil, errors.WithStack(err)
757
	}
758

759
	payloadBytes, err := proto.Marshal(payload)
760
	if err != nil {
761
		txContext.CleanupQueryContext(iterID)
762
		return nil, errors.Wrap(err, "marshal failed")
763
	}
764

765
	chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
766
	return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
767
768
}

denyeart's avatar
denyeart committed
769
// Handles query to ledger for query state next
770
func (h *Handler) HandleQueryStateNext(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
771
772
773
	queryStateNext := &pb.QueryStateNext{}
	err := proto.Unmarshal(msg.Payload, queryStateNext)
	if err != nil {
774
		return nil, errors.Wrap(err, "unmarshal failed")
775
	}
776

777
778
	queryIter := txContext.GetQueryIterator(queryStateNext.Id)
	if queryIter == nil {
779
		return nil, errors.New("query iterator not found")
780
	}
781

782
783
784
	totalReturnLimit := calculateTotalReturnLimit(nil)

	payload, err := h.QueryResponseBuilder.BuildQueryResponse(txContext, queryIter, queryStateNext.Id, false, totalReturnLimit)
785
	if err != nil {
786
		txContext.CleanupQueryContext(queryStateNext.Id)
787
		return nil, errors.WithStack(err)
788
	}
789

790
791
	payloadBytes, err := proto.Marshal(payload)
	if err != nil {
792
		txContext.CleanupQueryContext(queryStateNext.Id)
793
		return nil, errors.Wrap(err, "marshal failed")
794
	}
795

796
	return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
797
798
799
}

// Handles the closing of a state iterator
800
func (h *Handler) HandleQueryStateClose(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
801
	queryStateClose := &pb.QueryStateClose{}
802
803
	err := proto.Unmarshal(msg.Payload, queryStateClose)
	if err != nil {
804
		return nil, errors.Wrap(err, "unmarshal failed")
805
	}
806

807
808
809
810
	iter := txContext.GetQueryIterator(queryStateClose.Id)
	if iter != nil {
		txContext.CleanupQueryContext(queryStateClose.Id)
	}
811

812
813
814
	payload := &pb.QueryResponse{HasMore: false, Id: queryStateClose.Id}
	payloadBytes, err := proto.Marshal(payload)
	if err != nil {
815
		return nil, errors.Wrap(err, "marshal failed")
816
	}
817

818
	return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
819
820
}

821
// Handles query to ledger to execute query state
822
823
func (h *Handler) HandleGetQueryResult(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
	iterID := h.UUIDGenerator.New()
824

825
	getQueryResult := &pb.GetQueryResult{}
826
827
	err := proto.Unmarshal(msg.Payload, getQueryResult)
	if err != nil {
828
		return nil, errors.Wrap(err, "unmarshal failed")
829
	}
830

831
832
833
834
835
836
837
838
	metadata, err := getQueryMetadataFromBytes(getQueryResult.Metadata)
	if err != nil {
		return nil, err
	}

	totalReturnLimit := calculateTotalReturnLimit(metadata)
	isPaginated := false

839
	var executeIter commonledger.ResultsIterator
840
841
	var paginationInfo map[string]interface{}

842
843
844
	chaincodeName := h.ChaincodeName()
	collection := getQueryResult.Collection
	if isCollectionSet(collection) {
845
846
847
		if txContext.IsInitTransaction {
			return nil, errors.New("private data APIs are not allowed in chaincode Init()")
		}
848
849
850
851
		if err := errorIfCreatorHasNoReadAccess(chaincodeName, collection, txContext); err != nil {
			return nil, err
		}
		executeIter, err = txContext.TXSimulator.ExecuteQueryOnPrivateData(chaincodeName, collection, getQueryResult.Query)
852
853
854
855
856
857
858
859
860
	} else if isMetadataSetForPagination(metadata) {
		paginationInfo, err = createPaginationInfoFromMetadata(metadata, totalReturnLimit, pb.ChaincodeMessage_GET_QUERY_RESULT)
		if err != nil {
			return nil, err
		}
		isPaginated = true
		executeIter, err = txContext.TXSimulator.ExecuteQueryWithMetadata(chaincodeName,
			getQueryResult.Query, paginationInfo)

861
	} else {
862
		executeIter, err = txContext.TXSimulator.ExecuteQuery(chaincodeName, getQueryResult.Query)
863
864
	}
	if err != nil {
865
		return nil, errors.WithStack(err)
866
	}
867

868
	txContext.InitializeQueryContext(iterID, executeIter)
869

870
	payload, err := h.QueryResponseBuilder.BuildQueryResponse