chain.go 40 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

package etcdraft

import (
	"context"
yacovm's avatar
yacovm committed
11
12
	"encoding/pem"
	"fmt"
13
	"sync"
14
15
16
	"sync/atomic"
	"time"

yacovm's avatar
yacovm committed
17
18
	"code.cloudfoundry.org/clock"
	"github.com/golang/protobuf/proto"
19
	"github.com/hyperledger/fabric/common/configtx"
20
	"github.com/hyperledger/fabric/common/flogging"
yacovm's avatar
yacovm committed
21
	"github.com/hyperledger/fabric/orderer/common/cluster"
22
	"github.com/hyperledger/fabric/orderer/consensus"
23
	"github.com/hyperledger/fabric/orderer/consensus/migration"
24
25
	"github.com/hyperledger/fabric/protos/common"
	"github.com/hyperledger/fabric/protos/orderer"
yacovm's avatar
yacovm committed
26
	"github.com/hyperledger/fabric/protos/orderer/etcdraft"
27
28
	"github.com/hyperledger/fabric/protos/utils"
	"github.com/pkg/errors"
29
30
31
	"go.etcd.io/etcd/raft"
	"go.etcd.io/etcd/raft/raftpb"
	"go.etcd.io/etcd/wal"
32
33
)

Jay Guo's avatar
Jay Guo committed
34
35
36
37
38
39
40
41
42
43
44
45
46
47
const (
	BYTE = 1 << (10 * iota)
	KILOBYTE
	MEGABYTE
	GIGABYTE
	TERABYTE
)

const (
	// DefaultSnapshotCatchUpEntries is the default number of entries
	// to preserve in memory when a snapshot is taken. This is for
	// slow followers to catch up.
	DefaultSnapshotCatchUpEntries = uint64(20)

Jay Guo's avatar
Jay Guo committed
48
49
	// DefaultSnapshotIntervalSize is the default snapshot interval. It is
	// used if SnapshotIntervalSize is not provided in channel config options.
Jay Guo's avatar
Jay Guo committed
50
	// It is needed to enforce snapshot being set.
Jay Guo's avatar
Jay Guo committed
51
	DefaultSnapshotIntervalSize = 20 * MEGABYTE // 20 MB
52
53
54
55
56
57
58
59
60

	// DefaultEvictionSuspicion is the threshold that a node will start
	// suspecting its own eviction if it has been leaderless for this
	// period of time.
	DefaultEvictionSuspicion = time.Minute * 10

	// DefaultLeaderlessCheckInterval is the interval that a chain checks
	// its own leadership status.
	DefaultLeaderlessCheckInterval = time.Second * 10
Jay Guo's avatar
Jay Guo committed
61
)
62

63
64
65
66
67
68
69
70
71
72
73
74
//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/

// Configurator is used to configure the communication layer
// when the chain starts.
type Configurator interface {
	Configure(channel string, newNodes []cluster.RemoteNode)
}

//go:generate counterfeiter -o mocks/mock_rpc.go . RPC

// RPC is used to mock the transport layer in tests.
type RPC interface {
75
	SendConsensus(dest uint64, msg *orderer.ConsensusRequest) error
76
77
78
	SendSubmit(dest uint64, request *orderer.SubmitRequest) error
}

79
80
81
82
83
//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller

// BlockPuller is used to pull blocks from other OSN
type BlockPuller interface {
	PullBlock(seq uint64) *common.Block
84
	HeightsByEndpoints() (map[string]uint64, error)
85
86
87
	Close()
}

88
89
90
91
// CreateBlockPuller is a function to create BlockPuller on demand.
// It is passed into chain initializer so that tests could mock this.
type CreateBlockPuller func() (BlockPuller, error)

92
// Options contains all the configurations relevant to the chain.
93
94
95
96
97
type Options struct {
	RaftID uint64

	Clock clock.Clock

Jay Guo's avatar
Jay Guo committed
98
99
100
	WALDir               string
	SnapDir              string
	SnapshotIntervalSize uint32
101
102
103
104
105

	// This is configurable mainly for testing purpose. Users are not
	// expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used.
	SnapshotCatchUpEntries uint64

106
107
	MemoryStorage MemoryStorage
	Logger        *flogging.FabricLogger
108

Jay Guo's avatar
Jay Guo committed
109
110
111
112
113
	TickInterval      time.Duration
	ElectionTick      int
	HeartbeatTick     int
	MaxSizePerMsg     uint64
	MaxInflightBlocks int
114

115
116
	// BlockMetdata and Consenters should only be modified while under lock
	// of raftMetadataLock
117
	BlockMetadata *etcdraft.BlockMetadata
118
119
120
121
	Consenters    map[uint64]*etcdraft.Consenter

	Metrics *Metrics
	Cert    []byte
122
123
124

	EvictionSuspicion   time.Duration
	LeaderCheckInterval time.Duration
125
126
}

