handler.go 61.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package chaincode

import (
	"fmt"
	"io"
	"sync"
	"time"

	"github.com/golang/protobuf/proto"
	ccintf "github.com/hyperledger/fabric/core/container/ccintf"
	"github.com/hyperledger/fabric/core/crypto"
	"github.com/hyperledger/fabric/core/ledger/statemgmt"
29
	ledgernext "github.com/hyperledger/fabric/core/ledgernext"
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
	"github.com/hyperledger/fabric/core/util"
	pb "github.com/hyperledger/fabric/protos"
	"github.com/looplab/fsm"
	"github.com/op/go-logging"
	"golang.org/x/net/context"

	"github.com/hyperledger/fabric/core/ledger"
)

const (
	createdstate     = "created"     //start state
	establishedstate = "established" //in: CREATED, rcv:  REGISTER, send: REGISTERED, INIT
	initstate        = "init"        //in:ESTABLISHED, rcv:-, send: INIT
	readystate       = "ready"       //in:ESTABLISHED,TRANSACTION, rcv:COMPLETED
	transactionstate = "transaction" //in:READY, rcv: xact from consensus, send: TRANSACTION
	busyinitstate    = "busyinit"    //in:INIT, rcv: PUT_STATE, DEL_STATE, INVOKE_CHAINCODE
	busyxactstate    = "busyxact"    //in:TRANSACION, rcv: PUT_STATE, DEL_STATE, INVOKE_CHAINCODE
	endstate         = "end"         //in:INIT,ESTABLISHED, rcv: error, terminate container

)

var chaincodeLogger = logging.MustGetLogger("chaincode")

// MessageHandler interface for handling chaincode messages (common between Peer chaincode support and chaincode)
type MessageHandler interface {
	HandleMessage(msg *pb.ChaincodeMessage) error
	SendMessage(msg *pb.ChaincodeMessage) error
}

type transactionContext struct {
	transactionSecContext *pb.Transaction
	responseNotifier      chan *pb.ChaincodeMessage

	// tracks open iterators used for range queries
	rangeQueryIteratorMap map[string]statemgmt.RangeScanIterator
65
66

	txsimulator ledgernext.TxSimulator
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
}

type nextStateInfo struct {
	msg      *pb.ChaincodeMessage
	sendToCC bool
}

// Handler responsbile for management of Peer's side of chaincode stream
type Handler struct {
	sync.RWMutex
	//peer to shim grpc serializer. User only in serialSend
	serialLock  sync.Mutex
	ChatStream  ccintf.ChaincodeStream
	FSM         *fsm.FSM
	ChaincodeID *pb.ChaincodeID

	// A copy of decrypted deploy tx this handler manages, no code
	deployTXSecContext *pb.Transaction

	chaincodeSupport *ChaincodeSupport
	registered       bool
	readyNotify      chan bool
Gabor Hosszu's avatar
Gabor Hosszu committed
89
	// Map of tx txid to either invoke or query tx (decrypted). Each tx will be
90
91
92
	// added prior to execute and remove when done execute
	txCtxs map[string]*transactionContext

Gabor Hosszu's avatar
Gabor Hosszu committed
93
	txidMap map[string]bool
94

Gabor Hosszu's avatar
Gabor Hosszu committed
95
	// Track which TXIDs are queries; Although the shim maintains this, it cannot be trusted.
96
97
98
99
100
101
	isTransaction map[string]bool

	// used to do Send after making sure the state transition is complete
	nextState chan *nextStateInfo
}

Gabor Hosszu's avatar
Gabor Hosszu committed
102
103
104
func shorttxid(txid string) string {
	if len(txid) < 8 {
		return txid
105
	}
Gabor Hosszu's avatar
Gabor Hosszu committed
106
	return txid[0:8]
107
108
109
110
111
112
113
114
115
116
117
118
}

func (handler *Handler) serialSend(msg *pb.ChaincodeMessage) error {
	handler.serialLock.Lock()
	defer handler.serialLock.Unlock()
	if err := handler.ChatStream.Send(msg); err != nil {
		chaincodeLogger.Errorf("Error sending %s: %s", msg.Type.String(), err)
		return fmt.Errorf("Error sending %s: %s", msg.Type.String(), err)
	}
	return nil
}

119
func (handler *Handler) createTxContext(ctxt context.Context, txid string, tx *pb.Transaction) (*transactionContext, error) {
120
	if handler.txCtxs == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
121
		return nil, fmt.Errorf("cannot create notifier for txid:%s", txid)
122
123
124
	}
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
125
126
	if handler.txCtxs[txid] != nil {
		return nil, fmt.Errorf("txid:%s exists", txid)
127
128
129
	}
	txctx := &transactionContext{transactionSecContext: tx, responseNotifier: make(chan *pb.ChaincodeMessage, 1),
		rangeQueryIteratorMap: make(map[string]statemgmt.RangeScanIterator)}
Gabor Hosszu's avatar
Gabor Hosszu committed
130
	handler.txCtxs[txid] = txctx
131
132
133
134
	if txsim, ok := ctxt.Value(TXSimulatorKey).(ledgernext.TxSimulator); ok {
		txctx.txsimulator = txsim
	}

135
136
137
	return txctx, nil
}

Gabor Hosszu's avatar
Gabor Hosszu committed
138
func (handler *Handler) getTxContext(txid string) *transactionContext {
139
140
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
141
	return handler.txCtxs[txid]
142
143
}

Gabor Hosszu's avatar
Gabor Hosszu committed
144
func (handler *Handler) deleteTxContext(txid string) {
145
146
147
	handler.Lock()
	defer handler.Unlock()
	if handler.txCtxs != nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
148
		delete(handler.txCtxs, txid)
149
150
151
	}
}

Gabor Hosszu's avatar
Gabor Hosszu committed
152
func (handler *Handler) putRangeQueryIterator(txContext *transactionContext, txid string,
153
154
155
	rangeScanIterator statemgmt.RangeScanIterator) {
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
156
	txContext.rangeQueryIteratorMap[txid] = rangeScanIterator
157
158
}

Gabor Hosszu's avatar
Gabor Hosszu committed
159
func (handler *Handler) getRangeQueryIterator(txContext *transactionContext, txid string) statemgmt.RangeScanIterator {
160
161
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
162
	return txContext.rangeQueryIteratorMap[txid]
163
164
}

Gabor Hosszu's avatar
Gabor Hosszu committed
165
func (handler *Handler) deleteRangeQueryIterator(txContext *transactionContext, txid string) {
166
167
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
168
	delete(txContext.rangeQueryIteratorMap, txid)
169
170
}

171
172
173
//THIS CAN BE REMOVED ONCE WE FULL SUPPORT (Invoke and Query) CONFIDENTIALITY WITH CC-CALLING-CC
//Only invocation are allowed, not queries
func (handler *Handler) canCallChaincode(txid string, isQuery bool) *pb.ChaincodeMessage {
174
175
176
177
178
179
	secHelper := handler.chaincodeSupport.getSecHelper()
	if secHelper == nil {
		return nil
	}

	var errMsg string
Gabor Hosszu's avatar
Gabor Hosszu committed
180
	txctx := handler.getTxContext(txid)
181
	if txctx == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
182
		errMsg = fmt.Sprintf("[%s]Error no context while checking for confidentiality. Sending %s", shorttxid(txid), pb.ChaincodeMessage_ERROR)
183
	} else if txctx.transactionSecContext == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
184
		errMsg = fmt.Sprintf("[%s]Error transaction context is nil while checking for confidentiality. Sending %s", shorttxid(txid), pb.ChaincodeMessage_ERROR)
185
	} else if txctx.transactionSecContext.ConfidentialityLevel != pb.ConfidentialityLevel_PUBLIC {
186
187
188
		if isQuery {
			errMsg = fmt.Sprintf("[%s]Error chaincode-chaincode interactions not supported for with privacy enabled. Sending %s", shorttxid(txid), pb.ChaincodeMessage_ERROR)
		}
189
190
191
	}

	if errMsg != "" {
Gabor Hosszu's avatar
Gabor Hosszu committed
192
		return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: []byte(errMsg), Txid: txid}
193
194
195
196
197
198
	}

	//not CONFIDENTIAL transaction, OK to call CC
	return nil
}

Gabor Hosszu's avatar
Gabor Hosszu committed
199
func (handler *Handler) encryptOrDecrypt(encrypt bool, txid string, payload []byte) ([]byte, error) {
200
201
202
203
204
	secHelper := handler.chaincodeSupport.getSecHelper()
	if secHelper == nil {
		return payload, nil
	}

Gabor Hosszu's avatar
Gabor Hosszu committed
205
	txctx := handler.getTxContext(txid)
206
	if txctx == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
207
		return nil, fmt.Errorf("[%s]No context for txid %s", shorttxid(txid), txid)
208
209
	}
	if txctx.transactionSecContext == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
210
		return nil, fmt.Errorf("[%s]transaction context is nil for txid %s", shorttxid(txid), txid)
211
212
213
214
215
216
217
218
219
220
	}
	// TODO: this must be removed
	if txctx.transactionSecContext.ConfidentialityLevel == pb.ConfidentialityLevel_PUBLIC {
		return payload, nil
	}

	var enc crypto.StateEncryptor
	var err error
	if txctx.transactionSecContext.Type == pb.Transaction_CHAINCODE_DEPLOY {
		if enc, err = secHelper.GetStateEncryptor(handler.deployTXSecContext, handler.deployTXSecContext); err != nil {
221
			chaincodeLogger.Errorf("error getting crypto encryptor for deploy tx :%s", err)
222
223
224
225
			return nil, fmt.Errorf("error getting crypto encryptor for deploy tx :%s", err)
		}
	} else if txctx.transactionSecContext.Type == pb.Transaction_CHAINCODE_INVOKE || txctx.transactionSecContext.Type == pb.Transaction_CHAINCODE_QUERY {
		if enc, err = secHelper.GetStateEncryptor(handler.deployTXSecContext, txctx.transactionSecContext); err != nil {
226
			chaincodeLogger.Errorf("error getting crypto encryptor %s", err)
227
228
229
230
231
232
			return nil, fmt.Errorf("error getting crypto encryptor %s", err)
		}
	} else {
		return nil, fmt.Errorf("invalid transaction type %s", txctx.transactionSecContext.Type.String())
	}
	if enc == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
233
		return nil, fmt.Errorf("secure context returns nil encryptor for tx %s", txid)
234
235
	}
	if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
Gabor Hosszu's avatar
Gabor Hosszu committed
236
		chaincodeLogger.Debugf("[%s]Payload before encrypt/decrypt: %v", shorttxid(txid), payload)
237
238
239
240
241
242
243
	}
	if encrypt {
		payload, err = enc.Encrypt(payload)
	} else {
		payload, err = enc.Decrypt(payload)
	}
	if chaincodeLogger.IsEnabledFor(logging.DEBUG) {
Gabor Hosszu's avatar
Gabor Hosszu committed
244
		chaincodeLogger.Debugf("[%s]Payload after encrypt/decrypt: %v", shorttxid(txid), payload)
245
246
247
248
249
	}

	return payload, err
}

Gabor Hosszu's avatar
Gabor Hosszu committed
250
251
func (handler *Handler) decrypt(txid string, payload []byte) ([]byte, error) {
	return handler.encryptOrDecrypt(false, txid, payload)
252
253
}

Gabor Hosszu's avatar
Gabor Hosszu committed
254
255
func (handler *Handler) encrypt(txid string, payload []byte) ([]byte, error) {
	return handler.encryptOrDecrypt(true, txid, payload)
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
}

func (handler *Handler) getSecurityBinding(tx *pb.Transaction) ([]byte, error) {
	secHelper := handler.chaincodeSupport.getSecHelper()
	if secHelper == nil {
		return nil, nil
	}

	return secHelper.GetTransactionBinding(tx)
}

func (handler *Handler) deregister() error {
	if handler.registered {
		handler.chaincodeSupport.deregisterHandler(handler)
	}
	return nil
}

func (handler *Handler) triggerNextState(msg *pb.ChaincodeMessage, send bool) {
	handler.nextState <- &nextStateInfo{msg, send}
}

func (handler *Handler) waitForKeepaliveTimer() <-chan time.Time {
	if handler.chaincodeSupport.keepalive > 0 {
		c := time.After(handler.chaincodeSupport.keepalive)
		return c
	}
	//no one will signal this channel, listner blocks forever
	c := make(chan time.Time, 1)
	return c
}

func (handler *Handler) processStream() error {
	defer handler.deregister()
	msgAvail := make(chan *pb.ChaincodeMessage)
	var nsInfo *nextStateInfo
	var in *pb.ChaincodeMessage
	var err error

	//recv is used to spin Recv routine after previous received msg
	//has been processed
	recv := true
	for {
		in = nil
		err = nil
		nsInfo = nil
		if recv {
			recv = false
			go func() {
				var in2 *pb.ChaincodeMessage
				in2, err = handler.ChatStream.Recv()
				msgAvail <- in2
			}()
		}
		select {
		case in = <-msgAvail:
			// Defer the deregistering of the this handler.
			if err == io.EOF {
				chaincodeLogger.Debugf("Received EOF, ending chaincode support stream, %s", err)
				return err
			} else if err != nil {
				chaincodeLogger.Errorf("Error handling chaincode support stream: %s", err)
				return err
			} else if in == nil {
				err = fmt.Errorf("Received nil message, ending chaincode support stream")
				chaincodeLogger.Debug("Received nil message, ending chaincode support stream")
				return err
			}
Gabor Hosszu's avatar
Gabor Hosszu committed
324
			chaincodeLogger.Debugf("[%s]Received message %s from shim", shorttxid(in.Txid), in.Type.String())
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
			if in.Type.String() == pb.ChaincodeMessage_ERROR.String() {
				chaincodeLogger.Errorf("Got error: %s", string(in.Payload))
			}

			// we can spin off another Recv again
			recv = true

			if in.Type == pb.ChaincodeMessage_KEEPALIVE {
				chaincodeLogger.Debug("Received KEEPALIVE Response")
				// Received a keep alive message, we don't do anything with it for now
				// and it does not touch the state machine
				continue
			}
		case nsInfo = <-handler.nextState:
			in = nsInfo.msg
			if in == nil {
				err = fmt.Errorf("Next state nil message, ending chaincode support stream")
				chaincodeLogger.Debug("Next state nil message, ending chaincode support stream")
				return err
			}
Gabor Hosszu's avatar
Gabor Hosszu committed
345
			chaincodeLogger.Debugf("[%s]Move state message %s", shorttxid(in.Txid), in.Type.String())
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
		case <-handler.waitForKeepaliveTimer():
			if handler.chaincodeSupport.keepalive <= 0 {
				chaincodeLogger.Errorf("Invalid select: keepalive not on (keepalive=%d)", handler.chaincodeSupport.keepalive)
				continue
			}

			//TODO we could use this to hook into container lifecycle (kill the chaincode if not in use, etc)
			kaerr := handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_KEEPALIVE})
			if kaerr != nil {
				chaincodeLogger.Errorf("Error sending keepalive, err=%s", kaerr)
			} else {
				chaincodeLogger.Debug("Sent KEEPALIVE request")
			}
			//keepalive message kicked in. just continue
			continue
		}

		err = handler.HandleMessage(in)
		if err != nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
365
			chaincodeLogger.Errorf("[%s]Error handling message, ending stream: %s", shorttxid(in.Txid), err)
366
367
368
369
			return fmt.Errorf("Error handling message, ending stream: %s", err)
		}

		if nsInfo != nil && nsInfo.sendToCC {
Gabor Hosszu's avatar
Gabor Hosszu committed
370
			chaincodeLogger.Debugf("[%s]sending state message %s", shorttxid(in.Txid), in.Type.String())
371
			if err = handler.serialSend(in); err != nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
372
373
				chaincodeLogger.Debugf("[%s]serial sending received error %s", shorttxid(in.Txid), err)
				return fmt.Errorf("[%s]serial sending received error %s", shorttxid(in.Txid), err)
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
			}
		}
	}
}

// HandleChaincodeStream Main loop for handling the associated Chaincode stream
func HandleChaincodeStream(chaincodeSupport *ChaincodeSupport, ctxt context.Context, stream ccintf.ChaincodeStream) error {
	deadline, ok := ctxt.Deadline()
	chaincodeLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)
	handler := newChaincodeSupportHandler(chaincodeSupport, stream)
	return handler.processStream()
}

func newChaincodeSupportHandler(chaincodeSupport *ChaincodeSupport, peerChatStream ccintf.ChaincodeStream) *Handler {
	v := &Handler{
		ChatStream: peerChatStream,
	}
	v.chaincodeSupport = chaincodeSupport
	//we want this to block
	v.nextState = make(chan *nextStateInfo)

	v.FSM = fsm.NewFSM(
		createdstate,
		fsm.Events{
			//Send REGISTERED, then, if deploy { trigger INIT(via INIT) } else { trigger READY(via COMPLETED) }
			{Name: pb.ChaincodeMessage_REGISTER.String(), Src: []string{createdstate}, Dst: establishedstate},
			{Name: pb.ChaincodeMessage_INIT.String(), Src: []string{establishedstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_READY.String(), Src: []string{establishedstate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_TRANSACTION.String(), Src: []string{readystate}, Dst: transactionstate},
			{Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{transactionstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{transactionstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{transactionstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_PUT_STATE.String(), Src: []string{initstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_DEL_STATE.String(), Src: []string{initstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_INVOKE_CHAINCODE.String(), Src: []string{initstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_COMPLETED.String(), Src: []string{initstate, readystate, transactionstate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{readystate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{initstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{busyinitstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{transactionstate}, Dst: transactionstate},
			{Name: pb.ChaincodeMessage_GET_STATE.String(), Src: []string{busyxactstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{readystate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{initstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{busyinitstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{transactionstate}, Dst: transactionstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE.String(), Src: []string{busyxactstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{readystate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{initstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{busyinitstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{transactionstate}, Dst: transactionstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String(), Src: []string{busyxactstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{readystate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{initstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{busyinitstate}, Dst: busyinitstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{transactionstate}, Dst: transactionstate},
			{Name: pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(), Src: []string{busyxactstate}, Dst: busyxactstate},
			{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{initstate}, Dst: endstate},
			{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{transactionstate}, Dst: readystate},
			{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{busyinitstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_ERROR.String(), Src: []string{busyxactstate}, Dst: transactionstate},
			{Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{busyinitstate}, Dst: initstate},
			{Name: pb.ChaincodeMessage_RESPONSE.String(), Src: []string{busyxactstate}, Dst: transactionstate},
		},
		fsm.Callbacks{
			"before_" + pb.ChaincodeMessage_REGISTER.String():               func(e *fsm.Event) { v.beforeRegisterEvent(e, v.FSM.Current()) },
			"before_" + pb.ChaincodeMessage_COMPLETED.String():              func(e *fsm.Event) { v.beforeCompletedEvent(e, v.FSM.Current()) },
			"before_" + pb.ChaincodeMessage_INIT.String():                   func(e *fsm.Event) { v.beforeInitState(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_GET_STATE.String():               func(e *fsm.Event) { v.afterGetState(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE.String():       func(e *fsm.Event) { v.afterRangeQueryState(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE_NEXT.String():  func(e *fsm.Event) { v.afterRangeQueryStateNext(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_RANGE_QUERY_STATE_CLOSE.String(): func(e *fsm.Event) { v.afterRangeQueryStateClose(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_PUT_STATE.String():               func(e *fsm.Event) { v.afterPutState(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_DEL_STATE.String():               func(e *fsm.Event) { v.afterDelState(e, v.FSM.Current()) },
			"after_" + pb.ChaincodeMessage_INVOKE_CHAINCODE.String():        func(e *fsm.Event) { v.afterInvokeChaincode(e, v.FSM.Current()) },
			"enter_" + establishedstate:                                     func(e *fsm.Event) { v.enterEstablishedState(e, v.FSM.Current()) },
			"enter_" + initstate:                                            func(e *fsm.Event) { v.enterInitState(e, v.FSM.Current()) },
			"enter_" + readystate:                                           func(e *fsm.Event) { v.enterReadyState(e, v.FSM.Current()) },
			"enter_" + busyinitstate:                                        func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
			"enter_" + busyxactstate:                                        func(e *fsm.Event) { v.enterBusyState(e, v.FSM.Current()) },
			"enter_" + endstate:                                             func(e *fsm.Event) { v.enterEndState(e, v.FSM.Current()) },
		},
	)

	return v
}

Gabor Hosszu's avatar
Gabor Hosszu committed
460
461
func (handler *Handler) createTXIDEntry(txid string) bool {
	if handler.txidMap == nil {
462
463
464
465
		return false
	}
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
466
	if handler.txidMap[txid] {
467
468
		return false
	}
Gabor Hosszu's avatar
Gabor Hosszu committed
469
470
	handler.txidMap[txid] = true
	return handler.txidMap[txid]
471
472
}

Gabor Hosszu's avatar
Gabor Hosszu committed
473
func (handler *Handler) deleteTXIDEntry(txid string) {
474
475
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
476
477
	if handler.txidMap != nil {
		delete(handler.txidMap, txid)
478
	} else {
Gabor Hosszu's avatar
Gabor Hosszu committed
479
		chaincodeLogger.Warningf("TXID %s not found!", txid)
480
481
482
	}
}

Gabor Hosszu's avatar
Gabor Hosszu committed
483
484
// markIsTransaction marks a TXID as a transaction or a query; true = transaction, false = query
func (handler *Handler) markIsTransaction(txid string, isTrans bool) bool {
485
486
487
488
489
	handler.Lock()
	defer handler.Unlock()
	if handler.isTransaction == nil {
		return false
	}
Gabor Hosszu's avatar
Gabor Hosszu committed
490
	handler.isTransaction[txid] = isTrans
491
492
493
	return true
}

Gabor Hosszu's avatar
Gabor Hosszu committed
494
func (handler *Handler) getIsTransaction(txid string) bool {
495
496
497
498
499
	handler.Lock()
	defer handler.Unlock()
	if handler.isTransaction == nil {
		return false
	}
Gabor Hosszu's avatar
Gabor Hosszu committed
500
	return handler.isTransaction[txid]
501
502
}

Gabor Hosszu's avatar
Gabor Hosszu committed
503
func (handler *Handler) deleteIsTransaction(txid string) {
504
505
506
	handler.Lock()
	defer handler.Unlock()
	if handler.isTransaction != nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
507
		delete(handler.isTransaction, txid)
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
	}
}

func (handler *Handler) notifyDuringStartup(val bool) {
	//if USER_RUNS_CC readyNotify will be nil
	if handler.readyNotify != nil {
		chaincodeLogger.Debug("Notifying during startup")
		handler.readyNotify <- val
	} else {
		chaincodeLogger.Debug("nothing to notify (dev mode ?)")
	}
}

// beforeRegisterEvent is invoked when chaincode tries to register.
func (handler *Handler) beforeRegisterEvent(e *fsm.Event, state string) {
	chaincodeLogger.Debugf("Received %s in state %s", e.Event, state)
	msg, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeID := &pb.ChaincodeID{}
	err := proto.Unmarshal(msg.Payload, chaincodeID)
	if err != nil {
		e.Cancel(fmt.Errorf("Error in received %s, could NOT unmarshal registration info: %s", pb.ChaincodeMessage_REGISTER, err))
		return
	}

	// Now register with the chaincodeSupport
	handler.ChaincodeID = chaincodeID
	err = handler.chaincodeSupport.registerHandler(handler)
	if err != nil {
		e.Cancel(err)
		handler.notifyDuringStartup(false)
		return
	}

	chaincodeLogger.Debugf("Got %s for chaincodeID = %s, sending back %s", e.Event, chaincodeID, pb.ChaincodeMessage_REGISTERED)
	if err := handler.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil {
		e.Cancel(fmt.Errorf("Error sending %s: %s", pb.ChaincodeMessage_REGISTERED, err))
		handler.notifyDuringStartup(false)
		return
	}
}

func (handler *Handler) notify(msg *pb.ChaincodeMessage) {
	handler.Lock()
	defer handler.Unlock()
Gabor Hosszu's avatar
Gabor Hosszu committed
556
	tctx := handler.txCtxs[msg.Txid]
557
	if tctx == nil {
Gabor Hosszu's avatar
Gabor Hosszu committed
558
		chaincodeLogger.Debugf("notifier Txid:%s does not exist", msg.Txid)
559
	} else {
Gabor Hosszu's avatar
Gabor Hosszu committed
560
		chaincodeLogger.Debugf("notifying Txid:%s", msg.Txid)
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
		tctx.responseNotifier <- msg

		// clean up rangeQueryIteratorMap
		for _, v := range tctx.rangeQueryIteratorMap {
			v.Close()
		}
	}
}

// beforeCompletedEvent is invoked when chaincode has completed execution of init, invoke or query.
func (handler *Handler) beforeCompletedEvent(e *fsm.Event, state string) {
	msg, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	// Notify on channel once into READY state
Gabor Hosszu's avatar
Gabor Hosszu committed
578
	chaincodeLogger.Debugf("[%s]beforeCompleted - not in ready state will notify when in readystate", shorttxid(msg.Txid))
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
	return
}

// beforeInitState is invoked before an init message is sent to the chaincode.
func (handler *Handler) beforeInitState(e *fsm.Event, state string) {
	chaincodeLogger.Debugf("Before state %s.. notifying waiter that we are up", state)
	handler.notifyDuringStartup(true)
}

// afterGetState handles a GET_STATE request from the chaincode.
func (handler *Handler) afterGetState(e *fsm.Event, state string) {
	msg, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
Gabor Hosszu's avatar
Gabor Hosszu committed
595
	chaincodeLogger.Debugf("[%s]Received %s, invoking get state from ledger", shorttxid(msg.Txid), pb.ChaincodeMessage_GET_STATE)
596
597
598
599
600
601
602
603
604
605
606

	// Query ledger for state
	handler.handleGetState(msg)
}

// Handles query to ledger to get state
func (handler *Handler) handleGetState(msg *pb.ChaincodeMessage) {
	// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
	// is completed before the next one is triggered. The previous state transition is deemed complete only when
	// the afterGetState function is exited. Interesting bug fix!!
	go func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
607
608
		// Check if this is the unique state request from this chaincode txid
		uniqueReq := handler.createTXIDEntry(msg.Txid)
609
610
		if !uniqueReq {
			// Drop this request
Gabor Hosszu's avatar
Gabor Hosszu committed
611
			chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
612
613
614
615
616
617
			return
		}

		var serialSendMsg *pb.ChaincodeMessage

		defer func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
618
619
			handler.deleteTXIDEntry(msg.Txid)
			chaincodeLogger.Debugf("[%s]handleGetState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
620
621
622
623
624
625
626
627
628
			handler.serialSend(serialSendMsg)
		}()

		key := string(msg.Payload)
		ledgerObj, ledgerErr := ledger.GetLedger()
		if ledgerErr != nil {
			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(ledgerErr.Error())
			chaincodeLogger.Errorf("Failed to get chaincode state(%s). Sending %s", ledgerErr, pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
629
630
			// Remove txid from current set
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
631
632
633
634
635
636
			return
		}

		// Invoke ledger to get state
		chaincodeID := handler.ChaincodeID.Name

Gabor Hosszu's avatar
Gabor Hosszu committed
637
		readCommittedState := !handler.getIsTransaction(msg.Txid)
638
639
640
641
642
643
644
645
646
647
		var res []byte
		var err error

		txContext := handler.getTxContext(msg.Txid)
		if txContext.txsimulator != nil {
			res, err = txContext.txsimulator.GetState(chaincodeID, key)
		} else {
			res, err = ledgerObj.GetState(chaincodeID, key, readCommittedState)
		}

648
649
650
		if err != nil {
			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(err.Error())
Gabor Hosszu's avatar
Gabor Hosszu committed
651
652
			chaincodeLogger.Errorf("[%s]Failed to get chaincode state(%s). Sending %s", shorttxid(msg.Txid), err, pb.ChaincodeMessage_ERROR)
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
653
654
		} else if res == nil {
			//The state object being requested does not exist, so don't attempt to decrypt it
Gabor Hosszu's avatar
Gabor Hosszu committed
655
656
			chaincodeLogger.Debugf("[%s]No state associated with key: %s. Sending %s with an empty payload", shorttxid(msg.Txid), key, pb.ChaincodeMessage_RESPONSE)
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid}
657
658
		} else {
			// Decrypt the data if the confidential is enabled
Gabor Hosszu's avatar
Gabor Hosszu committed
659
			if res, err = handler.decrypt(msg.Txid, res); err == nil {
660
				// Send response msg back to chaincode. GetState will not trigger event
Gabor Hosszu's avatar
Gabor Hosszu committed
661
662
				chaincodeLogger.Debugf("[%s]Got state. Sending %s", shorttxid(msg.Txid), pb.ChaincodeMessage_RESPONSE)
				serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: res, Txid: msg.Txid}
663
664
			} else {
				// Send err msg back to chaincode.
Gabor Hosszu's avatar
Gabor Hosszu committed
665
				chaincodeLogger.Errorf("[%s]Got error (%s) while decrypting. Sending %s", shorttxid(msg.Txid), err, pb.ChaincodeMessage_ERROR)
666
				errBytes := []byte(err.Error())
Gabor Hosszu's avatar
Gabor Hosszu committed
667
				serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: errBytes, Txid: msg.Txid}
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
			}

		}

	}()
}

const maxRangeQueryStateLimit = 100

// afterRangeQueryState handles a RANGE_QUERY_STATE request from the chaincode.
func (handler *Handler) afterRangeQueryState(e *fsm.Event, state string) {
	msg, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_RANGE_QUERY_STATE)

	// Query ledger for state
	handler.handleRangeQueryState(msg)
	chaincodeLogger.Debug("Exiting GET_STATE")
}

// Handles query to ledger to rage query state
func (handler *Handler) handleRangeQueryState(msg *pb.ChaincodeMessage) {
	// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
	// is completed before the next one is triggered. The previous state transition is deemed complete only when
	// the afterRangeQueryState function is exited. Interesting bug fix!!
	go func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
697
698
		// Check if this is the unique state request from this chaincode txid
		uniqueReq := handler.createTXIDEntry(msg.Txid)
699
700
		if !uniqueReq {
			// Drop this request
Gabor Hosszu's avatar
Gabor Hosszu committed
701
			chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
702
703
704
705
706
707
			return
		}

		var serialSendMsg *pb.ChaincodeMessage

		defer func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
708
709
			handler.deleteTXIDEntry(msg.Txid)
			chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
710
711
712
713
714
715
716
717
			handler.serialSend(serialSendMsg)
		}()

		rangeQueryState := &pb.RangeQueryState{}
		unmarshalErr := proto.Unmarshal(msg.Payload, rangeQueryState)
		if unmarshalErr != nil {
			payload := []byte(unmarshalErr.Error())
			chaincodeLogger.Errorf("Failed to unmarshall range query request. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
718
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
719
720
721
722
723
724
725
726
727
728
			return
		}

		hasNext := true

		ledger, ledgerErr := ledger.GetLedger()
		if ledgerErr != nil {
			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(ledgerErr.Error())
			chaincodeLogger.Errorf("Failed to get ledger. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
729
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
730
731
732
733
734
			return
		}

		chaincodeID := handler.ChaincodeID.Name

Gabor Hosszu's avatar
Gabor Hosszu committed
735
		readCommittedState := !handler.getIsTransaction(msg.Txid)
736
737
738
739
740
		rangeIter, err := ledger.GetStateRangeScanIterator(chaincodeID, rangeQueryState.StartKey, rangeQueryState.EndKey, readCommittedState)
		if err != nil {
			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(err.Error())
			chaincodeLogger.Errorf("Failed to get ledger scan iterator. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
741
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
742
743
744
745
			return
		}

		iterID := util.GenerateUUID()
Gabor Hosszu's avatar
Gabor Hosszu committed
746
		txContext := handler.getTxContext(msg.Txid)
747
748
749
750
751
752
753
754
755
		handler.putRangeQueryIterator(txContext, iterID, rangeIter)

		hasNext = rangeIter.Next()

		var keysAndValues []*pb.RangeQueryStateKeyValue
		var i = uint32(0)
		for ; hasNext && i < maxRangeQueryStateLimit; i++ {
			key, value := rangeIter.GetKeyValue()
			// Decrypt the data if the confidential is enabled
Gabor Hosszu's avatar
Gabor Hosszu committed
756
			decryptedValue, decryptErr := handler.decrypt(msg.Txid, value)
757
758
759
			if decryptErr != nil {
				payload := []byte(decryptErr.Error())
				chaincodeLogger.Errorf("Failed decrypt value. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
760
				serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786

				rangeIter.Close()
				handler.deleteRangeQueryIterator(txContext, iterID)

				return
			}
			keyAndValue := pb.RangeQueryStateKeyValue{Key: key, Value: decryptedValue}
			keysAndValues = append(keysAndValues, &keyAndValue)

			hasNext = rangeIter.Next()
		}

		if !hasNext {
			rangeIter.Close()
			handler.deleteRangeQueryIterator(txContext, iterID)
		}

		payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: hasNext, ID: iterID}
		payloadBytes, err := proto.Marshal(payload)
		if err != nil {
			rangeIter.Close()
			handler.deleteRangeQueryIterator(txContext, iterID)

			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(err.Error())
			chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
787
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
788
789
790
791
			return
		}

		chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
Gabor Hosszu's avatar
Gabor Hosszu committed
792
		serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816

	}()
}

// afterRangeQueryState handles a RANGE_QUERY_STATE_NEXT request from the chaincode.
func (handler *Handler) afterRangeQueryStateNext(e *fsm.Event, state string) {
	msg, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_RANGE_QUERY_STATE)

	// Query ledger for state
	handler.handleRangeQueryStateNext(msg)
	chaincodeLogger.Debug("Exiting RANGE_QUERY_STATE_NEXT")
}

// Handles query to ledger to rage query state next
func (handler *Handler) handleRangeQueryStateNext(msg *pb.ChaincodeMessage) {
	// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
	// is completed before the next one is triggered. The previous state transition is deemed complete only when
	// the afterRangeQueryState function is exited. Interesting bug fix!!
	go func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
817
818
		// Check if this is the unique state request from this chaincode txid
		uniqueReq := handler.createTXIDEntry(msg.Txid)
819
820
		if !uniqueReq {
			// Drop this request
Gabor Hosszu's avatar
Gabor Hosszu committed
821
			chaincodeLogger.Debug("Another state request pending for this Txid. Cannot process.")
822
823
824
825
826
827
			return
		}

		var serialSendMsg *pb.ChaincodeMessage

		defer func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
828
829
			handler.deleteTXIDEntry(msg.Txid)
			chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
830
831
832
833
834
835
836
837
			handler.serialSend(serialSendMsg)
		}()

		rangeQueryStateNext := &pb.RangeQueryStateNext{}
		unmarshalErr := proto.Unmarshal(msg.Payload, rangeQueryStateNext)
		if unmarshalErr != nil {
			payload := []byte(unmarshalErr.Error())
			chaincodeLogger.Errorf("Failed to unmarshall state range next query request. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
838
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
839
840
841
			return
		}

Gabor Hosszu's avatar
Gabor Hosszu committed
842
		txContext := handler.getTxContext(msg.Txid)
843
844
845
846
847
		rangeIter := handler.getRangeQueryIterator(txContext, rangeQueryStateNext.ID)

		if rangeIter == nil {
			payload := []byte("Range query iterator not found")
			chaincodeLogger.Errorf("Range query iterator not found. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
848
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
849
850
851
852
853
854
855
856
857
			return
		}

		var keysAndValues []*pb.RangeQueryStateKeyValue
		var i = uint32(0)
		hasNext := true
		for ; hasNext && i < maxRangeQueryStateLimit; i++ {
			key, value := rangeIter.GetKeyValue()
			// Decrypt the data if the confidential is enabled
Gabor Hosszu's avatar
Gabor Hosszu committed
858
			decryptedValue, decryptErr := handler.decrypt(msg.Txid, value)
859
860
861
			if decryptErr != nil {
				payload := []byte(decryptErr.Error())
				chaincodeLogger.Errorf("Failed decrypt value. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
862
				serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888

				rangeIter.Close()
				handler.deleteRangeQueryIterator(txContext, rangeQueryStateNext.ID)

				return
			}
			keyAndValue := pb.RangeQueryStateKeyValue{Key: key, Value: decryptedValue}
			keysAndValues = append(keysAndValues, &keyAndValue)

			hasNext = rangeIter.Next()
		}

		if !hasNext {
			rangeIter.Close()
			handler.deleteRangeQueryIterator(txContext, rangeQueryStateNext.ID)
		}

		payload := &pb.RangeQueryStateResponse{KeysAndValues: keysAndValues, HasMore: hasNext, ID: rangeQueryStateNext.ID}
		payloadBytes, err := proto.Marshal(payload)
		if err != nil {
			rangeIter.Close()
			handler.deleteRangeQueryIterator(txContext, rangeQueryStateNext.ID)

			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(err.Error())
			chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
889
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
890
891
892
893
			return
		}

		chaincodeLogger.Debugf("Got keys and values. Sending %s", pb.ChaincodeMessage_RESPONSE)
Gabor Hosszu's avatar
Gabor Hosszu committed
894
		serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918

	}()
}

// afterRangeQueryState handles a RANGE_QUERY_STATE_CLOSE request from the chaincode.
func (handler *Handler) afterRangeQueryStateClose(e *fsm.Event, state string) {
	msg, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeLogger.Debugf("Received %s, invoking get state from ledger", pb.ChaincodeMessage_RANGE_QUERY_STATE)

	// Query ledger for state
	handler.handleRangeQueryStateClose(msg)
	chaincodeLogger.Debug("Exiting RANGE_QUERY_STATE_CLOSE")
}

// Handles the closing of a state iterator
func (handler *Handler) handleRangeQueryStateClose(msg *pb.ChaincodeMessage) {
	// The defer followed by triggering a go routine dance is needed to ensure that the previous state transition
	// is completed before the next one is triggered. The previous state transition is deemed complete only when
	// the afterRangeQueryState function is exited. Interesting bug fix!!
	go func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
919
920
		// Check if this is the unique state request from this chaincode txid
		uniqueReq := handler.createTXIDEntry(msg.Txid)
921
922
		if !uniqueReq {
			// Drop this request
Gabor Hosszu's avatar
Gabor Hosszu committed
923
			chaincodeLogger.Error("Another state request pending for this Txid. Cannot process.")
924
925
926
927
928
929
			return
		}

		var serialSendMsg *pb.ChaincodeMessage

		defer func() {
Gabor Hosszu's avatar
Gabor Hosszu committed
930
931
			handler.deleteTXIDEntry(msg.Txid)
			chaincodeLogger.Debugf("[%s]handleRangeQueryState serial send %s", shorttxid(serialSendMsg.Txid), serialSendMsg.Type)
932
933
934
935
936
937
938
939
			handler.serialSend(serialSendMsg)
		}()

		rangeQueryStateClose := &pb.RangeQueryStateClose{}
		unmarshalErr := proto.Unmarshal(msg.Payload, rangeQueryStateClose)
		if unmarshalErr != nil {
			payload := []byte(unmarshalErr.Error())
			chaincodeLogger.Errorf("Failed to unmarshall state range query close request. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
940
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
941
942
943
			return
		}

Gabor Hosszu's avatar
Gabor Hosszu committed
944
		txContext := handler.getTxContext(msg.Txid)
945
946
947
948
949
950
951
952
953
954
955
956
957
		iter := handler.getRangeQueryIterator(txContext, rangeQueryStateClose.ID)
		if iter != nil {
			iter.Close()
			handler.deleteRangeQueryIterator(txContext, rangeQueryStateClose.ID)
		}

		payload := &pb.RangeQueryStateResponse{HasMore: false, ID: rangeQueryStateClose.ID}
		payloadBytes, err := proto.Marshal(payload)
		if err != nil {

			// Send error msg back to chaincode. GetState will not trigger event
			payload := []byte(err.Error())
			chaincodeLogger.Errorf("Failed marshall resopnse. Sending %s", pb.ChaincodeMessage_ERROR)
Gabor Hosszu's avatar
Gabor Hosszu committed
958
			serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_ERROR, Payload: payload, Txid: msg.Txid}
959
960
961
962
			return
		}

		chaincodeLogger.Debugf("Closed. Sending %s", pb.ChaincodeMessage_RESPONSE)
Gabor Hosszu's avatar
Gabor Hosszu committed
963
		serialSendMsg = &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Payload: payloadBytes, Txid: msg.Txid}
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000

	}()
}

// afterPutState handles a PUT_STATE request from the chaincode.
func (handler *Handler) afterPutState(e *fsm.Event, state string) {
	_, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeLogger.Debugf("Received %s in state %s, invoking put state to ledger", pb.ChaincodeMessage_PUT_STATE, state)

	// Put state into ledger handled within enterBusyState
}

// afterDelState handles a DEL_STATE request from the chaincode.
func (handler *Handler) afterDelState(e *fsm.Event, state string) {
	_, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeLogger.Debugf("Received %s, invoking delete state from ledger", pb.ChaincodeMessage_DEL_STATE)

	// Delete state from ledger handled within enterBusyState
}

// afterInvokeChaincode handles an INVOKE_CHAINCODE request from the chaincode.
func (handler *Handler) afterInvokeChaincode(e *fsm.Event, state string) {
	_, ok := e.Args[0].(*pb.ChaincodeMessage)
	if !ok {
		e.Cancel(fmt.Errorf("Received unexpected message type"))
		return
	}
	chaincodeLogger.Debugf("Received %s in state %s, invoking another chaincode", pb.ChaincodeMessage_INVOKE_CHAINCODE, state)

For faster browsing, not all history is shown. View entire blame