chain_test.go 90.5 KB
Newer Older
1
2
3
4
5
/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/
yacovm's avatar
yacovm committed
6

7
8
9
package etcdraft_test

import (
yacovm's avatar
yacovm committed
10
	"encoding/pem"
11
12
13
14
15
	"fmt"
	"io/ioutil"
	"os"
	"os/user"
	"path"
16
	"sync"
17
18
	"time"

19
20
	"code.cloudfoundry.org/clock/fakeclock"
	"github.com/coreos/etcd/raft"
21
	"github.com/coreos/etcd/raft/raftpb"
22
	"github.com/golang/protobuf/proto"
23
	"github.com/hyperledger/fabric/bccsp/factory"
yacovm's avatar
yacovm committed
24
	"github.com/hyperledger/fabric/common/crypto/tlsgen"
25
26
	"github.com/hyperledger/fabric/common/flogging"
	mockconfig "github.com/hyperledger/fabric/common/mocks/config"
yacovm's avatar
yacovm committed
27
	"github.com/hyperledger/fabric/orderer/common/cluster"
28
	"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
29
	"github.com/hyperledger/fabric/orderer/consensus/etcdraft/mocks"
30
31
32
	consensusmocks "github.com/hyperledger/fabric/orderer/consensus/mocks"
	mockblockcutter "github.com/hyperledger/fabric/orderer/mocks/common/blockcutter"
	"github.com/hyperledger/fabric/protos/common"
33
	"github.com/hyperledger/fabric/protos/orderer"
yacovm's avatar
yacovm committed
34
	raftprotos "github.com/hyperledger/fabric/protos/orderer/etcdraft"
35
36
	. "github.com/onsi/ginkgo"
	. "github.com/onsi/gomega"
37
	"github.com/pkg/errors"
yacovm's avatar
yacovm committed
38
	"github.com/stretchr/testify/mock"
39
40
41
	"go.uber.org/zap"
)

42
const (
43
44
45
46
	interval            = time.Second
	LongEventualTimeout = 5 * time.Second
	ELECTION_TICK       = 2
	HEARTBEAT_TICK      = 1
47
48
)

49
50
51
52
func init() {
	factory.InitFactories(nil)
}

53
54
55
56
57
58
59
60
61
62
// for some test cases we chmod file/dir to test failures caused by exotic permissions.
// however this does not work if tests are running as root, i.e. in a container.
func skipIfRoot() {
	u, err := user.Current()
	Expect(err).NotTo(HaveOccurred())
	if u.Uid == "0" {
		Skip("you are running test as root, there's no way to make files unreadable")
	}
}

63
64
var _ = Describe("Chain", func() {
	var (
65
66
67
68
		env       *common.Envelope
		channelID string
		tlsCA     tlsgen.CA
		logger    *flogging.FabricLogger
69
70
71
	)

	BeforeEach(func() {
yacovm's avatar
yacovm committed
72
		tlsCA, _ = tlsgen.NewCA()
73
74
		channelID = "test-channel"
		logger = flogging.NewFabricLogger(zap.NewNop())
75
76
77
		env = &common.Envelope{
			Payload: marshalOrPanic(&common.Payload{
				Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
78
79
80
81
82
				Data:   []byte("TEST_MESSAGE"),
			}),
		}
	})

83
	Describe("Single Raft node", func() {
84
		var (
85
			configurator      *mocks.Configurator
yacovm's avatar
yacovm committed
86
87
88
89
90
91
			consenterMetadata *raftprotos.Metadata
			clock             *fakeclock.FakeClock
			opts              etcdraft.Options
			support           *consensusmocks.FakeConsenterSupport
			cutter            *mockblockcutter.Receiver
			storage           *raft.MemoryStorage
92
			observeC          chan raft.SoftState
yacovm's avatar
yacovm committed
93
			chain             *etcdraft.Chain
94
			dataDir           string
95
			walDir            string
96
97
			snapDir           string
			err               error
98
99
100
		)

		BeforeEach(func() {
101
102
			configurator = &mocks.Configurator{}
			configurator.On("Configure", mock.Anything, mock.Anything)
103
104
			clock = fakeclock.NewFakeClock(time.Now())
			storage = raft.NewMemoryStorage()
105
106

			dataDir, err = ioutil.TempDir("", "wal-")
107
			Expect(err).NotTo(HaveOccurred())
108
109
110
			walDir = path.Join(dataDir, "wal")
			snapDir = path.Join(dataDir, "snapshot")

111
			observeC = make(chan raft.SoftState, 1)
112
113
114
115
116
117
118
119
120
121
122

			support = &consensusmocks.FakeConsenterSupport{}
			support.ChainIDReturns(channelID)
			consenterMetadata = createMetadata(1, tlsCA)
			support.SharedConfigReturns(&mockconfig.Orderer{
				BatchTimeoutVal:      time.Hour,
				ConsensusMetadataVal: marshalOrPanic(consenterMetadata),
			})
			cutter = mockblockcutter.NewReceiver()
			support.BlockCutterReturns(cutter)

123
124
125
126
			// for block creator initialization
			support.HeightReturns(1)
			support.BlockReturns(getSeedBlock())

127
			meta := &raftprotos.RaftMetadata{
128
				Consenters:      map[uint64]*raftprotos.Consenter{},
129
				NextConsenterId: 1,
130
131
132
			}

			for _, c := range consenterMetadata.Consenters {
133
134
				meta.Consenters[meta.NextConsenterId] = c
				meta.NextConsenterId++
135
136
			}

137
			opts = etcdraft.Options{
138
				RaftID:          1,
139
140
				Clock:           clock,
				TickInterval:    interval,
141
142
				ElectionTick:    ELECTION_TICK,
				HeartbeatTick:   HEARTBEAT_TICK,
143
144
				MaxSizePerMsg:   1024 * 1024,
				MaxInflightMsgs: 256,
145
				RaftMetadata:    meta,
146
				Logger:          logger,
147
				MemoryStorage:   storage,
148
				WALDir:          walDir,
149
				SnapDir:         snapDir,
150
			}
151
152
		})

153
		campaign := func(clock *fakeclock.FakeClock, observeC <-chan raft.SoftState) {
154
155
156
			Eventually(func() bool {
				clock.Increment(interval)
				select {
157
158
				case s := <-observeC:
					return s.RaftState == raft.StateLeader
159
160
161
				default:
					return false
				}
162
			}, LongEventualTimeout).Should(BeTrue())
163
164
		}

165
		JustBeforeEach(func() {
166
167
168
			chain, err = etcdraft.NewChain(support, opts, configurator, nil, nil, observeC)
			Expect(err).NotTo(HaveOccurred())

169
170
			chain.Start()

171
			// When the Raft node bootstraps, it produces a ConfChange
172
173
			// to add itself, which needs to be consumed with Ready().
			// If there are pending configuration changes in raft,
174
175
176
			// it refuses to campaign, no matter how many ticks elapse.
			// This is not a problem in the production code because raft.Ready
			// will be consumed eventually, as the wall clock advances.
177
			//
178
179
180
181
182
			// However, this is problematic when using the fake clock and
			// artificial ticks. Instead of ticking raft indefinitely until
			// raft.Ready is consumed, this check is added to indirectly guarantee
			// that the first ConfChange is actually consumed and we can safely
			// proceed to tick the Raft FSM.
183
184
185
			Eventually(func() error {
				_, err := storage.Entries(1, 1, 1)
				return err
186
			}, LongEventualTimeout).ShouldNot(HaveOccurred())
187
188
189
190
		})

		AfterEach(func() {
			chain.Halt()
191
			os.RemoveAll(dataDir)
192
193
		})

yacovm's avatar
yacovm committed
194
195
196
		Context("when a node starts up", func() {
			It("properly configures the communication layer", func() {
				expectedNodeConfig := nodeConfigFromMetadata(consenterMetadata)
197
				configurator.AssertCalled(testingInstance, "Configure", channelID, expectedNodeConfig)
yacovm's avatar
yacovm committed
198
199
200
			})
		})

201
		Context("when no Raft leader is elected", func() {
202
			It("fails to order envelope", func() {
203
				err := chain.Order(env, 0)
204
				Expect(err).To(MatchError("no Raft leader"))
205
206
207
			})
		})

208
		Context("when Raft leader is elected", func() {
209
			JustBeforeEach(func() {
210
				campaign(clock, observeC)
211
212
213
214
			})

			It("fails to order envelope if chain is halted", func() {
				chain.Halt()
215
				err := chain.Order(env, 0)
216
217
218
219
220
221
222
223
				Expect(err).To(MatchError("chain is stopped"))
			})

			It("produces blocks following batch rules", func() {
				close(cutter.Block)

				By("cutting next batch directly")
				cutter.CutNext = true
224
				err := chain.Order(env, 0)
225
				Expect(err).NotTo(HaveOccurred())
226
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
227
228
229
230
231

				By("respecting batch timeout")
				cutter.CutNext = false
				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
232
				err = chain.Order(env, 0)
233
234
235
				Expect(err).NotTo(HaveOccurred())

				clock.WaitForNWatchersAndIncrement(timeout, 2)
236
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
237
			})
238
239
240
241
242
243
244

			It("does not reset timer for every envelope", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

245
				err := chain.Order(env, 0)
246
				Expect(err).NotTo(HaveOccurred())
247
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
248
249
250

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)

251
				err = chain.Order(env, 0)
252
				Expect(err).NotTo(HaveOccurred())
253
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(2))
254
255
256
257

				// the second envelope should not reset the timer; it should
				// therefore expire if we increment it by just timeout/2
				clock.Increment(timeout / 2)
258
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
259
260
261
262
263
264
265
			})

			It("does not write a block if halted before timeout", func() {
				close(cutter.Block)
				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

266
				err := chain.Order(env, 0)
267
				Expect(err).NotTo(HaveOccurred())
268
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
269
270

				// wait for timer to start
271
				Eventually(clock.WatcherCount, LongEventualTimeout).Should(Equal(2))
272
273
274
275
276
277
278
279
280
281
282

				chain.Halt()
				Consistently(support.WriteBlockCallCount).Should(Equal(0))
			})

			It("stops the timer if a batch is cut", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

283
				err := chain.Order(env, 0)
284
				Expect(err).NotTo(HaveOccurred())
285
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
286
287
288
289
290

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)

				By("force a batch to be cut before timer expires")
				cutter.CutNext = true
291
				err = chain.Order(env, 0)
292
				Expect(err).NotTo(HaveOccurred())
293

294
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
295
296
				b, _ := support.WriteBlockArgsForCall(0)
				Expect(b.Data.Data).To(HaveLen(2))
297
				Expect(cutter.CurBatch()).To(HaveLen(0))
298
299
300

				// this should start a fresh timer
				cutter.CutNext = false
301
				err = chain.Order(env, 0)
302
				Expect(err).NotTo(HaveOccurred())
303
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
304
305
306
307
308

				clock.WaitForNWatchersAndIncrement(timeout/2, 2)
				Consistently(support.WriteBlockCallCount).Should(Equal(1))

				clock.Increment(timeout / 2)
309

310
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
311
312
				b, _ = support.WriteBlockArgsForCall(1)
				Expect(b.Data.Data).To(HaveLen(1))
313
314
315
316
317
318
319
320
			})

			It("cut two batches if incoming envelope does not fit into first batch", func() {
				close(cutter.Block)

				timeout := time.Second
				support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})

321
				err := chain.Order(env, 0)
322
				Expect(err).NotTo(HaveOccurred())
323
				Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
324
325

				cutter.IsolatedTx = true
326
				err = chain.Order(env, 0)
327
328
				Expect(err).NotTo(HaveOccurred())

329
				Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
330
331
332
333
334
335
336
337
338
339
340
			})

			Context("revalidation", func() {
				BeforeEach(func() {
					close(cutter.Block)

					timeout := time.Hour
					support.SharedConfigReturns(&mockconfig.Orderer{BatchTimeoutVal: timeout})
					support.SequenceReturns(1)
				})

341
				It("enqueue if envelope is still valid", func() {
342
343
					support.ProcessNormalMsgReturns(1, nil)

344
					err := chain.Order(env, 0)
345
					Expect(err).NotTo(HaveOccurred())
346
					Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
347
348
				})

349
				It("does not enqueue if envelope is not valid", func() {
350
351
					support.ProcessNormalMsgReturns(1, errors.Errorf("Envelope is invalid"))

352
					err := chain.Order(env, 0)
353
					Expect(err).NotTo(HaveOccurred())
354
					Consistently(cutter.CurBatch).Should(HaveLen(0))
355
356
357
358
				})
			})

			It("unblocks Errored if chain is halted", func() {
359
360
				errorC := chain.Errored()
				Expect(errorC).NotTo(BeClosed())
361
				chain.Halt()
362
				Eventually(errorC).Should(BeClosed())
363
364
			})

365
366
			Describe("Config updates", func() {
				var (
367
368
					configEnv *common.Envelope
					configSeq uint64
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
				)

				Context("when a config update with invalid header comes", func() {

					BeforeEach(func() {
						configEnv = newConfigEnv(channelID,
							common.HeaderType_CONFIG_UPDATE, // invalid header; envelopes with CONFIG_UPDATE header never reach chain
							&common.ConfigUpdateEnvelope{ConfigUpdate: []byte("test invalid envelope")})
						configSeq = 0
					})

					It("should throw an error", func() {
						err := chain.Configure(configEnv, configSeq)
						Expect(err).To(MatchError("config transaction has unknown header type"))
					})
				})

				Context("when a type A config update comes", func() {

					Context("for existing channel", func() {

						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"BatchTimeout": {
									Version: 1,
									Value: marshalOrPanic(&orderer.BatchTimeout{
										Timeout: "3ms",
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values),
							)
							configSeq = 0
						}) // BeforeEach block

						Context("without revalidation (i.e. correct config sequence)", func() {

							Context("without pending normal envelope", func() {
								It("should create a config block and no normal block", func() {
									err := chain.Configure(configEnv, configSeq)
									Expect(err).NotTo(HaveOccurred())
413
414
									Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
									Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(0))
415
416
417
418
419
420
421
422
423
424
425
								})
							})

							Context("with pending normal envelope", func() {
								It("should create a normal block and a config block", func() {
									// We do not need to block the cutter from ordering in our test case and therefore close this channel.
									close(cutter.Block)

									By("adding a normal envelope")
									err := chain.Order(env, 0)
									Expect(err).NotTo(HaveOccurred())
426
									Eventually(cutter.CurBatch, LongEventualTimeout).Should(HaveLen(1))
427
428
429
430
431
432
433

									// // clock.WaitForNWatchersAndIncrement(timeout, 2)

									By("adding a config envelope")
									err = chain.Configure(configEnv, configSeq)
									Expect(err).NotTo(HaveOccurred())

434
435
									Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
									Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
								})
							})
						})

						Context("with revalidation (i.e. incorrect config sequence)", func() {

							BeforeEach(func() {
								support.SequenceReturns(1) // this causes the revalidation
							})

							It("should create config block upon correct revalidation", func() {
								support.ProcessConfigMsgReturns(configEnv, 1, nil) // nil implies correct revalidation

								err := chain.Configure(configEnv, configSeq)
								Expect(err).NotTo(HaveOccurred())
451
								Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
							})

							It("should not create config block upon incorrect revalidation", func() {
								support.ProcessConfigMsgReturns(configEnv, 1, errors.Errorf("Invalid config envelope at changed config sequence"))

								err := chain.Configure(configEnv, configSeq)
								Expect(err).NotTo(HaveOccurred())
								Consistently(support.WriteConfigBlockCallCount).Should(Equal(0)) // no call to WriteConfigBlock
							})
						})
					})

					Context("for creating a new channel", func() {

						// use to prepare the Orderer Values
						BeforeEach(func() {
							chainID := "mychannel"
Jay Guo's avatar
Jay Guo committed
469
							values := make(map[string]*common.ConfigValue)
470
							configEnv = newConfigEnv(chainID,
Jay Guo's avatar
Jay Guo committed
471
472
473
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(chainID, values),
							)
474
475
476
							configSeq = 0
						}) // BeforeEach block

477
						It("should be able to create a channel", func() {
478
							err := chain.Configure(configEnv, configSeq)
479
							Expect(err).NotTo(HaveOccurred())
Jay Guo's avatar
Jay Guo committed
480
							Eventually(support.WriteConfigBlockCallCount).Should(Equal(1))
481
482
483
484
485
						})
					})
				}) // Context block for type A config

				Context("when a type B config update comes", func() {
486
					Context("updating protocol values", func() {
487
488
489
490
491
492
						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
493
										Metadata: marshalOrPanic(consenterMetadata),
494
495
496
497
498
499
500
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0
501

502
503
						}) // BeforeEach block

504
						It("should be able to process config update of type B", func() {
505
							err := chain.Configure(configEnv, configSeq)
506
507
508
509
							Expect(err).NotTo(HaveOccurred())
						})
					})

510
					Context("updating consenters set by more than one node", func() {
511
512
513
514
515
516
517
518
519
520
521
522
523
524
						// use to prepare the Orderer Values
						BeforeEach(func() {
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(createMetadata(3, tlsCA)),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0
525

526
527
						}) // BeforeEach block

528
						It("should fail, since consenters set change is not supported", func() {
529
							err := chain.Configure(configEnv, configSeq)
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
							Expect(err).To(MatchError("update of more than one consenters at a time is not supported"))
						})
					})

					Context("updating consenters set by exactly one node", func() {
						It("should be able to process config update adding single node", func() {
							metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
							metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
								Host:          "localhost",
								Port:          7050,
								ServerTlsCert: serverTLSCert(tlsCA),
								ClientTlsCert: clientTLSCert(tlsCA),
							})

							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).NotTo(HaveOccurred())
						})

						It("should be able to process config update removing single node", func() {
							metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
							// Remove one of the consenters
							metadata.Consenters = metadata.Consenters[1:]
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).NotTo(HaveOccurred())
						})

						It("fail since not allowed to add and remove node at same change", func() {
							metadata := proto.Clone(consenterMetadata).(*raftprotos.Metadata)
							// Remove one of the consenters
							metadata.Consenters = metadata.Consenters[1:]
							metadata.Consenters = append(metadata.Consenters, &raftprotos.Consenter{
								Host:          "localhost",
								Port:          7050,
								ServerTlsCert: serverTLSCert(tlsCA),
								ClientTlsCert: clientTLSCert(tlsCA),
							})
							values := map[string]*common.ConfigValue{
								"ConsensusType": {
									Version: 1,
									Value: marshalOrPanic(&orderer.ConsensusType{
										Metadata: marshalOrPanic(metadata),
									}),
								},
							}
							configEnv = newConfigEnv(channelID,
								common.HeaderType_CONFIG,
								newConfigUpdateEnv(channelID, values))
							configSeq = 0

							err := chain.Configure(configEnv, configSeq)
							Expect(err).To(MatchError("update of more than one consenters at a time is not supported"))
607
608
609
						})
					})
				})
610
			})
611
612

			Describe("Crash Fault Tolerance", func() {
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
				var (
					raftMetadata *raftprotos.RaftMetadata
				)

				BeforeEach(func() {
					tlsCA, _ := tlsgen.NewCA()

					raftMetadata = &raftprotos.RaftMetadata{
						Consenters: map[uint64]*raftprotos.Consenter{
							1: {
								Host:          "localhost",
								Port:          7051,
								ClientTlsCert: clientTLSCert(tlsCA),
								ServerTlsCert: serverTLSCert(tlsCA),
							},
						},
629
						NextConsenterId: 2,
630
631
632
					}
				})

633
634
				Describe("when a chain is started with existing WAL", func() {
					var (
635
636
						m1 *raftprotos.RaftMetadata
						m2 *raftprotos.RaftMetadata
637
638
639
640
641
642
643
644
645
646
					)
					JustBeforeEach(func() {
						// to generate WAL data, we start a chain,
						// order several envelopes and then halt the chain.
						close(cutter.Block)
						cutter.CutNext = true

						// enque some data to be persisted on disk by raft
						err := chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
647
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
648
649
650
651
652
653
654

						_, metadata := support.WriteBlockArgsForCall(0)
						m1 = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m1)

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
655
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
656
657
658
659
660
661
662
663
664

						_, metadata = support.WriteBlockArgsForCall(1)
						m2 = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m2)

						chain.Halt()
					})

					It("replays blocks from committed entries", func() {
665
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
666
						c.init()
667
668
669
						c.Start()
						defer c.Halt()

670
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688

						_, metadata := c.support.WriteBlockArgsForCall(0)
						m := &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m1.RaftIndex))

						_, metadata = c.support.WriteBlockArgsForCall(1)
						m = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m2.RaftIndex))

						// chain should keep functioning
						campaign(c.clock, c.observe)

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
689
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
690

691
692
693
					})

					It("only replays blocks after Applied index", func() {
694
695
						raftMetadata.RaftIndex = m1.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
696
						c.init()
697
698
699
						c.Start()
						defer c.Halt()

700
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
701
702
703
704
705
706
707
708
709
710
711
712
713

						_, metadata := c.support.WriteBlockArgsForCall(0)
						m := &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
						Expect(m.RaftIndex).To(Equal(m2.RaftIndex))

						// chain should keep functioning
						campaign(c.clock, c.observe)

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
714
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
715
716
717
					})

					It("does not replay any block if already in sync", func() {
718
719
						raftMetadata.RaftIndex = m2.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
720
						c.init()
721
722
723
724
725
726
727
728
729
730
731
732
						c.Start()
						defer c.Halt()

						Consistently(c.support.WriteBlockCallCount).Should(Equal(0))

						// chain should keep functioning
						campaign(c.clock, c.observe)

						c.cutter.CutNext = true

						err := c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
733
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
734
735
736
737
738
739
740
741
742
743
744
745
					})

					Context("WAL file is not readable", func() {
						It("fails to load wal", func() {
							skipIfRoot()

							files, err := ioutil.ReadDir(walDir)
							Expect(err).NotTo(HaveOccurred())
							for _, f := range files {
								os.Chmod(path.Join(walDir, f.Name()), 0300)
							}

746
							c, err := etcdraft.NewChain(support, opts, configurator, nil, nil, observeC)
747
748
749
750
751
							Expect(c).To(BeNil())
							Expect(err).To(MatchError(ContainSubstring("failed to open existing WAL")))
						})
					})
				})
752
753
754
755
756
757

				Describe("when snapshotting is enabled (snapshot interval is not zero)", func() {
					var (
						m *raftprotos.RaftMetadata

						ledgerLock sync.Mutex
758
						ledger     map[uint64]*common.Block
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
					)

					countFiles := func() int {
						files, err := ioutil.ReadDir(snapDir)
						Expect(err).NotTo(HaveOccurred())
						return len(files)
					}

					BeforeEach(func() {
						opts.SnapInterval = 2
						opts.SnapshotCatchUpEntries = 2

						close(cutter.Block)
						cutter.CutNext = true

774
775
776
777
778
779
						ledgerLock.Lock()
						ledger = map[uint64]*common.Block{
							0: getSeedBlock(), // genesis block
						}
						ledgerLock.Unlock()

780
781
782
783
784
785
786
						support.WriteBlockStub = func(b *common.Block, meta []byte) {
							bytes, err := proto.Marshal(&common.Metadata{Value: meta})
							Expect(err).NotTo(HaveOccurred())
							b.Metadata.Metadata[common.BlockMetadataIndex_ORDERER] = bytes

							ledgerLock.Lock()
							defer ledgerLock.Unlock()
787
							ledger[b.Header.Number] = b
788
789
790
						}

						support.HeightStub = func() uint64 {
791
792
							ledgerLock.Lock()
							defer ledgerLock.Unlock()
793
794
795
796
797
798
799
							return uint64(len(ledger))
						}
					})

					JustBeforeEach(func() {
						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
800
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
801
802
803

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
804
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
805

806
						_, metadata := support.WriteBlockArgsForCall(1)
807
808
809
810
811
812
813
814
815
816
817
						m = &raftprotos.RaftMetadata{}
						proto.Unmarshal(metadata, m)
					})

					It("writes snapshot file to snapDir", func() {
						// Scenario: start a chain with SnapInterval = 1, expect it to take
						// one snapshot after ordering 3 blocks.
						//
						// block number starts from 0, and we determine if snapshot should be taken by:
						//        appliedBlockNum - snapBlockNum < SnapInterval

818
819
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
						Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
820
821
822
823

						// chain should still be functioning
						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
824
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
825
826
827
828
829
830
831
832
833
					})

					It("pauses chain if sync is in progress", func() {
						// Scenario:
						// after a snapshot is taken, reboot chain with raftIndex = 0
						// chain should attempt to sync upon reboot, and blocks on
						// `WaitReady` API

						// check snapshot does exit
834
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
835
836
837

						chain.Halt()

838
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
						c.init()

						signal := make(chan struct{})

						c.puller.PullBlockStub = func(i uint64) *common.Block {
							<-signal // blocking for assertions
							ledgerLock.Lock()
							defer ledgerLock.Unlock()
							if i >= uint64(len(ledger)) {
								return nil
							}

							return ledger[i]
						}

						err := c.WaitReady()
855
						Expect(err).To(MatchError("chain is not started"))
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870

						c.Start()
						defer c.Halt()

						// pull block is called, so chain should be catching up now, WaitReady should block
						signal <- struct{}{}

						done := make(chan error)
						go func() {
							done <- c.WaitReady()
						}()

						Consistently(done).ShouldNot(Receive())
						close(signal) // unblock block puller

871
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
872
873
874
875
876
877
878
879
880
881
					})

					It("restores snapshot w/o extra entries", func() {
						// Scenario:
						// after a snapshot is taken, no more entries are appended.
						// then node is restarted, it loads snapshot, finds its term
						// and index. While replaying WAL to memory storage, it should
						// not append any entry because no extra entry was appended
						// after snapshot was taken.

882
883
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
						Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
884
885
886
887
888
889
890
891
892
893
						snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created
						Expect(err).NotTo(HaveOccurred())
						i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory
						Expect(err).NotTo(HaveOccurred())

						// expect storage to preserve SnapshotCatchUpEntries entries before snapshot
						Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1))

						chain.Halt()

894
895
						raftMetadata.RaftIndex = m.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
896
897
898
899
900
901
902

						c.init()
						c.Start()
						defer c.Halt()

						// following arithmetic reflects how etcdraft MemoryStorage is implemented
						// when no entry is appended after snapshot being loaded.
903
904
						Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1))
						Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index))
905
906
907
908
909
910
911
912
913
914

						// chain keeps functioning
						Eventually(func() bool {
							c.clock.Increment(interval)
							select {
							case <-c.observe:
								return true
							default:
								return false
							}
915
						}, LongEventualTimeout).Should(BeTrue())
916
917
918
919

						c.cutter.CutNext = true
						err = c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
920
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
921
922
923
924
925
926
927
928
929
930
					})

					It("restores snapshot w/ extra entries", func() {
						// Scenario:
						// after a snapshot is taken, more entries are appended.
						// then node is restarted, it loads snapshot, finds its term
						// and index. While replaying WAL to memory storage, it should
						// append some entries.

						// check snapshot does exit
931
932
						Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
						Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", 1))
933
934
935
936
937
938
939
940
941
942
						snapshot, err := opts.MemoryStorage.Snapshot() // get the snapshot just created
						Expect(err).NotTo(HaveOccurred())
						i, err := opts.MemoryStorage.FirstIndex() // get the first index in memory
						Expect(err).NotTo(HaveOccurred())

						// expect storage to preserve SnapshotCatchUpEntries entries before snapshot
						Expect(i).To(Equal(snapshot.Metadata.Index - opts.SnapshotCatchUpEntries + 1))

						err = chain.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
943
						Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(3))
944
945
946
947
948

						lasti, _ := opts.MemoryStorage.LastIndex()

						chain.Halt()

949
950
						raftMetadata.RaftIndex = m.RaftIndex
						c := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata)
951
						c.support.HeightReturns(5)
952
953
954
955
956
						c.support.BlockReturns(&common.Block{
							Header:   &common.BlockHeader{},
							Data:     &common.BlockData{Data: [][]byte{[]byte("foo")}},
							Metadata: &common.BlockMetadata{Metadata: make([][]byte, 4)},
						})
957
958
959
960
961

						c.init()
						c.Start()
						defer c.Halt()

962
963
						Eventually(c.opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(Equal(snapshot.Metadata.Index + 1))
						Eventually(c.opts.MemoryStorage.LastIndex, LongEventualTimeout).Should(Equal(lasti))
964
965
966
967
968
969
970
971
972
973

						// chain keeps functioning
						Eventually(func() bool {
							c.clock.Increment(interval)
							select {
							case <-c.observe:
								return true
							default:
								return false
							}
974
						}, LongEventualTimeout).Should(BeTrue())
975
976
977
978

						c.cutter.CutNext = true
						err = c.Order(env, uint64(0))
						Expect(err).NotTo(HaveOccurred())
979
						Eventually(c.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
					})

					When("local ledger is in sync with snapshot", func() {
						It("does not pull blocks and still respects snapshot interval", func() {
							// Scenario:
							// - snapshot is taken at block 2
							// - order one more envelope (block 3)
							// - reboot chain at block 2
							// - block 3 should be replayed from wal
							// - order another envelope to trigger snapshot, containing block 3 & 4
							// Assertions:
							// - block puller should NOT be called
							// - chain should keep functioning after reboot
							// - chain should respect snapshot interval to trigger next snapshot

							// check snapshot does exit
996
							Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
997
998
999
1000

							// order another envelope. this should not trigger snapshot
							err = chain.Order(env, uint64(0))
							Expect(err).NotTo(HaveOccurred())
For faster browsing, not all history is shown. View entire blame