chain.go 26 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package etcdraft

import (
	"context"
yacovm's avatar
yacovm committed
11
12
	"encoding/pem"
	"fmt"
13
	"sync"
14
15
16
	"sync/atomic"
	"time"

yacovm's avatar
yacovm committed
17
18
19
	"code.cloudfoundry.org/clock"
	"github.com/coreos/etcd/raft"
	"github.com/coreos/etcd/raft/raftpb"
20
	"github.com/coreos/etcd/wal"
yacovm's avatar
yacovm committed
21
	"github.com/golang/protobuf/proto"
22
	"github.com/hyperledger/fabric/common/configtx"
23
	"github.com/hyperledger/fabric/common/flogging"
yacovm's avatar
yacovm committed
24
	"github.com/hyperledger/fabric/orderer/common/cluster"
25
26
27
	"github.com/hyperledger/fabric/orderer/consensus"
	"github.com/hyperledger/fabric/protos/common"
	"github.com/hyperledger/fabric/protos/orderer"
yacovm's avatar
yacovm committed
28
	"github.com/hyperledger/fabric/protos/orderer/etcdraft"
29
30
31
32
	"github.com/hyperledger/fabric/protos/utils"
	"github.com/pkg/errors"
)

33
34
35
36
37
// DefaultSnapshotCatchUpEntries is the default number of entries
// to preserve in memory when a snapshot is taken. This is for
// slow followers to catch up.
const DefaultSnapshotCatchUpEntries = uint64(500)

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/

// Configurator is used to configure the communication layer
// when the chain starts.
type Configurator interface {
	Configure(channel string, newNodes []cluster.RemoteNode)
}

//go:generate counterfeiter -o mocks/mock_rpc.go . RPC

// RPC is used to mock the transport layer in tests.
type RPC interface {
	Step(dest uint64, msg *orderer.StepRequest) (*orderer.StepResponse, error)
	SendSubmit(dest uint64, request *orderer.SubmitRequest) error
}

54
55
56
57
58
59
60
61
//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller

// BlockPuller is used to pull blocks from other OSN
type BlockPuller interface {
	PullBlock(seq uint64) *common.Block
	Close()
}

62
// Options contains all the configurations relevant to the chain.
63
64
65
66
67
type Options struct {
	RaftID uint64

	Clock clock.Clock

68
69
70
71
72
73
74
75
	WALDir       string
	SnapDir      string
	SnapInterval uint64

	// This is configurable mainly for testing purpose. Users are not
	// expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used.
	SnapshotCatchUpEntries uint64

76
77
	MemoryStorage MemoryStorage
	Logger        *flogging.FabricLogger
78
79
80
81
82
83

	TickInterval    time.Duration
	ElectionTick    int
	HeartbeatTick   int
	MaxSizePerMsg   uint64
	MaxInflightMsgs int
84
85

	RaftMetadata *etcdraft.RaftMetadata
86
87
}

88
type submit struct {
89
90
	req    *orderer.SubmitRequest
	leader chan uint64
91
92
}

93
// Chain implements consensus.Chain interface.
94
type Chain struct {
95
	configurator Configurator
96

97
	rpc RPC
98
99
100

	raftID    uint64
	channelID string
101

102
	submitC  chan *submit
103
	applyC   chan apply
104
	observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
105
106
107
108
	haltC    chan struct{}         // Signals to goroutines that the chain is halting
	doneC    chan struct{}         // Closes when the chain halts
	startC   chan struct{}         // Closes when the node is started
	snapC    chan *raftpb.Snapshot // Signal to catch up with snapshot
109

110
111
112
	errorCLock sync.RWMutex
	errorC     chan struct{} // returned by Errored()

113
114
	raftMetadataLock     sync.RWMutex
	confChangeInProgress *raftpb.ConfChange
Jay Guo's avatar
Jay Guo committed
115
116
	justElected          bool // this is true when node has just been elected
	configInflight       bool // this is true when there is config block or ConfChange in flight
117
	blockInflight        int  // number of in flight blocks
118

119
	clock clock.Clock // Tests can inject a fake clock
120

121
	support consensus.ConsenterSupport
122
123
124

	appliedIndex uint64

125
126
127
128
129
130
	// needed by snapshotting
	lastSnapBlockNum uint64
	confState        raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
	puller           BlockPuller      // Deliver client to pull blocks from other OSNs

	fresh bool // indicate if this is a fresh raft node
131

132
133
	node *node
	opts Options
134
135
136
137

	logger *flogging.FabricLogger
}

138
139
140
141
142
143
// NewChain constructs a chain object.
func NewChain(
	support consensus.ConsenterSupport,
	opts Options,
	conf Configurator,
	rpc RPC,
144
	puller BlockPuller,
145
	observeC chan<- raft.SoftState) (*Chain, error) {
146
147
148

	lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)

149
	fresh := !wal.Exist(opts.WALDir)
150
	storage, err := CreateStorage(lg, opts.WALDir, opts.SnapDir, opts.MemoryStorage)
151
	if err != nil {
152
		return nil, errors.Errorf("failed to restore persisted raft data: %s", err)
153
154
	}

155
156
157
158
159
160
161
162
163
164
165
166
167
	if opts.SnapshotCatchUpEntries == 0 {
		storage.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
	} else {
		storage.SnapshotCatchUpEntries = opts.SnapshotCatchUpEntries
	}

	// get block number in last snapshot, if exists
	var snapBlkNum uint64
	if s := storage.Snapshot(); !raft.IsEmptySnap(s) {
		b := utils.UnmarshalBlockOrPanic(s.Data)
		snapBlkNum = b.Header.Number
	}

168
	c := &Chain{
169
170
171
172
		configurator:     conf,
		rpc:              rpc,
		channelID:        support.ChainID(),
		raftID:           opts.RaftID,
173
		submitC:          make(chan *submit),
174
		applyC:           make(chan apply),
175
176
177
178
		haltC:            make(chan struct{}),
		doneC:            make(chan struct{}),
		startC:           make(chan struct{}),
		snapC:            make(chan *raftpb.Snapshot),
179
		errorC:           make(chan struct{}),
180
181
182
183
184
185
186
187
188
		observeC:         observeC,
		support:          support,
		fresh:            fresh,
		appliedIndex:     opts.RaftMetadata.RaftIndex,
		lastSnapBlockNum: snapBlkNum,
		puller:           puller,
		clock:            opts.Clock,
		logger:           lg,
		opts:             opts,
189
	}
190
191
192

	// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
	// We guard against replay of written blocks in `entriesToApply` instead.
193
	config := &raft.Config{
194
195
196
197
198
199
200
201
202
203
		ID:              c.raftID,
		ElectionTick:    c.opts.ElectionTick,
		HeartbeatTick:   c.opts.HeartbeatTick,
		MaxSizePerMsg:   c.opts.MaxSizePerMsg,
		MaxInflightMsgs: c.opts.MaxInflightMsgs,
		Logger:          c.logger,
		Storage:         c.opts.MemoryStorage,
		// PreVote prevents reconnected node from disturbing network.
		// See etcd/raft doc for more details.
		PreVote:                   true,
Jay Guo's avatar
Jay Guo committed
204
		CheckQuorum:               true,
205
		DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
206
207
	}

208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
	c.node = &node{
		chainID:      c.channelID,
		chain:        c,
		logger:       c.logger,
		storage:      storage,
		rpc:          c.rpc,
		config:       config,
		tickInterval: c.opts.TickInterval,
		clock:        c.clock,
		metadata:     c.opts.RaftMetadata,
	}

	return c, nil
}

// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
	c.logger.Infof("Starting Raft node")

yacovm's avatar
yacovm committed
227
	if err := c.configureComm(); err != nil {
228
		c.logger.Errorf("Failed to start chain, aborting: +%v", err)
yacovm's avatar
yacovm committed
229
230
231
232
		close(c.doneC)
		return
	}

233
	c.node.start(c.fresh, c.support.Height() > 1)
234
	close(c.startC)
235
	close(c.errorC)
236
237
238
239

	go c.serveRequest()
}

240
// Order submits normal type transactions for ordering.
241
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
242
	return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Content: env, Channel: c.channelID}, 0)
243
244
}

245
// Configure submits config type transactions for ordering.
246
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
247
248
249
	if err := c.checkConfigUpdateValidity(env); err != nil {
		return err
	}
250
	return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Content: env, Channel: c.channelID}, 0)
251
252
}

253
// Validate the config update for being of Type A or Type B as described in the design doc.
254
255
256
257
258
259
260
261
262
263
264
265
266
func (c *Chain) checkConfigUpdateValidity(ctx *common.Envelope) error {
	var err error
	payload, err := utils.UnmarshalPayload(ctx.Payload)
	if err != nil {
		return err
	}
	chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
	if err != nil {
		return err
	}

	switch chdr.Type {
	case int32(common.HeaderType_ORDERER_TRANSACTION):
267
		return nil
268
	case int32(common.HeaderType_CONFIG):
269
		configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
270
271
272
273
		if err != nil {
			return err
		}

274
		// Check that only the ConsensusType is updated in the write-set
275
		if ordererConfigGroup, ok := configUpdate.WriteSet.Groups["Orderer"]; ok {
276
277
			if val, ok := ordererConfigGroup.Values["ConsensusType"]; ok {
				return c.checkConsentersSet(val)
278
279
280
281
282
283
284
			}
		}
		return nil

	default:
		return errors.Errorf("config transaction has unknown header type")
	}
285
286
}

287
288
289
290
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
291
func (c *Chain) WaitReady() error {
292
293
294
295
296
	if err := c.isRunning(); err != nil {
		return err
	}

	select {
297
	case c.submitC <- nil:
298
299
300
301
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
	}

302
303
304
	return nil
}

305
// Errored returns a channel that closes when the chain stops.
306
func (c *Chain) Errored() <-chan struct{} {
307
308
309
	c.errorCLock.RLock()
	defer c.errorCLock.RUnlock()
	return c.errorC
310
311
}

312
// Halt stops the chain.
313
func (c *Chain) Halt() {
314
315
316
317
318
319
320
	select {
	case <-c.startC:
	default:
		c.logger.Warnf("Attempted to halt a chain that has not started")
		return
	}

321
322
323
324
325
326
327
328
	select {
	case c.haltC <- struct{}{}:
	case <-c.doneC:
		return
	}
	<-c.doneC
}

329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
func (c *Chain) isRunning() error {
	select {
	case <-c.startC:
	default:
		return errors.Errorf("chain is not started")
	}

	select {
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
	default:
	}

	return nil
}

345
346
// Step passes the given StepRequest message to the raft.Node instance
func (c *Chain) Step(req *orderer.StepRequest, sender uint64) error {
347
348
349
350
351
352
353
354
355
356
357
358
359
360
	if err := c.isRunning(); err != nil {
		return err
	}

	stepMsg := &raftpb.Message{}
	if err := proto.Unmarshal(req.Payload, stepMsg); err != nil {
		return fmt.Errorf("failed to unmarshal StepRequest payload to Raft Message: %s", err)
	}

	if err := c.node.Step(context.TODO(), *stepMsg); err != nil {
		return fmt.Errorf("failed to process Raft Step message: %s", err)
	}

	return nil
361
362
}

363
364
365
366
// Submit forwards the incoming request to:
// - the local serveRequest goroutine if this is leader
// - the actual leader via the transport mechanism
// The call fails if there's no leader elected yet.
367
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
368
369
370
	if err := c.isRunning(); err != nil {
		return err
	}
371

372
	leadC := make(chan uint64, 1)
373
	select {
374
375
376
377
378
379
380
381
382
383
	case c.submitC <- &submit{req, leadC}:
		lead := <-leadC
		if lead == raft.None {
			return errors.Errorf("no Raft leader")
		}

		if lead != c.raftID {
			return c.rpc.SendSubmit(lead, req)
		}

384
385
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
386
	}
387
388

	return nil
389
390
}

391
type apply struct {
392
393
	entries []raftpb.Entry
	soft    *raft.SoftState
394
395
}

396
397
398
399
func isCandidate(state raft.StateType) bool {
	return state == raft.StatePreCandidate || state == raft.StateCandidate
}

400
func (c *Chain) serveRequest() {
401
402
403
404
	ticking := false
	timer := c.clock.NewTimer(time.Second)
	// we need a stopped timer rather than nil,
	// because we will be select waiting on timer.C()
405
406
407
408
	if !timer.Stop() {
		<-timer.C()
	}

409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
	// if timer is already started, this is a no-op
	start := func() {
		if !ticking {
			ticking = true
			timer.Reset(c.support.SharedConfig().BatchTimeout())
		}
	}

	stop := func() {
		if !timer.Stop() && ticking {
			// we only need to drain the channel if the timer expired (not explicitly stopped)
			<-timer.C()
		}
		ticking = false
	}

Jay Guo's avatar
Jay Guo committed
425
	var soft raft.SoftState
426
	submitC := c.submitC
427
	var bc *blockCreator
428

Jay Guo's avatar
Jay Guo committed
429
	becomeLeader := func() {
430
		c.blockInflight = 0
Jay Guo's avatar
Jay Guo committed
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
		c.justElected = true
		submitC = nil

		// if there is unfinished ConfChange, we should resume the effort to propose it as
		// new leader, and wait for it to be committed before start serving new requests.
		if cc := c.getInFlightConfChange(); cc != nil {
			if err := c.node.ProposeConfChange(context.TODO(), *cc); err != nil {
				c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
			}

			c.confChangeInProgress = cc
			c.configInflight = true
		}
	}

	becomeFollower := func() {
447
		c.blockInflight = 0
Jay Guo's avatar
Jay Guo committed
448
449
450
451
452
453
		_ = c.support.BlockCutter().Cut()
		stop()
		submitC = c.submitC
		bc = nil
	}

454
	for {
455
		select {
456
457
		case s := <-submitC:
			if s == nil {
458
459
460
461
				// polled by `WaitReady`
				continue
			}

Jay Guo's avatar
Jay Guo committed
462
			if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
463
				s.leader <- raft.None
Jay Guo's avatar
Jay Guo committed
464
465
466
				continue
			}

467
468
469
470
			s.leader <- soft.Lead
			if soft.Lead != c.raftID {
				continue
			}
471

472
473
474
475
476
477
478
479
			batches, pending, err := c.ordered(s.req)
			if err != nil {
				c.logger.Errorf("Failed to order message: %s", err)
			}
			if pending {
				start() // no-op if timer is already started
			} else {
				stop()
480
481
			}

482
483
484
485
			c.propose(bc, batches...)
			if c.configInflight || c.blockInflight >= c.opts.MaxInflightMsgs {
				submitC = nil // stop accepting new envelopes
			}
486

487
		case app := <-c.applyC:
488
489
			if app.soft != nil {
				newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
Jay Guo's avatar
Jay Guo committed
490
491
				if newLeader != soft.Lead {
					c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
492
493

					if newLeader == c.raftID {
Jay Guo's avatar
Jay Guo committed
494
						becomeLeader()
495
496
					}

Jay Guo's avatar
Jay Guo committed
497
					if soft.Lead == c.raftID {
Jay Guo's avatar
Jay Guo committed
498
						becomeFollower()
499
					}
500
				}
501

502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
				foundLeader := soft.Lead == raft.None && newLeader != raft.None
				quitCandidate := isCandidate(soft.RaftState) && !isCandidate(app.soft.RaftState)

				if foundLeader || quitCandidate {
					c.errorCLock.Lock()
					c.errorC = make(chan struct{})
					c.errorCLock.Unlock()
				}

				if isCandidate(app.soft.RaftState) || newLeader == raft.None {
					select {
					case <-c.errorC:
					default:
						close(c.errorC)
					}
				}

Jay Guo's avatar
Jay Guo committed
519
520
				soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}

521
522
				// notify external observer
				select {
Jay Guo's avatar
Jay Guo committed
523
				case c.observeC <- soft:
524
				default:
525
526
527
				}
			}

Jay Guo's avatar
Jay Guo committed
528
			c.apply(app.entries)
529

530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
			if c.justElected {
				msgInflight := c.node.lastIndex() > c.appliedIndex
				if msgInflight || c.configInflight {
					c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
					continue
				}

				c.logger.Infof("Start accepting requests as Raft leader")
				lastBlock := c.support.Block(c.support.Height() - 1)
				bc = &blockCreator{
					hash:   lastBlock.Header.Hash(),
					number: lastBlock.Header.Number,
					logger: c.logger,
				}
				submitC = c.submitC
				c.justElected = false
			} else if c.configInflight {
				c.logger.Debugf("Config block or ConfChange in flight, pause accepting transaction")
				submitC = nil
549
			} else if c.blockInflight < c.opts.MaxInflightMsgs {
550
				submitC = c.submitC
551
			}
552

553
		case <-timer.C():
554
			ticking = false
555
556
557
558
559
560
561
562

			batch := c.support.BlockCutter().Cut()
			if len(batch) == 0 {
				c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
				continue
			}

			c.logger.Debugf("Batch timer expired, creating block")
563
			c.propose(bc, batch) // we are certain this is normal block, no need to block
564

565
		case sn := <-c.snapC:
566
567
568
569
570
			if sn.Metadata.Index <= c.appliedIndex {
				c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
				break
			}

571
572
573
574
575
			b := utils.UnmarshalBlockOrPanic(sn.Data)
			c.lastSnapBlockNum = b.Header.Number
			c.confState = sn.Metadata.ConfState
			c.appliedIndex = sn.Metadata.Index

576
577
578
579
580
			if err := c.catchUp(sn); err != nil {
				c.logger.Errorf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
					sn.Metadata.Term, sn.Metadata.Index, err)
			}

