Unverified Commit 532b5382 authored by yacovm's avatar yacovm Committed by Artem Barger
Browse files

[FAB-13481] Make onboarding code more idiomatic

This change set addresses code review comments from
https://gerrit.hyperledger.org/r/#/c/28391/ and from
https://gerrit.hyperledger.org/r/#/c/28385/

,
in an attempt to make the orderer code more idiomatic.

Change-Id: I04ac7bc21ee8fc1ccda4e76d8afa53fe527f7f5e
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
Signed-off-by: default avatarArtem Barger <bartem@il.ibm.com>
parent 235bb3ac
......@@ -21,14 +21,17 @@ import (
// type bugs.
type BundleSource struct {
bundle atomic.Value
callbacks []func(*Bundle)
callbacks []BundleActor
}
// BundleActor performs an operation based on the given bundle
type BundleActor func(bundle *Bundle)
// NewBundleSource creates a new BundleSource with an initial Bundle value
// The callbacks will be invoked whenever the Update method is called for the
// BundleSource. Note, these callbacks are called immediately before this function
// returns.
func NewBundleSource(bundle *Bundle, callbacks ...func(*Bundle)) *BundleSource {
func NewBundleSource(bundle *Bundle, callbacks ...BundleActor) *BundleSource {
bs := &BundleSource{
callbacks: callbacks,
}
......
......@@ -499,7 +499,7 @@ func waitForBlockReception(o *nwo.Orderer, submitter *nwo.Peer, network *nwo.Net
return ""
}
return sessErr
}, time.Minute, time.Second).Should(BeEmpty())
}, network.EventuallyTimeout, time.Second).Should(BeEmpty())
}
func assertNoErrorsAreLogged(ordererRunners []*ginkgomon.Runner) {
......
......@@ -23,10 +23,14 @@ import (
)
const (
// RetryTimeout is the time the block puller retries
// RetryTimeout is the time the block puller retries.
RetryTimeout = time.Second * 10
)
// ChannelPredicate accepts channels according to their names.
type ChannelPredicate func(channelName string) bool
// AnyChannel accepts all channels.
func AnyChannel(_ string) bool {
return true
}
......@@ -78,7 +82,7 @@ type ChannelLister interface {
// Replicator replicates chains
type Replicator struct {
DoNotPanicIfClusterNotReachable bool
Filter func(string) bool
Filter ChannelPredicate
SystemChannel string
ChannelLister ChannelLister
Logger *flogging.FabricLogger
......@@ -167,7 +171,7 @@ func (r *Replicator) discoverChannels() []ChannelGenesisBlock {
// and commits it to the ledger.
func (r *Replicator) PullChannel(channel string) error {
if !r.Filter(channel) {
r.Logger.Info("Channel", channel, "shouldn't be pulled. Skipping it")
r.Logger.Infof("Channel %s shouldn't be pulled. Skipping it", channel)
return ErrSkipped
}
r.Logger.Info("Pulling channel", channel)
......@@ -257,7 +261,7 @@ type channelPullHints struct {
}
func (r *Replicator) channelsToPull(channels GenesisBlocks) channelPullHints {
r.Logger.Info("Will now attempt to pull channels:", channels.Names())
r.Logger.Info("Evaluating channels to pull:", channels.Names())
var channelsNotToPull []ChannelGenesisBlock
var channelsToPull []ChannelGenesisBlock
for _, channel := range channels {
......
......@@ -11,7 +11,6 @@ package multichannel
import (
"fmt"
"reflect"
"sync"
"github.com/hyperledger/fabric/common/channelconfig"
......@@ -102,7 +101,7 @@ type Registrar struct {
systemChannelID string
systemChannel *ChainSupport
templator msgprocessor.ChannelConfigTemplator
callbacks []func(bundle *channelconfig.Bundle)
callbacks []channelconfig.BundleActor
}
// ConfigBlock retrieves the last configuration block from the given ledger.
......@@ -127,7 +126,7 @@ func configTx(reader blockledger.Reader) *cb.Envelope {
// NewRegistrar produces an instance of a *Registrar.
func NewRegistrar(ledgerFactory blockledger.Factory,
signer crypto.LocalSigner, metricsProvider metrics.Provider, callbacks ...func(bundle *channelconfig.Bundle)) *Registrar {
signer crypto.LocalSigner, metricsProvider metrics.Provider, callbacks ...channelconfig.BundleActor) *Registrar {
r := &Registrar{
chains: make(map[string]*ChainSupport),
ledgerFactory: ledgerFactory,
......@@ -290,8 +289,8 @@ func (r *Registrar) CreateChain(chainName string) {
}
chain := r.GetChain(chainName)
if chain != nil {
logger.Infof("A chain of type %v for channel %s already exists. "+
"Halting it.", reflect.TypeOf(chain.Chain), chainName)
logger.Infof("A chain of type %T for channel %s already exists. "+
"Halting it.", chain.Chain, chainName)
chain.Halt()
}
r.newChain(configTx(lf))
......
......@@ -201,7 +201,6 @@ func TestCreateChain(t *testing.T) {
assert.False(t, ok)
// The new chain is not halted: Close the channel to prove that.
close(chain2.Chain.(*mockChain).queue)
}
// This test brings up the entire system, with the mock consenter, including the broadcasters etc. and creates a new chain
......
......@@ -510,7 +510,8 @@ func initializeMultichannelRegistrar(
metricsProvider metrics.Provider,
healthChecker healthChecker,
lf blockledger.Factory,
callbacks ...func(bundle *channelconfig.Bundle)) *multichannel.Registrar {
callbacks ...channelconfig.BundleActor,
) *multichannel.Registrar {
genesisBlock := extractBootstrapBlock(conf)
// Are we bootstrapping?
if len(lf.ChainIDs()) == 0 {
......@@ -536,7 +537,8 @@ func initializeMultichannelRegistrar(
return registrar
}
func initializeEtcdraftConsenter(consenters map[string]consensus.Consenter,
func initializeEtcdraftConsenter(
consenters map[string]consensus.Consenter,
conf *localconfig.TopLevel,
lf blockledger.Factory,
clusterDialer *cluster.PredicateDialer,
......@@ -544,7 +546,8 @@ func initializeEtcdraftConsenter(consenters map[string]consensus.Consenter,
ri *replicationInitiator,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
registrar *multichannel.Registrar) {
registrar *multichannel.Registrar,
) {
replicationRefreshInterval := conf.General.Cluster.ReplicationBackgroundRefreshInterval
if replicationRefreshInterval == 0 {
replicationRefreshInterval = defaultReplicationBackgroundRefreshInterval
......@@ -563,10 +566,11 @@ func initializeEtcdraftConsenter(consenters map[string]consensus.Consenter,
}
exponentialSleep := exponentialDurationSeries(replicationBackgroundInitialRefreshInterval, replicationRefreshInterval)
ticker := newTicker(exponentialSleep)
icr := &inactiveChainReplicator{
logger: logger,
scheduleChan: makeTickChannel(exponentialSleep, time.Sleep),
scheduleChan: ticker.C,
quitChan: make(chan struct{}),
replicator: ri,
chains2CreationCallbacks: make(map[string]chainCreation),
......
......@@ -166,7 +166,7 @@ type chainCreation struct {
// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be activated.
func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common.Block, createChainCallback func()) {
func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common.Block, createChain etcdraft.CreateChainCallback) {
if genesisBlock == nil {
dc.logger.Panicf("Called with a nil genesis block")
}
......@@ -175,7 +175,7 @@ func (dc *inactiveChainReplicator) TrackChain(chain string, genesisBlock *common
dc.logger.Infof("Adding %s to the set of chains to track", chain)
dc.chains2CreationCallbacks[chain] = chainCreation{
genesisBlock: genesisBlock,
create: createChainCallback,
create: createChain,
}
}
......@@ -222,37 +222,3 @@ func (dc *inactiveChainReplicator) listInactiveChains() []string {
}
return chains
}
func exponentialDurationSeries(initialDuration, maxDuration time.Duration) func() time.Duration {
exp := &exponentialDuration{
n: initialDuration,
max: maxDuration,
}
return exp.next
}
type exponentialDuration struct {
n time.Duration
max time.Duration
}
func (exp *exponentialDuration) next() time.Duration {
n := exp.n
exp.n *= 2
if exp.n > exp.max {
exp.n = exp.max
}
return n
}
func makeTickChannel(computeSleepDuration func() time.Duration, sleep func(time.Duration)) <-chan time.Time {
c := make(chan time.Time)
go func() {
for {
sleep(computeSleepDuration())
c <- time.Now()
}
}()
return c
}
......@@ -24,6 +24,7 @@ import (
ramledger "github.com/hyperledger/fabric/common/ledger/blockledger/ram"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/orderer/common/cluster"
"github.com/hyperledger/fabric/orderer/common/cluster/mocks"
"github.com/hyperledger/fabric/orderer/common/localconfig"
server_mocks "github.com/hyperledger/fabric/orderer/common/server/mocks"
......@@ -702,7 +703,7 @@ func TestReplicate(t *testing.T) {
func TestInactiveChainReplicator(t *testing.T) {
for _, testCase := range []struct {
name string
description string
chainsTracked []string
ReplicateChainsExpectedInput1 []string
ReplicateChainsExpectedInput1Reverse []string
......@@ -714,10 +715,10 @@ func TestInactiveChainReplicator(t *testing.T) {
genesisBlock *common.Block
}{
{
name: "no chains tracked",
description: "no chains tracked",
},
{
name: "some chains tracked, but not all succeed replication",
description: "some chains tracked, but not all succeed replication",
chainsTracked: []string{"foo", "bar"},
ReplicateChainsExpectedInput1: []string{"foo", "bar"},
ReplicateChainsExpectedInput1Reverse: []string{"bar", "foo"},
......@@ -729,7 +730,7 @@ func TestInactiveChainReplicator(t *testing.T) {
genesisBlock: &common.Block{},
},
{
name: "some chains tracked, and all succeed replication but on 2nd pass",
description: "some chains tracked, and all succeed replication but on 2nd pass",
chainsTracked: []string{"foo", "bar"},
ReplicateChainsExpectedInput1: []string{"foo", "bar"},
ReplicateChainsExpectedInput1Reverse: []string{"bar", "foo"},
......@@ -741,7 +742,7 @@ func TestInactiveChainReplicator(t *testing.T) {
genesisBlock: &common.Block{},
},
} {
t.Run(testCase.name, func(t *testing.T) {
t.Run(testCase.description, func(t *testing.T) {
scheduler := make(chan time.Time)
replicator := &server_mocks.ChainReplicator{}
icr := &inactiveChainReplicator{
......@@ -801,6 +802,17 @@ func TestInactiveChainReplicator(t *testing.T) {
}
}
func TestInactiveChainReplicatorChannels(t *testing.T) {
icr := &inactiveChainReplicator{
logger: flogging.MustGetLogger("test"),
chains2CreationCallbacks: make(map[string]chainCreation),
}
icr.TrackChain("foo", &common.Block{}, func() {})
assert.Equal(t, []cluster.ChannelGenesisBlock{{ChannelName: "foo", GenesisBlock: &common.Block{}}}, icr.Channels())
icr.Close()
}
func TestTrackChainNilGenesisBlock(t *testing.T) {
icr := &inactiveChainReplicator{
logger: flogging.MustGetLogger("test"),
......@@ -860,42 +872,3 @@ func injectOrdererEndpoint(t *testing.T, block *common.Block, endpoint string) {
block.Data.Data[0] = utils.MarshalOrPanic(env)
block.Header.DataHash = block.Data.Hash()
}
func TestExponentialDuration(t *testing.T) {
exp := exponentialDurationSeries(time.Millisecond*100, time.Second)
prev := exp()
for i := 0; i < 3; i++ {
n := exp()
assert.Equal(t, prev*2, n)
prev = n
assert.True(t, n < time.Second)
}
for i := 0; i < 10; i++ {
assert.Equal(t, time.Second, exp())
}
}
func TestMakeTickChannel(t *testing.T) {
sleepDurations := make(chan time.Duration, 100)
fakeSleep := func(d time.Duration) {
if d == time.Millisecond*16 {
return
}
// Fake a sleep by putting the duration we sleep
// into the waitTimes channel
sleepDurations <- d
}
exp := exponentialDurationSeries(time.Millisecond, time.Millisecond*16)
c := makeTickChannel(exp, fakeSleep)
for expectedSleepTime := time.Millisecond; expectedSleepTime <= time.Millisecond*8; expectedSleepTime *= 2 {
// Wait for tick
<-c
// See how much time we slept
sleptTime := <-sleepDurations
assert.Equal(t, expectedSleepTime, sleptTime)
}
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package server
import "time"
type durationSeries func() time.Duration
// ticker has a channel that will send the
// time with intervals computed by the durationSeries.
type ticker struct {
stopped bool
C <-chan time.Time
nextInterval durationSeries
stopChan chan struct{}
}
// newTicker returns a channel that sends the time at periods
// specified by the given durationSeries.
func newTicker(nextInterval durationSeries) *ticker {
c := make(chan time.Time)
ticker := &ticker{
stopChan: make(chan struct{}),
C: c,
nextInterval: nextInterval,
}
go func() {
defer close(c)
ticker.run(c)
}()
return ticker
}
func (t *ticker) run(c chan<- time.Time) {
for {
if t.stopped {
return
}
select {
case <-time.After(t.nextInterval()):
t.tick(c)
case <-t.stopChan:
return
}
}
}
func (t *ticker) tick(c chan<- time.Time) {
select {
case c <- time.Now():
case <-t.stopChan:
t.stopped = true
}
}
func (t *ticker) stop() {
close(t.stopChan)
}
func exponentialDurationSeries(initialDuration, maxDuration time.Duration) func() time.Duration {
exp := &exponentialDuration{
n: initialDuration,
max: maxDuration,
}
return exp.next
}
type exponentialDuration struct {
n time.Duration
max time.Duration
}
func (exp *exponentialDuration) next() time.Duration {
n := exp.n
exp.n *= 2
if exp.n > exp.max {
exp.n = exp.max
}
return n
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package server
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestExponentialDuration(t *testing.T) {
t.Parallel()
exp := exponentialDurationSeries(time.Millisecond*100, time.Second)
prev := exp()
for i := 0; i < 3; i++ {
n := exp()
assert.Equal(t, prev*2, n)
prev = n
assert.True(t, n < time.Second)
}
for i := 0; i < 10; i++ {
assert.Equal(t, time.Second, exp())
}
}
func TestTicker(t *testing.T) {
t.Parallel()
everyMillis := func() time.Duration {
return time.Millisecond
}
t.Run("Stop ticker serially", func(t *testing.T) {
ticker := newTicker(everyMillis)
for i := 0; i < 10; i++ {
<-ticker.C
}
ticker.stop()
// Ensure no more ticks are received
_, ok := <-ticker.C
assert.False(t, ok)
})
t.Run("Stop ticker concurrently", func(t *testing.T) {
ticker := newTicker(func() time.Duration {
return time.Millisecond
})
var tickerStopped sync.WaitGroup
tickerStopped.Add(1)
go func() {
defer tickerStopped.Done()
time.Sleep(time.Millisecond * 50)
ticker.stop()
<-ticker.C
}()
tickerStopped.Wait()
_, ok := <-ticker.C
assert.False(t, ok)
})
}
......@@ -29,13 +29,16 @@ import (
"github.com/pkg/errors"
)
// CreateChainCallback creates a new chain
type CreateChainCallback func()
//go:generate mockery -dir . -name InactiveChainRegistry -case underscore -output mocks
// InactiveChainRegistry registers chains that are inactive
type InactiveChainRegistry interface {
// TrackChain tracks a chain with the given name, and calls the given callback
// when this chain should be activated.
TrackChain(chainName string, genesisBlock *common.Block, createChainCallback func())
// when this chain should be created.
TrackChain(chainName string, genesisBlock *common.Block, createChain CreateChainCallback)
}
//go:generate mockery -dir . -name ChainGetter -case underscore -output mocks
......@@ -197,8 +200,14 @@ func ReadRaftMetadata(blockMetadata *common.Metadata, configMetadata *etcdraft.M
}
// New creates a etcdraft Consenter
func New(clusterDialer *cluster.PredicateDialer, conf *localconfig.TopLevel,
srvConf comm.ServerConfig, srv *comm.GRPCServer, r *multichannel.Registrar, icr InactiveChainRegistry) *Consenter {
func New(
clusterDialer *cluster.PredicateDialer,
conf *localconfig.TopLevel,
srvConf comm.ServerConfig,
srv *comm.GRPCServer,
r *multichannel.Registrar,
icr InactiveChainRegistry,
) *Consenter {
logger := flogging.MustGetLogger("orderer.consensus.etcdraft")
var cfg Config
......
......@@ -3,7 +3,7 @@
package mocks
import common "github.com/hyperledger/fabric/protos/common"
import etcdraft "github.com/hyperledger/fabric/orderer/consensus/etcdraft"
import mock "github.com/stretchr/testify/mock"
// InactiveChainRegistry is an autogenerated mock type for the InactiveChainRegistry type
......@@ -11,7 +11,7 @@ type InactiveChainRegistry struct {
mock.Mock
}
// TrackChain provides a mock function with given fields: chainName, genesisBlock, createChainCallback
func (_m *InactiveChainRegistry) TrackChain(chainName string, genesisBlock *common.Block, createChainCallback func()) {
_m.Called(chainName, genesisBlock, createChainCallback)
// TrackChain provides a mock function with given fields: chainName, genesisBlock, createChain
func (_m *InactiveChainRegistry) TrackChain(chainName string, genesisBlock *common.Block, createChain etcdraft.CreateChainCallback) {
_m.Called(chainName, genesisBlock, createChain)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment