Commit ed9517ea authored by Jason Yellick's avatar Jason Yellick
Browse files

[FAB-5265] Rm blockcutter message validation



The primary goal of the series in FAB-5258 is to prevent all OSNs from
having to validate all messages for all channels.  Since all messages
pass through the block cutter, the block cutter cannot be involved in
message validation as it currently is.

This CR pulls the message validation out of the blockcutter and pushes
it into the msgprocessor definitions.  Ultimately, the msgprocessor
interfaces will only be called if necessary, eliminating the performance
bottleneck.

Change-Id: I3c0d41e47873aa6e764c70fd176722306f00655c
Signed-off-by: default avatarJason Yellick <jyellick@us.ibm.com>
parent 258b25c8
......@@ -51,17 +51,15 @@ type Receiver interface {
type receiver struct {
sharedConfigManager config.Orderer
filters *filter.RuleSet
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
pendingCommitters []filter.Committer
}
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager and filters
func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet) Receiver {
func NewReceiverImpl(sharedConfigManager config.Orderer) Receiver {
return &receiver{
sharedConfigManager: sharedConfigManager,
filters: filters,
}
}
......@@ -83,17 +81,7 @@ func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
// The messages must be filtered a second time in case configuration has changed since the message was received
committer, err := r.filters.Apply(msg)
if err != nil {
logger.Debugf("Rejecting message: %s", err)
return nil, false
}
if committer.Isolated() {
logger.Panicf("The use of isolated committers has been deprecated and should no longer appear in this path")
}
// The messages are not filtered a second time, this is pushed onto the Consenter
messageSizeBytes := messageSizeBytes(msg)
if messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {
......
......@@ -17,11 +17,9 @@ limitations under the License.
package blockcutter
import (
"bytes"
"testing"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/filter"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
......@@ -33,59 +31,14 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}
type isolatedCommitter struct{}
func (ic isolatedCommitter) Isolated() bool { return true }
func (ic isolatedCommitter) Commit() {}
type mockIsolatedFilter struct{}
func (mif *mockIsolatedFilter) Apply(msg *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(msg.Payload, isolatedTx.Payload) {
return filter.Accept, isolatedCommitter{}
}
return filter.Forward, nil
}
type mockRejectFilter struct{}
func (mrf mockRejectFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(message.Payload, badTx.Payload) {
return filter.Reject, nil
}
return filter.Forward, nil
}
type mockAcceptFilter struct{}
func (mrf mockAcceptFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(message.Payload, goodTx.Payload) {
return filter.Accept, filter.NoopCommitter
}
return filter.Forward, nil
}
func getFilters() *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
&mockIsolatedFilter{},
&mockRejectFilter{},
&mockAcceptFilter{},
})
}
var badTx = &cb.Envelope{Payload: []byte("BAD")}
var goodTx = &cb.Envelope{Payload: []byte("GOOD")}
var goodTxLarge = &cb.Envelope{Payload: []byte("GOOD"), Signature: make([]byte, 1000)}
var isolatedTx = &cb.Envelope{Payload: []byte("ISOLATED")}
var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")}
func TestNormalBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}})
batches, ok := r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
......@@ -96,73 +49,7 @@ func TestNormalBatch(t *testing.T) {
assert.True(t, ok, "Should have enqueued second message into batch")
}
func TestBadMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
batches, ok := r.Ordered(badTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued bad message into batch")
batches, ok = r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")
batches, ok = r.Ordered(badTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued second bad message into batch")
}
func TestUnmatchedMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
batches, ok := r.Ordered(unmatchedTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued unmatched message into batch")
batches, ok = r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")
batches, ok = r.Ordered(unmatchedTx)
assert.Nil(t, batches, "Should not have created batch from unmatched message")
assert.False(t, ok, "Should not have enqueued second bad message into batch")
}
func TestIsolatedEmptyBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message")
}
func TestIsolatedPartialBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
batches, ok := r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")
assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message")
}
func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
filters := getFilters()
goodTxBytes := messageSizeBytes(goodTx)
// set preferred max bytes such that 10 goodTx will not fit
......@@ -171,7 +58,7 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
// set message count > 9
maxMessageCount := uint32(20)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}})
// enqueue 9 messages
for i := 0; i < 9; i++ {
......@@ -194,8 +81,6 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
}
func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) {
filters := getFilters()
goodTxLargeBytes := messageSizeBytes(goodTxLarge)
// set preferred max bytes such that 1 goodTxLarge will not fit
......@@ -204,7 +89,7 @@ func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) {
// set message count > 1
maxMessageCount := uint32(20)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}})
// submit large message
batches, ok := r.Ordered(goodTxLarge)
......
......@@ -52,7 +52,7 @@ func newChainSupport(
signer crypto.LocalSigner,
) *ChainSupport {
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig())
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
if !ok {
......
......@@ -379,21 +379,27 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.Co
}
switch class {
case msgprocessor.ConfigUpdateMsg:
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
}
_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err)
break
}
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
}
block := support.CreateNextBlock([]*cb.Envelope{env})
support.WriteConfigBlock(block, nil)
*timer = nil
case msgprocessor.NormalMsg:
_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
break
}
batches, ok := support.BlockCutter().Ordered(env)
logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v", support.ChainID(), len(batches), ok)
if ok && len(batches) == 0 && *timer == nil {
......
......@@ -95,21 +95,28 @@ func (ch *chain) main() {
}
switch class {
case msgprocessor.ConfigUpdateMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
batch := ch.support.BlockCutter().Cut()
if batch != nil {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
block := ch.support.CreateNextBlock([]*cb.Envelope{msg})
ch.support.WriteConfigBlock(block, nil)
timer = nil
case msgprocessor.NormalMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}
batches, ok := ch.support.BlockCutter().Ordered(msg)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.support.SharedConfig().BatchTimeout())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment