Unverified Commit ae55c51e authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13178] Move raft logic to its own file



This CR makes raft-facing logic more contained by encapsulating
them in a seperate object, in a new file. This CR also simplify
the implementation by removing some unecessary locking and channels

Change-Id: Ibefe74a35f0645955cbb3885efc8a3f8aaa0a8ca
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 06ce7476
......@@ -101,7 +101,6 @@ type Chain struct {
observeC chan<- uint64 // Notifies external observer on leader change (passed in optionally as an argument for tests)
haltC chan struct{} // Signals to goroutines that the chain is halting
doneC chan struct{} // Closes when the chain halts
resignC chan struct{} // Notifies node that it is no longer the leader
startC chan struct{} // Closes when the node is started
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
......@@ -118,16 +117,13 @@ type Chain struct {
// needed by snapshotting
lastSnapBlockNum uint64
syncLock sync.Mutex // Protects the manipulation of syncC
syncC chan struct{} // Indicate sync in progress
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
puller BlockPuller // Deliver client to pull blocks from other OSNs
fresh bool // indicate if this is a fresh raft node
node raft.Node
storage *RaftStorage
opts Options
node *node
opts Options
logger *flogging.FabricLogger
}
......@@ -164,7 +160,7 @@ func NewChain(
lastBlock := support.Block(support.Height() - 1)
return &Chain{
c := &Chain{
configurator: conf,
rpc: rpc,
channelID: support.ChainID(),
......@@ -173,9 +169,7 @@ func NewChain(
applyC: make(chan apply),
haltC: make(chan struct{}),
doneC: make(chan struct{}),
resignC: make(chan struct{}),
startC: make(chan struct{}),
syncC: make(chan struct{}),
snapC: make(chan *raftpb.Snapshot),
observeC: observeC,
support: support,
......@@ -186,14 +180,8 @@ func NewChain(
puller: puller,
clock: opts.Clock,
logger: lg,
storage: storage,
opts: opts,
}, nil
}
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
}
// DO NOT use Applied option in config, see https://github.com/etcd-io/etcd/issues/10217
// We guard against replay of written blocks in `entriesToApply` instead.
......@@ -211,30 +199,34 @@ func (c *Chain) Start() {
DisableProposalForwarding: true, // This prevents blocks from being accidentally proposed by followers
}
c.node = &node{
chainID: c.channelID,
chain: c,
logger: c.logger,
storage: storage,
rpc: c.rpc,
config: config,
tickInterval: c.opts.TickInterval,
clock: c.clock,
metadata: c.opts.RaftMetadata,
}
return c, nil
}
// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.logger.Infof("Starting Raft node")
if err := c.configureComm(); err != nil {
c.logger.Errorf("Failed to start chain, aborting: +%v", err)
close(c.doneC)
return
}
raftPeers := RaftPeers(c.opts.RaftMetadata.Consenters)
if c.fresh {
if c.support.Height() > 1 {
raftPeers = nil
c.logger.Info("Starting raft node to join an existing channel")
} else {
c.logger.Info("Starting raft node as part of a new channel")
}
c.node = raft.StartNode(config, raftPeers)
} else {
c.logger.Info("Restarting raft node")
c.node = raft.RestartNode(config)
}
c.node.start(c.fresh, c.support.Height() > 1)
close(c.startC)
go c.serveRaft()
go c.serveRequest()
}
......@@ -294,12 +286,8 @@ func (c *Chain) WaitReady() error {
return err
}
c.syncLock.Lock()
ch := c.syncC
c.syncLock.Unlock()
select {
case <-ch:
case c.submitC <- nil:
case <-c.doneC:
return errors.Errorf("chain is stopped")
}
......@@ -394,8 +382,8 @@ func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
}
type apply struct {
entries []raftpb.Entry
justElected bool
entries []raftpb.Entry
soft *raft.SoftState
}
func (c *Chain) serveRequest() {
......@@ -423,20 +411,17 @@ func (c *Chain) serveRequest() {
ticking = false
}
if s := c.storage.Snapshot(); !raft.IsEmptySnap(s) {
if err := c.catchUp(&s); err != nil {
c.logger.Errorf("Failed to recover from snapshot taken at Term %d and Index %d: %s",
s.Metadata.Term, s.Metadata.Index, err)
}
} else {
close(c.syncC)
}
var submitC chan *orderer.SubmitRequest
var leader uint64
submitC := c.submitC
for {
select {
case msg := <-submitC:
if msg == nil {
// polled by `WaitReady`
continue
}
batches, pending, err := c.ordered(msg)
if err != nil {
c.logger.Errorf("Failed to order message: %s", err)
......@@ -453,8 +438,38 @@ func (c *Chain) serveRequest() {
}
case app := <-c.applyC:
var ready bool
if app.justElected {
var elected, ready bool
if app.soft != nil {
newLeader := atomic.LoadUint64(&app.soft.Lead) // etcdraft requires atomic access
oldLeader := atomic.SwapUint64(&c.leader, newLeader)
if newLeader != oldLeader {
c.logger.Infof("Raft leader changed: %d -> %d", oldLeader, newLeader)
// follower -> leader
if newLeader == c.raftID {
elected = true
}
// leader -> follower
if oldLeader == c.raftID {
_ = c.support.BlockCutter().Cut()
c.BlockCreator.resetCreatedBlocks()
stop()
submitC = c.submitC
}
leader = newLeader
// notify external observer
select {
case c.observeC <- leader:
default:
}
}
}
if elected {
// if there is unfinished ConfChange, we should resume the effort to propose it as
// new leader, and wait for it to be committed before start serving new requests.
if cc := c.getInFlightConfChange(); cc != nil {
......@@ -469,16 +484,11 @@ func (c *Chain) serveRequest() {
}
appliedConfig := c.apply(app.entries)
if appliedConfig || ready {
submitC = c.submitC // start accepting new envelopes
if (appliedConfig || ready) && c.raftID == leader {
c.logger.Infof("Start accepting requests as Raft leader")
submitC = c.submitC
}
case <-c.resignC:
_ = c.support.BlockCutter().Cut()
c.BlockCreator.resetCreatedBlocks()
stop()
submitC = nil
case <-timer.C():
ticking = false
......@@ -492,6 +502,11 @@ func (c *Chain) serveRequest() {
c.propose(batch) // we are certain this is normal block, no need to block
case sn := <-c.snapC:
if sn.Metadata.Index <= c.appliedIndex {
c.logger.Debugf("Skip snapshot taken at index %d, because it is behind current applied index %d", sn.Metadata.Index, c.appliedIndex)
break
}
b := utils.UnmarshalBlockOrPanic(sn.Data)
c.lastSnapBlockNum = b.Header.Number
c.confState = sn.Metadata.ConfState
......@@ -596,11 +611,7 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}
c.syncLock.Lock()
c.syncC = make(chan struct{})
c.syncLock.Unlock()
defer func() {
close(c.syncC)
c.puller.Close()
}()
......@@ -623,67 +634,6 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}
func (c *Chain) serveRaft() {
ticker := c.clock.NewTicker(c.opts.TickInterval)
for {
select {
case <-ticker.C():
c.node.Tick()
case rd := <-c.node.Ready():
if err := c.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
c.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}
if !raft.IsEmptySnap(rd.Snapshot) {
c.snapC <- &rd.Snapshot
}
var justElected bool
if rd.SoftState != nil {
newLead := atomic.LoadUint64(&rd.SoftState.Lead)
lead := atomic.LoadUint64(&c.leader)
if newLead != lead {
c.logger.Infof("Raft leader changed: %d -> %d", lead, newLead)
atomic.StoreUint64(&c.leader, newLead)
if lead == c.raftID {
c.resignC <- struct{}{}
}
if newLead == c.raftID {
justElected = true
}
// notify external observer
select {
case c.observeC <- newLead:
default:
}
}
}
c.applyC <- apply{rd.CommittedEntries, justElected}
c.node.Advance()
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
c.send(rd.Messages)
case <-c.haltC:
ticker.Stop()
c.node.Stop()
c.storage.Close()
c.logger.Infof("Raft node stopped")
close(c.doneC) // close after all the artifacts are closed
return
}
}
}
// apply applies data to ledger, and return true when:
// - comitting a config block that does not add/remove nodes, or
// - applying a raft ConfChange that was introduced by previous config block
......@@ -757,39 +707,13 @@ func (c *Chain) apply(ents []raftpb.Entry) (unblocking bool) {
if appliedb-c.lastSnapBlockNum >= c.opts.SnapInterval {
c.logger.Infof("Taking snapshot at block %d, last snapshotted block number is %d", appliedb, c.lastSnapBlockNum)
if err := c.storage.TakeSnapshot(c.appliedIndex, &c.confState, ents[position].Data); err != nil {
c.logger.Fatalf("Failed to create snapshot at index %d", c.appliedIndex)
}
c.node.takeSnapshot(c.appliedIndex, &c.confState, ents[position].Data)
c.lastSnapBlockNum = appliedb
}
return
}
func (c *Chain) send(msgs []raftpb.Message) {
for _, msg := range msgs {
if msg.To == 0 {
continue
}
status := raft.SnapshotFinish
msgBytes := utils.MarshalOrPanic(&msg)
_, err := c.rpc.Step(msg.To, &orderer.StepRequest{Channel: c.support.ChainID(), Payload: msgBytes})
if err != nil {
// TODO We should call ReportUnreachable if message delivery fails
c.logger.Errorf("Failed to send StepRequest to %d, because: %s", msg.To, err)
status = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
c.node.ReportSnapshot(msg.To, status)
}
}
}
func (c *Chain) isConfig(env *common.Envelope) bool {
h, err := utils.ChannelHeader(env)
if err != nil {
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package etcdraft
import (
"time"
"code.cloudfoundry.org/clock"
"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/orderer/etcdraft"
"github.com/hyperledger/fabric/protos/utils"
)
type node struct {
chainID string
logger *flogging.FabricLogger
storage *RaftStorage
config *raft.Config
rpc RPC
chain *Chain
tickInterval time.Duration
clock clock.Clock
metadata *etcdraft.RaftMetadata
raft.Node
}
func (n *node) start(fresh, join bool) {
raftPeers := RaftPeers(n.metadata.Consenters)
if fresh {
if join {
raftPeers = nil
n.logger.Info("Starting raft node to join an existing channel")
} else {
n.logger.Info("Starting raft node as part of a new channel")
}
n.Node = raft.StartNode(n.config, raftPeers)
} else {
n.logger.Info("Restarting raft node")
n.Node = raft.RestartNode(n.config)
}
go n.run()
}
func (n *node) run() {
ticker := n.clock.NewTicker(n.tickInterval)
if s := n.storage.Snapshot(); !raft.IsEmptySnap(s) {
n.chain.snapC <- &s
}
for {
select {
case <-ticker.C():
n.Tick()
case rd := <-n.Ready():
if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
}
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
n.Advance()
// TODO(jay_guo) leader can write to disk in parallel with replicating
// to the followers and them writing to their disks. Check 10.2.1 in thesis
n.send(rd.Messages)
case <-n.chain.haltC:
ticker.Stop()
n.Stop()
n.storage.Close()
n.logger.Infof("Raft node stopped")
close(n.chain.doneC) // close after all the artifacts are closed
return
}
}
}
func (n *node) send(msgs []raftpb.Message) {
for _, msg := range msgs {
if msg.To == 0 {
continue
}
status := raft.SnapshotFinish
msgBytes := utils.MarshalOrPanic(&msg)
_, err := n.rpc.Step(msg.To, &orderer.StepRequest{Channel: n.chainID, Payload: msgBytes})
if err != nil {
// TODO We should call ReportUnreachable if message delivery fails
n.logger.Errorf("Failed to send StepRequest to %d, because: %s", msg.To, err)
status = raft.SnapshotFailure
}
if msg.Type == raftpb.MsgSnap {
n.ReportSnapshot(msg.To, status)
}
}
}
func (n *node) takeSnapshot(index uint64, cs *raftpb.ConfState, data []byte) {
if err := n.storage.TakeSnapshot(index, cs, data); err != nil {
n.logger.Panicf("Failed to create snapshot at index %d: %s", index, err)
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment