Unverified Commit e4060ed3 authored by Jay Guo's avatar Jay Guo Committed by Artem Barger
Browse files

[FAB-13455] Initialize BlockPuller on demand.



The creation of BlockPuller takes latest certificates, therefore
should be done on-demand to guarantee its validity.

Change-Id: I327275da495a85126feb58c84b460bed98f7b860
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 082a9102
......@@ -59,6 +59,10 @@ type BlockPuller interface {
Close()
}
// CreateBlockPuller is a function to create BlockPuller on demand.
// It is passed into chain initializer so that tests could mock this.
type CreateBlockPuller func() (BlockPuller, error)
// Options contains all the configurations relevant to the chain.
type Options struct {
RaftID uint64
......@@ -132,7 +136,8 @@ type Chain struct {
// needed by snapshotting
lastSnapBlockNum uint64
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
puller BlockPuller // Deliver client to pull blocks from other OSNs
createPuller CreateBlockPuller // func used to create BlockPuller on demand
fresh bool // indicate if this is a fresh raft node
......@@ -148,7 +153,7 @@ func NewChain(
opts Options,
conf Configurator,
rpc RPC,
puller BlockPuller,
f CreateBlockPuller,
observeC chan<- raft.SoftState) (*Chain, error) {
lg := opts.Logger.With("channel", support.ChainID(), "node", opts.RaftID)
......@@ -190,7 +195,7 @@ func NewChain(
fresh: fresh,
appliedIndex: opts.RaftMetadata.RaftIndex,
lastSnapBlockNum: snapBlkNum,
puller: puller,
createPuller: f,
clock: opts.Clock,
logger: lg,
opts: opts,
......@@ -688,12 +693,14 @@ func (c *Chain) catchUp(snap *raftpb.Snapshot) error {
return nil
}
defer func() {
c.puller.Close()
}()
puller, err := c.createPuller()
if err != nil {
return errors.Errorf("failed to create block puller: %s", err)
}
defer puller.Close()
for next <= b.Header.Number {
block := c.puller.PullBlock(next)
block := puller.PullBlock(next)
if block == nil {
return errors.Errorf("failed to fetch block %d from cluster", next)
}
......
......@@ -2607,7 +2607,14 @@ func newChain(timeout time.Duration, channel string, dataDir string, id uint64,
}
func (c *chain) init() {
ch, err := etcdraft.NewChain(c.support, c.opts, c.configurator, c.rpc, c.puller, c.observe)
ch, err := etcdraft.NewChain(
c.support,
c.opts,
c.configurator,
c.rpc,
func() (etcdraft.BlockPuller, error) { return c.puller, nil },
c.observe,
)
Expect(err).NotTo(HaveOccurred())
c.Chain = ch
}
......
......@@ -144,11 +144,6 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
return &inactive.Chain{Err: errors.Errorf("channel %s is not serviced by me", support.ChainID())}, nil
}
bp, err := newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster)
if err != nil {
return nil, errors.WithStack(err)
}
opts := Options{
RaftID: id,
Clock: clock.NewClock(),
......@@ -173,7 +168,14 @@ func (c *Consenter) HandleChain(support consensus.ConsenterSupport, metadata *co
Comm: c.Communication,
DestinationToStream: make(map[uint64]orderer.Cluster_SubmitClient),
}
return NewChain(support, opts, c.Communication, rpc, bp, nil)
return NewChain(
support,
opts,
c.Communication,
rpc,
func() (BlockPuller, error) { return newBlockPuller(support, c.Dialer, c.OrdererConfig.General.Cluster) },
nil,
)
}
// ReadRaftMetadata attempts to read raft metadata from block metadata, if available.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment