Commit b95fda38 authored by Gari Singh's avatar Gari Singh Committed by Gerrit Code Review
Browse files

Merge "[FAB-5266] Replace Enqueue with Order/Configure"

parents 2b63ca49 3e5c3e44
......@@ -53,8 +53,13 @@ type SupportManager interface {
// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error
// Filters returns the set of broadcast filters for this chain
Filters() *filter.RuleSet
......@@ -102,6 +107,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
isConfig := false
configUpdateMsg := msg
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
......@@ -126,6 +133,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
isConfig = true
}
support, ok := bh.sm.GetChain(chdr.ChannelId)
......@@ -144,7 +153,14 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
if !support.Enqueue(msg) {
// XXX temporary hack to mesh interface definitions, will remove.
if isConfig {
err = support.Configure(configUpdateMsg, msg, 0)
} else {
err = support.Order(msg, 0)
}
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
......
......@@ -127,9 +127,17 @@ func (ms *mockSupport) Filters() *filter.RuleSet {
return ms.filters
}
// Enqueue sends a message for ordering
func (ms *mockSupport) Enqueue(env *cb.Envelope) bool {
return !ms.rejectEnqueue
// Order sends a message for ordering
func (ms *mockSupport) Order(env *cb.Envelope, configSeq uint64) error {
if ms.rejectEnqueue {
return fmt.Errorf("Reject")
}
return nil
}
// Configure sends a reconfiguration message for ordering
func (ms *mockSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return ms.Order(config, configSeq)
}
func makeConfigMessage(chainID string) *cb.Envelope {
......@@ -264,9 +272,9 @@ func TestGoodConfigUpdate(t *testing.T) {
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
newChannelId := "New Chain"
newChannelID := "New Chain"
m.recvChan <- makeConfigMessage(newChannelId)
m.recvChan <- makeConfigMessage(newChannelID)
reply := <-m.sendChan
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
}
......@@ -301,9 +309,9 @@ func TestRejected(t *testing.T) {
defer close(m.recvChan)
go bh.Handle(m)
newChannelId := "New Chain"
newChannelID := "New Chain"
m.recvChan <- makeConfigMessage(newChannelId)
m.recvChan <- makeConfigMessage(newChannelID)
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE")
}
......
......@@ -197,9 +197,14 @@ func (cs *ChainSupport) Reader() ledger.Reader {
return cs.ledger
}
// Enqueue takes a message and sends it to the consenter for ordering.
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
// Order passes through to the Consenter implementation.
func (cs *ChainSupport) Order(env *cb.Envelope, configSeq uint64) error {
return cs.chain.Order(env, configSeq)
}
// Configure passes through to the Consenter implementation.
func (cs *ChainSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return cs.chain.Configure(configUpdate, config, configSeq)
}
// Errored returns whether the backing consenter has errored
......
......@@ -178,7 +178,7 @@ func TestManagerImpl(t *testing.T) {
}
for _, message := range messages {
chainSupport.Enqueue(message)
chainSupport.Order(message, 0)
}
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
......@@ -489,7 +489,7 @@ func TestNewChain(t *testing.T) {
chainSupport, ok := manager.GetChain(manager.SystemChannelID())
assert.True(t, ok, "Could not find system channel")
chainSupport.Enqueue(wrapped)
chainSupport.Configure(wrapped, wrapped, 0)
func() {
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
......@@ -521,7 +521,7 @@ func TestNewChain(t *testing.T) {
}
for _, message := range messages {
chainSupport.Enqueue(message)
chainSupport.Order(message, 0)
}
it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})
......
......@@ -54,9 +54,14 @@ func (mch *mockChain) Errored() <-chan struct{} {
return nil
}
func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
func (mch *mockChain) Order(env *cb.Envelope, configSeq uint64) error {
mch.queue <- env
return true
return nil
}
func (mch *mockChain) Configure(configUpdate, config *cb.Envelope, configSeq uint64) error {
mch.queue <- config
return nil
}
func (mch *mockChain) Start() {
......
......@@ -32,8 +32,27 @@ type Consenter interface {
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on failure.
Enqueue(env *cb.Envelope) bool
// NOTE: The solo/kafka consenters have not been updated to perform the revalidation
// checks conditionally. For now, Order/Configure are essentially Enqueue as before.
// This does not cause data inconsistency, but it wastes cycles and will be required
// to properly support the ConfigUpdate concept once introduced
// Order accepts a message which has been processed at a given configSeq.
// If the configSeq advances, it is the responsibility of the consenter
// to revalidate and potentially discard the message
// The consenter may return an error, indicating the message was not accepted
Order(env *cb.Envelope, configSeq uint64) error
// Configure accepts a message which reconfigures the channel and will
// trigger an update to the configSeq if committed. The configuration must have
// been triggered by a ConfigUpdate message, which is included. If the config
// sequence advances, it is the responsibility of the consenter to recompute the
// resulting config, discarding the message if the reconfiguration is no longer
// valid. While a configure message is in flight, the consenter should lock
// and block additional calls to Order/Configure, any messages received will
// need to be revalidated before ordering.
// The consenter may return an error, indicating the message was not accepted
Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error
// Errored returns a channel which will close when an error has occurred.
// This is especially useful for the Deliver client, who must terminate waiting
......
......@@ -112,9 +112,21 @@ func (chain *chainImpl) Halt() {
}
}
// Enqueue accepts a message and returns true on acceptance, or false otheriwse.
// Implements the consensus.Chain interface. Called by Broadcast().
func (chain *chainImpl) Enqueue(env *cb.Envelope) bool {
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
if !chain.enqueue(env) {
return fmt.Errorf("Could not enqueue")
}
return nil
}
// Implements the consensus.Chain interface. Called by Broadcast().
func (chain *chainImpl) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return chain.Order(config, configSeq)
}
// enqueue accepts a message and returns true on acceptance, or false otheriwse.
func (chain *chainImpl) enqueue(env *cb.Envelope) bool {
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID())
select {
case <-chain.startChan: // The Start phase has completed
......
......@@ -195,7 +195,7 @@ func TestChain(t *testing.T) {
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
})
t.Run("EnqueueIfNotStarted", func(t *testing.T) {
t.Run("enqueueIfNotStarted", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -216,7 +216,7 @@ func TestChain(t *testing.T) {
SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message),
})
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
t.Run("StartWithConsumerForChannelError", func(t *testing.T) {
......@@ -247,7 +247,7 @@ func TestChain(t *testing.T) {
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
})
t.Run("EnqueueProper", func(t *testing.T) {
t.Run("enqueueProper", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -273,14 +273,14 @@ func TestChain(t *testing.T) {
t.Fatal("startChan should have been closed by now")
}
// Enqueue should have access to the post path, and its ProduceRequest
// enqueue should have access to the post path, and its ProduceRequest
// should go by without error
assert.True(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return true")
assert.True(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return true")
chain.Halt()
})
t.Run("EnqueueIfHalted", func(t *testing.T) {
t.Run("enqueueIfHalted", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -308,10 +308,10 @@ func TestChain(t *testing.T) {
chain.Halt()
// haltChan should close access to the post path
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
t.Run("EnqueueError", func(t *testing.T) {
t.Run("enqueueError", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
......@@ -345,7 +345,7 @@ func TestChain(t *testing.T) {
SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition),
})
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
}
......
......@@ -165,7 +165,7 @@ func setupTestLogging(logLevel string, verbose bool) {
// Taken from orderer/solo/consensus_test.go
func syncQueueMessage(message *cb.Envelope, chain *chainImpl, mockBlockcutter *mockblockcutter.Receiver) {
chain.Enqueue(message)
chain.enqueue(message)
mockBlockcutter.Block <- struct{}{} // We'll move past this line (and the function will return) only when the mock blockcutter is about to return
}
......
......@@ -17,6 +17,7 @@ limitations under the License.
package solo
import (
"fmt"
"time"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
......@@ -37,7 +38,7 @@ type chain struct {
// New creates a new consenter for the solo consensus scheme.
// The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process).
// It accepts messages being delivered via Enqueue, orders them, and then uses the blockcutter to form the messages
// It accepts messages being delivered via Order/Configure, orders them, and then uses the blockcutter to form the messages
// into blocks before writing to the given ledger
func New() consensus.Consenter {
return &consenter{}
......@@ -68,16 +69,22 @@ func (ch *chain) Halt() {
}
}
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
func (ch *chain) Enqueue(env *cb.Envelope) bool {
// Order accepts normal messages for ordering
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
select {
case ch.sendChan <- env:
return true
return nil
case <-ch.exitChan:
return false
return fmt.Errorf("Exiting")
}
}
// Order accepts normal messages for ordering
func (ch *chain) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
// TODO, handle this specially
return ch.Order(config, configSeq)
}
// Errored only closes on exit
func (ch *chain) Errored() <-chan struct{} {
return ch.exitChan
......
......@@ -37,7 +37,7 @@ func init() {
var testMessage = &cb.Envelope{Payload: []byte("TEST_MESSAGE")}
func syncQueueMessage(msg *cb.Envelope, chain *chain, bc *mockblockcutter.Receiver) {
chain.Enqueue(msg)
chain.Order(msg, 0)
bc.Block <- struct{}{}
}
......@@ -91,7 +91,7 @@ func TestStart(t *testing.T) {
defer bs.Halt()
support.BlockCutterVal.CutNext = true
bs.Enqueue(testMessage)
assert.Nil(t, bs.Order(testMessage, 0))
select {
case <-support.Blocks:
case <-bs.Errored():
......@@ -99,7 +99,7 @@ func TestStart(t *testing.T) {
}
}
func TestEnqueueAfterHalt(t *testing.T) {
func TestOrderAfterHalt(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichannel.ConsenterSupport{
Blocks: make(chan *cb.Block),
......@@ -109,7 +109,7 @@ func TestEnqueueAfterHalt(t *testing.T) {
defer close(support.BlockCutterVal.Block)
bs := newChain(support)
bs.Halt()
assert.False(t, bs.Enqueue(testMessage), "Enqueue should not be accepted after halt")
assert.NotNil(t, bs.Order(testMessage, 0), "Order should not be accepted after halt")
select {
case <-bs.Errored():
default:
......@@ -253,7 +253,7 @@ func TestConfigMsg(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
support.ClassifyMsgVal = msgprocessor.ConfigUpdateMsg
bs.Enqueue(testMessage)
assert.Nil(t, bs.Order(testMessage, 0))
select {
case <-support.Blocks:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment