chain_test.go 92.7 KB
Newer Older
1
2
3
4
5
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/
yacovm's avatar
yacovm committed
6

7
8
9
package etcdraft_test

import (
yacovm's avatar
yacovm committed
10
	"encoding/pem"
11
12
13
14
15
	"fmt"
	"io/ioutil"
	"os"
	"os/user"
	"path"
16
	"sync"
17
18
	"time"

19
20
	"code.cloudfoundry.org/clock/fakeclock"
	"github.com/coreos/etcd/raft"
21
	"github.com/coreos/etcd/raft/raftpb"
22
	"github.com/golang/protobuf/proto"
23
	"github.com/hyperledger/fabric/bccsp/factory"
yacovm's avatar
yacovm committed
24
	"github.com/hyperledger/fabric/common/crypto/tlsgen"
25
26
	"github.com/hyperledger/fabric/common/flogging"
	mockconfig "github.com/hyperledger/fabric/common/mocks/config"
yacovm's avatar
yacovm committed
27
	"github.com/hyperledger/fabric/orderer/common/cluster"
28
	"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
29
	"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
30
31
32
	consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
	mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
	"github.com/hyperledger/fabric/protos/common"
33
	"github.com/hyperledger/fabric/protos/orderer"
yacovm's avatar
yacovm committed
34
	raftprotos "github.com/hyperledger/fabric/protos/orderer/etcdraft"
35
	"github.com/hyperledger/fabric/protos/utils"
36
37
	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
38
	"github.com/onsi/gomega/types"
39
	"github.com/pkg/errors"
yacovm's avatar
yacovm committed
40
	"github.com/stretchr/testify/mock"
41
42
43
	"go.uber.org/zap"
)

44
const (
45
46
47
48
	interval            = time.Second
	LongEventualTimeout = 5 * time.Second
	ELECTION_TICK       = 2
	HEARTBEAT_TICK      = 1
49
50
)

51
52
53
54
func init() {
	factory.InitFactories(nil)
}

55
56
57
58
59
60
61
62
63
64
// for some test cases we chmod file/dir to test failures caused by exotic permissions.
// however this does not work if tests are running as root, i.e. in a container.
func skipIfRoot() {
	u, err := user.Current()
	Expect(err).NotTo(HaveOccurred())
	if u.Uid == "0" {
		Skip("you are running test as root, there's no way to make files unreadable")
	}
}

65
66
var _ = Describe("Chain", func() {
	var (
67
68
69
70
		env       *common.Envelope
		channelID string
		tlsCA     tlsgen.CA
		logger    *flogging.FabricLogger
71
72
73
	)

	BeforeEach(func() {
yacovm's avatar
yacovm committed
74
		tlsCA, _ = tlsgen.NewCA()
75
76
		channelID = "test-channel"
		logger = flogging.NewFabricLogger(zap.NewNop())
77
78
79
		env = &common.Envelope{
			Payload: marshalOrPanic(&common.Payload{
				Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
80
81
82
83
84
				Data:   []byte("TEST_MESSAGE"),
			}),
		}
	})

85
	Describe("Single Raft node", func() {
86
		var (
87
			configurator      *mocks.Configurator
yacovm's avatar
yacovm committed
88
89
90
91
92
93
			consenterMetadata *raftprotos.Metadata
			clock             *fakeclock.FakeClock
			opts              etcdraft.Options
			support           *consensusmocks.FakeConsenterSupport
			cutter            *mockblockcutter.Receiver
			storage           *raft.MemoryStorage
94
			observeC          chan raft.SoftState
yacovm's avatar
yacovm committed
95
			chain             *etcdraft.Chain
96
			dataDir           string
97
			walDir            string
98
99
			snapDir           string
			err               error
100
101
102
		)

		BeforeEach(func() {
103
104
			configurator = &mocks.Configurator{}
			configurator.On("Configure", mock.Anything, mock.Anything)
105
106
			clock = fakeclock.NewFakeClock(time.Now())
			storage = raft.NewMemoryStorage()
107
108

			dataDir, err = ioutil.TempDir("", "wal-")
109
			Expect(err).NotTo(HaveOccurred())
110
111
112
			walDir = path.Join(dataDir, "wal")
			snapDir = path.Join(dataDir, "snapshot")

113
			observeC = make(chan raft.SoftState, 1)
114
115
116
117
118
119
120
121
122
123
124

			support = &consensusmocks.FakeConsenterSupport{}
			support.ChainIDReturns(channelID)
			consenterMetadata = createMetadata(1, tlsCA)
			support.SharedConfigReturns(&mockconfig.Orderer{
				BatchTimeoutVal:      time.Hour,
				ConsensusMetadataVal: marshalOrPanic(consenterMetadata),
			})
			cutter = mockblockcutter.NewReceiver()
			support.BlockCutterReturns(cutter)

125
126
127
128
			// for block creator initialization
			support.HeightReturns(1)
			support.BlockReturns(getSeedBlock())

129
			meta := &raftprotos.RaftMetadata{
130
				Consenters:      map[uint64]*raftprotos.Consenter{},
131
				NextConsenterId: 1,
132
133
134
			}

			for _, c := range consenterMetadata.Consenters {
135
136
				meta.Consenters[meta.NextConsenterId] = c
				meta.NextConsenterId++
137
138
			}

139
			opts = etcdraft.Options{
140
				RaftID:          1,
141
142
				Clock:           clock,
				TickInterval:    interval,
143
144
				ElectionTick:    ELECTION_TICK,
				HeartbeatTick:   HEARTBEAT_TICK,
145
146
				MaxSizePerMsg:   1024 * 1024,
				MaxInflightMsgs: 256,
147
				RaftMetadata:    meta,
148
				Logger:          logger,
149
				MemoryStorage:   storage,
150
				WALDir:          walDir,
151
				SnapDir:         snapDir,
152
			}
153
154
		})

155
		campaign := func(clock *fakeclock.FakeClock, observeC <-chan raft.SoftState) {
156
157
158
			Eventually(func() bool {
				clock.Increment(interval)
				select {
159
160
				case s := <-observeC:
					return s.RaftState == raft.StateLeader
161
162
163
				default:
					return false
				}
164
			}, LongEventualTimeout).Should(BeTrue())
165
166
		}

167
		JustBeforeEach(func() {
168
169
170
			chain, err = etcdraft.NewChain(support, opts, configurator, nil, nil, observeC)
			Expect(err).NotTo(HaveOccurred())

171
172
			chain.Start()

173
			// When the Raft node bootstraps, it produces a ConfChange
174
175
			// to add itself, which needs to be consumed with Ready().
			// If there are pending configuration changes in raft,
176
177
178
			// it refuses to campaign, no matter how many ticks elapse.
			// This is not a problem in the production code because raft.Ready
			// will be consumed eventually, as the wall clock advances.
179
			//
180
181
182
183
184
			// However, this is problematic when using the fake clock and
			// artificial ticks. Instead of ticking raft indefinitely until
			// raft.Ready is consumed, this check is added to indirectly guarantee
			// that the first ConfChange is actually consumed and we can safely
			// proceed to tick the Raft FSM.
185
186
187
			Eventually(func() error {
				_, err := storage.Entries(1, 1, 1)
				return err
188
			}, LongEventualTimeout).ShouldNot(HaveOccurred())
189
190
191
192
		})

		AfterEach(func() {
			chain.Halt()
193
			os.RemoveAll(dataDir)
194
195
		})

yacovm's avatar
yacovm committed
196
197
198
		Context("when a node starts up", func() {
			It("properly configures the communication layer", func() {
				expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata)
199
				configurator.AssertCalled(testingInstance, "Configure", channelID, expectedNodeConfig)
yacovm's avatar
yacovm committed
200
201
202
			})
		})

203
		Context("when no Raft leader is elected", func() {
204
			It("fails to order envelope", func() {
205
				err := chain.Order(env, 0)
206
				Expect(err).To(MatchError("no Raft leader"))
207
208
209
			})
		})

210
		Context("when Raft leader is elected", func() {
211
			JustBeforeEach(func() {
212
				campaign(clock, observeC)
213
214
215
216
			})

			It("fails to order envelope if chain is halted", func() {
				chain.Halt()
217
				err := chain.Order(env, 0)
218
219
220
221
222
223
224
225
				Expect(err).To(MatchError("chain is stopped"))
			})

			It("produces blocks following batch rules", func() {
				close(cutter.Block)

				By("cutting next batch directly")
				cutter.CutNext = true
226
				err := chain.Order(env, 0)
227
				Expect(err).NotTo(HaveOccurred())
228
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
229
230
231
232
233

				By("respecting batch timeout")
				cutter.CutNext = false
				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
234
				err = chain.Order(env, 0)
235
236
237
				Expect(err).NotTo(HaveOccurred())

				clock.WaitForNWatchersAndIncrement(timeout, 2)
238
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
239
			})
240
241
242
243
244
245
246

			It("does not reset timer for every envelope", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

247
				err := chain.Order(env, 0)
248
				Expect(err).NotTo(HaveOccurred())
249
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
250
251
252

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)

253
				err = chain.Order(env, 0)
254
				Expect(err).NotTo(HaveOccurred())
255
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(2))
256
257
258
259

				// the second envelope should not reset the timer; it should
				// therefore expire if we increment it by just timeout/2
				clock.Increment(timeout / 2)
260
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
261
262
263
264
265
266
267
			})

			It("does not write a block if halted before timeout", func() {
				close(cutter.Block)
				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

268
				err := chain.Order(env, 0)
269
				Expect(err).NotTo(HaveOccurred())
270
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
271
272

				// wait for timer to start
273
				Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
274
275
276
277
278
279
280
281
282
283
284

				chain.Halt()
				Consistently(support.WriteBlockCallCount).Should(Equal(0))
			})

			It("stops the timer if a batch is cut", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

285
				err := chain.Order(env, 0)
286
				Expect(err).NotTo(HaveOccurred())
287
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
288
289
290
291
292

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)

				By("force a batch to be cut before timer expires")
				cutter.CutNext = true
293
				err = chain.Order(env, 0)
294
				Expect(err).NotTo(HaveOccurred())
295

296
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
297
298
				b, _ := support.WriteBlockArgsForCall(0)
				Expect(b.Data.Data).To(HaveLen(2))
299
				Expect(cutter.CurBatch()).To(HaveLen(0))
300
301
302

				// this should start a fresh timer
				cutter.CutNext = false
303
				err = chain.Order(env, 0)
304
				Expect(err).NotTo(HaveOccurred())
305
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
306
307
308
309
310

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)
				Consistently(support.WriteBlockCallCount).Should(Equal(1))

				clock.Increment(timeout / 2)
311

312
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
313
314
				b, _ = support.WriteBlockArgsForCall(1)
				Expect(b.Data.Data).To(HaveLen(1))
315
316
317
318
319
320
321
322
			})

			It("cut two batches if incoming envelope does not fit into first batch", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

323
				err := chain.Order(env, 0)
324
				Expect(err).NotTo(HaveOccurred())
325
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
326
327

				cutter.IsolatedTx = true
328
				err = chain.Order(env, 0)
329
330
				Expect(err).NotTo(HaveOccurred())

331
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
332
333
334
335
336
337
338
339
340
341
342
			})

			Context("revalidation", func() {
				BeforeEach(func() {
					close(cutter.Block)

					timeout := time.Hour
					support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
					support.SequenceReturns(1)
				})

343
				It("enqueue if envelope is still valid", func() {
344
345
					support.ProcessNormalMsgReturns(1, nil)

346
					err := chain.Order(env, 0)
347
					Expect(err).NotTo(HaveOccurred())
348
					Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
349
350
				})

351
				It("does not enqueue if envelope is not valid", func() {
352
353
					support.ProcessNormalMsgReturns(1, errors.Errorf("Envelope is invalid"))

354
					err := chain.Order(env, 0)
355
					Expect(err).NotTo(HaveOccurred())
356
					Consistently(cutter.CurBatch).Should(HaveLen(0))
357
358
359
360
				})
			})

			It("unblocks Errored if chain is halted", func() {
361
362
				errorC := chain.Errored()
				Expect(errorC).NotTo(BeClosed())
363
				chain.Halt()
364
				Eventually(errorC).Should(BeClosed())
365
366
			})

367
368
			Describe("Config updates", func() {
				var (
369
370
					configEnv *common.Envelope
					configSeq uint64
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
				)

				Context("when a config update with invalid header comes", func() {

					BeforeEach(func() {
						configEnv = newConfigEnv(channelID,
							common.HeaderType_CONFIG_UPDATE, // invalid header; envelopes with CONFIG_UPDATE header never reach chain
							&common.ConfigUpdateEnvelope{ConfigUpdate: []byte("test invalid envelope")})
						configSeq = 0
					})

					It("should throw an error", func() {
						err := chain.Configure(configEnv, configSeq)
						Expect(err).To(MatchError("config transaction has unknown header type"))
					})
				})

				Context("when a type A config update comes", func() {

					Context("for existing channel", func() {

						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"BatchTimeout": {
									Version: 1,
									Value: marshalOrPanic(&orderer.BatchTimeout{
										Timeout: "3ms",
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values),
							)
							configSeq = 0
						}) // BeforeEach block

						Context("without revalidation (i.e. correct config sequence)", func() {

							Context("without pending normal envelope", func() {
								It("should create a config block and no normal block", func() {
									err := chain.Configure(configEnv, configSeq)
									Expect(err).NotTo(HaveOccurred())
415
416
									Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
									Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
417
418
419
420
421
422
423
424
425
426
427
								})
							})

							Context("with pending normal envelope", func() {
								It("should create a normal block and a config block", func() {
									// We do not need to block the cutter from ordering in our test case and therefore close this channel.
									close(cutter.Block)

									By("adding a normal envelope")
									err := chain.Order(env, 0)
									Expect(err).NotTo(HaveOccurred())
428
									Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
429
430
431
432
433
434
435

									// // clock.WaitForNWatchersAndIncrement(timeout, 2)

									By("adding a config envelope")
									err = chain.Configure(configEnv, configSeq)
									Expect(err).NotTo(HaveOccurred())

436
437
									Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
									Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
								})
							})
						})

						Context("with revalidation (i.e. incorrect config sequence)", func() {

							BeforeEach(func() {
								support.SequenceReturns(1) // this causes the revalidation
							})

							It("should create config block upon correct revalidation", func() {
								support.ProcessConfigMsgReturns(configEnv, 1, nil) // nil implies correct revalidation

								err := chain.Configure(configEnv, configSeq)
								Expect(err).NotTo(HaveOccurred())
453
								Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
							})

							It("should not create config block upon incorrect revalidation", func() {
								support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence"))

								err := chain.Configure(configEnv, configSeq)
								Expect(err).NotTo(HaveOccurred())
								Consistently(support.WriteConfigBlockCallCount).Should(Equal(0)) // no call to WriteConfigBlock
							})
						})
					})

					Context("for creating a new channel", func() {

						// use to prepare the Orderer Values
						BeforeEach(func() {
							chainID := "mychannel"
Jay Guo's avatar
Jay Guo committed
471
							values := make(map[string]*common.ConfigValue)
472
							configEnv = newConfigEnv(chainID,
Jay Guo's avatar
Jay Guo committed
473
474
475
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(chainID, values),
							)
476
477
478
							configSeq = 0
						}) // BeforeEach block

479
						It("should be able to create a channel", func() {
480
							err := chain.Configure(configEnv, configSeq)
481
							Expect(err).NotTo(HaveOccurred())
Jay Guo's avatar
Jay Guo committed
482
							Eventually(support.WriteConfigBlockCallCount).Should(Equal(1))
483
484
485
486
487
						})
					})
				}) // Context block for type A config

				Context("when a type B config update comes", func() {
488
					Context("updating protocol values", func() {
489
490
491
492
493
494
						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
495
										Metadata: marshalOrPanic(consenterMetadata),
496
497
498
499
500
501
502
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0
503

504
505
						}) // BeforeEach block

506
						It("should be able to process config update of type B", func() {
507
							err := chain.Configure(configEnv, configSeq)
508
509
510
511
							Expect(err).NotTo(HaveOccurred())
						})
					})

512
					Context("updating consenters set by more than one node", func() {
513
514
515
516
517
518
519
520
521
522
523
524
525
526
						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(createMetadata(3, tlsCA)),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0
527

528
529
						}) // BeforeEach block

530
						It("should fail, since consenters set change is not supported", func() {
531
							err := chain.Configure(configEnv, configSeq)
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
							Expect(err).To(MatchError("update of more than one consenters at a time is not supported"))
						})
					})

					Context("updating consenters set by exactly one node", func() {
						It("should be able to process config update adding single node", func() {
							metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
							metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
								Host:          "localhost",
								Port:          7050,
								ServerTlsCert: serverTLSCert(tlsCA),
								ClientTlsCert: clientTLSCert(tlsCA),
							})

							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).NotTo(HaveOccurred())
						})

						It("should be able to process config update removing single node", func() {
							metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
							// Remove one of the consenters
							metadata.Consenters = metadata.Consenters[1:]
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).NotTo(HaveOccurred())
						})

						It("fail since not allowed to add and remove node at same change", func() {
							metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
							// Remove one of the consenters
							metadata.Consenters = metadata.Consenters[1:]
							metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
								Host:          "localhost",
								Port:          7050,
								ServerTlsCert: serverTLSCert(tlsCA),
								ClientTlsCert: clientTLSCert(tlsCA),
							})
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).To(MatchError("update of more than one consenters at a time is not supported"))
609
610
611
						})
					})
				})
612
			})
613
614

			Describe("Crash Fault Tolerance", func() {
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
				var (
					raftMetadata *raftprotos.RaftMetadata
				)

				BeforeEach(func() {
					tlsCA, _ := tlsgen.NewCA()

					raftMetadata = &raftprotos.RaftMetadata{
						Consenters: map[uint64]*raftprotos.Consenter{
							1: {
								Host:          "localhost",
								Port:          7051,
								ClientTlsCert: clientTLSCert(tlsCA),
								ServerTlsCert: serverTLSCert(tlsCA),
							},
						},
631
						NextConsenterId: 2,
632
633
634
					}
				})

635
636
				Describe("when a chain is started with existing WAL", func() {
					var (
637
638
						m1 *raftprotos.RaftMetadata
						m2 *raftprotos.RaftMetadata
639
640
641
642
643
644
645
646
647
648
					)
					JustBeforeEach(func() {
						// to generate WAL data, we start a chain,
						// order several envelopes and then halt the chain.
						close(cutter.Block)
						cutter.CutNext = true

						// enque some data to be persisted on disk by raft
						err := chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
649
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
650
651
652
653
654
655
656

						_, metadata := support.WriteBlockArgsForCall(0)
						m1 = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m1)

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
657
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
658
659
660
661
662
663
664
665
666

						_, metadata = support.WriteBlockArgsForCall(1)
						m2 = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m2)

						chain.Halt()
					})

					It("replays blocks from committed entries", func() {
667
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
668
						c.init()
669
670
671
						c.Start()
						defer c.Halt()

672
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690

						_, metadata := c.support.WriteBlockArgsForCall(0)
						m := &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m1.RaftIndex))

						_, metadata = c.support.WriteBlockArgsForCall(1)
						m = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m2.RaftIndex))

						// chain should keep functioning
						campaign(c.clock, c.observe)

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
691
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
692

693
694
695
					})

					It("only replays blocks after Applied index", func() {
696
697
						raftMetadata.RaftIndex = m1.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
698
						c.init()
699
700
701
						c.Start()
						defer c.Halt()

702
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
703
704
705
706
707
708
709
710
711
712
713
714
715

						_, metadata := c.support.WriteBlockArgsForCall(0)
						m := &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m2.RaftIndex))

						// chain should keep functioning
						campaign(c.clock, c.observe)

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
716
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
717
718
719
					})

					It("does not replay any block if already in sync", func() {
720
721
						raftMetadata.RaftIndex = m2.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
722
						c.init()
723
724
725
726
727
728
729
730
731
732
733
734
						c.Start()
						defer c.Halt()

						Consistently(c.support.WriteBlockCallCount).Should(Equal(0))

						// chain should keep functioning
						campaign(c.clock, c.observe)

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
735
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
736
737
738
739
740
741
742
743
744
745
746
747
					})

					Context("WAL file is not readable", func() {
						It("fails to load wal", func() {
							skipIfRoot()

							files, err := ioutil.ReadDir(walDir)
							Expect(err).NotTo(HaveOccurred())
							for _, f := range files {
								os.Chmod(path.Join(walDir, f.Name()), 0300)
							}

748
							c, err := etcdraft.NewChain(support, opts, configurator, nil, nil, observeC)
749
750
751
752
753
							Expect(c).To(BeNil())
							Expect(err).To(MatchError(ContainSubstring("failed to open existing WAL")))
						})
					})
				})
754
755
756
757
758
759

				Describe("when snapshotting is enabled (snapshot interval is not zero)", func() {
					var (
						m *raftprotos.RaftMetadata

						ledgerLock sync.Mutex
760
						ledger     map[uint64]*common.Block
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
					)

					countFiles := func() int {
						files, err := ioutil.ReadDir(snapDir)
						Expect(err).NotTo(HaveOccurred())
						return len(files)
					}

					BeforeEach(func() {
						opts.SnapInterval = 2
						opts.SnapshotCatchUpEntries = 2

						close(cutter.Block)
						cutter.CutNext = true

776
777
778
779
780
781
						ledgerLock.Lock()
						ledger = map[uint64]*common.Block{
							0: getSeedBlock(), // genesis block
						}
						ledgerLock.Unlock()

782
783
784
785
786
787
788
						support.WriteBlockStub = func(b *common.Block, meta []byte) {
							bytes, err := proto.Marshal(&common.Metadata{Value: meta})
							Expect(err).NotTo(HaveOccurred())
							b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes

							ledgerLock.Lock()
							defer ledgerLock.Unlock()
789
							ledger[b.Header.Number] = b
790
791
792
						}

						support.HeightStub = func() uint64 {
793
794
							ledgerLock.Lock()
							defer ledgerLock.Unlock()
795
796
797
798
799
800
801
							return uint64(len(ledger))
						}
					})

					JustBeforeEach(func() {
						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
802
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
803
804
805

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
806
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
807

808
						_, metadata := support.WriteBlockArgsForCall(1)
809
810
811
812
813
814
815
816
817
818
819
						m = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
					})

					It("writes snapshot file to snapDir", func() {
						// Scenario: start a chain with SnapInterval = 1, expect it to take
						// one snapshot after ordering 3 blocks.
						//
						// block number starts from 0, and we determine if snapshot should be taken by:
						//        appliedBlockNum - snapBlockNum < SnapInterval

820
821
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
						Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
822
823
824
825

						// chain should still be functioning
						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
826
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
827
828
829
830
831
832
833
834
835
					})

					It("pauses chain if sync is in progress", func() {
						// Scenario:
						// after a snapshot is taken, reboot chain with raftIndex = 0
						// chain should attempt to sync upon reboot, and blocks on
						// `WaitReady` API

						// check snapshot does exit
836
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
837
838
839

						chain.Halt()

840
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
						c.init()

						signal := make(chan struct{})

						c.puller.PullBlockStub = func(i uint64) *common.Block {
							<-signal // blocking for assertions
							ledgerLock.Lock()
							defer ledgerLock.Unlock()
							if i >= uint64(len(ledger)) {
								return nil
							}

							return ledger[i]
						}

						err := c.WaitReady()
857
						Expect(err).To(MatchError("chain is not started"))
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872

						c.Start()
						defer c.Halt()

						// pull block is called, so chain should be catching up now, WaitReady should block
						signal <- struct{}{}

						done := make(chan error)
						go func() {
							done <- c.WaitReady()
						}()

						Consistently(done).ShouldNot(Receive())
						close(signal) // unblock block puller

873
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
874
875
876
877
878
879
880
881
882
883
					})

					It("restores snapshot w/o extra entries", func() {
						// Scenario:
						// after a snapshot is taken, no more entries are appended.
						// then node is restarted, it loads snapshot, finds its term
						// and index. While replaying WAL to memory storage, it should
						// not append any entry because no extra entry was appended
						// after snapshot was taken.

884
885
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
						Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
886
887
888
889
890
891
892
893
894
895
						snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created
						Expect(err).NotTo(HaveOccurred())
						i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory
						Expect(err).NotTo(HaveOccurred())

						// expect storage to preserve SnapshotCatchUpEntries entries before snapshot
						Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1))

						chain.Halt()

896
897
						raftMetadata.RaftIndex = m.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
898
899
900
901
902
903
904

						c.init()
						c.Start()
						defer c.Halt()

						// following arithmetic reflects how etcdraft MemoryStorage is implemented
						// when no entry is appended after snapshot being loaded.
905
906
						Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1))
						Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index))
907
908
909
910
911
912
913
914
915
916

						// chain keeps functioning
						Eventually(func() bool {
							c.clock.Increment(interval)
							select {
							case <-c.observe:
								return true
							default:
								return false
							}
917
						}, LongEventualTimeout).Should(BeTrue())
918
919
920
921

						c.cutter.CutNext = true
						err = c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
922
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
923
924
925
926
927
928
929
930
931
932
					})

					It("restores snapshot w/ extra entries", func() {
						// Scenario:
						// after a snapshot is taken, more entries are appended.
						// then node is restarted, it loads snapshot, finds its term
						// and index. While replaying WAL to memory storage, it should
						// append some entries.

						// check snapshot does exit
933
934
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
						Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
935
936
937
938
939
940
941
942
943
944
						snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created
						Expect(err).NotTo(HaveOccurred())
						i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory
						Expect(err).NotTo(HaveOccurred())

						// expect storage to preserve SnapshotCatchUpEntries entries before snapshot
						Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1))

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
945
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
946
947
948
949
950

						lasti, _ := opts.MemoryStorage.LastIndex()

						chain.Halt()

951
952
						raftMetadata.RaftIndex = m.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
953
						c.support.HeightReturns(5)
954
955
956
957
958
						c.support.BlockReturns(&common.Block{
							Header:   &common.BlockHeader{},
							Data:     &common.BlockData{Data: [][]byte{[]byte("foo")}},
							Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
						})
959
960
961
962
963

						c.init()
						c.Start()
						defer c.Halt()

964
965
						Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1))
						Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(lasti))
966
967
968
969
970
971
972
973
974
975

						// chain keeps functioning
						Eventually(func() bool {
							c.clock.Increment(interval)
							select {
							case <-c.observe:
								return true
							default:
								return false
							}
976
						}, LongEventualTimeout).Should(BeTrue())
977
978
979
980

						c.cutter.CutNext = true
						err = c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
981
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
					})

					When("local ledger is in sync with snapshot", func() {
						It("does not pull blocks and still respects snapshot interval", func() {
							// Scenario:
							// - snapshot is taken at block 2
							// - order one more envelope (block 3)
							// - reboot chain at block 2
							// - block 3 should be replayed from wal
							// - order another envelope to trigger snapshot, containing block 3 & 4
							// Assertions:
							// - block puller should NOT be called
							// - chain should keep functioning after reboot
							// - chain should respect snapshot interval to trigger next snapshot

							// check snapshot does exit
998
							Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
999
1000

							// order another envelope. this should not trigger snapshot
For faster browsing, not all history is shown. View entire blame