Commit ade6dc48 authored by Jay Guo's avatar Jay Guo
Browse files

FAB-14656 Respect snapshot interval when node restarts



When a node is restarted, it loads WAL data since last snapshot
into memory, however this chunk of data is not currently taken
into account as part of accumulated data, which is compared to
snapshot interval.

This CR fixes this. Also it changes some log level to improve
serviceability.

FAB-14656 #done

Change-Id: If152071e64fd8268d20362c593d24af4ab2be355
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 7e440c73
......@@ -274,8 +274,14 @@ func NewChain(
migrationStatus: migration.NewStatusStepper(support.IsSystemChannel(), support.ChainID()), // Needed by consensus-type migration
}
// Sets initial values for metrics
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
c.Metrics.IsLeader.Set(float64(0)) // all nodes start out as followers
c.Metrics.CommittedBlockNumber.Set(float64(c.lastBlock.Header.Number))
c.Metrics.SnapshotBlockNumber.Set(float64(c.lastSnapBlockNum))
// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
// We guard against replay of written blocks in `entriesToApply` instead.
// We guard against replay of written blocks with `appliedIndex` instead.
config := &raft.Config{
ID: c.raftID,
ElectionTick: c.opts.ElectionTick,
......@@ -317,9 +323,6 @@ func (c *Chain) MigrationStatus() migration.Status {
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
c.Metrics.ClusterSize.Set(float64(len(c.opts.BlockMetadata.ConsenterIds)))
// all nodes start out as followers
c.Metrics.IsLeader.Set(float64(0))
if err := c.configureComm(); err != nil {
c.logger.Errorf("Failed to start chain, aborting: +%v", err)
close(c.doneC)
......@@ -831,7 +834,7 @@ func (c *Chain) writeBlock(block *common.Block, index uint64) {
}
c.lastBlock = block
c.logger.Debugf("Writing block %d to ledger", block.Header.Number)
c.logger.Infof("Writing block %d (Raft index: %d) to ledger", block.Header.Number, index)
if utils.IsConfigBlock(block) {
c.writeConfigBlock(block, index)
......@@ -894,7 +897,7 @@ func (c *Chain) ordered(msg *orderer.SubmitRequest) (batches [][]*common.Envelop
func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
for _, batch := range batches {
b := bc.createNextBlock(batch)
c.logger.Debugf("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
c.logger.Infof("Created block %d, there are %d blocks in flight", b.Header.Number, c.blockInflight)
select {
case ch <- b:
......@@ -1006,7 +1009,6 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}
var appliedb uint64
var position int
for i := range ents {
switch ents[i].Type {
......@@ -1015,6 +1017,9 @@ func (c *Chain) apply(ents []raftpb.Entry) {
break
}
position = i
c.accDataSize += uint32(len(ents[i].Data))
// We need to strictly avoid re-applying normal entries,
// otherwise we are writing the same block twice.
if ents[i].Index <= c.appliedIndex {
......@@ -1024,11 +1029,7 @@ func (c *Chain) apply(ents []raftpb.Entry) {
block := utils.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index)
appliedb = block.Header.Number
c.Metrics.CommittedBlockNumber.Set(float64(appliedb))
position = i
c.accDataSize += uint32(len(ents[i].Data))
c.Metrics.CommittedBlockNumber.Set(float64(block.Header.Number))
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
......@@ -1077,20 +1078,17 @@ func (c *Chain) apply(ents []raftpb.Entry) {
}
}
if appliedb == 0 {
// no block has been written (appliedb == 0) in this round
return
}
if c.accDataSize >= c.sizeLimit {
b := utils.UnmarshalBlockOrPanic(ents[position].Data)
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Accumulated %d bytes since last snapshot, exceeding size limit (%d bytes), "+
"taking snapshot at block %d, last snapshotted block number is %d, nodes: %+v",
c.accDataSize, c.sizeLimit, appliedb, c.lastSnapBlockNum, c.confState.Nodes)
"taking snapshot at block %d (index: %d), last snapshotted block number is %d, current nodes: %+v",
c.accDataSize, c.sizeLimit, b.Header.Number, c.appliedIndex, c.lastSnapBlockNum, c.confState.Nodes)
c.accDataSize = 0
c.lastSnapBlockNum = appliedb
c.Metrics.SnapshotBlockNumber.Set(float64(appliedb))
c.lastSnapBlockNum = b.Header.Number
c.Metrics.SnapshotBlockNumber.Set(float64(b.Header.Number))
default:
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapshotIntervalSize is too small")
}
......
......@@ -296,8 +296,8 @@ var _ = Describe("Chain", func() {
Expect(fakeFields.fakeNormalProposalsReceived.AddCallCount()).To(Equal(1))
Expect(fakeFields.fakeNormalProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(0)).Should(Equal(float64(1)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1)))
// There are three calls to DataPersistDuration by now corresponding to the following three
// arriving on the Ready channel:
......@@ -321,8 +321,8 @@ var _ = Describe("Chain", func() {
clock.WaitForNWatchersAndIncrement(timeout, 2)
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(2)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2)))
Expect(fakeFields.fakeDataPersistDuration.ObserveCallCount()).Should(Equal(4))
Expect(fakeFields.fakeDataPersistDuration.ObserveArgsForCall(3)).Should(Equal(float64(0)))
})
......@@ -511,8 +511,8 @@ var _ = Describe("Chain", func() {
Expect(fakeFields.fakeConfigProposalsReceived.AddArgsForCall(0)).To(Equal(float64(1)))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Consistently(support.WriteBlockCallCount).Should(Equal(0))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(0)).Should(Equal(float64(1)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(1)))
})
})
......@@ -538,8 +538,8 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(support.WriteConfigBlockCallCount, LongEventualTimeout).Should(Equal(1))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(2))
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(1)).Should(Equal(float64(2)))
Expect(fakeFields.fakeCommittedBlockNumber.SetCallCount()).Should(Equal(3)) // incl. initial call
Expect(fakeFields.fakeCommittedBlockNumber.SetArgsForCall(2)).Should(Equal(float64(2)))
})
})
})
......@@ -908,10 +908,10 @@ var _ = Describe("Chain", func() {
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(1))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2)) // incl. initial call
s, _ := opts.MemoryStorage.Snapshot()
b := utils.UnmarshalBlockOrPanic(s.Data)
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(0)).To(Equal(float64(b.Header.Number)))
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number)))
i, _ = opts.MemoryStorage.FirstIndex()
......@@ -920,10 +920,10 @@ var _ = Describe("Chain", func() {
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
Eventually(opts.MemoryStorage.FirstIndex, LongEventualTimeout).Should(BeNumerically(">", i))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(2))
Expect(fakeFields.fakeSnapshotBlockNumber.SetCallCount()).To(Equal(3)) // incl. initial call
s, _ = opts.MemoryStorage.Snapshot()
b = utils.UnmarshalBlockOrPanic(s.Data)
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(1)).To(Equal(float64(b.Header.Number)))
Expect(fakeFields.fakeSnapshotBlockNumber.SetArgsForCall(2)).To(Equal(float64(b.Header.Number)))
})
It("pauses chain if sync is in progress", func() {
......@@ -1188,6 +1188,46 @@ var _ = Describe("Chain", func() {
Eventually(countFiles, LongEventualTimeout).Should(Equal(2))
})
})
It("respects snapshot interval after reboot", func() {
largeEnv := &common.Envelope{
Payload: marshalOrPanic(&common.Payload{
Header: &common.Header{ChannelHeader: marshalOrPanic(&common.ChannelHeader{Type: int32(common.HeaderType_MESSAGE), ChannelId: channelID})},
Data: make([]byte, 500),
}),
}
Expect(chain.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(1))
// check no snapshot is taken
Consistently(countFiles).Should(Equal(0))
_, metadata := support.WriteBlockArgsForCall(0)
m := &raftprotos.BlockMetadata{}
proto.Unmarshal(metadata, m)
chain.Halt()
raftMetadata.RaftIndex = m.RaftIndex
c1 := newChain(10*time.Second, channelID, dataDir, 1, raftMetadata, consenters)
cnt := support.WriteBlockCallCount()
for i := 0; i < cnt; i++ {
c1.support.WriteBlock(support.WriteBlockArgsForCall(i))
}
c1.cutter.CutNext = true
c1.opts.SnapshotIntervalSize = 1024
By("Restarting chain")
c1.init()
c1.Start()
// chain keeps functioning
campaign(c1.Chain, c1.observe)
Expect(c1.Order(largeEnv, uint64(0))).To(Succeed())
Eventually(c1.support.WriteBlockCallCount, LongEventualTimeout).Should(Equal(2))
// check snapshot does exit
Eventually(countFiles, LongEventualTimeout).Should(Equal(1))
})
})
})
})
......
......@@ -269,6 +269,8 @@ func (rs *RaftStorage) Store(entries []raftpb.Entry, hardstate raftpb.HardState,
}
func (rs *RaftStorage) saveSnap(snap raftpb.Snapshot) error {
rs.lg.Infof("Persisting snapshot (term: %d, index: %d) to WAL and disk", snap.Metadata.Term, snap.Metadata.Index)
// must save the snapshot index to the WAL before saving the
// snapshot to maintain the invariant that we only Open the
// wal at previously-saved snapshot indexes.
......@@ -277,12 +279,10 @@ func (rs *RaftStorage) saveSnap(snap raftpb.Snapshot) error {
Term: snap.Metadata.Term,
}
rs.lg.Debugf("Saving snapshot to WAL")
if err := rs.wal.SaveSnapshot(walsnap); err != nil {
return errors.Errorf("failed to save snapshot to WAL: %s", err)
}
rs.lg.Debugf("Saving snapshot to disk")
if err := rs.snap.SaveSnap(snap); err != nil {
return errors.Errorf("failed to save snapshot to disk: %s", err)
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment