Commit 73f3e405 authored by Yacov Manevich's avatar Yacov Manevich Committed by Gerrit Code Review
Browse files

Merge "FAB-14656 Respect snapshot interval when node restarts" into release-1.4

parents 21e02f31 ade6dc48
......@@ -274,8 +274,14 @@ func NewChain(
migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
}
// Sets initial values for metrics
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
c.Metrics.IsLeader.Set(float64(0)) // all nodes start out as followers
c.Metrics.CommittedBlockNumber.Set(float64(c.lastBlock.Header.Number))
c.Metrics.SnapshotBlockNumber.Set(float64(c.lastSnapBlockNum))
// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
// We guard against replay of written blocks in `entriesToApply` instead.
// We guard against replay of written blocks with `appliedIndex` instead.
config := &raft.Config{
ID: c.raftID,
ElectionTick: c.opts.ElectionTick,
......@@ -317,9 +323,6 @@ func (c *Chain) MigrationStatus() migration.Status {
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
// all nodes start out as followers
c.Metrics.IsLeader.Set(float64(0))
if err := c.configureComm(); err != nil {
c.logger.Errorf("Failed to start chain, aborting: +%v", err)
close(c.doneC)
......@@ -831,7 +834,7 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) {
}
c.lastBlock = block
c.logger.Debugf("Writing block %d to ledger", block.Header.Number)
c.logger.Infof("Writing block %d (Raft index: %d) to ledger", block.Header.Number, index)
if utils.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
......@@ -894,7 +897,7 @@ func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelop
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch)
c.logger.Debugf("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
c.logger.Infof("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b:
......@@ -1006,7 +1009,6 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}
var appliedb uint64
var position int
for i := range ents {
switch ents[i].Type {
......@@ -1015,6 +1017,9 @@ func (c *Chain) apply(ents []raftpb.Entry) {
break
}
position = i
c.accDataSize += uint32(len(ents[i].Data))
// We need to strictly avoid re-applying normal entries,
// otherwise we are writing the same block twice.
if ents[i].Index <= c.appliedIndex {
......@@ -1024,11 +1029,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
block := utils.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index)
appliedb = block.Header.Number
c.Metrics.CommittedBlockNumber.Set(float64(appliedb))
position = i
c.accDataSize += uint32(len(ents[i].Data))
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
......@@ -1077,20 +1078,17 @@ func (c *Chain) apply(ents []raftpb.Entry) {
}
}
if appliedb == 0 {
// no block has been written (appliedb == 0) in this round
return
}
if c.accDataSize >= c.sizeLimit {
b := utils.UnmarshalBlockOrPanic(ents[position].Data)
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block %d, last snapshotted block number is %d, nodes: %+v",
c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum, c.confState.Nodes)
"taking snapshot at block %d (index: %d), last snapshotted block number is %d, current nodes: %+v",
c.accDataSize, c.sizeLimit, b.Header.Number, c.appliedIndex, c.lastSnapBlockNum, c.confState.Nodes)
c.accDataSize = 0
c.lastSnapBlockNum = appliedb
c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
c.lastSnapBlockNum = b.Header.Number
c.Metrics.SnapshotBlockNumber.Set(float64(b.Header.Number))
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotIntervalSize is too small")
}
......
......@@ -296,8 +296,8 @@ var _ = Describe("Chain", func() {
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(0)).Should(Equal(float64(1)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1)))
// There are three calls to DataPersistDuration by now corresponding to the following three
// arriving on the Ready channel:
......@@ -321,8 +321,8 @@ var _ = Describe("Chain", func() {
clock.WaitForNWatchersAndIncrement(timeout, 2)
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(2)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2)))
Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(4))
Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(3)).Should(Equal(float64(0)))
})
......@@ -511,8 +511,8 @@ var _ = Describe("Chain", func() {
Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Consistently(support.WriteBlockCallCount).Should(Equal(0))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(0)).Should(Equal(float64(1)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1)))
})
})
......@@ -538,8 +538,8 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(2)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2)))
})
})
})
......@@ -908,10 +908,10 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(1))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2)) // incl. initial call
s, _ := opts.MemoryStorage.Snapshot()
b := utils.UnmarshalBlockOrPanic(s.Data)
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(0)).To(Equal(float64(b.Header.Number)))
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number)))
i, _ = opts.MemoryStorage.FirstIndex()
......@@ -920,10 +920,10 @@ var _ = Describe("Chain", func() {
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(3)) // incl. initial call
s, _ = opts.MemoryStorage.Snapshot()
b = utils.UnmarshalBlockOrPanic(s.Data)
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number)))
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(2)).To(Equal(float64(b.Header.Number)))
})
It("pauses chain if sync is in progress", func() {
......@@ -1188,6 +1188,46 @@ var _ = Describe("Chain", func() {
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
})
})
It("respects snapshot interval after reboot", func() {
largeEnv := &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
Data: make([]byte, 500),
}),
}
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
// check no snapshot is taken
Consistently(countFiles).Should(Equal(0))
_, metadata := support.WriteBlockArgsForCall(0)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c1 := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
cnt := support.WriteBlockCallCount()
for i := 0; i < cnt; i++ {
c1.support.WriteBlock(support.WriteBlockArgsForCall(i))
}
c1.cutter.CutNext = true
c1.opts.SnapshotIntervalSize = 1024
By("Restarting chain")
c1.init()
c1.Start()
// chain keeps functioning
campaign(c1.Chain, c1.observe)
Expect(c1.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// check snapshot does exit
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
})
})
})
})
......
......@@ -269,6 +269,8 @@ func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState,
}
func (rs *RaftStorage) saveSnap(snap raftpb.Snapshot) error {
rs.lg.Infof("Persisting snapshot (term: %d, index: %d) to WAL and disk", snap.Metadata.Term, snap.Metadata.Index)
// must save the snapshot index to the WAL before saving the
// snapshot to maintain the invariant that we only Open the
// wal at previously-saved snapshot indexes.
......@@ -277,12 +279,10 @@ func (rs *RaftStorage) saveSnap(snap raftpb.Snapshot) error {
Term: snap.Metadata.Term,
}
rs.lg.Debugf("Saving snapshot to WAL")
if err := rs.wal.SaveSnapshot(walsnap); err != nil {
return errors.Errorf("failed to save snapshot to WAL: %s", err)
}
rs.lg.Debugf("Saving snapshot to disk")
if err := rs.snap.SaveSnap(snap); err != nil {
return errors.Errorf("failed to save snapshot to disk: %s", err)
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment