Commit 2b63ca49 authored by Gari Singh's avatar Gari Singh Committed by Gerrit Code Review
Browse files

Merge "[FAB-5265] Rm blockcutter message validation"

parents 19a71f4c ed9517ea
......@@ -51,17 +51,15 @@ type Receiver interface {
type receiver struct {
sharedConfigManager config.Orderer
filters *filter.RuleSet
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
pendingCommitters []filter.Committer
}
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager and filters
func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet) Receiver {
func NewReceiverImpl(sharedConfigManager config.Orderer) Receiver {
return &receiver{
sharedConfigManager: sharedConfigManager,
filters: filters,
}
}
......@@ -83,17 +81,7 @@ func NewReceiverImpl(sharedConfigManager config.Orderer, filters *filter.RuleSet
// - The current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - After adding the current message to the pending batch, the message count has reached BatchSize.MaxMessageCount.
func (r *receiver) Ordered(msg *cb.Envelope) ([][]*cb.Envelope, bool) {
// The messages must be filtered a second time in case configuration has changed since the message was received
committer, err := r.filters.Apply(msg)
if err != nil {
logger.Debugf("Rejecting message: %s", err)
return nil, false
}
if committer.Isolated() {
logger.Panicf("The use of isolated committers has been deprecated and should no longer appear in this path")
}
// The messages are not filtered a second time, this is pushed onto the Consenter
messageSizeBytes := messageSizeBytes(msg)
if messageSizeBytes > r.sharedConfigManager.BatchSize().PreferredMaxBytes {
......
......@@ -17,11 +17,9 @@ limitations under the License.
package blockcutter
import (
"bytes"
"testing"
mockconfig "github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/orderer/common/filter"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
......@@ -33,59 +31,14 @@ func init() {
logging.SetLevel(logging.DEBUG, "")
}
type isolatedCommitter struct{}
func (ic isolatedCommitter) Isolated() bool { return true }
func (ic isolatedCommitter) Commit() {}
type mockIsolatedFilter struct{}
func (mif *mockIsolatedFilter) Apply(msg *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(msg.Payload, isolatedTx.Payload) {
return filter.Accept, isolatedCommitter{}
}
return filter.Forward, nil
}
type mockRejectFilter struct{}
func (mrf mockRejectFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(message.Payload, badTx.Payload) {
return filter.Reject, nil
}
return filter.Forward, nil
}
type mockAcceptFilter struct{}
func (mrf mockAcceptFilter) Apply(message *cb.Envelope) (filter.Action, filter.Committer) {
if bytes.Equal(message.Payload, goodTx.Payload) {
return filter.Accept, filter.NoopCommitter
}
return filter.Forward, nil
}
func getFilters() *filter.RuleSet {
return filter.NewRuleSet([]filter.Rule{
&mockIsolatedFilter{},
&mockRejectFilter{},
&mockAcceptFilter{},
})
}
var badTx = &cb.Envelope{Payload: []byte("BAD")}
var goodTx = &cb.Envelope{Payload: []byte("GOOD")}
var goodTxLarge = &cb.Envelope{Payload: []byte("GOOD"), Signature: make([]byte, 1000)}
var isolatedTx = &cb.Envelope{Payload: []byte("ISOLATED")}
var unmatchedTx = &cb.Envelope{Payload: []byte("UNMATCHED")}
func TestNormalBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}})
batches, ok := r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
......@@ -96,73 +49,7 @@ func TestNormalBatch(t *testing.T) {
assert.True(t, ok, "Should have enqueued second message into batch")
}
func TestBadMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
batches, ok := r.Ordered(badTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued bad message into batch")
batches, ok = r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")
batches, ok = r.Ordered(badTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued second bad message into batch")
}
func TestUnmatchedMessageInBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
batches, ok := r.Ordered(unmatchedTx)
assert.Nil(t, batches, "Should not have created batch")
assert.False(t, ok, "Should not have enqueued unmatched message into batch")
batches, ok = r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")
batches, ok = r.Ordered(unmatchedTx)
assert.Nil(t, batches, "Should not have created batch from unmatched message")
assert.False(t, ok, "Should not have enqueued second bad message into batch")
}
func TestIsolatedEmptyBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message")
}
func TestIsolatedPartialBatch(t *testing.T) {
filters := getFilters()
maxMessageCount := uint32(2)
absoluteMaxBytes := uint32(1000)
preferredMaxBytes := uint32(100)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: absoluteMaxBytes, PreferredMaxBytes: preferredMaxBytes}}, filters)
batches, ok := r.Ordered(goodTx)
assert.Nil(t, batches, "Should not have created batch")
assert.True(t, ok, "Should have enqueued good message into batch")
assert.Panics(t, func() { r.Ordered(isolatedTx) }, "Should not have handled an isolated by committer message")
}
func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
filters := getFilters()
goodTxBytes := messageSizeBytes(goodTx)
// set preferred max bytes such that 10 goodTx will not fit
......@@ -171,7 +58,7 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
// set message count > 9
maxMessageCount := uint32(20)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 2, PreferredMaxBytes: preferredMaxBytes}})
// enqueue 9 messages
for i := 0; i < 9; i++ {
......@@ -194,8 +81,6 @@ func TestBatchSizePreferredMaxBytesOverflow(t *testing.T) {
}
func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) {
filters := getFilters()
goodTxLargeBytes := messageSizeBytes(goodTxLarge)
// set preferred max bytes such that 1 goodTxLarge will not fit
......@@ -204,7 +89,7 @@ func TestBatchSizePreferredMaxBytesOverflowNoPending(t *testing.T) {
// set message count > 1
maxMessageCount := uint32(20)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}}, filters)
r := NewReceiverImpl(&mockconfig.Orderer{BatchSizeVal: &ab.BatchSize{MaxMessageCount: maxMessageCount, AbsoluteMaxBytes: preferredMaxBytes * 3, PreferredMaxBytes: preferredMaxBytes}})
// submit large message
batches, ok := r.Ordered(goodTxLarge)
......
......@@ -52,7 +52,7 @@ func newChainSupport(
signer crypto.LocalSigner,
) *ChainSupport {
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig(), filters)
cutter := blockcutter.NewReceiverImpl(ledgerResources.SharedConfig())
consenterType := ledgerResources.SharedConfig().ConsensusType()
consenter, ok := consenters[consenterType]
if !ok {
......
......@@ -379,21 +379,27 @@ func processRegular(regularMessage *ab.KafkaMessageRegular, support consensus.Co
}
switch class {
case msgprocessor.ConfigUpdateMsg:
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
}
_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("[channel: %s] Discarding bad config message: %s", support.ChainID(), err)
break
}
batch := support.BlockCutter().Cut()
if batch != nil {
block := support.CreateNextBlock(batch)
support.WriteBlock(block, nil)
}
block := support.CreateNextBlock([]*cb.Envelope{env})
support.WriteConfigBlock(block, nil)
*timer = nil
case msgprocessor.NormalMsg:
_, err := support.ProcessNormalMsg(env)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
break
}
batches, ok := support.BlockCutter().Ordered(env)
logger.Debugf("[channel: %s] Ordering results: items in batch = %d, ok = %v", support.ChainID(), len(batches), ok)
if ok && len(batches) == 0 && *timer == nil {
......
......@@ -95,21 +95,28 @@ func (ch *chain) main() {
}
switch class {
case msgprocessor.ConfigUpdateMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
batch := ch.support.BlockCutter().Cut()
if batch != nil {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad config message: %s", err)
continue
}
block := ch.support.CreateNextBlock([]*cb.Envelope{msg})
ch.support.WriteConfigBlock(block, nil)
timer = nil
case msgprocessor.NormalMsg:
_, err := ch.support.ProcessNormalMsg(msg)
if err != nil {
logger.Warningf("Discarding bad normal message: %s", err)
continue
}
batches, ok := ch.support.BlockCutter().Ordered(msg)
if ok && len(batches) == 0 && timer == nil {
timer = time.After(ch.support.SharedConfig().BatchTimeout())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment