Unverified Commit 2ad9d9da authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13059] put raft snapshotting in go routine

This CR puts Raft snapshotting into a go routine to avoid
excessive snapshotting due to extreme small SnapshotInterval.

This is also preperation for WAL files pruning.

Change-Id: Ib43a2197c533bdc224a4bc52ff6cb418b62a0c33
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 8c445197
......@@ -90,6 +90,12 @@ type submit struct {
leader chan uint64
type gc struct {
index uint64
state raftpb.ConfState
data []byte
// Chain implements consensus.Chain interface.
type Chain struct {
configurator Configurator
......@@ -106,6 +112,7 @@ type Chain struct {
doneC chan struct{} // Closes when the chain halts
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
gcC chan *gc // Signal to take snapshot
errorCLock sync.RWMutex
errorC chan struct{} // returned by Errored()
......@@ -177,6 +184,7 @@ func NewChain(
startC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
errorC: make(chan struct{}),
gcC: make(chan *gc),
observeC: observeC,
support: support,
fresh: fresh,
......@@ -234,6 +242,7 @@ func (c *Chain) Start() {
go c.gc()
go c.serveRequest()
......@@ -770,14 +779,30 @@ func (c *Chain) apply(ents []raftpb.Entry) {
if appliedb-c.lastSnapBlockNum >= c.opts.SnapInterval {
c.logger.Infof("Taking snapshot at block %d, last snapshotted block number is %d", appliedb, c.lastSnapBlockNum)
c.node.takeSnapshot(c.appliedIndex, &c.confState, ents[position].Data)
c.lastSnapBlockNum = appliedb
select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.logger.Infof("Taking snapshot at block %d, last snapshotted block number is %d", appliedb, c.lastSnapBlockNum)
c.lastSnapBlockNum = appliedb
c.logger.Warnf("Snapshotting is in progress, it is very likely that SnapInterval is too small")
func (c *Chain) gc() {
for {
select {
case g := <-c.gcC:
c.node.takeSnapshot(g.index, g.state, g.data)
case <-c.doneC:
c.logger.Infof("Stop garbage collecting")
func (c *Chain) isConfig(env *common.Envelope) bool {
h, err := utils.ChannelHeader(env)
if err != nil {
......@@ -118,9 +118,9 @@ func (n *node) send(msgs []raftpb.Message) {
func (n *node) takeSnapshot(index uint64, cs *raftpb.ConfState, data []byte) {
func (n *node) takeSnapshot(index uint64, cs raftpb.ConfState, data []byte) {
if err := n.storage.TakeSnapshot(index, cs, data); err != nil {
n.logger.Panicf("Failed to create snapshot at index %d: %s", index, err)
n.logger.Errorf("Failed to create snapshot at index %d: %s", index, err)
......@@ -200,9 +200,9 @@ func (rs *RaftStorage) saveSnap(snap raftpb.Snapshot) error {
// TakeSnapshot takes a snapshot at index i from MemoryStorage, and persists it to wal and disk.
func (rs *RaftStorage) TakeSnapshot(i uint64, cs *raftpb.ConfState, data []byte) error {
func (rs *RaftStorage) TakeSnapshot(i uint64, cs raftpb.ConfState, data []byte) error {
rs.lg.Debugf("Creating snapshot at index %d from MemoryStorage", i)
snap, err := rs.ram.CreateSnapshot(i, cs, data)
snap, err := rs.ram.CreateSnapshot(i, &cs, data)
if err != nil {
return errors.Errorf("failed to create snapshot from MemoryStorage: %s", err)
......@@ -219,7 +219,7 @@ func (rs *RaftStorage) TakeSnapshot(i uint64, cs *raftpb.ConfState, data []byte)
if err == raft.ErrCompacted {
rs.lg.Warnf("Raft entries prior to %d are already purged", compacti)
} else {
rs.lg.Fatalf("Failed to purg raft entries: %s", err)
rs.lg.Fatalf("Failed to purge raft entries: %s", err)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment