Commit 73faa7c5 authored by Jason Yellick's avatar Jason Yellick
Browse files

FAB-13067 Add blockcutter metrics



The only real tunables for the orderer as far as latency and throughput
goes is the batch parameters.  Therefore, it's critical for network
operators to be able to gain insight into the duration filling a block
takes to know whether they should relax or tighten their batch
parameters.

Change-Id: I9f64116f16b3138c1bb1cce11b00787f630a7141
Signed-off-by: default avatarJason Yellick <jyellick@us.ibm.com>
parent cdb1d3a2
......@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package blockcutter
import (
"time"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
......@@ -33,12 +35,18 @@ type receiver struct {
sharedConfigFetcher OrdererConfigFetcher
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
PendingBatchStartTime time.Time
ChannelID string
Metrics *Metrics
}
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager
func NewReceiverImpl(sharedConfigFetcher OrdererConfigFetcher) Receiver {
func NewReceiverImpl(channelID string, sharedConfigFetcher OrdererConfigFetcher, metrics *Metrics) Receiver {
return &receiver{
sharedConfigFetcher: sharedConfigFetcher,
Metrics: metrics,
ChannelID: channelID,
}
}
......@@ -59,10 +67,16 @@ func NewReceiverImpl(sharedConfigFetcher OrdererConfigFetcher) Receiver {
//
// Note that messageBatches can not be greater than 2.
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {
if len(r.pendingBatch) == 0 {
// We are beginning a new batch, mark the time
r.PendingBatchStartTime = time.Now()
}
ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
if !ok {
logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
}
batchSize := ordererConfig.BatchSize()
messageSizeBytes := messageSizeBytes(msg)
......@@ -78,6 +92,9 @@ func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, p
// create new batch with single message
messageBatches = append(messageBatches, []*cb.Envelope{msg})
// Record that this batch took no time to fill
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)
return
}
......@@ -87,6 +104,7 @@ func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, p
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
messageBatch := r.Cut()
r.PendingBatchStartTime = time.Now()
messageBatches = append(messageBatches, messageBatch)
}
......@@ -107,6 +125,8 @@ func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, p
// Cut returns the current batch and starts a new one
func (r *receiver) Cut() []*cb.Envelope {
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
r.PendingBatchStartTime = time.Time{}
batch := r.pendingBatch
r.pendingBatch = nil
r.pendingBatchSizeBytes = 0
......
......@@ -10,12 +10,23 @@ import (
"testing"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
//go:generate counterfeiter -o mock/metrics_histogram.go --fake-name MetricsHistogram . metricsHistogram
type metricsHistogram interface {
metrics.Histogram
}
//go:generate counterfeiter -o mock/metrics_provider.go --fake-name MetricsProvider . metricsProvider
type metricsProvider interface {
metrics.Provider
}
//go:generate counterfeiter -o mock/config_fetcher.go --fake-name OrdererConfigFetcher . ordererConfigFetcher
type ordererConfigFetcher interface {
blockcutter.OrdererConfigFetcher
......
......@@ -21,6 +21,9 @@ var _ = Describe("Blockcutter", func() {
bc blockcutter.Receiver
fakeConfig *mock.OrdererConfig
fakeConfigFetcher *mock.OrdererConfigFetcher
metrics *blockcutter.Metrics
fakeBlockFillDuration *mock.MetricsHistogram
)
BeforeEach(func() {
......@@ -28,7 +31,13 @@ var _ = Describe("Blockcutter", func() {
fakeConfigFetcher = &mock.OrdererConfigFetcher{}
fakeConfigFetcher.OrdererConfigReturns(fakeConfig, true)
bc = blockcutter.NewReceiverImpl(fakeConfigFetcher)
fakeBlockFillDuration = &mock.MetricsHistogram{}
fakeBlockFillDuration.WithReturns(fakeBlockFillDuration)
metrics = &blockcutter.Metrics{
BlockFillDuration: fakeBlockFillDuration,
}
bc = blockcutter.NewReceiverImpl("mychannel", fakeConfigFetcher, metrics)
})
Describe("Ordered", func() {
......@@ -49,6 +58,7 @@ var _ = Describe("Blockcutter", func() {
batches, pending := bc.Ordered(message)
Expect(batches).To(BeEmpty())
Expect(pending).To(BeTrue())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(0))
})
Context("when enough batches to fill the max message count are enqueued", func() {
......@@ -60,6 +70,12 @@ var _ = Describe("Blockcutter", func() {
Expect(len(batches)).To(Equal(1))
Expect(len(batches[0])).To(Equal(2))
Expect(pending).To(BeFalse())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
})
})
......@@ -78,6 +94,7 @@ var _ = Describe("Blockcutter", func() {
batches, pending = bc.Ordered(message)
Expect(batches).To(BeEmpty())
Expect(pending).To(BeTrue())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(0))
})
})
......@@ -93,6 +110,10 @@ var _ = Describe("Blockcutter", func() {
batches, pending := bc.Ordered(message)
Expect(len(batches)).To(Equal(1))
Expect(pending).To(BeFalse())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(Equal(float64(0)))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
})
})
......@@ -113,6 +134,12 @@ var _ = Describe("Blockcutter", func() {
Expect(len(batches)).To(Equal(1))
Expect(len(batches[0])).To(Equal(1))
Expect(pending).To(BeTrue())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
})
Context("when the new message is larger than the preferred max bytes", func() {
......@@ -134,6 +161,14 @@ var _ = Describe("Blockcutter", func() {
Expect(len(batches[0])).To(Equal(1))
Expect(len(batches[1])).To(Equal(1))
Expect(pending).To(BeFalse())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(2))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(1)).To(Equal(float64(0)))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(2))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
Expect(fakeBlockFillDuration.WithArgsForCall(1)).To(Equal([]string{"channel", "mychannel"}))
})
})
})
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockcutter
import "github.com/hyperledger/fabric/common/metrics"
var (
blockFillDuration = metrics.HistogramOpts{
Namespace: "blockcutter",
Name: "block_fill_duration",
Help: "The time from first transaction enqueing to the block being cut in seconds.",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
}
)
type Metrics struct {
BlockFillDuration metrics.Histogram
}
func NewMetrics(p metrics.Provider) *Metrics {
return &Metrics{
BlockFillDuration: p.NewHistogram(blockFillDuration),
}
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockcutter_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/blockcutter/mock"
)
var _ = Describe("Metrics", func() {
Describe("NewMetrics", func() {
var (
fakeProvider *mock.MetricsProvider
)
BeforeEach(func() {
fakeProvider = &mock.MetricsProvider{}
fakeProvider.NewHistogramReturns(&mock.MetricsHistogram{})
})
It("uses the provider to initialize its field", func() {
metrics := blockcutter.NewMetrics(fakeProvider)
Expect(metrics).NotTo(BeNil())
Expect(metrics.BlockFillDuration).To(Equal(&mock.MetricsHistogram{}))
Expect(fakeProvider.NewHistogramCallCount()).To(Equal(1))
})
})
})
// Code generated by counterfeiter. DO NOT EDIT.
package mock
import (
"sync"
"github.com/hyperledger/fabric/common/metrics"
)
type MetricsHistogram struct {
WithStub func(labelValues ...string) metrics.Histogram
withMutex sync.RWMutex
withArgsForCall []struct {
labelValues []string
}
withReturns struct {
result1 metrics.Histogram
}
withReturnsOnCall map[int]struct {
result1 metrics.Histogram
}
ObserveStub func(value float64)
observeMutex sync.RWMutex
observeArgsForCall []struct {
value float64
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *MetricsHistogram) With(labelValues ...string) metrics.Histogram {
fake.withMutex.Lock()
ret, specificReturn := fake.withReturnsOnCall[len(fake.withArgsForCall)]
fake.withArgsForCall = append(fake.withArgsForCall, struct {
labelValues []string
}{labelValues})
fake.recordInvocation("With", []interface{}{labelValues})
fake.withMutex.Unlock()
if fake.WithStub != nil {
return fake.WithStub(labelValues...)
}
if specificReturn {
return ret.result1
}
return fake.withReturns.result1
}
func (fake *MetricsHistogram) WithCallCount() int {
fake.withMutex.RLock()
defer fake.withMutex.RUnlock()
return len(fake.withArgsForCall)
}
func (fake *MetricsHistogram) WithArgsForCall(i int) []string {
fake.withMutex.RLock()
defer fake.withMutex.RUnlock()
return fake.withArgsForCall[i].labelValues
}
func (fake *MetricsHistogram) WithReturns(result1 metrics.Histogram) {
fake.WithStub = nil
fake.withReturns = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsHistogram) WithReturnsOnCall(i int, result1 metrics.Histogram) {
fake.WithStub = nil
if fake.withReturnsOnCall == nil {
fake.withReturnsOnCall = make(map[int]struct {
result1 metrics.Histogram
})
}
fake.withReturnsOnCall[i] = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsHistogram) Observe(value float64) {
fake.observeMutex.Lock()
fake.observeArgsForCall = append(fake.observeArgsForCall, struct {
value float64
}{value})
fake.recordInvocation("Observe", []interface{}{value})
fake.observeMutex.Unlock()
if fake.ObserveStub != nil {
fake.ObserveStub(value)
}
}
func (fake *MetricsHistogram) ObserveCallCount() int {
fake.observeMutex.RLock()
defer fake.observeMutex.RUnlock()
return len(fake.observeArgsForCall)
}
func (fake *MetricsHistogram) ObserveArgsForCall(i int) float64 {
fake.observeMutex.RLock()
defer fake.observeMutex.RUnlock()
return fake.observeArgsForCall[i].value
}
func (fake *MetricsHistogram) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.withMutex.RLock()
defer fake.withMutex.RUnlock()
fake.observeMutex.RLock()
defer fake.observeMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *MetricsHistogram) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
// Code generated by counterfeiter. DO NOT EDIT.
package mock
import (
"sync"
"github.com/hyperledger/fabric/common/metrics"
)
type MetricsProvider struct {
NewCounterStub func(metrics.CounterOpts) metrics.Counter
newCounterMutex sync.RWMutex
newCounterArgsForCall []struct {
arg1 metrics.CounterOpts
}
newCounterReturns struct {
result1 metrics.Counter
}
newCounterReturnsOnCall map[int]struct {
result1 metrics.Counter
}
NewGaugeStub func(metrics.GaugeOpts) metrics.Gauge
newGaugeMutex sync.RWMutex
newGaugeArgsForCall []struct {
arg1 metrics.GaugeOpts
}
newGaugeReturns struct {
result1 metrics.Gauge
}
newGaugeReturnsOnCall map[int]struct {
result1 metrics.Gauge
}
NewHistogramStub func(metrics.HistogramOpts) metrics.Histogram
newHistogramMutex sync.RWMutex
newHistogramArgsForCall []struct {
arg1 metrics.HistogramOpts
}
newHistogramReturns struct {
result1 metrics.Histogram
}
newHistogramReturnsOnCall map[int]struct {
result1 metrics.Histogram
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *MetricsProvider) NewCounter(arg1 metrics.CounterOpts) metrics.Counter {
fake.newCounterMutex.Lock()
ret, specificReturn := fake.newCounterReturnsOnCall[len(fake.newCounterArgsForCall)]
fake.newCounterArgsForCall = append(fake.newCounterArgsForCall, struct {
arg1 metrics.CounterOpts
}{arg1})
fake.recordInvocation("NewCounter", []interface{}{arg1})
fake.newCounterMutex.Unlock()
if fake.NewCounterStub != nil {
return fake.NewCounterStub(arg1)
}
if specificReturn {
return ret.result1
}
return fake.newCounterReturns.result1
}
func (fake *MetricsProvider) NewCounterCallCount() int {
fake.newCounterMutex.RLock()
defer fake.newCounterMutex.RUnlock()
return len(fake.newCounterArgsForCall)
}
func (fake *MetricsProvider) NewCounterArgsForCall(i int) metrics.CounterOpts {
fake.newCounterMutex.RLock()
defer fake.newCounterMutex.RUnlock()
return fake.newCounterArgsForCall[i].arg1
}
func (fake *MetricsProvider) NewCounterReturns(result1 metrics.Counter) {
fake.NewCounterStub = nil
fake.newCounterReturns = struct {
result1 metrics.Counter
}{result1}
}
func (fake *MetricsProvider) NewCounterReturnsOnCall(i int, result1 metrics.Counter) {
fake.NewCounterStub = nil
if fake.newCounterReturnsOnCall == nil {
fake.newCounterReturnsOnCall = make(map[int]struct {
result1 metrics.Counter
})
}
fake.newCounterReturnsOnCall[i] = struct {
result1 metrics.Counter
}{result1}
}
func (fake *MetricsProvider) NewGauge(arg1 metrics.GaugeOpts) metrics.Gauge {
fake.newGaugeMutex.Lock()
ret, specificReturn := fake.newGaugeReturnsOnCall[len(fake.newGaugeArgsForCall)]
fake.newGaugeArgsForCall = append(fake.newGaugeArgsForCall, struct {
arg1 metrics.GaugeOpts
}{arg1})
fake.recordInvocation("NewGauge", []interface{}{arg1})
fake.newGaugeMutex.Unlock()
if fake.NewGaugeStub != nil {
return fake.NewGaugeStub(arg1)
}
if specificReturn {
return ret.result1
}
return fake.newGaugeReturns.result1
}
func (fake *MetricsProvider) NewGaugeCallCount() int {
fake.newGaugeMutex.RLock()
defer fake.newGaugeMutex.RUnlock()
return len(fake.newGaugeArgsForCall)
}
func (fake *MetricsProvider) NewGaugeArgsForCall(i int) metrics.GaugeOpts {
fake.newGaugeMutex.RLock()
defer fake.newGaugeMutex.RUnlock()
return fake.newGaugeArgsForCall[i].arg1
}
func (fake *MetricsProvider) NewGaugeReturns(result1 metrics.Gauge) {
fake.NewGaugeStub = nil
fake.newGaugeReturns = struct {
result1 metrics.Gauge
}{result1}
}
func (fake *MetricsProvider) NewGaugeReturnsOnCall(i int, result1 metrics.Gauge) {
fake.NewGaugeStub = nil
if fake.newGaugeReturnsOnCall == nil {
fake.newGaugeReturnsOnCall = make(map[int]struct {
result1 metrics.Gauge
})
}
fake.newGaugeReturnsOnCall[i] = struct {
result1 metrics.Gauge
}{result1}
}
func (fake *MetricsProvider) NewHistogram(arg1 metrics.HistogramOpts) metrics.Histogram {
fake.newHistogramMutex.Lock()
ret, specificReturn := fake.newHistogramReturnsOnCall[len(fake.newHistogramArgsForCall)]
fake.newHistogramArgsForCall = append(fake.newHistogramArgsForCall, struct {
arg1 metrics.HistogramOpts
}{arg1})
fake.recordInvocation("NewHistogram", []interface{}{arg1})
fake.newHistogramMutex.Unlock()
if fake.NewHistogramStub != nil {
return fake.NewHistogramStub(arg1)
}
if specificReturn {
return ret.result1
}
return fake.newHistogramReturns.result1
}
func (fake *MetricsProvider) NewHistogramCallCount() int {
fake.newHistogramMutex.RLock()
defer fake.newHistogramMutex.RUnlock()
return len(fake.newHistogramArgsForCall)
}
func (fake *MetricsProvider) NewHistogramArgsForCall(i int) metrics.HistogramOpts {
fake.newHistogramMutex.RLock()
defer fake.newHistogramMutex.RUnlock()
return fake.newHistogramArgsForCall[i].arg1
}
func (fake *MetricsProvider) NewHistogramReturns(result1 metrics.Histogram) {
fake.NewHistogramStub = nil
fake.newHistogramReturns = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsProvider) NewHistogramReturnsOnCall(i int, result1 metrics.Histogram) {
fake.NewHistogramStub = nil
if fake.newHistogramReturnsOnCall == nil {
fake.newHistogramReturnsOnCall = make(map[int]struct {
result1 metrics.Histogram
})
}
fake.newHistogramReturnsOnCall[i] = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsProvider) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.newCounterMutex.RLock()
defer fake.newCounterMutex.RUnlock()
fake.newGaugeMutex.RLock()
defer fake.newGaugeMutex.RUnlock()
fake.newHistogramMutex.RLock()
defer fake.newHistogramMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *MetricsProvider) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string