581
		case <-c.doneC:
582
583
584
585
586
587
			select {
			case <-c.errorC: // avoid closing closed channel
			default:
				close(c.errorC)
			}

588
589
590
591
592
593
			c.logger.Infof("Stop serving requests")
			return
		}
	}
}

Jay Guo's avatar
Jay Guo committed
594
func (c *Chain) writeBlock(block *common.Block, index uint64) {
595
596
597
598
	if c.blockInflight > 0 {
		c.blockInflight-- // only reduce on leader
	}

599
	if utils.IsConfigBlock(block) {
Jay Guo's avatar
Jay Guo committed
600
601
		c.writeConfigBlock(block, index)
		return
602
603
	}

604
	c.raftMetadataLock.Lock()
605
	c.opts.RaftMetadata.RaftIndex = index
606
607
608
	m := utils.MarshalOrPanic(c.opts.RaftMetadata)
	c.raftMetadataLock.Unlock()

609
	c.support.WriteBlock(block, m)
610
611
}

612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
// Orders the envelope in the `msg` content. SubmitRequest.
// Returns
//   -- batches [][]*common.Envelope; the batches cut,
//   -- pending bool; if there are envelopes pending to be ordered,
//   -- err error; the error encountered, if any.
// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.
func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelope, pending bool, err error) {
	seq := c.support.Sequence()

	if c.isConfig(msg.Content) {
		// ConfigMsg
		if msg.LastValidationSeq < seq {
			msg.Content, _, err = c.support.ProcessConfigMsg(msg.Content)
			if err != nil {
				return nil, true, errors.Errorf("bad config message: %s", err)
			}
		}
		batch := c.support.BlockCutter().Cut()
		batches = [][]*common.Envelope{}
		if len(batch) != 0 {
			batches = append(batches, batch)
		}
		batches = append(batches, []*common.Envelope{msg.Content})
		return batches, false, nil
	}
	// it is a normal message
	if msg.LastValidationSeq < seq {
		if _, err := c.support.ProcessNormalMsg(msg.Content); err != nil {
			return nil, true, errors.Errorf("bad normal message: %s", err)
		}
	}
	batches, pending = c.support.BlockCutter().Ordered(msg.Content)
	return batches, pending, nil

}

Jay Guo's avatar
Jay Guo committed
648
func (c *Chain) propose(bc *blockCreator, batches ...[]*common.Envelope) {
649
	for _, batch := range batches {
650
		b := bc.createNextBlock(batch)
651
652
		data := utils.MarshalOrPanic(b)
		if err := c.node.Propose(context.TODO(), data); err != nil {
653
654
			c.logger.Errorf("Failed to propose block to raft: %s", err)
			return // don't bother continue proposing next batch
655
656
		}

657
		// if it is config block, then we should wait for the commit of the block
658
		if utils.IsConfigBlock(b) {
Jay Guo's avatar
Jay Guo committed
659
			c.configInflight = true
660
		}
661
662

		c.blockInflight++
663
664
	}

665
	return
666
667
}

668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
	b, err := utils.UnmarshalBlock(snap.Data)
	if err != nil {
		return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
	}

	c.logger.Infof("Catching up with snapshot taken at block %d", b.Header.Number)

	next := c.support.Height()
	if next > b.Header.Number {
		c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, next-1)
		return nil
	}

	defer func() {
		c.puller.Close()
	}()

	for next <= b.Header.Number {
		block := c.puller.PullBlock(next)
		if block == nil {
			return errors.Errorf("failed to fetch block %d from cluster", next)
		}
		if utils.IsConfigBlock(block) {
			c.support.WriteConfigBlock(block, nil)
		} else {
			c.support.WriteBlock(block, nil)
		}

		next++
	}

	c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
	return nil
}

Jay Guo's avatar
Jay Guo committed
704
func (c *Chain) apply(ents []raftpb.Entry) {
705
706
707
708
709
710
711
712
	if len(ents) == 0 {
		return
	}

	if ents[0].Index > c.appliedIndex+1 {
		c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
	}

713
714
	var appliedb uint64
	var position int
715
716
717
	for i := range ents {
		switch ents[i].Type {
		case raftpb.EntryNormal:
718
719
720
			// We need to strictly avoid re-applying normal entries,
			// otherwise we are writing the same block twice.
			if len(ents[i].Data) == 0 || ents[i].Index <= c.appliedIndex {
721
722
723
				break
			}

724
			block := utils.UnmarshalBlockOrPanic(ents[i].Data)
Jay Guo's avatar
Jay Guo committed
725
			c.writeBlock(block, ents[i].Index)
726

727
			appliedb = block.Header.Number
728
			position = i
729
730
731
732
733
734
735
736

		case raftpb.EntryConfChange:
			var cc raftpb.ConfChange
			if err := cc.Unmarshal(ents[i].Data); err != nil {
				c.logger.Warnf("Failed to unmarshal ConfChange data: %s", err)
				continue
			}

737
			c.confState = *c.node.ApplyConfChange(cc)
738

739
740
741
742
743
744
745
746
747
748
749
			// This ConfChange was introduced by a previously committed config block,
			// we can now unblock submitC to accept envelopes.
			if c.confChangeInProgress != nil &&
				c.confChangeInProgress.NodeID == cc.NodeID &&
				c.confChangeInProgress.Type == cc.Type {

				if err := c.configureComm(); err != nil {
					c.logger.Panicf("Failed to configure communication: %s", err)
				}

				c.confChangeInProgress = nil
Jay Guo's avatar
Jay Guo committed
750
				c.configInflight = false
751
			}
752
753
754
755
756
757
758

			if cc.Type == raftpb.ConfChangeRemoveNode && cc.NodeID == c.raftID {
				c.logger.Infof("Current node removed from replica set for channel %s", c.channelID)
				// calling goroutine, since otherwise it will be blocked
				// trying to write into haltC
				go c.Halt()
			}
759
760
		}

761
762
763
		if ents[i].Index > c.appliedIndex {
			c.appliedIndex = ents[i].Index
		}
764
	}
765
766
767
768
769
770
771
772
773

	if c.opts.SnapInterval == 0 || appliedb == 0 {
		// snapshot is not enabled (SnapInterval == 0) or
		// no block has been written (appliedb == 0) in this round
		return
	}

	if appliedb-c.lastSnapBlockNum >= c.opts.SnapInterval {
		c.logger.Infof("Taking snapshot at block %d, last snapshotted block number is %d", appliedb, c.lastSnapBlockNum)
774
		c.node.takeSnapshot(c.appliedIndex, &c.confState, ents[position].Data)
775
776
		c.lastSnapBlockNum = appliedb
	}
777
778

	return
779
780
781
782
783
}

func (c *Chain) isConfig(env *common.Envelope) bool {
	h, err := utils.ChannelHeader(env)
	if err != nil {
784
		c.logger.Panicf("failed to extract channel header from envelope")
785
786
787
788
	}

	return h.Type == int32(common.HeaderType_CONFIG) || h.Type == int32(common.HeaderType_ORDERER_TRANSACTION)
}
yacovm's avatar
yacovm committed
789
790

func (c *Chain) configureComm() error {
791
	nodes, err := c.remotePeers()
yacovm's avatar
yacovm committed
792
793
794
	if err != nil {
		return err
	}
795
796

	c.configurator.Configure(c.channelID, nodes)
yacovm's avatar
yacovm committed
797
798
799
	return nil
}

800
func (c *Chain) remotePeers() ([]cluster.RemoteNode, error) {
yacovm's avatar
yacovm committed
801
	var nodes []cluster.RemoteNode
802
	for raftID, consenter := range c.opts.RaftMetadata.Consenters {
yacovm's avatar
yacovm committed
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
		// No need to know yourself
		if raftID == c.raftID {
			continue
		}
		serverCertAsDER, err := c.pemToDER(consenter.ServerTlsCert, raftID, "server")
		if err != nil {
			return nil, errors.WithStack(err)
		}
		clientCertAsDER, err := c.pemToDER(consenter.ClientTlsCert, raftID, "client")
		if err != nil {
			return nil, errors.WithStack(err)
		}
		nodes = append(nodes, cluster.RemoteNode{
			ID:            raftID,
			Endpoint:      fmt.Sprintf("%s:%d", consenter.Host, consenter.Port),
			ServerTLSCert: serverCertAsDER,
			ClientTLSCert: clientCertAsDER,
		})
	}
	return nodes, nil
}

func (c *Chain) pemToDER(pemBytes []byte, id uint64, certType string) ([]byte, error) {
	bl, _ := pem.Decode(pemBytes)
	if bl == nil {
		c.logger.Errorf("Rejecting PEM block of %s TLS cert for node %d, offending PEM is: %s", certType, id, string(pemBytes))
		return nil, errors.Errorf("invalid PEM block")
	}
	return bl.Bytes, nil
}
833

834
// checkConsentersSet validates correctness of the consenters set provided within configuration value
835
func (c *Chain) checkConsentersSet(configValue *common.ConfigValue) error {
836
837
838
839
	// read metadata update from configuration
	updatedMetadata, err := MetadataFromConfigValue(configValue)
	if err != nil {
		return err
840
841
	}

842
843
844
	c.raftMetadataLock.RLock()
	changes := ComputeMembershipChanges(c.opts.RaftMetadata.Consenters, updatedMetadata.Consenters)
	c.raftMetadataLock.RUnlock()
845

846
847
	if changes.TotalChanges > 1 {
		return errors.New("update of more than one consenters at a time is not supported")
848
849
850
851
	}

	return nil
}
852

853
854
855
// writeConfigBlock writes configuration blocks into the ledger in
// addition extracts updates about raft replica set and if there
// are changes updates cluster membership as well
Jay Guo's avatar
Jay Guo committed
856
func (c *Chain) writeConfigBlock(block *common.Block, index uint64) {
857
	metadata, raftMetadata := c.newRaftMetadata(block)
858
859
860
861

	var changes *MembershipChanges
	if metadata != nil {
		changes = ComputeMembershipChanges(raftMetadata.Consenters, metadata.Consenters)
862
863
	}

864
	confChange := changes.UpdateRaftMetadataAndConfChange(raftMetadata)
865
	raftMetadata.RaftIndex = index
866

867
868
	raftMetadataBytes := utils.MarshalOrPanic(raftMetadata)
	// write block with metadata
869
	c.support.WriteConfigBlock(block, raftMetadataBytes)
Jay Guo's avatar
Jay Guo committed
870
	c.configInflight = false
871
872

	// update membership
873
	if confChange != nil {
874
875
876
877
		// ProposeConfChange returns error only if node being stopped.
		// This proposal is dropped by followers because DisableProposalForwarding is enabled.
		if err := c.node.ProposeConfChange(context.TODO(), *confChange); err != nil {
			c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
878
		}
879
880
881
882
883
884
885

		c.confChangeInProgress = confChange

		c.raftMetadataLock.Lock()
		c.opts.RaftMetadata = raftMetadata
		c.raftMetadataLock.Unlock()

Jay Guo's avatar
Jay Guo committed
886
		c.configInflight = true
887
888
	}
}
889

890
891
892
893
894
895
896
897
// getInFlightConfChange returns ConfChange in-flight if any.
// It either returns confChangeInProgress if it is not nil, or
// attempts to read ConfChange from last committed block.
func (c *Chain) getInFlightConfChange() *raftpb.ConfChange {
	if c.confChangeInProgress != nil {
		return c.confChangeInProgress
	}

898
	if c.support.Height() <= 1 {
899
		return nil // nothing to failover just started the chain
900
901
902
903
	}
	lastBlock := c.support.Block(c.support.Height() - 1)
	if lastBlock == nil {
		c.logger.Panicf("nil block, failed to read last written block, blockNum = %d, ledger height = %d, raftID = %d", c.support.Height()-1, c.support.Height(), c.raftID)
904
	}
905
	if !utils.IsConfigBlock(lastBlock) {
906
		return nil
907
908
	}

909
910
911
912
913
914
	// extract membership mapping from configuration block metadata
	// and compare with Raft configuration
	metadata, err := utils.GetMetadataFromBlock(lastBlock, common.BlockMetadataIndex_ORDERER)
	if err != nil {
		c.logger.Panicf("Error extracting orderer metadata: %+v", err)
	}
915

916
917
918
	raftMetadata := &etcdraft.RaftMetadata{}
	if err := proto.Unmarshal(metadata.Value, raftMetadata); err != nil {
		c.logger.Panicf("Failed to unmarshal block's metadata: %+v", err)
919
920
	}

921
922
923
924
925
926
927
928
929
	// extracting current Raft configuration state
	confState := c.node.ApplyConfChange(raftpb.ConfChange{})

	if len(confState.Nodes) == len(raftMetadata.Consenters) {
		// since configuration change could only add one node or
		// remove one node at a time, if raft nodes state size
		// equal to membership stored in block metadata field,
		// that means everything is in sync and no need to propose
		// update
930
		return nil
931
932
	}

933
	return ConfChange(raftMetadata, confState)
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
}

// newRaftMetadata extract raft metadata from the configuration block
func (c *Chain) newRaftMetadata(block *common.Block) (*etcdraft.Metadata, *etcdraft.RaftMetadata) {
	metadata, err := ConsensusMetadataFromConfigBlock(block)
	if err != nil {
		c.logger.Panicf("error reading consensus metadata: %s", err)
	}
	raftMetadata := proto.Clone(c.opts.RaftMetadata).(*etcdraft.RaftMetadata)
	// proto.Clone doesn't copy an empty map, hence need to initialize it after
	// cloning
	if raftMetadata.Consenters == nil {
		raftMetadata.Consenters = map[uint64]*etcdraft.Consenter{}
	}
	return metadata, raftMetadata
}