chain_test.go 118 KB
Newer Older
1
2
3
4
5
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/
yacovm's avatar
yacovm committed
6

7
8
9
package etcdraft_test

import (
yacovm's avatar
yacovm committed
10
	"encoding/pem"
11
12
13
14
15
	"fmt"
	"io/ioutil"
	"os"
	"os/user"
	"path"
16
	"sync"
17
18
	"time"

19
	"code.cloudfoundry.org/clock/fakeclock"
20
	"github.com/golang/protobuf/proto"
21
	"github.com/hyperledger/fabric/bccsp/factory"
yacovm's avatar
yacovm committed
22
	"github.com/hyperledger/fabric/common/crypto/tlsgen"
23
24
	"github.com/hyperledger/fabric/common/flogging"
	mockconfig "github.com/hyperledger/fabric/common/mocks/config"
yacovm's avatar
yacovm committed
25
	"github.com/hyperledger/fabric/orderer/common/cluster"
26
	"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
27
	"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
28
29
30
	consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
	mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
	"github.com/hyperledger/fabric/protos/common"
31
	"github.com/hyperledger/fabric/protos/orderer"
yacovm's avatar
yacovm committed
32
	raftprotos "github.com/hyperledger/fabric/protos/orderer/etcdraft"
33
	"github.com/hyperledger/fabric/protos/utils"
34
35
	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
36
	"github.com/onsi/gomega/types"
37
	"github.com/pkg/errors"
yacovm's avatar
yacovm committed
38
	"github.com/stretchr/testify/mock"
39
40
	"go.etcd.io/etcd/raft"
	"go.etcd.io/etcd/raft/raftpb"
41
42
43
	"go.uber.org/zap"
)

44
const (
45
	interval            = time.Second
46
	LongEventualTimeout = 10 * time.Second
47
48
49
50
51
52
53
54

	// 10 is the default setting of ELECTION_TICK.
	// We used to have a small number here (2) to reduce the time for test - we don't
	// need to tick node 10 times to trigger election - however, we are using another
	// mechanism to trigger it now which does not depend on time: send an artificial
	// MsgTimeoutNow to node.
	ELECTION_TICK  = 10
	HEARTBEAT_TICK = 1
55
56
)

57
58
59
60
func init() {
	factory.InitFactories(nil)
}

61
62
63
64
65
66
67
68
69
70
// for some test cases we chmod file/dir to test failures caused by exotic permissions.
// however this does not work if tests are running as root, i.e. in a container.
func skipIfRoot() {
	u, err := user.Current()
	Expect(err).NotTo(HaveOccurred())
	if u.Uid == "0" {
		Skip("you are running test as root, there's no way to make files unreadable")
	}
}

71
72
var _ = Describe("Chain", func() {
	var (
73
74
75
76
		env       *common.Envelope
		channelID string
		tlsCA     tlsgen.CA
		logger    *flogging.FabricLogger
77
78
79
	)

	BeforeEach(func() {
yacovm's avatar
yacovm committed
80
		tlsCA, _ = tlsgen.NewCA()
81
		channelID = "test-channel"
82
		logger = flogging.NewFabricLogger(zap.NewExample())
83
84
85
		env = &common.Envelope{
			Payload: marshalOrPanic(&common.Payload{
				Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
86
87
88
89
90
				Data:   []byte("TEST_MESSAGE"),
			}),
		}
	})

91
	Describe("Single Raft node", func() {
92
		var (
93
			configurator      *mocks.Configurator
94
			consenterMetadata *raftprotos.ConfigMetadata
95
			consenters        map[uint64]*raftprotos.Consenter
yacovm's avatar
yacovm committed
96
97
98
99
100
			clock             *fakeclock.FakeClock
			opts              etcdraft.Options
			support           *consensusmocks.FakeConsenterSupport
			cutter            *mockblockcutter.Receiver
			storage           *raft.MemoryStorage
101
			observeC          chan raft.SoftState
yacovm's avatar
yacovm committed
102
			chain             *etcdraft.Chain
103
			dataDir           string
104
			walDir            string
105
106
			snapDir           string
			err               error
107
			fakeFields        *fakeMetricsFields
108
109
110
		)

		BeforeEach(func() {
111
112
			configurator = &mocks.Configurator{}
			configurator.On("Configure", mock.Anything, mock.Anything)
113
114
			clock = fakeclock.NewFakeClock(time.Now())
			storage = raft.NewMemoryStorage()
115
116

			dataDir, err = ioutil.TempDir("", "wal-")
117
			Expect(err).NotTo(HaveOccurred())
118
119
120
			walDir = path.Join(dataDir, "wal")
			snapDir = path.Join(dataDir, "snapshot")

121
			observeC = make(chan raft.SoftState, 1)
122
123
124
125
126
127
128
129
130
131
132

			support = &consensusmocks.FakeConsenterSupport{}
			support.ChainIDReturns(channelID)
			consenterMetadata = createMetadata(1, tlsCA)
			support.SharedConfigReturns(&mockconfig.Orderer{
				BatchTimeoutVal:      time.Hour,
				ConsensusMetadataVal: marshalOrPanic(consenterMetadata),
			})
			cutter = mockblockcutter.NewReceiver()
			support.BlockCutterReturns(cutter)

133
134
135
136
			// for block creator initialization
			support.HeightReturns(1)
			support.BlockReturns(getSeedBlock())

137
			meta := &raftprotos.BlockMetadata{
138
				ConsenterIds:    make([]uint64, len(consenterMetadata.Consenters)),
139
				NextConsenterId: 1,
140
141
			}

142
143
			for i := range meta.ConsenterIds {
				meta.ConsenterIds[i] = meta.NextConsenterId
144
				meta.NextConsenterId++
145
146
			}

147
148
149
150
151
			consenters = map[uint64]*raftprotos.Consenter{}
			for i, c := range consenterMetadata.Consenters {
				consenters[meta.ConsenterIds[i]] = c
			}

152
153
			fakeFields = newFakeMetricsFields()

154
			opts = etcdraft.Options{
Jay Guo's avatar
Jay Guo committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
				RaftID:            1,
				Clock:             clock,
				TickInterval:      interval,
				ElectionTick:      ELECTION_TICK,
				HeartbeatTick:     HEARTBEAT_TICK,
				MaxSizePerMsg:     1024 * 1024,
				MaxInflightBlocks: 256,
				BlockMetadata:     meta,
				Consenters:        consenters,
				Logger:            logger,
				MemoryStorage:     storage,
				WALDir:            walDir,
				SnapDir:           snapDir,
				Metrics:           newFakeMetrics(fakeFields),
169
			}
170
171
		})

172
173
174
175
176
		campaign := func(c *etcdraft.Chain, observeC <-chan raft.SoftState) {
			Eventually(func() <-chan raft.SoftState {
				c.Consensus(&orderer.ConsensusRequest{Payload: utils.MarshalOrPanic(&raftpb.Message{Type: raftpb.MsgTimeoutNow})}, 0)
				return observeC
			}, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader)))
177
178
		}

179
		JustBeforeEach(func() {
180
			chain, err = etcdraft.NewChain(support, opts, configurator, nil, noOpBlockPuller, observeC)
181
182
			Expect(err).NotTo(HaveOccurred())

183
184
			chain.Start()

185
			// When the Raft node bootstraps, it produces a ConfChange
186
187
			// to add itself, which needs to be consumed with Ready().
			// If there are pending configuration changes in raft,
188
189
190
			// it refuses to campaign, no matter how many ticks elapse.
			// This is not a problem in the production code because raft.Ready
			// will be consumed eventually, as the wall clock advances.
191
			//
192
193
194
195
196
			// However, this is problematic when using the fake clock and
			// artificial ticks. Instead of ticking raft indefinitely until
			// raft.Ready is consumed, this check is added to indirectly guarantee
			// that the first ConfChange is actually consumed and we can safely
			// proceed to tick the Raft FSM.
197
198
199
			Eventually(func() error {
				_, err := storage.Entries(1, 1, 1)
				return err
200
			}, LongEventualTimeout).ShouldNot(HaveOccurred())
201
202
203
204
		})

		AfterEach(func() {
			chain.Halt()
205
			Eventually(chain.Errored, LongEventualTimeout).Should(BeClosed())
206
			os.RemoveAll(dataDir)
207
208
		})

yacovm's avatar
yacovm committed
209
210
211
		Context("when a node starts up", func() {
			It("properly configures the communication layer", func() {
				expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata)
212
				configurator.AssertCalled(testingInstance, "Configure", channelID, expectedNodeConfig)
yacovm's avatar
yacovm committed
213
			})
214
215
216
217
218
219
220
221
222
223
224
225
226

			It("correctly sets the metrics labels and publishes requisite metrics", func() {
				type withImplementers interface {
					WithCallCount() int
					WithArgsForCall(int) []string
				}
				metricsList := []withImplementers{
					fakeFields.fakeClusterSize,
					fakeFields.fakeIsLeader,
					fakeFields.fakeCommittedBlockNumber,
					fakeFields.fakeSnapshotBlockNumber,
					fakeFields.fakeLeaderChanges,
					fakeFields.fakeProposalFailures,
227
228
229
					fakeFields.fakeDataPersistDuration,
					fakeFields.fakeNormalProposalsReceived,
					fakeFields.fakeConfigProposalsReceived,
230
231
232
233
234
235
236
237
238
239
240
241
242
				}
				for _, m := range metricsList {
					Expect(m.WithCallCount()).To(Equal(1))
					Expect(func() string {
						return m.WithArgsForCall(0)[1]
					}()).To(Equal(channelID))
				}

				Expect(fakeFields.fakeClusterSize.SetCallCount()).To(Equal(1))
				Expect(fakeFields.fakeClusterSize.SetArgsForCall(0)).To(Equal(float64(1)))
				Expect(fakeFields.fakeIsLeader.SetCallCount()).To(Equal(1))
				Expect(fakeFields.fakeIsLeader.SetArgsForCall(0)).To(Equal(float64(0)))
			})
yacovm's avatar
yacovm committed
243
244
		})

245
		Context("when no Raft leader is elected", func() {
246
			It("fails to order envelope", func() {
247
				err := chain.Order(env, 0)
248
				Expect(err).To(MatchError("no Raft leader"))
249
250
251
				Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
				Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
				Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(0))
252
253
				Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
				Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1)))
254
			})
Jay Guo's avatar
Jay Guo committed
255
256
257
258
259
260
261
262
263
264

			It("starts proactive campaign", func() {
				// assert that even tick supplied are less than ELECTION_TIMEOUT,
				// a leader can still be successfully elected.
				for i := 0; i < ELECTION_TICK; i++ {
					clock.Increment(interval)
					time.Sleep(10 * time.Millisecond)
				}
				Eventually(observeC, LongEventualTimeout).Should(Receive(StateEqual(1, raft.StateLeader)))
			})
265
266
		})

267
		Context("when Raft leader is elected", func() {
268
			JustBeforeEach(func() {
269
				campaign(chain, observeC)
270
271
			})

272
273
274
275
276
277
278
			It("updates metrics upon leader election)", func() {
				Expect(fakeFields.fakeIsLeader.SetCallCount()).To(Equal(2))
				Expect(fakeFields.fakeIsLeader.SetArgsForCall(1)).To(Equal(float64(1)))
				Expect(fakeFields.fakeLeaderChanges.AddCallCount()).To(Equal(1))
				Expect(fakeFields.fakeLeaderChanges.AddArgsForCall(0)).To(Equal(float64(1)))
			})

279
280
			It("fails to order envelope if chain is halted", func() {
				chain.Halt()
281
				err := chain.Order(env, 0)
282
				Expect(err).To(MatchError("chain is stopped"))
283
284
				Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
				Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
285
286
				Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
				Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1)))
287
288
289
290
291
292
293
			})

			It("produces blocks following batch rules", func() {
				close(cutter.Block)

				By("cutting next batch directly")
				cutter.CutNext = true
294
				err := chain.Order(env, 0)
295
				Expect(err).NotTo(HaveOccurred())
296
297
				Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
				Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
298
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
299
300
				Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(1))
				Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(0)).Should(Equal(float64(1)))
301

302
303
304
305
306
307
308
309
310
311
312
				// There are three calls to DataPersistDuration by now corresponding to the following three
				// arriving on the Ready channel:
				// 1. an EntryConfChange to let this node join the Raft cluster
				// 2. a SoftState and an associated increase of term in the HardState due to the node being elected leader
				// 3. a block being committed
				// The duration being emitted is zero since we don't tick the fake clock during this time
				Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(3))
				Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(0)).Should(Equal(float64(0)))
				Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(1)).Should(Equal(float64(0)))
				Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(2)).Should(Equal(float64(0)))

313
314
315
316
				By("respecting batch timeout")
				cutter.CutNext = false
				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
317
				err = chain.Order(env, 0)
318
				Expect(err).NotTo(HaveOccurred())
319
320
				Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(2))
				Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(1)).To(Equal(float64(1)))
321
322

				clock.WaitForNWatchersAndIncrement(timeout, 2)
323
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
324
325
				Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2))
				Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(2)))
326
327
				Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(4))
				Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(3)).Should(Equal(float64(0)))
328
			})
329
330
331
332
333
334
335

			It("does not reset timer for every envelope", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

336
				err := chain.Order(env, 0)
337
				Expect(err).NotTo(HaveOccurred())
338
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
339
340
341

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)

342
				err = chain.Order(env, 0)
343
				Expect(err).NotTo(HaveOccurred())
344
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(2))
345
346
347
348

				// the second envelope should not reset the timer; it should
				// therefore expire if we increment it by just timeout/2
				clock.Increment(timeout / 2)
349
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
350
351
352
353
354
355
356
			})

			It("does not write a block if halted before timeout", func() {
				close(cutter.Block)
				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

357
				err := chain.Order(env, 0)
358
				Expect(err).NotTo(HaveOccurred())
359
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
360
361

				// wait for timer to start
362
				Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
363
364
365
366
367
368
369
370
371
372
373

				chain.Halt()
				Consistently(support.WriteBlockCallCount).Should(Equal(0))
			})

			It("stops the timer if a batch is cut", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

374
				err := chain.Order(env, 0)
375
				Expect(err).NotTo(HaveOccurred())
376
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
377
378
379
380
381

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)

				By("force a batch to be cut before timer expires")
				cutter.CutNext = true
382
				err = chain.Order(env, 0)
383
				Expect(err).NotTo(HaveOccurred())
384

385
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
386
387
				b, _ := support.WriteBlockArgsForCall(0)
				Expect(b.Data.Data).To(HaveLen(2))
388
				Expect(cutter.CurBatch()).To(HaveLen(0))
389
390
391

				// this should start a fresh timer
				cutter.CutNext = false
392
				err = chain.Order(env, 0)
393
				Expect(err).NotTo(HaveOccurred())
394
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
395
396
397
398
399

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)
				Consistently(support.WriteBlockCallCount).Should(Equal(1))

				clock.Increment(timeout / 2)
400

401
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
402
403
				b, _ = support.WriteBlockArgsForCall(1)
				Expect(b.Data.Data).To(HaveLen(1))
404
405
406
407
408
409
410
411
			})

			It("cut two batches if incoming envelope does not fit into first batch", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

412
				err := chain.Order(env, 0)
413
				Expect(err).NotTo(HaveOccurred())
414
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
415
416

				cutter.IsolatedTx = true
417
				err = chain.Order(env, 0)
418
419
				Expect(err).NotTo(HaveOccurred())

420
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
421
422
423
424
425
426
427
428
429
430
431
			})

			Context("revalidation", func() {
				BeforeEach(func() {
					close(cutter.Block)

					timeout := time.Hour
					support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
					support.SequenceReturns(1)
				})

432
				It("enqueue if envelope is still valid", func() {
433
434
					support.ProcessNormalMsgReturns(1, nil)

435
					err := chain.Order(env, 0)
436
					Expect(err).NotTo(HaveOccurred())
437
					Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
438
					Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
439
440
				})

441
				It("does not enqueue if envelope is not valid", func() {
442
443
					support.ProcessNormalMsgReturns(1, errors.Errorf("Envelope is invalid"))

444
					err := chain.Order(env, 0)
445
					Expect(err).NotTo(HaveOccurred())
446
					Consistently(cutter.CurBatch).Should(HaveLen(0))
447
					Consistently(clock.WatcherCount).Should(Equal(1))
448
449
450
451
				})
			})

			It("unblocks Errored if chain is halted", func() {
452
453
				errorC := chain.Errored()
				Expect(errorC).NotTo(BeClosed())
454
				chain.Halt()
455
				Eventually(errorC, LongEventualTimeout).Should(BeClosed())
456
457
			})

458
459
			Describe("Config updates", func() {
				var (
460
461
					configEnv *common.Envelope
					configSeq uint64
462
463
464
465
466
467
468
469
470
471
472
473
474
475
				)

				Context("when a config update with invalid header comes", func() {

					BeforeEach(func() {
						configEnv = newConfigEnv(channelID,
							common.HeaderType_CONFIG_UPDATE, // invalid header; envelopes with CONFIG_UPDATE header never reach chain
							&common.ConfigUpdateEnvelope{ConfigUpdate: []byte("test invalid envelope")})
						configSeq = 0
					})

					It("should throw an error", func() {
						err := chain.Configure(configEnv, configSeq)
						Expect(err).To(MatchError("config transaction has unknown header type"))
476
477
						Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
						Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
478
479
						Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
						Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1)))
480
481
482
483
484
485
486
487
488
					})
				})

				Context("when a type A config update comes", func() {

					Context("for existing channel", func() {

						// use to prepare the Orderer Values
						BeforeEach(func() {
489
							newValues := map[string]*common.ConfigValue{
490
491
492
493
494
495
								"BatchTimeout": {
									Version: 1,
									Value: marshalOrPanic(&orderer.BatchTimeout{
										Timeout: "3ms",
									}),
								},
496
497
498
499
500
501
502
503
								"ConsensusType": {
									Version: 4,
								},
							}
							oldValues := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 4,
								},
504
505
506
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
507
								newConfigUpdateEnv(channelID, oldValues, newValues),
508
509
510
511
512
513
514
515
516
517
							)
							configSeq = 0
						}) // BeforeEach block

						Context("without revalidation (i.e. correct config sequence)", func() {

							Context("without pending normal envelope", func() {
								It("should create a config block and no normal block", func() {
									err := chain.Configure(configEnv, configSeq)
									Expect(err).NotTo(HaveOccurred())
518
519
									Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
									Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
520
									Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
521
522
523
									Consistently(support.WriteBlockCallCount).Should(Equal(0))
									Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(1))
									Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(0)).Should(Equal(float64(1)))
524
525
526
527
528
529
530
531
532
533
534
								})
							})

							Context("with pending normal envelope", func() {
								It("should create a normal block and a config block", func() {
									// We do not need to block the cutter from ordering in our test case and therefore close this channel.
									close(cutter.Block)

									By("adding a normal envelope")
									err := chain.Order(env, 0)
									Expect(err).NotTo(HaveOccurred())
535
536
									Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
									Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
537
									Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
538
539
540
541
542
543

									// // clock.WaitForNWatchersAndIncrement(timeout, 2)

									By("adding a config envelope")
									err = chain.Configure(configEnv, configSeq)
									Expect(err).NotTo(HaveOccurred())
544
545
									Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
									Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
546

547
548
									Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
									Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
549
550
									Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2))
									Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(2)))
551
552
553
554
555
556
557
								})
							})
						})

						Context("with revalidation (i.e. incorrect config sequence)", func() {

							BeforeEach(func() {
558
								close(cutter.Block)
559
560
561
562
563
564
								support.SequenceReturns(1) // this causes the revalidation
							})

							It("should create config block upon correct revalidation", func() {
								support.ProcessConfigMsgReturns(configEnv, 1, nil) // nil implies correct revalidation

565
566
								Expect(chain.Configure(configEnv, configSeq)).To(Succeed())
								Consistently(clock.WatcherCount).Should(Equal(1))
567
								Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
568
569
570
571
572
							})

							It("should not create config block upon incorrect revalidation", func() {
								support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence"))

573
574
								Expect(chain.Configure(configEnv, configSeq)).To(Succeed())
								Consistently(clock.WatcherCount).Should(Equal(1))
575
576
								Consistently(support.WriteConfigBlockCallCount).Should(Equal(0)) // no call to WriteConfigBlock
							})
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596

							It("should not disturb current running timer upon incorrect revalidation", func() {
								support.ProcessNormalMsgReturns(1, nil)
								support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence"))

								Expect(chain.Order(env, configSeq)).To(Succeed())
								Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))

								clock.Increment(30 * time.Minute)
								Consistently(support.WriteBlockCallCount).Should(Equal(0))

								Expect(chain.Configure(configEnv, configSeq)).To(Succeed())
								Consistently(clock.WatcherCount).Should(Equal(2))

								Consistently(support.WriteBlockCallCount).Should(Equal(0))
								Consistently(support.WriteConfigBlockCallCount).Should(Equal(0))

								clock.Increment(30 * time.Minute)
								Eventually(support.WriteBlockCallCount).Should(Equal(1))
							})
597
598
599
600
601
602
603
604
						})
					})

					Context("for creating a new channel", func() {

						// use to prepare the Orderer Values
						BeforeEach(func() {
							chainID := "mychannel"
Jay Guo's avatar
Jay Guo committed
605
							values := make(map[string]*common.ConfigValue)
606
							configEnv = newConfigEnv(chainID,
Jay Guo's avatar
Jay Guo committed
607
								common.HeaderType_CONFIG,
608
								newConfigUpdateEnv(chainID, nil, values),
Jay Guo's avatar
Jay Guo committed
609
							)
610
611
612
							configSeq = 0
						}) // BeforeEach block

613
						It("should be able to create a channel", func() {
614
							err := chain.Configure(configEnv, configSeq)
615
							Expect(err).NotTo(HaveOccurred())
616
							Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
617
618
619
620
621
						})
					})
				}) // Context block for type A config

				Context("when a type B config update comes", func() {
622
					Context("updating protocol values", func() {
623
624
625
626
627
628
						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
629
										Metadata: marshalOrPanic(consenterMetadata),
630
631
632
633
634
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
635
								newConfigUpdateEnv(channelID, nil, values))
636
							configSeq = 0
637

638
639
						}) // BeforeEach block

640
						It("should be able to process config update of type B", func() {
641
							err := chain.Configure(configEnv, configSeq)
642
							Expect(err).NotTo(HaveOccurred())
643
644
							Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
							Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
645
646
647
						})
					})

648
					Context("updating consenters set by more than one node", func() {
649
650
651
652
653
654
655
656
657
658
659
660
						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(createMetadata(3, tlsCA)),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
661
								newConfigUpdateEnv(channelID, nil, values))
662
							configSeq = 0
663

664
665
						}) // BeforeEach block

666
						It("should fail, since consenters set change is not supported", func() {
667
							err := chain.Configure(configEnv, configSeq)
668
							Expect(err).To(MatchError("update of more than one consenter at a time is not supported, requested changes: add 3 node(s), remove 1 node(s)"))
669
670
							Expect(fakeFields.fakeConfigProposalsReceived.AddCallCount()).To(Equal(1))
							Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
671
672
							Expect(fakeFields.fakeProposalFailures.AddCallCount()).To(Equal(1))
							Expect(fakeFields.fakeProposalFailures.AddArgsForCall(0)).To(Equal(float64(1)))
673
674
675
676
677
						})
					})

					Context("updating consenters set by exactly one node", func() {
						It("should be able to process config update adding single node", func() {
678
							metadata := proto.Clone(consenterMetadata).(*raftprotos.ConfigMetadata)
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
							metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
								Host:          "localhost",
								Port:          7050,
								ServerTlsCert: serverTLSCert(tlsCA),
								ClientTlsCert: clientTLSCert(tlsCA),
							})

							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
696
								newConfigUpdateEnv(channelID, nil, values))
697
698
699
700
701
702
703
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).NotTo(HaveOccurred())
						})

						It("should be able to process config update removing single node", func() {
704
							metadata := proto.Clone(consenterMetadata).(*raftprotos.ConfigMetadata)
705
706
707
708
709
710
711
712
713
714
715
716
							// Remove one of the consenters
							metadata.Consenters = metadata.Consenters[1:]
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
717
								newConfigUpdateEnv(channelID, nil, values))
718
719
720
721
722
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).NotTo(HaveOccurred())
						})
723
724
					})
				})
725
			})
726
727

			Describe("Crash Fault Tolerance", func() {
728
				var (
729
					raftMetadata *raftprotos.BlockMetadata
730
731
732
				)

				BeforeEach(func() {
733
					raftMetadata = &raftprotos.BlockMetadata{
734
						ConsenterIds:    []uint64{1},
735
						NextConsenterId: 2,
736
737
738
					}
				})

739
740
				Describe("when a chain is started with existing WAL", func() {
					var (
741
742
						m1 *raftprotos.BlockMetadata
						m2 *raftprotos.BlockMetadata
743
744
745
746
747
748
749
750
751
752
					)
					JustBeforeEach(func() {
						// to generate WAL data, we start a chain,
						// order several envelopes and then halt the chain.
						close(cutter.Block)
						cutter.CutNext = true

						// enque some data to be persisted on disk by raft
						err := chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
753
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
754
755

						_, metadata := support.WriteBlockArgsForCall(0)
756
						m1 = &raftprotos.BlockMetadata{}
757
758
759
760
						proto.Unmarshal(metadata, m1)

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
761
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
762
763

						_, metadata = support.WriteBlockArgsForCall(1)
764
						m2 = &raftprotos.BlockMetadata{}
765
766
767
768
769
770
						proto.Unmarshal(metadata, m2)

						chain.Halt()
					})

					It("replays blocks from committed entries", func() {
771
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
772
						c.init()
773
774
775
						c.Start()
						defer c.Halt()

776
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
777
778

						_, metadata := c.support.WriteBlockArgsForCall(0)
779
						m := &raftprotos.BlockMetadata{}
780
781
782
783
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m1.RaftIndex))

						_, metadata = c.support.WriteBlockArgsForCall(1)
784
						m = &raftprotos.BlockMetadata{}
785
786
787
788
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m2.RaftIndex))

						// chain should keep functioning
789
						campaign(c.Chain, c.observe)
790
791
792
793
794

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
795
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
796

797
798
799
					})

					It("only replays blocks after Applied index", func() {
800
						raftMetadata.RaftIndex = m1.RaftIndex
801
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
802
803
						c.support.WriteBlock(support.WriteBlockArgsForCall(0))

804
						c.init()
805
806
807
						c.Start()
						defer c.Halt()

808
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
809

810
						_, metadata := c.support.WriteBlockArgsForCall(1)
811
						m := &raftprotos.BlockMetadata{}
812
813
814
815
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m2.RaftIndex))

						// chain should keep functioning
816
						campaign(c.Chain, c.observe)
817
818
819
820
821

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
822
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
823
824
825
					})

					It("does not replay any block if already in sync", func() {
826
						raftMetadata.RaftIndex = m2.RaftIndex
827
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
828
						c.init()
829
830
831
832
833
834
						c.Start()
						defer c.Halt()

						Consistently(c.support.WriteBlockCallCount).Should(Equal(0))

						// chain should keep functioning
835
						campaign(c.Chain, c.observe)
836
837
838
839
840

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
841
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
842
843
844
845
846
847
848
849
850
851
852
853
					})

					Context("WAL file is not readable", func() {
						It("fails to load wal", func() {
							skipIfRoot()

							files, err := ioutil.ReadDir(walDir)
							Expect(err).NotTo(HaveOccurred())
							for _, f := range files {
								os.Chmod(path.Join(walDir, f.Name()), 0300)
							}

854
							c, err := etcdraft.NewChain(support, opts, configurator, nil, noOpBlockPuller, observeC)
855
							Expect(c).To(BeNil())
Jay Guo's avatar
Jay Guo committed
856
							Expect(err).To(MatchError(ContainSubstring("permission denied")))
857
858
859
						})
					})
				})
860
861
862
863

				Describe("when snapshotting is enabled (snapshot interval is not zero)", func() {
					var (
						ledgerLock sync.Mutex
864
						ledger     map[uint64]*common.Block
865
866
867
868
869
870
871
872
873
874
875
876
877
878
					)

					countFiles := func() int {
						files, err := ioutil.ReadDir(snapDir)
						Expect(err).NotTo(HaveOccurred())
						return len(files)
					}

					BeforeEach(func() {
						opts.SnapshotCatchUpEntries = 2

						close(cutter.Block)
						cutter.CutNext = true

879
880
881
882
883
884
						ledgerLock.Lock()
						ledger = map[uint64]*common.Block{
							0: getSeedBlock(), // genesis block
						}
						ledgerLock.Unlock()

885
886
887
						support.WriteBlockStub = func(block *common.Block, meta []byte) {
							b := proto.Clone(block).(*common.Block)

888
889
890
891
892
893
							bytes, err := proto.Marshal(&common.Metadata{Value: meta})
							Expect(err).NotTo(HaveOccurred())
							b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes

							ledgerLock.Lock()
							defer ledgerLock.Unlock()
894
							ledger[b.Header.Number] = b
895
896
897
						}

						support.HeightStub = func() uint64 {
898
899
							ledgerLock.Lock()
							defer ledgerLock.Unlock()
900
901
902
903
							return uint64(len(ledger))
						}
					})

Jay Guo's avatar
Jay Guo committed
904
905
					Context("Small SnapshotInterval", func() {
						BeforeEach(func() {
Jay Guo's avatar
Jay Guo committed
906
							opts.SnapshotIntervalSize = 1
Jay Guo's avatar
Jay Guo committed
907
						})
908

Jay Guo's avatar
Jay Guo committed
909
910
911
						It("writes snapshot file to snapDir", func() {
							// Scenario: start a chain with SnapInterval = 1 byte, expect it to take
							// one snapshot for each block
912

Jay Guo's avatar
Jay Guo committed
913
							i, _ := opts.MemoryStorage.FirstIndex()
914

Jay Guo's avatar
Jay Guo committed
915
916
917
918
							Expect(chain.Order(env, uint64(0))).To(Succeed())
							Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
							Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
							Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
919
920
921
922
							Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(1))
							s, _ := opts.MemoryStorage.Snapshot()
							b := utils.UnmarshalBlockOrPanic(s.Data)
							Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(0)).To(Equal(float64(b.Header.Number)))
923

Jay Guo's avatar
Jay Guo committed
924
							i, _ = opts.MemoryStorage.FirstIndex()
925

Jay Guo's avatar
Jay Guo committed
926
927
							Expect(chain.Order(env, uint64(0))).To(Succeed())
							Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
928

Jay Guo's avatar
Jay Guo committed
929
930
							Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
							Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
931
932
933
934
							Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2))
							s, _ = opts.MemoryStorage.Snapshot()
							b = utils.UnmarshalBlockOrPanic(s.Data)
							Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number)))
Jay Guo's avatar
Jay Guo committed
935
						})
936

Jay Guo's avatar
Jay Guo committed
937
938
939
940
941
						It("pauses chain if sync is in progress", func() {
							// Scenario:
							// after a snapshot is taken, reboot chain with raftIndex = 0
							// chain should attempt to sync upon reboot, and blocks on
							// `WaitReady` API
942

Jay Guo's avatar
Jay Guo committed
943
							i, _ := opts.MemoryStorage.FirstIndex()
944

Jay Guo's avatar
Jay Guo committed
945
946
947
948
							Expect(chain.Order(env, uint64(0))).To(Succeed())
							Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
							Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
							Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
949

Jay Guo's avatar
Jay Guo committed
950
							i, _ = opts.MemoryStorage.FirstIndex()
951

Jay Guo's avatar
Jay Guo committed
952
953
954
955
							Expect(chain.Order(env, uint64(0))).To(Succeed())
							Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
							Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
							Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
956

Jay Guo's avatar
Jay Guo committed
957
							chain.Halt()
958

959
							c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
Jay Guo's avatar
Jay Guo committed
960
							c.init()
961

Jay Guo's avatar
Jay Guo committed
962
							signal := make(chan struct{})
963

Jay Guo's avatar
Jay Guo committed
964
965
966
967
968
969
970
							c.puller.PullBlockStub = func(i uint64) *common.Block {
								<-signal // blocking for assertions
								ledgerLock.Lock()
								defer ledgerLock.Unlock()
								if i >= uint64(len(ledger)) {
									return nil
								}
971

Jay Guo's avatar
Jay Guo committed
972
973
								return ledger[i]
							}
974

Jay Guo's avatar
Jay Guo committed
975
976
							err := c.WaitReady()
							Expect(err).To(MatchError("chain is not started"))
977

Jay Guo's avatar
Jay Guo committed
978
979
							c.Start()
							defer c.Halt()
980

Jay Guo's avatar
Jay Guo committed
981
982
							// pull block is called, so chain should be catching up now, WaitReady should block
							signal <- struct{}{}
983

Jay Guo's avatar
Jay Guo committed
984
985
986
987
							done := make(chan error)
							go func() {
								done <- c.WaitReady()
							}()
988

Jay Guo's avatar
Jay Guo committed
989
990
991
992
993
							Consistently(done).ShouldNot(Receive())
							close(signal)                         // unblock block puller
							Eventually(done).Should(Receive(nil)) // WaitReady should be unblocked
							Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
						})
994

Jay Guo's avatar
Jay Guo committed
995
996
997
998
999
1000
						It("restores snapshot w/o extra entries", func() {
							// Scenario:
							// after a snapshot is taken, no more entries are appended.
							// then node is restarted, it loads snapshot, finds its term
							// and index. While replaying WAL to memory storage, it should
							// not append any entry because no extra entry was appended
For faster browsing, not all history is shown. View entire blame