127
type submit struct {
128
129
	req    *orderer.SubmitRequest
	leader chan uint64
130
131
}

132
133
134
135
136
137
type gc struct {
	index uint64
	state raftpb.ConfState
	data  []byte
}

138
// Chain implements consensus.Chain interface.
139
type Chain struct {
140
	configurator Configurator
141

142
	rpc RPC
143
144
145

	raftID    uint64
	channelID string
146

147
148
	lastKnownLeader uint64

149
	submitC  chan *submit
150
	applyC   chan apply
151
	observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
152
153
154
155
	haltC    chan struct{}         // Signals to goroutines that the chain is halting
	doneC    chan struct{}         // Closes when the chain halts
	startC   chan struct{}         // Closes when the node is started
	snapC    chan *raftpb.Snapshot // Signal to catch up with snapshot
156
	gcC      chan *gc              // Signal to take snapshot
157

158
159
160
	errorCLock sync.RWMutex
	errorC     chan struct{} // returned by Errored()

161
162
	raftMetadataLock     sync.RWMutex
	confChangeInProgress *raftpb.ConfChange
Jay Guo's avatar
Jay Guo committed
163
164
	justElected          bool // this is true when node has just been elected
	configInflight       bool // this is true when there is config block or ConfChange in flight
165
	blockInflight        int  // number of in flight blocks
166

167
	clock clock.Clock // Tests can inject a fake clock
168

169
	support consensus.ConsenterSupport
170

171
	lastBlock    *common.Block
172
173
	appliedIndex uint64

174
	// needed by snapshotting
Jay Guo's avatar
Jay Guo committed
175
	sizeLimit        uint32 // SnapshotIntervalSize in bytes
Jay Guo's avatar
Jay Guo committed
176
	accDataSize      uint32 // accumulative data size since last snapshot
177
178
	lastSnapBlockNum uint64
	confState        raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
179
180

	createPuller CreateBlockPuller // func used to create BlockPuller on demand
181
182

	fresh bool // indicate if this is a fresh raft node
183

184
185
	// this is exported so that test can use `Node.Status()` to get raft node status.
	Node *node
186
	opts Options
187

188
189
	Metrics *Metrics
	logger  *flogging.FabricLogger
190
191

	migrationStatus migration.Status // The consensus-type migration status
192
193

	periodicChecker *PeriodicCheck
194
195
}

196
197
198
199
200
201
// NewChain constructs a chain object.
func NewChain(
	support consensus.ConsenterSupport,
	opts Options,
	conf Configurator,
	rpc RPC,
202
	f CreateBlockPuller,
203
	observeC chan<- raft.SoftState) (*Chain, error) {
204
205
206

	lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)

207
	fresh := !wal.Exist(opts.WALDir)
208
	storage, err := CreateStorage(lg, opts.WALDir, opts.SnapDir, opts.MemoryStorage)
209
	if err != nil {
210
		return nil, errors.Errorf("failed to restore persisted raft data: %s", err)
211
212
	}

213
214
215
216
217
218
	if opts.SnapshotCatchUpEntries == 0 {
		storage.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
	} else {
		storage.SnapshotCatchUpEntries = opts.SnapshotCatchUpEntries
	}

Jay Guo's avatar
Jay Guo committed
219
	sizeLimit := opts.SnapshotIntervalSize
Jay Guo's avatar
Jay Guo committed
220
	if sizeLimit == 0 {
Jay Guo's avatar
Jay Guo committed
221
		sizeLimit = DefaultSnapshotIntervalSize
Jay Guo's avatar
Jay Guo committed
222
223
	}

224
225
	// get block number in last snapshot, if exists
	var snapBlkNum uint64
226
	var cc raftpb.ConfState
227
228
229
	if s := storage.Snapshot(); !raft.IsEmptySnap(s) {
		b := utils.UnmarshalBlockOrPanic(s.Data)
		snapBlkNum = b.Header.Number
230
		cc = s.Metadata.ConfState
231
232
	}

233
234
235
236
237
	b := support.Block(support.Height() - 1)
	if b == nil {
		return nil, errors.Errorf("failed to get last block")
	}

238
	c := &Chain{
239
240
241
242
		configurator:     conf,
		rpc:              rpc,
		channelID:        support.ChainID(),
		raftID:           opts.RaftID,
243
		submitC:          make(chan *submit),
244
		applyC:           make(chan apply),
245
246
247
248
		haltC:            make(chan struct{}),
		doneC:            make(chan struct{}),
		startC:           make(chan struct{}),
		snapC:            make(chan *raftpb.Snapshot),
249
		errorC:           make(chan struct{}),
250
		gcC:              make(chan *gc),
251
252
253
		observeC:         observeC,
		support:          support,
		fresh:            fresh,
254
		appliedIndex:     opts.BlockMetadata.RaftIndex,
255
		lastBlock:        b,
Jay Guo's avatar
Jay Guo committed
256
		sizeLimit:        sizeLimit,
257
		lastSnapBlockNum: snapBlkNum,
258
		confState:        cc,
259
		createPuller:     f,
260
		clock:            opts.Clock,
261
		Metrics: &Metrics{
262
263
264
265
266
267
268
269
270
			ClusterSize:             opts.Metrics.ClusterSize.With("channel", support.ChainID()),
			IsLeader:                opts.Metrics.IsLeader.With("channel", support.ChainID()),
			CommittedBlockNumber:    opts.Metrics.CommittedBlockNumber.With("channel", support.ChainID()),
			SnapshotBlockNumber:     opts.Metrics.SnapshotBlockNumber.With("channel", support.ChainID()),
			LeaderChanges:           opts.Metrics.LeaderChanges.With("channel", support.ChainID()),
			ProposalFailures:        opts.Metrics.ProposalFailures.With("channel", support.ChainID()),
			DataPersistDuration:     opts.Metrics.DataPersistDuration.With("channel", support.ChainID()),
			NormalProposalsReceived: opts.Metrics.NormalProposalsReceived.With("channel", support.ChainID()),
			ConfigProposalsReceived: opts.Metrics.ConfigProposalsReceived.With("channel", support.ChainID()),
271
		},
272
273
274
		logger:          lg,
		opts:            opts,
		migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
275
	}
276
277
278

	// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
	// We guard against replay of written blocks in `entriesToApply` instead.
279
	config := &raft.Config{
280
281
282
283
		ID:              c.raftID,
		ElectionTick:    c.opts.ElectionTick,
		HeartbeatTick:   c.opts.HeartbeatTick,
		MaxSizePerMsg:   c.opts.MaxSizePerMsg,
Jay Guo's avatar
Jay Guo committed
284
		MaxInflightMsgs: c.opts.MaxInflightBlocks,
285
286
287
288
289
		Logger:          c.logger,
		Storage:         c.opts.MemoryStorage,
		// PreVote prevents reconnected node from disturbing network.
		// See etcd/raft doc for more details.
		PreVote:                   true,
Jay Guo's avatar
Jay Guo committed
290
		CheckQuorum:               true,
291
		DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
292
293
	}

294
	c.Node = &node{
295
296
297
		chainID:      c.channelID,
		chain:        c,
		logger:       c.logger,
298
		metrics:      c.Metrics,
299
300
301
302
303
		storage:      storage,
		rpc:          c.rpc,
		config:       config,
		tickInterval: c.opts.TickInterval,
		clock:        c.clock,
304
		metadata:     c.opts.BlockMetadata,
305
306
307
308
309
	}

	return c, nil
}

310
311
312
313
314
315
// MigrationStatus provides access to the consensus-type migration status of the chain.
// (Added to the Chain interface mainly for the Kafka chains)
func (c *Chain) MigrationStatus() migration.Status {
	return c.migrationStatus
}

316
317
318
319
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
	c.logger.Infof("Starting Raft node")

320
	c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
321
322
	// all nodes start out as followers
	c.Metrics.IsLeader.Set(float64(0))
yacovm's avatar
yacovm committed
323
	if err := c.configureComm(); err != nil {
324
		c.logger.Errorf("Failed to start chain, aborting: +%v", err)
yacovm's avatar
yacovm committed
325
326
327
328
		close(c.doneC)
		return
	}

329
330
331
332
333
	isJoin := c.support.Height() > 1
	isMigration := false
	if isJoin {
		isMigration = c.detectMigration()
	}
334
335
	c.Node.start(c.fresh, isJoin, isMigration)

336
	close(c.startC)
337
	close(c.errorC)
338

339
	go c.gc()
340
	go c.serveRequest()
341
342
343

	es := c.newEvictionSuspector()

344
345
346
347
348
	interval := DefaultLeaderlessCheckInterval
	if c.opts.LeaderCheckInterval != 0 {
		interval = c.opts.LeaderCheckInterval
	}

349
350
	c.periodicChecker = &PeriodicCheck{
		Logger:        c.logger,
351
		Report:        es.confirmSuspicion,
352
		CheckInterval: interval,
353
354
		Condition:     c.suspectEviction,
	}
355
	c.periodicChecker.Run()
356
357
}

358
359
360
361
362
363
// detectMigration detects if the orderer restarts right after consensus-type migration,
// in which the Height>1 but previous blocks were created by Kafka.
// If this is the case, Raft should be started like it is joining a new channel.
func (c *Chain) detectMigration() bool {
	startOfChain := false
	if c.support.SharedConfig().Capabilities().Kafka2RaftMigration() {
364
		lastConfigIndex, err := utils.GetLastConfigIndexFromBlock(c.lastBlock)
365
366
367
368
		if err != nil {
			c.logger.Panicf("Chain did not have appropriately encoded last config in its latest block: %s", err)
		}

369
370
		c.logger.Debugf("Detecting if consensus-type migration, sysChan=%v, lastConfigIndex=%d, Height=%d, mig-state: %s",
			c.support.IsSystemChannel(), lastConfigIndex, c.lastBlock.Header.Number+1, c.support.SharedConfig().ConsensusMigrationState().String())
371

372
		if lastConfigIndex != c.lastBlock.Header.Number { // The last block is not a config-tx
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
			return startOfChain
		}

		// The last block was a config-tx
		if c.support.IsSystemChannel() {
			if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_COMMIT {
				startOfChain = true
			}
		} else {
			if c.support.SharedConfig().ConsensusMigrationState() == orderer.ConsensusType_MIG_STATE_CONTEXT {
				startOfChain = true
			}
		}

		if startOfChain {
			c.logger.Infof("[channel: %s], Restarting after consensus-type migration. Type: %s, just starting the channel.",
				c.support.ChainID(), c.support.SharedConfig().ConsensusType())
		}
	}
	return startOfChain
}

395
// Order submits normal type transactions for ordering.
396
func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
397
	c.Metrics.NormalProposalsReceived.Add(1)
398
	return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
399
400
}

401
// Configure submits config type transactions for ordering.
402
func (c *Chain) Configure(env *common.Envelope, configSeq uint64) error {
403
	c.Metrics.ConfigProposalsReceived.Add(1)
404
	if err := c.checkConfigUpdateValidity(env); err != nil {
405
		c.Metrics.ProposalFailures.Add(1)
406
407
		return err
	}
408
	return c.Submit(&orderer.SubmitRequest{LastValidationSeq: configSeq, Payload: env, Channel: c.channelID}, 0)
409
410
}

411
// Validate the config update for being of Type A or Type B as described in the design doc.
412
413
414
415
416
417
418
419
420
421
422
423
424
func (c *Chain) checkConfigUpdateValidity(ctx *common.Envelope) error {
	var err error
	payload, err := utils.UnmarshalPayload(ctx.Payload)
	if err != nil {
		return err
	}
	chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
	if err != nil {
		return err
	}

	switch chdr.Type {
	case int32(common.HeaderType_ORDERER_TRANSACTION):
425
		return nil
426
	case int32(common.HeaderType_CONFIG):
427
		configUpdate, err := configtx.UnmarshalConfigUpdateFromPayload(payload)
428
429
430
431
		if err != nil {
			return err
		}

432
		// Check that only the ConsensusType is updated in the write-set
433
		if ordererConfigGroup, ok := configUpdate.WriteSet.Groups["Orderer"]; ok {
434
435
			if val, ok := ordererConfigGroup.Values["ConsensusType"]; ok {
				return c.checkConsentersSet(val)
436
437
438
439
440
441
442
			}
		}
		return nil

	default:
		return errors.Errorf("config transaction has unknown header type")
	}
443
444
}

445
446
447
448
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
449
func (c *Chain) WaitReady() error {
450
451
452
453
454
	if err := c.isRunning(); err != nil {
		return err
	}

	select {
455
	case c.submitC <- nil:
456
457
458
459
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
	}

460
461
462
	return nil
}

463
// Errored returns a channel that closes when the chain stops.
464
func (c *Chain) Errored() <-chan struct{} {
465
466
467
	c.errorCLock.RLock()
	defer c.errorCLock.RUnlock()
	return c.errorC
468
469
}

470
// Halt stops the chain.
471
func (c *Chain) Halt() {
472
473
474
475
476
477
478
	select {
	case <-c.startC:
	default:
		c.logger.Warnf("Attempted to halt a chain that has not started")
		return
	}

479
480
481
482
483
484
485
486
	select {
	case c.haltC <- struct{}{}:
	case <-c.doneC:
		return
	}
	<-c.doneC
}

487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
func (c *Chain) isRunning() error {
	select {
	case <-c.startC:
	default:
		return errors.Errorf("chain is not started")
	}

	select {
	case <-c.doneC:
		return errors.Errorf("chain is stopped")
	default:
	}

	return nil
}

503
504
// Consensus passes the given ConsensusRequest message to the raft.Node instance
func (c *Chain) Consensus(req *orderer.ConsensusRequest, sender uint64) error {
505
506
507
508
509
510
511
512
513
	if err := c.isRunning(); err != nil {
		return err
	}

	stepMsg := &raftpb.Message{}
	if err := proto.Unmarshal(req.Payload, stepMsg); err != nil {
		return fmt.Errorf("failed to unmarshal StepRequest payload to Raft Message: %s", err)
	}

514
	if err := c.Node.Step(context.TODO(), *stepMsg); err != nil {
515
516
517
518
		return fmt.Errorf("failed to process Raft Step message: %s", err)
	}

	return nil
519
520
}

521
522
523
524
// Submit forwards the incoming request to:
// - the local serveRequest goroutine if this is leader
// - the actual leader via the transport mechanism
// The call fails if there's no leader elected yet.
525
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
526
	if err := c.isRunning(); err != nil {
527
		c.Metrics.ProposalFailures.Add(1)
528
529
		return err
	}
530

531
	leadC := make(chan uint64, 1)
532
	select {
533
534
535
	case c.submitC <- &submit{req, leadC}:
		lead := <-leadC
		if lead == raft.None {
536
			c.Metrics.ProposalFailures.Add(1)
537
538
539
540
			return errors.Errorf("no Raft leader")
		}

		if lead != c.raftID {
541
542
543
544
			if err := c.rpc.SendSubmit(lead, req); err != nil {
				c.Metrics.ProposalFailures.Add(1)
				return err
			}
545
546
		}

547
	case <-c.doneC:
548
		c.Metrics.ProposalFailures.Add(1)
549
		return errors.Errorf("chain is stopped")
550
	}
551
552

	return nil
553
554
}

555
type apply struct {
556
557
	entries []raftpb.Entry
	soft    *raft.SoftState
558
559
}

560
561
562
563
func isCandidate(state raft.StateType) bool {
	return state == raft.StatePreCandidate || state == raft.StateCandidate
}

564
func (c *Chain) serveRequest() {
565
566
567
568
	ticking := false
	timer := c.clock.NewTimer(time.Second)
	// we need a stopped timer rather than nil,
	// because we will be select waiting on timer.C()
569
570
571
572
	if !timer.Stop() {
		<-timer.C()
	}

573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
	// if timer is already started, this is a no-op
	start := func() {
		if !ticking {
			ticking = true
			timer.Reset(c.support.SharedConfig().BatchTimeout())
		}
	}

	stop := func() {
		if !timer.Stop() && ticking {
			// we only need to drain the channel if the timer expired (not explicitly stopped)
			<-timer.C()
		}
		ticking = false
	}

Jay Guo's avatar
Jay Guo committed
589
	var soft raft.SoftState
590
	submitC := c.submitC
591
	var bc *blockCreator
592

593
594
595
596
597
598
599
	var propC chan<- *common.Block
	var cancelProp context.CancelFunc
	cancelProp = func() {} // no-op as initial value

	becomeLeader := func() (chan<- *common.Block, context.CancelFunc) {
		c.Metrics.IsLeader.Set(1)

600
		c.blockInflight = 0
Jay Guo's avatar
Jay Guo committed
601
602
		c.justElected = true
		submitC = nil
Jay Guo's avatar
Jay Guo committed
603
		ch := make(chan *common.Block, c.opts.MaxInflightBlocks)
Jay Guo's avatar
Jay Guo committed
604
605
606
607

		// if there is unfinished ConfChange, we should resume the effort to propose it as
		// new leader, and wait for it to be committed before start serving new requests.
		if cc := c.getInFlightConfChange(); cc != nil {
608
609
610
611
612
613
			// The reason `ProposeConfChange` should be called in go routine is documented in `writeConfigBlock` method.
			go func() {
				if err := c.Node.ProposeConfChange(context.TODO(), *cc); err != nil {
					c.logger.Warnf("Failed to propose configuration update to Raft node: %s", err)
				}
			}()
Jay Guo's avatar
Jay Guo committed
614
615
616
617

			c.confChangeInProgress = cc
			c.configInflight = true
		}
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641

		// Leader should call Propose in go routine, because this method may be blocked
		// if node is leaderless (this can happen when leader steps down in a heavily
		// loaded network). We need to make sure applyC can still be consumed properly.
		ctx, cancel := context.WithCancel(context.Background())
		go func(ctx context.Context, ch <-chan *common.Block) {
			for {
				select {
				case b := <-ch:
					data := utils.MarshalOrPanic(b)
					if err := c.Node.Propose(ctx, data); err != nil {
						c.logger.Errorf("Failed to propose block %d to raft and discard %d blocks in queue: %s", b.Header.Number, len(ch), err)
						return
					}
					c.logger.Debugf("Proposed block %d to raft consensus", b.Header.Number)

				case <-ctx.Done():
					c.logger.Debugf("Quit proposing blocks, discarded %d blocks in the queue", len(ch))
					return
				}
			}
		}(ctx, ch)

		return ch, cancel
Jay Guo's avatar
Jay Guo committed
642
643
644
	}

	becomeFollower := func() {
645
		cancelProp()
646
		c.blockInflight = 0
Jay Guo's avatar
Jay Guo committed
647
648
649
650
		_ = c.support.BlockCutter().Cut()
		stop()
		submitC = c.submitC
		bc = nil
651
		c.Metrics.IsLeader.Set(0)
Jay Guo's avatar
Jay Guo committed
652
653
	}

654
	for {
655
		select {
656
657
		case s := <-submitC:
			if s == nil {
658
659
660
661
				// polled by `WaitReady`
				continue
			}

Jay Guo's avatar
Jay Guo committed
662
			if soft.RaftState == raft.StatePreCandidate || soft.RaftState == raft.StateCandidate {
663
				s.leader <- raft.None
Jay Guo's avatar
Jay Guo committed
664
665
666
				continue
			}

667
668
669
670
			s.leader <- soft.Lead
			if soft.Lead != c.raftID {
				continue
			}
671

672
673
674
			batches, pending, err := c.ordered(s.req)
			if err != nil {
				c.logger.Errorf("Failed to order message: %s", err)
675
				continue
676
677
678
679
680
			}
			if pending {
				start() // no-op if timer is already started
			} else {
				stop()
681
682
			}

683
			c.propose(propC, bc, batches...)
684
685
686
687

			if c.configInflight {
				c.logger.Info("Received config block, pause accepting transaction till it is committed")
				submitC = nil
Jay Guo's avatar
Jay Guo committed
688
			} else if c.blockInflight >= c.opts.MaxInflightBlocks {
689
				c.logger.Debugf("Number of in-flight blocks (%d) reaches limit (%d), pause accepting transaction",
Jay Guo's avatar
Jay Guo committed
690
					c.blockInflight, c.opts.MaxInflightBlocks)
691
				submitC = nil
692
			}
693

694
		case app := <-c.applyC:
695
696
			if app.soft != nil {
				newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
Jay Guo's avatar
Jay Guo committed
697
698
				if newLeader != soft.Lead {
					c.logger.Infof("Raft leader changed: %d -> %d", soft.Lead, newLeader)
699
					c.Metrics.LeaderChanges.Add(1)
700

701
702
					atomic.StoreUint64(&c.lastKnownLeader, newLeader)

703
					if newLeader == c.raftID {
704
						propC, cancelProp = becomeLeader()
705
706
					}

Jay Guo's avatar
Jay Guo committed
707
					if soft.Lead == c.raftID {
Jay Guo's avatar
Jay Guo committed
708
						becomeFollower()
709
					}
710
				}
711

712
713
714
715
716
717
718
719
720
721
				foundLeader := soft.Lead == raft.None && newLeader != raft.None
				quitCandidate := isCandidate(soft.RaftState) && !isCandidate(app.soft.RaftState)

				if foundLeader || quitCandidate {
					c.errorCLock.Lock()
					c.errorC = make(chan struct{})
					c.errorCLock.Unlock()
				}

				if isCandidate(app.soft.RaftState) || newLeader == raft.None {
722
					atomic.StoreUint64(&c.lastKnownLeader, raft.None)
723
724
725
					select {
					case <-c.errorC:
					default:
726
						nodeCount := len(c.opts.BlockMetadata.ConsenterIds)
727
728
729
730
731
732
733
						// Only close the error channel (to signal the broadcast/deliver front-end a consensus backend error)
						// If we are a cluster of size 3 or more, otherwise we can't expand a cluster of size 1 to 2 nodes.
						if nodeCount > 2 {
							close(c.errorC)
						} else {
							c.logger.Warningf("No leader is present, cluster size is %d", nodeCount)
						}
734
735
736
					}
				}

Jay Guo's avatar
Jay Guo committed
737
738
				soft = raft.SoftState{Lead: newLeader, RaftState: app.soft.RaftState}

739
740
				// notify external observer
				select {
Jay Guo's avatar
Jay Guo committed
741
				case c.observeC <- soft:
742
				default:
743
744
745
				}
			}

Jay Guo's avatar
Jay Guo committed
746
			c.apply(app.entries)
747

748
			if c.justElected {
749
				msgInflight := c.Node.lastIndex() > c.appliedIndex
750
				if msgInflight {
751
752
753
754
					c.logger.Debugf("There are in flight blocks, new leader should not serve requests")
					continue
				}

755
756
757
758
759
760
				if c.configInflight {
					c.logger.Debugf("There is config block in flight, new leader should not serve requests")
					continue
				}

				c.logger.Infof("Start accepting requests as Raft leader at block %d", c.lastBlock.Header.Number)
761
				bc = &blockCreator{
762
763
					hash:   c.lastBlock.Header.Hash(),
					number: c.lastBlock.Header.Number,
764
765
766
767
768
					logger: c.logger,
				}
				submitC = c.submitC
				c.justElected = false
			} else if c.configInflight {
769
				c.logger.Info("Config block or ConfChange in flight, pause accepting transaction")
770
				submitC = nil
Jay Guo's avatar
Jay Guo committed
771
			} else if c.blockInflight < c.opts.MaxInflightBlocks {
772
				submitC = c.submitC
773
			}
774

775
		case <-timer.C():
776
			ticking = false
777
778
779
780
781
782
783
784

			batch := c.support.BlockCutter().Cut()
			if len(batch) == 0 {
				c.logger.Warningf("Batch timer expired with no pending requests, this might indicate a bug")
				continue
			}

			c.logger.Debugf("Batch timer expired, creating block")
785
			c.propose(propC, bc, batch) // we are certain this is normal block, no need to block
786

787
		case sn := <-c.snapC:
788
789
790
791
792
			if sn.Metadata.Index != 0 {
				if sn.Metadata.Index <= c.appliedIndex {
					c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
					break
				}
793

794
795
796
797
798
				c.confState = sn.Metadata.ConfState
				c.appliedIndex = sn.Metadata.Index
			} else {
				c.logger.Infof("Received artificial snapshot to trigger catchup")
			}
799

800
			if err := c.catchUp(sn); err != nil {
801
				c.logger.Panicf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
802
803
804
					sn.Metadata.Term, sn.Metadata.Index, err)
			}

805
		case <-c.doneC:
806
807
			cancelProp()

808
809
810
811
812
813
			select {
			case <-c.errorC: // avoid closing closed channel
			default:
				close(c.errorC)
			}

814
			c.logger.Infof("Stop serving requests")
815
			c.periodicChecker.Stop()
816
817
818
819
820
			return
		}
	}
}

Jay Guo's avatar
Jay Guo committed
821
func (c *Chain) writeBlock(block *common.Block, index uint64) {
822
823
824
825
826
827
828
	if block.Header.Number > c.lastBlock.Header.Number+1 {
		c.logger.Panicf("Got block %d, expect block %d", block.Header.Number, c.lastBlock.Header.Number+1)
	} else if block.Header.Number < c.lastBlock.Header.Number+1 {
		c.logger.Infof("Got block %d, expect block %d, this node was forced to catch up", block.Header.Number, c.lastBlock.Header.Number+1)
		return
	}

829
830
831
	if c.blockInflight > 0 {
		c.blockInflight-- // only reduce on leader
	}
832
	c.lastBlock = block
833

834
	c.logger.Debugf("Writing block %d to ledger", block.Header.Number)
835

836
	if utils.IsConfigBlock(block) {
Jay Guo's avatar
Jay Guo committed
837
838
		c.writeConfigBlock(block, index)
		return
839
840
	}

841
	c.raftMetadataLock.Lock()
842
843
	c.opts.BlockMetadata.RaftIndex = index
	m := utils.MarshalOrPanic(c.opts.BlockMetadata)
844
845
	c.raftMetadataLock.Unlock()

846
	c.support.WriteBlock(block, m)
847
848
}

849
850
851
852
853
854
855
856
857
// Orders the envelope in the `msg` content. SubmitRequest.
// Returns
//   -- batches [][]*common.Envelope; the batches cut,
//   -- pending bool; if there are envelopes pending to be ordered,
//   -- err error; the error encountered, if any.
// It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.
func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelope, pending bool, err error) {
	seq := c.support.Sequence()

858
	if c.isConfig(msg.Payload) {
859
860
		// ConfigMsg
		if msg.LastValidationSeq < seq {
861
			c.logger.Warnf("Config message was validated against %d, although current config seq has advanced (%d)", msg.LastValidationSeq, seq)
862
			msg.Payload, _, err = c.support.ProcessConfigMsg(msg.Payload)
863
			if err != nil {
864
865
866
867
868
869
				c.Metrics.ProposalFailures.Add(1)
				return nil, true, errors.Errorf("bad config message: %s", err)
			}

			if err = c.checkConfigUpdateValidity(msg.Payload); err != nil {
				c.Metrics.ProposalFailures.Add(1)
870
871
872
873
874
875
876
877
				return nil, true, errors.Errorf("bad config message: %s", err)
			}
		}
		batch := c.support.BlockCutter().Cut()
		batches = [][]*common.Envelope{}
		if len(batch) != 0 {
			batches = append(batches, batch)
		}
878
		batches = append(batches, []*common.Envelope{msg.Payload})
879
880
881
882
		return batches, false, nil
	}
	// it is a normal message
	if msg.LastValidationSeq < seq {
883
		c.logger.Warnf("Normal message was validated against %d, although current config seq has advanced (%d)", msg.LastValidationSeq, seq)
884
		if _, err := c.support.ProcessNormalMsg(msg.Payload); err != nil {
885
			c.Metrics.ProposalFailures.Add(1)
886
887
888
			return nil, true, errors.Errorf("bad normal message: %s", err)
		}
	}
889
	batches, pending = c.support.BlockCutter().Ordered(msg.Payload)
890
891
892
893
	return batches, pending, nil

}

894
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
895
	for _, batch := range batches {
896
		b := bc.createNextBlock(batch)
897
		c.logger.Debugf("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
898
899
900
901
902

		select {
		case ch <- b:
		default:
			c.logger.Panic("Programming error: limit of in-flight blocks does not properly take effect or block is proposed by follower")
903
904
		}

905
		// if it is config block, then we should wait for the commit of the block
906
		if utils.IsConfigBlock(b) {
Jay Guo's avatar
Jay Guo committed
907
			c.configInflight = true
908
		}
909
910

		c.blockInflight++
911
912
	}

913
	return
914
915
}

916
917
918
919
920
921
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
	b, err := utils.UnmarshalBlock(snap.Data)
	if err != nil {
		return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
	}

922
923
	if c.lastBlock.Header.Number >= b.Header.Number {
		c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, c.lastBlock.Header.Number)
924
925
926
		return nil
	}

927
928
929
930
931
	puller, err := c.createPuller()
	if err != nil {
		return errors.Errorf("failed to create block puller: %s", err)
	}
	defer puller.Close()
932

933
934
935
936
	next := c.lastBlock.Header.Number + 1

	c.logger.Infof("Catching up with snapshot taken at block %d, starting from block %d", b.Header.Number, next)

937
	for next <= b.Header.Number {
938
		block := puller.PullBlock(next)
939
940
941
942
943
		if block == nil {
			return errors.Errorf("failed to fetch block %d from cluster", next)
		}
		if utils.IsConfigBlock(block) {
			c.support.WriteConfigBlock(block, nil)
944

945
			configMembership := c.detectConfChange(block)
946

947
			if configMembership != nil && configMembership.Changed() {
948
				c.logger.Infof("Config block %d changes consenter set, communication should be reconfigured", block.Header.Number)
949
950

				c.raftMetadataLock.Lock()
951
				c.opts.BlockMetadata = configMembership.NewBlockMetadata
952
				c.opts.Consenters = configMembership.NewConsenters
953
954
955
956
957
958
				c.raftMetadataLock.Unlock()

				if err := c.configureComm(); err != nil {
					c.logger.Panicf("Failed to configure communication: %s", err)
				}
			}
959
960
961
962
		} else {
			c.support.WriteBlock(block, nil)
		}

963
		c.lastBlock = block
964
965
966
967
968
969
970
		next++
	}

	c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
	return nil
}

971
func (c *Chain) detectConfChange(block *common.Block) *MembershipChanges {
972
973
	// If config is targeting THIS channel, inspect consenter set and
	// propose raft ConfChange if it adds/removes node.
974
	configMetadata := c.newConfigMetadata(block)
975

976
977
978
979
	if configMetadata == nil {
		return nil
	}

Jay Guo's avatar
Jay Guo committed
980
981
982
983
984
985
	if configMetadata.Options != nil &&
		configMetadata.Options.SnapshotIntervalSize != 0 &&
		configMetadata.Options.SnapshotIntervalSize != c.sizeLimit {
		c.logger.Infof("Update snapshot interval size to %d bytes (was %d)",
			configMetadata.Options.SnapshotIntervalSize, c.sizeLimit)
		c.sizeLimit = configMetadata.Options.SnapshotIntervalSize
986
987
	}

988
	changes, err := ComputeMembershipChanges(c.opts.BlockMetadata, c.opts.Consenters, configMetadata.Consenters)
989
990
	if err != nil {
		c.logger.Panicf("illegal configuration change detected: %s", err)
991
992
	}

993
994
	if changes.Rotated() {
		c.logger.Infof("Config block %d rotates TLS certificate of node %d", block.Header.Number, changes.RotatedNode)
995
	}
996

997
	return changes
998
999
}

Jay Guo's avatar
Jay Guo committed
1000
func (c *Chain) apply(ents []raftpb.Entry) {
For faster browsing, not all history is shown. View entire blame