Commit f37c8da2 authored by Srinivasan Muralidharan's avatar Srinivasan Muralidharan Committed by Gerrit Code Review
Browse files

Merge "[FAB-5267] Switch Broadcast filter w/ msgprocessor"

parents 74f1a17e f5e25a3d
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Copyright IBM Corp. All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
SPDX-License-Identifier: Apache-2.0
*/
package broadcast
import (
"github.com/hyperledger/fabric/orderer/common/filter"
"io"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/op/go-logging"
"io"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/protos/utils"
)
var logger = logging.MustGetLogger("orderer/common/broadcast")
// ConfigUpdateProcessor is used to transform CONFIG_UPDATE transactions which are used to generate other envelope
// message types with preprocessing by the orderer
type ConfigUpdateProcessor interface {
// Process transforms an envelope of type CONFIG_UPDATE to another type
Process(envConfigUpdate *cb.Envelope) (*cb.Envelope, error)
}
// Handler defines an interface which handles broadcasts
type Handler interface {
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
Handle(srv ab.AtomicBroadcast_BroadcastServer) error
}
// SupportManager provides a way for the Handler to look up the Support for a chain
type SupportManager interface {
ConfigUpdateProcessor
// ChannelSupportRegistrar provides a way for the Handler to look up the Support for a chain
type ChannelSupportRegistrar interface {
// BroadcastChannelSupport returns the message channel header, whether the message is a config update
// and the channel resources for a message or an error if the message is not a message which can
// be processed directly (like CONFIG and ORDERER_TRANSACTION messages)
BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error)
}
// GetChain gets the chain support for a given ChannelId
GetChain(chainID string) (Support, bool)
// ChannelSupport provides the backing resources needed to support broadcast on a channel
type ChannelSupport interface {
msgprocessor.Processor
Consenter
}
// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Consenter provides methods to send messages through consensus
type Consenter interface {
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error
......@@ -60,17 +46,14 @@ type Support interface {
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error
// Filters returns the set of broadcast filters for this chain
Filters() *filter.RuleSet
}
type handlerImpl struct {
sm SupportManager
sm ChannelSupportRegistrar
}
// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(sm SupportManager) Handler {
func NewHandlerImpl(sm ChannelSupportRegistrar) Handler {
return &handlerImpl{
sm: sm,
}
......@@ -90,78 +73,40 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return err
}
payload, err := utils.UnmarshalPayload(msg.Payload)
chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
if err != nil {
logger.Warningf("Received malformed message, dropping connection: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
if payload.Header == nil {
logger.Warningf("Received malformed message, with missing header, dropping connection")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
logger.Warningf("[channel: %s] Could not get message processor: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
logger.Warningf("Received malformed message (bad channel header), dropping connection: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
if !isConfig {
logger.Debugf("[channel: %s] Broadcast is processing normal message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
isConfig := false
configUpdateMsg := msg
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Rejecting CONFIG_UPDATE because: %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
logger.Warningf("[channel: %s] Rejecting broadcast of normal message because of error: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err)})
}
err = proto.Unmarshal(msg.Payload, payload)
if err != nil || payload.Header == nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
err = processor.Order(msg, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of normal message with SERVICE_UNAVAILABLE: reject by Order: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
} else { // isConfig
logger.Debugf("[channel: %s] Broadcast is processing config update message", chdr.ChannelId)
chdr, err = utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (bad channel header): %s", err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
logger.Warningf("[channel: %s] Rejecting broadcast of config message because of error: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err)})
}
if chdr.ChannelId == "" {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
err = processor.Configure(msg, config, configSeq)
if err != nil {
logger.Warningf("[channel: %s] Rejecting broadcast of config message with SERVICE_UNAVAILABLE: rejected by Configure: %s", chdr.ChannelId, err)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
isConfig = true
}
support, ok := bh.sm.GetChain(chdr.ChannelId)
if !ok {
logger.Warningf("Rejecting broadcast because channel %s was not found", chdr.ChannelId)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}
logger.Debugf("[channel: %s] Broadcast is filtering message of type %s", chdr.ChannelId, cb.HeaderType_name[chdr.Type])
// Normal transaction for existing chain
_, filterErr := support.Filters().Apply(msg)
if filterErr != nil {
logger.Warningf("[channel: %s] Rejecting broadcast message because of filter error: %s", chdr.ChannelId, filterErr)
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}
// XXX temporary hack to mesh interface definitions, will remove.
if isConfig {
err = support.Configure(configUpdateMsg, msg, 0)
} else {
err = support.Order(msg, 0)
}
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}
if logger.IsEnabledFor(logging.DEBUG) {
......@@ -175,3 +120,13 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
}
}
}
// ClassifyError converts an error type into a status code.
func ClassifyError(err error) cb.Status {
switch err {
case msgprocessor.ErrChannelDoesNotExist:
return cb.Status_NOT_FOUND
default:
return cb.Status_BAD_REQUEST
}
}
......@@ -22,10 +22,9 @@ import (
"testing"
"time"
"github.com/hyperledger/fabric/orderer/common/filter"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
logging "github.com/op/go-logging"
"github.com/stretchr/testify/assert"
......@@ -36,8 +35,6 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}
var systemChain = "systemChain"
type mockB struct {
grpc.ServerStream
recvChan chan *cb.Envelope
......@@ -93,38 +90,21 @@ func (m *erroneousSendMockB) Recv() (*cb.Envelope, error) {
return m.recvVal, nil
}
var RejectRule = filter.Rule(rejectRule{})
type rejectRule struct{}
func (r rejectRule) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
return filter.Reject, nil
}
type mockSupportManager struct {
chains map[string]*mockSupport
ProcessVal *cb.Envelope
}
func (mm *mockSupportManager) GetChain(chainID string) (Support, bool) {
chain, ok := mm.chains[chainID]
return chain, ok
MsgProcessorIsConfig bool
MsgProcessorVal *mockSupport
MsgProcessorErr error
}
func (mm *mockSupportManager) Process(configTx *cb.Envelope) (*cb.Envelope, error) {
if mm.ProcessVal == nil {
return nil, fmt.Errorf("Nil result implies error")
}
return mm.ProcessVal, nil
func (mm *mockSupportManager) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader, bool, ChannelSupport, error) {
return &cb.ChannelHeader{}, mm.MsgProcessorIsConfig, mm.MsgProcessorVal, mm.MsgProcessorErr
}
type mockSupport struct {
filters *filter.RuleSet
rejectEnqueue bool
}
func (ms *mockSupport) Filters() *filter.RuleSet {
return ms.filters
ProcessConfigEnv *cb.Envelope
ProcessConfigSeq uint64
ProcessErr error
rejectEnqueue bool
}
// Order sends a message for ordering
......@@ -140,52 +120,26 @@ func (ms *mockSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope,
return ms.Order(config, configSeq)
}
func makeConfigMessage(chainID string) *cb.Envelope {
payload := &cb.Payload{
Data: utils.MarshalOrPanic(&cb.ConfigEnvelope{}),
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
ChannelId: chainID,
Type: int32(cb.HeaderType_CONFIG_UPDATE),
}),
},
}
return &cb.Envelope{
Payload: utils.MarshalOrPanic(payload),
}
func (ms *mockSupport) ClassifyMsg(chdr *cb.ChannelHeader) (msgprocessor.Classification, error) {
panic("UNIMPLMENTED")
}
func makeMessage(chainID string, data []byte) *cb.Envelope {
payload := &cb.Payload{
Data: data,
Header: &cb.Header{
ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{
ChannelId: chainID,
}),
},
}
return &cb.Envelope{
Payload: utils.MarshalOrPanic(payload),
}
func (ms *mockSupport) ProcessNormalMsg(msg *cb.Envelope) (uint64, error) {
return ms.ProcessConfigSeq, ms.ProcessErr
}
func getMockSupportManager() (*mockSupportManager, *mockSupport) {
filters := filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
filter.AcceptRule,
})
mm := &mockSupportManager{
chains: make(map[string]*mockSupport),
}
mSysChain := &mockSupport{
filters: filters,
func (ms *mockSupport) ProcessConfigUpdateMsg(msg *cb.Envelope) (*cb.Envelope, uint64, error) {
return ms.ProcessConfigEnv, ms.ProcessConfigSeq, ms.ProcessErr
}
func getMockSupportManager() *mockSupportManager {
return &mockSupportManager{
MsgProcessorVal: &mockSupport{},
}
mm.chains[string(systemChain)] = mSysChain
return mm, mSysChain
}
func TestEnqueueFailure(t *testing.T) {
mm, mSysChain := getMockSupportManager()
mm := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
......@@ -196,15 +150,15 @@ func TestEnqueueFailure(t *testing.T) {
}()
for i := 0; i < 2; i++ {
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
m.recvChan <- nil
reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully queued the message")
}
}
mSysChain.rejectEnqueue = true
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
mm.MsgProcessorVal.rejectEnqueue = true
m.recvChan <- nil
reply := <-m.sendChan
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
......@@ -217,32 +171,9 @@ func TestEnqueueFailure(t *testing.T) {
}
}
func TestEmptyEnvelope(t *testing.T) {
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
done := make(chan struct{})
go func() {
bh.Handle(m)
close(done)
}()
m.recvChan <- &cb.Envelope{}
reply := <-m.sendChan
if reply.Status != cb.Status_BAD_REQUEST {
t.Fatalf("Should have rejected the null message")
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("Should have terminated the stream")
}
}
func TestBadChannelId(t *testing.T) {
mm, _ := getMockSupportManager()
mm := getMockSupportManager()
mm.MsgProcessorVal = &mockSupport{ProcessErr: msgprocessor.ErrChannelDoesNotExist}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
......@@ -252,7 +183,7 @@ func TestBadChannelId(t *testing.T) {
close(done)
}()
m.recvChan <- makeMessage("Wrong chain", []byte("Some bytes"))
m.recvChan <- nil
reply := <-m.sendChan
if reply.Status != cb.Status_NOT_FOUND {
t.Fatalf("Should have rejected message to a chain which does not exist")
......@@ -266,27 +197,28 @@ func TestBadChannelId(t *testing.T) {
}
func TestGoodConfigUpdate(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ChannelId: systemChain})}})}
mm := getMockSupportManager()
mm.MsgProcessorIsConfig = true
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
newChannelID := "New Chain"
m.recvChan <- makeConfigMessage(newChannelID)
m.recvChan <- nil
reply := <-m.sendChan
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
}
func TestBadConfigUpdate(t *testing.T) {
mm, _ := getMockSupportManager()
mm := getMockSupportManager()
mm.MsgProcessorIsConfig = true
mm.MsgProcessorVal.ProcessErr = fmt.Errorf("Error")
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- makeConfigMessage(systemChain)
m.recvChan <- nil
reply := <-m.sendChan
assert.NotEqual(t, cb.Status_SUCCESS, reply.Status, "Should have rejected CONFIG_UPDATE")
}
......@@ -299,19 +231,15 @@ func TestGracefulShutdown(t *testing.T) {
}
func TestRejected(t *testing.T) {
filters := filter.NewRuleSet([]filter.Rule{RejectRule})
mm := &mockSupportManager{
chains: map[string]*mockSupport{string(systemChain): {filters: filters}},
MsgProcessorVal: &mockSupport{ProcessErr: fmt.Errorf("Reject")},
}
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ChannelId: systemChain})}})}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
newChannelID := "New Chain"
m.recvChan <- makeConfigMessage(newChannelID)
m.recvChan <- nil
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE")
}
......@@ -322,97 +250,8 @@ func TestBadStreamRecv(t *testing.T) {
}
func TestBadStreamSend(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{ChannelId: systemChain})}})}
mm := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := &erroneousSendMockB{recvVal: makeConfigMessage("New Chain")}
m := &erroneousSendMockB{recvVal: nil}
assert.Error(t, bh.Handle(m), "Should catch unexpected stream error")
}
func TestMalformedEnvelope(t *testing.T) {
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- &cb.Envelope{Payload: []byte("foo")}
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected the malformed message")
}
func TestMissingHeader(t *testing.T) {
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{})}
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected the payload without header")
}
func TestBadChannelHeader(t *testing.T) {
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: []byte("foo")}})}
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected bad header")
}
func TestBadPayloadAfterProcessing(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: []byte("foo")}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- makeConfigMessage("New Chain")
reply := <-m.sendChan
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
}
func TestNilHeaderAfterProcessing(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{})}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- makeConfigMessage("New Chain")
reply := <-m.sendChan
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
}
func TestBadChannelHeaderAfterProcessing(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: []byte("foo")}})}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- makeConfigMessage("New Chain")
reply := <-m.sendChan
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
}
func TestEmptyChannelIDAfterProcessing(t *testing.T) {
mm, _ := getMockSupportManager()
mm.ProcessVal = &cb.Envelope{Payload: utils.MarshalOrPanic(&cb.Payload{Header: &cb.Header{ChannelHeader: utils.MarshalOrPanic(&cb.ChannelHeader{})}})}
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
m.recvChan <- makeConfigMessage("New Chain")
reply := <-m.sendChan
assert.Equal(t, cb.Status_INTERNAL_SERVER_ERROR, reply.Status, "Should respond with internal server error")
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
// Package configupdate is an implementation of the broadcast.Proccessor interface
// It facilitates the preprocessing of CONFIG_UPDATE transactions which can
// generate either new CONFIG transactions or new channel creation
// ORDERER_TRANSACTION messages.
package configupdate
import (
"fmt"
configtxapi "github.com/hyperledger/fabric/common/configtx/api"
"github.com/hyperledger/fabric/common/crypto"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/op/go-logging"
)
var logger = logging.MustGetLogger("orderer/configupdate")
const (
// These should eventually be derived from the channel support once enabled