Commit 3e5c3e44 authored by Jason Yellick's avatar Jason Yellick
Browse files

[FAB-5266] Replace Enqueue with Order/Configure



The current consenter interface only allows for one sort of message
ingress.  All messages are received via 'Enqueue', and treated
identically.

In order for the consenter to be able to differentiate and handle config
vs non-config messages differently, the consenter needs two diferent
ingress points for messages.

This CR replaces the Enqueue method with two methods: Order and
Configure.

For the time being, these methods behave exactly as Enqueue, but will be
leveraged in future CRs.

Change-Id: I3701e5e3c0de4833a455c49acebbad70c6ed763c
Signed-off-by: default avatarJason Yellick <jyellick@us.ibm.com>
parent ed9517ea
......@@ -53,8 +53,13 @@ type SupportManager interface {
// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error
// Filters returns the set of broadcast filters for this chain
Filters() *filter.RuleSet
......@@ -102,6 +107,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
isConfig := false
configUpdateMsg := msg
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
......@@ -126,6 +133,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
isConfig = true
}
support, ok := bh.sm.GetChain(chdr.ChannelId)
......@@ -144,7 +153,14 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
if !support.Enqueue(msg) {
// XXX temporary hack to mesh interface definitions, will remove.
if isConfig {
err = support.Configure(configUpdateMsg, msg, 0)
} else {
err = support.Order(msg, 0)
}
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
......
......@@ -127,9 +127,17 @@ func (ms *mockSupport) Filters() *filter.RuleSet {
return ms.filters
}
// Enqueue sends a message for ordering
func (ms *mockSupport) Enqueue(env *cb.Envelope) bool {
return !ms.rejectEnqueue
// Order sends a message for ordering
func (ms *mockSupport) Order(env *cb.Envelope, configSeq uint64) error {
if ms.rejectEnqueue {
return fmt.Errorf("Reject")
}
return nil
}
// Configure sends a reconfiguration message for ordering
func (ms *mockSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return ms.Order(config, configSeq)
}
func makeConfigMessage(chainID string) *cb.Envelope {
......@@ -264,9 +272,9 @@ func TestGoodConfigUpdate(t *testing.T) {
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
newChannelId := "New Chain"
newChannelID := "New Chain"
m.recvChan <- makeConfigMessage(newChannelId)
m.recvChan <- makeConfigMessage(newChannelID)
reply := <-m.sendChan
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
}
......@@ -301,9 +309,9 @@ func TestRejected(t *testing.T) {
defer close(m.recvChan)
go bh.Handle(m)
newChannelId := "New Chain"
newChannelID := "New Chain"
m.recvChan <- makeConfigMessage(newChannelId)
m.recvChan <- makeConfigMessage(newChannelID)
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE")
}
......
......@@ -197,9 +197,14 @@ func (cs *ChainSupport) Reader() ledger.Reader {
return cs.ledger
}
// Enqueue takes a message and sends it to the consenter for ordering.
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
// Order passes through to the Consenter implementation.
func (cs *ChainSupport) Order(env *cb.Envelope, configSeq uint64) error {
return cs.chain.Order(env, configSeq)
}
// Configure passes through to the Consenter implementation.
func (cs *ChainSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return cs.chain.Configure(configUpdate, config, configSeq)
}
// Errored returns whether the backing consenter has errored
......
......@@ -178,7 +178,7 @@ func TestManagerImpl(t *testing.T) {
}
for _, message := range messages {
chainSupport.Enqueue(message)
chainSupport.Order(message, 0)
}
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
......@@ -489,7 +489,7 @@ func TestNewChain(t *testing.T) {
chainSupport, ok := manager.GetChain(manager.SystemChannelID())
assert.True(t, ok, "Could not find system channel")
chainSupport.Enqueue(wrapped)
chainSupport.Configure(wrapped, wrapped, 0)
func() {
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
......@@ -521,7 +521,7 @@ func TestNewChain(t *testing.T) {
}
for _, message := range messages {
chainSupport.Enqueue(message)
chainSupport.Order(message, 0)
}
it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})
......
......@@ -54,9 +54,14 @@ func (mch *mockChain) Errored() <-chan struct{} {
return nil
}
func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
func (mch *mockChain) Order(env *cb.Envelope, configSeq uint64) error {
mch.queue <- env
return true
return nil
}
func (mch *mockChain) Configure(configUpdate, config *cb.Envelope, configSeq uint64) error {
mch.queue <- config
return nil
}
func (mch *mockChain) Start() {
......
......@@ -32,8 +32,27 @@ type Consenter interface {
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on failure.
Enqueue(env *cb.Envelope) bool
// NOTE: The solo/kafka consenters have not been updated to perform the revalidation
// checks conditionally. For now, Order/Configure are essentially Enqueue as before.
// This does not cause data inconsistency, but it wastes cycles and will be required
// to properly support the ConfigUpdate concept once introduced
// Order accepts a message which has been processed at a given configSeq.
// If the configSeq advances, it is the responsibility of the consenter
// to revalidate and potentially discard the message
// The consenter may return an error, indicating the message was not accepted
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a message which reconfigures the channel and will
// trigger an update to the configSeq if committed. The configuration must have
// been triggered by a ConfigUpdate message, which is included. If the config
// sequence advances, it is the responsibility of the consenter to recompute the
// resulting config, discarding the message if the reconfiguration is no longer
// valid. While a configure message is in flight, the consenter should lock
// and block additional calls to Order/Configure, any messages received will
// need to be revalidated before ordering.
// The consenter may return an error, indicating the message was not accepted
Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error
// Errored returns a channel which will close when an error has occurred.
// This is especially useful for the Deliver client, who must terminate waiting
......
......@@ -112,9 +112,21 @@ func (chain *chainImpl) Halt() {
}
}
// Enqueue accepts a message and returns true on acceptance, or false otheriwse.
// Implements the consensus.Chain interface. Called by Broadcast().
func (chain *chainImpl) Enqueue(env *cb.Envelope) bool {
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
if !chain.enqueue(env) {
return fmt.Errorf("Could not enqueue")
}
return nil
}
// Implements the consensus.Chain interface. Called by Broadcast().
func (chain *chainImpl) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return chain.Order(config, configSeq)
}
// enqueue accepts a message and returns true on acceptance, or false otheriwse.
func (chain *chainImpl) enqueue(env *cb.Envelope) bool {
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID())
select {
case <-chain.startChan: // The Start phase has completed
......
......@@ -195,7 +195,7 @@ func TestChain(t *testing.T) {
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
})
t.Run("EnqueueIfNotStarted", func(t *testing.T) {
t.Run("enqueueIfNotStarted", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -216,7 +216,7 @@ func TestChain(t *testing.T) {
SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message),
})
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
t.Run("StartWithConsumerForChannelError", func(t *testing.T) {
......@@ -247,7 +247,7 @@ func TestChain(t *testing.T) {
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
})
t.Run("EnqueueProper", func(t *testing.T) {
t.Run("enqueueProper", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -273,14 +273,14 @@ func TestChain(t *testing.T) {
t.Fatal("startChan should have been closed by now")
}
// Enqueue should have access to the post path, and its ProduceRequest
// enqueue should have access to the post path, and its ProduceRequest
// should go by without error
assert.True(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return true")
assert.True(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return true")
chain.Halt()
})
t.Run("EnqueueIfHalted", func(t *testing.T) {
t.Run("enqueueIfHalted", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -308,10 +308,10 @@ func TestChain(t *testing.T) {
chain.Halt()
// haltChan should close access to the post path
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
t.Run("EnqueueError", func(t *testing.T) {
t.Run("enqueueError", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -345,7 +345,7 @@ func TestChain(t *testing.T) {
SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition),
})
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
}
......
......@@ -165,7 +165,7 @@ func setupTestLogging(logLevel string, verbose bool) {
// Taken from orderer/solo/consensus_test.go
func syncQueueMessage(message *cb.Envelope, chain *chainImpl, mockBlockcutter *mockblockcutter.Receiver) {
chain.Enqueue(message)
chain.enqueue(message)
mockBlockcutter.Block <- struct{}{} // We'll move past this line (and the function will return) only when the mock blockcutter is about to return
}
......
......@@ -17,6 +17,7 @@ limitations under the License.
package solo
import (
"fmt"
"time"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
......@@ -37,7 +38,7 @@ type chain struct {
// New creates a new consenter for the solo consensus scheme.
// The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process).
// It accepts messages being delivered via Enqueue, orders them, and then uses the blockcutter to form the messages
// It accepts messages being delivered via Order/Configure, orders them, and then uses the blockcutter to form the messages
// into blocks before writing to the given ledger
func New() consensus.Consenter {
return &consenter{}
......@@ -68,16 +69,22 @@ func (ch *chain) Halt() {
}
}
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
func (ch *chain) Enqueue(env *cb.Envelope) bool {
// Order accepts normal messages for ordering
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
select {
case ch.sendChan <- env:
return true
return nil
case <-ch.exitChan:
return false
return fmt.Errorf("Exiting")
}
}
// Order accepts normal messages for ordering
func (ch *chain) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
// TODO, handle this specially
return ch.Order(config, configSeq)
}
// Errored only closes on exit
func (ch *chain) Errored() <-chan struct{} {
return ch.exitChan
......
......@@ -37,7 +37,7 @@ func init() {
var testMessage = &cb.Envelope{Payload: []byte("TEST_MESSAGE")}
func syncQueueMessage(msg *cb.Envelope, chain *chain, bc *mockblockcutter.Receiver) {
chain.Enqueue(msg)
chain.Order(msg, 0)
bc.Block <- struct{}{}
}
......@@ -91,7 +91,7 @@ func TestStart(t *testing.T) {
defer bs.Halt()
support.BlockCutterVal.CutNext = true
bs.Enqueue(testMessage)
assert.Nil(t, bs.Order(testMessage, 0))
select {
case <-support.Blocks:
case <-bs.Errored():
......@@ -99,7 +99,7 @@ func TestStart(t *testing.T) {
}
}
func TestEnqueueAfterHalt(t *testing.T) {
func TestOrderAfterHalt(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichannel.ConsenterSupport{
Blocks: make(chan *cb.Block),
......@@ -109,7 +109,7 @@ func TestEnqueueAfterHalt(t *testing.T) {
defer close(support.BlockCutterVal.Block)
bs := newChain(support)
bs.Halt()
assert.False(t, bs.Enqueue(testMessage), "Enqueue should not be accepted after halt")
assert.NotNil(t, bs.Order(testMessage, 0), "Order should not be accepted after halt")
select {
case <-bs.Errored():
default:
......@@ -253,7 +253,7 @@ func TestConfigMsg(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
support.ClassifyMsgVal = msgprocessor.ConfigUpdateMsg
bs.Enqueue(testMessage)
assert.Nil(t, bs.Order(testMessage, 0))
select {
case <-support.Blocks:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment