Commit 9b8827b8 authored by Jay Guo's avatar Jay Guo Committed by yacovm
Browse files

[FAB-11919] Towards etcdraft snapshotting 2/4



This CR adds enables etcd/raft based chain to take snapshot for
every `SnapInterval` blocks. It stores snapshot to wal and disk.
When node restarts, it loads latest snapshot and uses Term and
Index stored in it to replay WAL to memory storage.

Change-Id: Ib8c22173cd5b05b7a1b27b16e2003bdabbecf82a
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 068b0e96
......@@ -102,7 +102,7 @@
revision = "be9bd761db19d4fc551d40be908f02e00487511d"
[[projects]]
digest = "1:19fe0160ba5281f25062c4704df82bac66ac11fe6780f2752443405a79c462d0"
digest = "1:cffadb7be778996825cde9028332fa925f07f6c487467f7f8ae41e7472bdbfc4"
name = "github.com/coreos/etcd"
packages = [
"pkg/crc",
......@@ -111,6 +111,8 @@
"pkg/pbutil",
"raft",
"raft/raftpb",
"snap",
"snap/snappb",
"wal",
"wal/walpb",
]
......@@ -932,6 +934,7 @@
"github.com/Shopify/sarama/mocks",
"github.com/coreos/etcd/raft",
"github.com/coreos/etcd/raft/raftpb",
"github.com/coreos/etcd/snap",
"github.com/coreos/etcd/wal",
"github.com/coreos/etcd/wal/walpb",
"github.com/davecgh/go-spew/spew",
......
......@@ -11,12 +11,14 @@ import (
"encoding/pem"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
"code.cloudfoundry.org/clock"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/coreos/etcd/wal"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/configtx"
"github.com/hyperledger/fabric/common/flogging"
......@@ -29,6 +31,11 @@ import (
"github.com/pkg/errors"
)
// DefaultSnapshotCatchUpEntries is the default number of entries
// to preserve in memory when a snapshot is taken. This is for
// slow followers to catch up.
const DefaultSnapshotCatchUpEntries = uint64(500)
//go:generate mockery -dir . -name Configurator -case underscore -output ./mocks/
// Configurator is used to configure the communication layer
......@@ -45,6 +52,14 @@ type RPC interface {
SendSubmit(dest uint64, request *orderer.SubmitRequest) error
}
//go:generate counterfeiter -o mocks/mock_blockpuller.go . BlockPuller
// BlockPuller is used to pull blocks from other OSN
type BlockPuller interface {
PullBlock(seq uint64) *common.Block
Close()
}
type block struct {
b *common.Block
......@@ -60,7 +75,14 @@ type Options struct {
Clock clock.Clock
WALDir string
WALDir string
SnapDir string
SnapInterval uint64
// This is configurable mainly for testing purpose. Users are not
// expected to alter this. Instead, DefaultSnapshotCatchUpEntries is used.
SnapshotCatchUpEntries uint64
MemoryStorage MemoryStorage
Logger *flogging.FabricLogger
......@@ -83,11 +105,12 @@ type Chain struct {
submitC chan *orderer.SubmitRequest
commitC chan block
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
resignC chan struct{} // Notifies node that it is no longer the leader
startC chan struct{} // Closes when the node is started
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
resignC chan struct{} // Notifies node that it is no longer the leader
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
clock clock.Clock // Tests can inject a fake clock
......@@ -96,7 +119,14 @@ type Chain struct {
leader uint64
appliedIndex uint64
hasWAL bool // indicate if this is a fresh raft node
// needed by snapshotting
lastSnapBlockNum uint64
syncLock sync.Mutex // Protects the manipulation of syncC
syncC chan struct{} // Indicate sync in progress
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
puller BlockPuller // Deliver client to pull blocks from other OSNs
fresh bool // indicate if this is a fresh raft node
node raft.Node
storage *RaftStorage
......@@ -111,35 +141,55 @@ func NewChain(
opts Options,
conf Configurator,
rpc RPC,
puller BlockPuller,
observeC chan<- uint64) (*Chain, error) {
lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
applied := opts.RaftMetadata.RaftIndex
storage, hasWAL, err := Restore(lg, applied, opts.WALDir, opts.MemoryStorage)
fresh := !wal.Exist(opts.WALDir)
appliedi := opts.RaftMetadata.RaftIndex
storage, err := CreateStorage(lg, appliedi, opts.WALDir, opts.SnapDir, opts.MemoryStorage)
if err != nil {
return nil, errors.Errorf("failed to restore persisted raft data: %s", err)
}
if opts.SnapshotCatchUpEntries == 0 {
storage.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
} else {
storage.SnapshotCatchUpEntries = opts.SnapshotCatchUpEntries
}
// get block number in last snapshot, if exists
var snapBlkNum uint64
if s := storage.Snapshot(); !raft.IsEmptySnap(s) {
b := utils.UnmarshalBlockOrPanic(s.Data)
snapBlkNum = b.Header.Number
}
return &Chain{
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *orderer.SubmitRequest),
commitC: make(chan block),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
resignC: make(chan struct{}),
startC: make(chan struct{}),
observeC: observeC,
support: support,
hasWAL: hasWAL,
appliedIndex: applied,
clock: opts.Clock,
logger: lg,
storage: storage,
opts: opts,
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
raftID: opts.RaftID,
submitC: make(chan *orderer.SubmitRequest),
commitC: make(chan block),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
resignC: make(chan struct{}),
startC: make(chan struct{}),
syncC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
observeC: observeC,
support: support,
fresh: fresh,
appliedIndex: appliedi,
lastSnapBlockNum: snapBlkNum,
puller: puller,
clock: opts.Clock,
logger: lg,
storage: storage,
opts: opts,
}, nil
}
......@@ -168,7 +218,7 @@ func (c *Chain) Start() {
raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters)
if !c.hasWAL {
if c.fresh {
c.logger.Infof("starting new raft node %d", c.raftID)
c.node = raft.StartNode(config, raftPeers)
} else {
......@@ -237,8 +287,25 @@ func (c *Chain) checkConfigUpdateValidity(ctx *common.Envelope) error {
}
}
// WaitReady is currently a no-op.
// WaitReady blocks when the chain:
// - is catching up with other nodes using snapshot
//
// In any other case, it returns right away.
func (c *Chain) WaitReady() error {
if err := c.isRunning(); err != nil {
return err
}
c.syncLock.Lock()
ch := c.syncC
c.syncLock.Unlock()
select {
case <-ch:
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
return nil
}
......@@ -351,8 +418,16 @@ func (c *Chain) serveRequest() {
ticking = false
}
for {
if s := c.storage.Snapshot(); !raft.IsEmptySnap(s) {
if err := c.catchUp(&s); err != nil {
c.logger.Errorf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
s.Metadata.Term, s.Metadata.Index, err)
}
} else {
close(c.syncC)
}
for {
select {
case msg := <-c.submitC:
batches, pending, err := c.ordered(msg)
......@@ -390,6 +465,12 @@ func (c *Chain) serveRequest() {
c.logger.Errorf("Failed to commit block: %s", err)
}
case sn := <-c.snapC:
if err := c.catchUp(sn); err != nil {
c.logger.Errorf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
sn.Metadata.Term, sn.Metadata.Index, err)
}
case <-c.doneC:
c.logger.Infof("Stop serving requests")
return
......@@ -467,6 +548,47 @@ func (c *Chain) commitBatches(batches ...[]*common.Envelope) error {
return nil
}
func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
b, err := utils.UnmarshalBlock(snap.Data)
if err != nil {
return errors.Errorf("failed to unmarshal snapshot data to block: %s", err)
}
c.logger.Infof("Catching up with snapshot taken at block %d", b.Header.Number)
next := c.support.Height()
if next > b.Header.Number {
c.logger.Warnf("Snapshot is at block %d, local block number is %d, no sync needed", b.Header.Number, next-1)
return nil
}
c.syncLock.Lock()
c.syncC = make(chan struct{})
c.syncLock.Unlock()
defer func() {
close(c.syncC)
c.puller.Close()
}()
for next <= b.Header.Number {
block := c.puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block %d from cluster", next)
}
if utils.IsConfigBlock(block) {
c.support.WriteConfigBlock(block, nil)
} else {
c.support.WriteBlock(block, nil)
}
next++
}
c.logger.Infof("Finished syncing with cluster up to block %d (incl.)", b.Header.Number)
return nil
}
func (c *Chain) serveRaft() {
ticker := c.clock.NewTicker(c.opts.TickInterval)
......@@ -476,11 +598,24 @@ func (c *Chain) serveRaft() {
c.node.Tick()
case rd := <-c.node.Ready():
if err := c.storage.Store(rd.Entries, rd.HardState); err != nil {
if err := c.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
c.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}
if !raft.IsEmptySnap(rd.Snapshot) {
c.snapC <- &rd.Snapshot
b := utils.UnmarshalBlockOrPanic(rd.Snapshot.Data)
c.lastSnapBlockNum = b.Header.Number
c.confState = rd.Snapshot.Metadata.ConfState
c.appliedIndex = rd.Snapshot.Metadata.Index
}
c.apply(rd.CommittedEntries)
c.node.Advance()
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
c.send(rd.Messages)
if rd.SoftState != nil {
......@@ -522,6 +657,8 @@ func (c *Chain) apply(ents []raftpb.Entry) {
c.logger.Panicf("first index of committed entry[%d] should <= appliedIndex[%d]+1", ents[0].Index, c.appliedIndex)
}
var appliedb uint64
var position int
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
......@@ -531,7 +668,10 @@ func (c *Chain) apply(ents []raftpb.Entry) {
break
}
c.commitC <- block{utils.UnmarshalBlockOrPanic(ents[i].Data), ents[i].Index}
b := block{utils.UnmarshalBlockOrPanic(ents[i].Data), ents[i].Index}
c.commitC <- b
appliedb = b.b.Header.Number
position = i
case raftpb.EntryConfChange:
var cc raftpb.ConfChange
......@@ -540,13 +680,28 @@ func (c *Chain) apply(ents []raftpb.Entry) {
continue
}
c.node.ApplyConfChange(cc)
c.confState = *c.node.ApplyConfChange(cc)
}
if ents[i].Index > c.appliedIndex {
c.appliedIndex = ents[i].Index
}
}
if c.opts.SnapInterval == 0 || appliedb == 0 {
// snapshot is not enabled (SnapInterval == 0) or
// no block has been written (appliedb == 0) in this round
return
}
if appliedb-c.lastSnapBlockNum >= c.opts.SnapInterval {
c.logger.Infof("Taking snapshot at block %d, last snapshotted block number is %d", appliedb, c.lastSnapBlockNum)
if err := c.storage.TakeSnapshot(c.appliedIndex, &c.confState, ents[position].Data); err != nil {
c.logger.Fatalf("Failed to create snapshot at index %d", c.appliedIndex)
}
c.lastSnapBlockNum = appliedb
}
}
func (c *Chain) send(msgs []raftpb.Message) {
......@@ -555,11 +710,19 @@ func (c *Chain) send(msgs []raftpb.Message) {
continue
}
status := raft.SnapshotFinish
msgBytes := utils.MarshalOrPanic(&msg)
_, err := c.rpc.Step(msg.To, &orderer.StepRequest{Channel: c.support.ChainID(), Payload: msgBytes})
if err != nil {
// TODO We should call ReportUnreachable if message delivery fails
c.logger.Errorf("Failed to send StepRequest to %d, because: %s", msg.To, err)
status = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
c.node.ReportSnapshot(msg.To, status)
}
}
}
......
This diff is collapsed.
......@@ -40,7 +40,8 @@ type ChainGetter interface {
// Config contains etcdraft configurations
type Config struct {
WALDir string // WAL data of <my-channel> is stored in WALDir/<my-channel>
WALDir string // WAL data of <my-channel> is stored in WALDir/<my-channel>
SnapDir string // Snapshots of <my-channel> are stored in SnapDir/<my-channel>
}
// Consenter implements etddraft consenter
......@@ -133,11 +134,13 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
MaxSizePerMsg: m.Options.MaxSizePerMsg,
RaftMetadata: raftMetadata,
WALDir: path.Join(c.Config.WALDir, support.ChainID()),
WALDir: path.Join(c.Config.WALDir, support.ChainID()),
SnapDir: path.Join(c.Config.SnapDir, support.ChainID()),
}
rpc := &cluster.RPC{Channel: support.ChainID(), Comm: c.Communication}
return NewChain(support, opts, c.Communication, rpc, nil)
return NewChain(support, opts, c.Communication, rpc, nil, nil)
}
func raftMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.Metadata) (*etcdraft.RaftMetadata, error) {
......
......@@ -9,6 +9,7 @@ package etcdraft_test
import (
"io/ioutil"
"os"
"path"
"github.com/hyperledger/fabric/common/flogging"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
......@@ -30,11 +31,23 @@ var _ = Describe("Consenter", func() {
var (
chainGetter *mocks.ChainGetter
support *consensusmocks.FakeConsenterSupport
dataDir string
snapDir string
walDir string
err error
)
BeforeEach(func() {
chainGetter = &mocks.ChainGetter{}
support = &consensusmocks.FakeConsenterSupport{}
dataDir, err = ioutil.TempDir("", "snap-")
Expect(err).NotTo(HaveOccurred())
walDir = path.Join(dataDir, "wal-")
snapDir = path.Join(dataDir, "snap-")
})
AfterEach(func() {
os.RemoveAll(dataDir)
})
When("the consenter is extracting the channel", func() {
......@@ -117,12 +130,9 @@ var _ = Describe("Consenter", func() {
metadata := utils.MarshalOrPanic(m)
support.SharedConfigReturns(&mockconfig.Orderer{ConsensusMetadataVal: metadata})
dir, err := ioutil.TempDir("", "wal-")
Expect(err).NotTo(HaveOccurred())
defer os.RemoveAll(dir)
consenter := newConsenter(chainGetter)
consenter.Config.WALDir = dir
consenter.Config.WALDir = walDir
consenter.Config.SnapDir = snapDir
chain, err := consenter.HandleChain(support, nil)
Expect(err).NotTo(HaveOccurred())
......@@ -174,7 +184,8 @@ var _ = Describe("Consenter", func() {
defer os.RemoveAll(dir)
consenter := newConsenter(chainGetter)
consenter.Config.WALDir = dir
consenter.Config.WALDir = walDir
consenter.Config.SnapDir = snapDir
d := &etcdraftproto.RaftMetadata{
Consenters: map[uint64]*etcdraftproto.Consenter{1: c},
......
// Code generated by counterfeiter. DO NOT EDIT.
package mocks
import (
"sync"
"github.com/hyperledger/fabric/orderer/consensus/etcdraft"
"github.com/hyperledger/fabric/protos/common"
)
type FakeBlockPuller struct {
PullBlockStub func(seq uint64) *common.Block
pullBlockMutex sync.RWMutex
pullBlockArgsForCall []struct {
seq uint64
}
pullBlockReturns struct {
result1 *common.Block
}
pullBlockReturnsOnCall map[int]struct {
result1 *common.Block
}
CloseStub func()
closeMutex sync.RWMutex
closeArgsForCall []struct{}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *FakeBlockPuller) PullBlock(seq uint64) *common.Block {
fake.pullBlockMutex.Lock()
ret, specificReturn := fake.pullBlockReturnsOnCall[len(fake.pullBlockArgsForCall)]
fake.pullBlockArgsForCall = append(fake.pullBlockArgsForCall, struct {
seq uint64
}{seq})
fake.recordInvocation("PullBlock", []interface{}{seq})
fake.pullBlockMutex.Unlock()
if fake.PullBlockStub != nil {
return fake.PullBlockStub(seq)
}
if specificReturn {
return ret.result1
}
return fake.pullBlockReturns.result1
}
func (fake *FakeBlockPuller) PullBlockCallCount() int {
fake.pullBlockMutex.RLock()
defer fake.pullBlockMutex.RUnlock()
return len(fake.pullBlockArgsForCall)
}
func (fake *FakeBlockPuller) PullBlockArgsForCall(i int) uint64 {
fake.pullBlockMutex.RLock()
defer fake.pullBlockMutex.RUnlock()
return fake.pullBlockArgsForCall[i].seq
}
func (fake *FakeBlockPuller) PullBlockReturns(result1 *common.Block) {
fake.PullBlockStub = nil
fake.pullBlockReturns = struct {
result1 *common.Block
}{result1}
}
func (fake *FakeBlockPuller) PullBlockReturnsOnCall(i int, result1 *common.Block) {
fake.PullBlockStub = nil
if fake.pullBlockReturnsOnCall == nil {
fake.pullBlockReturnsOnCall = make(map[int]struct {
result1 *common.Block
})
}
fake.pullBlockReturnsOnCall[i] = struct {
result1 *common.Block
}{result1}
}
func (fake *FakeBlockPuller) Close() {
fake.closeMutex.Lock()
fake.closeArgsForCall = append(fake.closeArgsForCall, struct{}{})
fake.recordInvocation("Close", []interface{}{})
fake.closeMutex.Unlock()
if fake.CloseStub != nil {
fake.CloseStub()
}
}
func (fake *FakeBlockPuller) CloseCallCount() int {
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
return len(fake.closeArgsForCall)
}
func (fake *FakeBlockPuller) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.pullBlockMutex.RLock()
defer fake.pullBlockMutex.RUnlock()
fake.closeMutex.RLock()
defer fake.closeMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *FakeBlockPuller) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ etcdraft.BlockPuller = new(FakeBlockPuller)
......@@ -11,6 +11,7 @@ import (
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"