Commit 8a15ac6a authored by Matthew Sykes's avatar Matthew Sykes Committed by Gerrit Code Review
Browse files

Merge "FAB-13067 Add blockcutter metrics"

parents a60ed5a6 73faa7c5
......@@ -7,6 +7,8 @@ SPDX-License-Identifier: Apache-2.0
package blockcutter
import (
"time"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
......@@ -33,12 +35,18 @@ type receiver struct {
sharedConfigFetcher OrdererConfigFetcher
pendingBatch []*cb.Envelope
pendingBatchSizeBytes uint32
PendingBatchStartTime time.Time
ChannelID string
Metrics *Metrics
}
// NewReceiverImpl creates a Receiver implementation based on the given configtxorderer manager
func NewReceiverImpl(sharedConfigFetcher OrdererConfigFetcher) Receiver {
func NewReceiverImpl(channelID string, sharedConfigFetcher OrdererConfigFetcher, metrics *Metrics) Receiver {
return &receiver{
sharedConfigFetcher: sharedConfigFetcher,
Metrics: metrics,
ChannelID: channelID,
}
}
......@@ -59,10 +67,16 @@ func NewReceiverImpl(sharedConfigFetcher OrdererConfigFetcher) Receiver {
//
// Note that messageBatches can not be greater than 2.
func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, pending bool) {
if len(r.pendingBatch) == 0 {
// We are beginning a new batch, mark the time
r.PendingBatchStartTime = time.Now()
}
ordererConfig, ok := r.sharedConfigFetcher.OrdererConfig()
if !ok {
logger.Panicf("Could not retrieve orderer config to query batch parameters, block cutting is not possible")
}
batchSize := ordererConfig.BatchSize()
messageSizeBytes := messageSizeBytes(msg)
......@@ -78,6 +92,9 @@ func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, p
// create new batch with single message
messageBatches = append(messageBatches, []*cb.Envelope{msg})
// Record that this batch took no time to fill
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(0)
return
}
......@@ -87,6 +104,7 @@ func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, p
logger.Debugf("The current message, with %v bytes, will overflow the pending batch of %v bytes.", messageSizeBytes, r.pendingBatchSizeBytes)
logger.Debugf("Pending batch would overflow if current message is added, cutting batch now.")
messageBatch := r.Cut()
r.PendingBatchStartTime = time.Now()
messageBatches = append(messageBatches, messageBatch)
}
......@@ -107,6 +125,8 @@ func (r *receiver) Ordered(msg *cb.Envelope) (messageBatches [][]*cb.Envelope, p
// Cut returns the current batch and starts a new one
func (r *receiver) Cut() []*cb.Envelope {
r.Metrics.BlockFillDuration.With("channel", r.ChannelID).Observe(time.Since(r.PendingBatchStartTime).Seconds())
r.PendingBatchStartTime = time.Time{}
batch := r.pendingBatch
r.pendingBatch = nil
r.pendingBatchSizeBytes = 0
......
......@@ -10,12 +10,23 @@ import (
"testing"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
//go:generate counterfeiter -o mock/metrics_histogram.go --fake-name MetricsHistogram . metricsHistogram
type metricsHistogram interface {
metrics.Histogram
}
//go:generate counterfeiter -o mock/metrics_provider.go --fake-name MetricsProvider . metricsProvider
type metricsProvider interface {
metrics.Provider
}
//go:generate counterfeiter -o mock/config_fetcher.go --fake-name OrdererConfigFetcher . ordererConfigFetcher
type ordererConfigFetcher interface {
blockcutter.OrdererConfigFetcher
......
......@@ -21,6 +21,9 @@ var _ = Describe("Blockcutter", func() {
bc blockcutter.Receiver
fakeConfig *mock.OrdererConfig
fakeConfigFetcher *mock.OrdererConfigFetcher
metrics *blockcutter.Metrics
fakeBlockFillDuration *mock.MetricsHistogram
)
BeforeEach(func() {
......@@ -28,7 +31,13 @@ var _ = Describe("Blockcutter", func() {
fakeConfigFetcher = &mock.OrdererConfigFetcher{}
fakeConfigFetcher.OrdererConfigReturns(fakeConfig, true)
bc = blockcutter.NewReceiverImpl(fakeConfigFetcher)
fakeBlockFillDuration = &mock.MetricsHistogram{}
fakeBlockFillDuration.WithReturns(fakeBlockFillDuration)
metrics = &blockcutter.Metrics{
BlockFillDuration: fakeBlockFillDuration,
}
bc = blockcutter.NewReceiverImpl("mychannel", fakeConfigFetcher, metrics)
})
Describe("Ordered", func() {
......@@ -49,6 +58,7 @@ var _ = Describe("Blockcutter", func() {
batches, pending := bc.Ordered(message)
Expect(batches).To(BeEmpty())
Expect(pending).To(BeTrue())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(0))
})
Context("when enough batches to fill the max message count are enqueued", func() {
......@@ -60,6 +70,12 @@ var _ = Describe("Blockcutter", func() {
Expect(len(batches)).To(Equal(1))
Expect(len(batches[0])).To(Equal(2))
Expect(pending).To(BeFalse())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
})
})
......@@ -78,6 +94,7 @@ var _ = Describe("Blockcutter", func() {
batches, pending = bc.Ordered(message)
Expect(batches).To(BeEmpty())
Expect(pending).To(BeTrue())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(0))
})
})
......@@ -93,6 +110,10 @@ var _ = Describe("Blockcutter", func() {
batches, pending := bc.Ordered(message)
Expect(len(batches)).To(Equal(1))
Expect(pending).To(BeFalse())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(Equal(float64(0)))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
})
})
......@@ -113,6 +134,12 @@ var _ = Describe("Blockcutter", func() {
Expect(len(batches)).To(Equal(1))
Expect(len(batches[0])).To(Equal(1))
Expect(pending).To(BeTrue())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(1))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
})
Context("when the new message is larger than the preferred max bytes", func() {
......@@ -134,6 +161,14 @@ var _ = Describe("Blockcutter", func() {
Expect(len(batches[0])).To(Equal(1))
Expect(len(batches[1])).To(Equal(1))
Expect(pending).To(BeFalse())
Expect(fakeBlockFillDuration.ObserveCallCount()).To(Equal(2))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically(">", 0))
Expect(fakeBlockFillDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1))
Expect(fakeBlockFillDuration.ObserveArgsForCall(1)).To(Equal(float64(0)))
Expect(fakeBlockFillDuration.WithCallCount()).To(Equal(2))
Expect(fakeBlockFillDuration.WithArgsForCall(0)).To(Equal([]string{"channel", "mychannel"}))
Expect(fakeBlockFillDuration.WithArgsForCall(1)).To(Equal([]string{"channel", "mychannel"}))
})
})
})
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockcutter
import "github.com/hyperledger/fabric/common/metrics"
var (
blockFillDuration = metrics.HistogramOpts{
Namespace: "blockcutter",
Name: "block_fill_duration",
Help: "The time from first transaction enqueing to the block being cut in seconds.",
LabelNames: []string{"channel"},
StatsdFormat: "%{#fqname}.%{channel}",
}
)
type Metrics struct {
BlockFillDuration metrics.Histogram
}
func NewMetrics(p metrics.Provider) *Metrics {
return &Metrics{
BlockFillDuration: p.NewHistogram(blockFillDuration),
}
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package blockcutter_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/hyperledger/fabric/orderer/common/blockcutter"
"github.com/hyperledger/fabric/orderer/common/blockcutter/mock"
)
var _ = Describe("Metrics", func() {
Describe("NewMetrics", func() {
var (
fakeProvider *mock.MetricsProvider
)
BeforeEach(func() {
fakeProvider = &mock.MetricsProvider{}
fakeProvider.NewHistogramReturns(&mock.MetricsHistogram{})
})
It("uses the provider to initialize its field", func() {
metrics := blockcutter.NewMetrics(fakeProvider)
Expect(metrics).NotTo(BeNil())
Expect(metrics.BlockFillDuration).To(Equal(&mock.MetricsHistogram{}))
Expect(fakeProvider.NewHistogramCallCount()).To(Equal(1))
})
})
})
// Code generated by counterfeiter. DO NOT EDIT.
package mock
import (
"sync"
"github.com/hyperledger/fabric/common/metrics"
)
type MetricsHistogram struct {
WithStub func(labelValues ...string) metrics.Histogram
withMutex sync.RWMutex
withArgsForCall []struct {
labelValues []string
}
withReturns struct {
result1 metrics.Histogram
}
withReturnsOnCall map[int]struct {
result1 metrics.Histogram
}
ObserveStub func(value float64)
observeMutex sync.RWMutex
observeArgsForCall []struct {
value float64
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *MetricsHistogram) With(labelValues ...string) metrics.Histogram {
fake.withMutex.Lock()
ret, specificReturn := fake.withReturnsOnCall[len(fake.withArgsForCall)]
fake.withArgsForCall = append(fake.withArgsForCall, struct {
labelValues []string
}{labelValues})
fake.recordInvocation("With", []interface{}{labelValues})
fake.withMutex.Unlock()
if fake.WithStub != nil {
return fake.WithStub(labelValues...)
}
if specificReturn {
return ret.result1
}
return fake.withReturns.result1
}
func (fake *MetricsHistogram) WithCallCount() int {
fake.withMutex.RLock()
defer fake.withMutex.RUnlock()
return len(fake.withArgsForCall)
}
func (fake *MetricsHistogram) WithArgsForCall(i int) []string {
fake.withMutex.RLock()
defer fake.withMutex.RUnlock()
return fake.withArgsForCall[i].labelValues
}
func (fake *MetricsHistogram) WithReturns(result1 metrics.Histogram) {
fake.WithStub = nil
fake.withReturns = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsHistogram) WithReturnsOnCall(i int, result1 metrics.Histogram) {
fake.WithStub = nil
if fake.withReturnsOnCall == nil {
fake.withReturnsOnCall = make(map[int]struct {
result1 metrics.Histogram
})
}
fake.withReturnsOnCall[i] = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsHistogram) Observe(value float64) {
fake.observeMutex.Lock()
fake.observeArgsForCall = append(fake.observeArgsForCall, struct {
value float64
}{value})
fake.recordInvocation("Observe", []interface{}{value})
fake.observeMutex.Unlock()
if fake.ObserveStub != nil {
fake.ObserveStub(value)
}
}
func (fake *MetricsHistogram) ObserveCallCount() int {
fake.observeMutex.RLock()
defer fake.observeMutex.RUnlock()
return len(fake.observeArgsForCall)
}
func (fake *MetricsHistogram) ObserveArgsForCall(i int) float64 {
fake.observeMutex.RLock()
defer fake.observeMutex.RUnlock()
return fake.observeArgsForCall[i].value
}
func (fake *MetricsHistogram) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.withMutex.RLock()
defer fake.withMutex.RUnlock()
fake.observeMutex.RLock()
defer fake.observeMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *MetricsHistogram) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
// Code generated by counterfeiter. DO NOT EDIT.
package mock
import (
"sync"
"github.com/hyperledger/fabric/common/metrics"
)
type MetricsProvider struct {
NewCounterStub func(metrics.CounterOpts) metrics.Counter
newCounterMutex sync.RWMutex
newCounterArgsForCall []struct {
arg1 metrics.CounterOpts
}
newCounterReturns struct {
result1 metrics.Counter
}
newCounterReturnsOnCall map[int]struct {
result1 metrics.Counter
}
NewGaugeStub func(metrics.GaugeOpts) metrics.Gauge
newGaugeMutex sync.RWMutex
newGaugeArgsForCall []struct {
arg1 metrics.GaugeOpts
}
newGaugeReturns struct {
result1 metrics.Gauge
}
newGaugeReturnsOnCall map[int]struct {
result1 metrics.Gauge
}
NewHistogramStub func(metrics.HistogramOpts) metrics.Histogram
newHistogramMutex sync.RWMutex
newHistogramArgsForCall []struct {
arg1 metrics.HistogramOpts
}
newHistogramReturns struct {
result1 metrics.Histogram
}
newHistogramReturnsOnCall map[int]struct {
result1 metrics.Histogram
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *MetricsProvider) NewCounter(arg1 metrics.CounterOpts) metrics.Counter {
fake.newCounterMutex.Lock()
ret, specificReturn := fake.newCounterReturnsOnCall[len(fake.newCounterArgsForCall)]
fake.newCounterArgsForCall = append(fake.newCounterArgsForCall, struct {
arg1 metrics.CounterOpts
}{arg1})
fake.recordInvocation("NewCounter", []interface{}{arg1})
fake.newCounterMutex.Unlock()
if fake.NewCounterStub != nil {
return fake.NewCounterStub(arg1)
}
if specificReturn {
return ret.result1
}
return fake.newCounterReturns.result1
}
func (fake *MetricsProvider) NewCounterCallCount() int {
fake.newCounterMutex.RLock()
defer fake.newCounterMutex.RUnlock()
return len(fake.newCounterArgsForCall)
}
func (fake *MetricsProvider) NewCounterArgsForCall(i int) metrics.CounterOpts {
fake.newCounterMutex.RLock()
defer fake.newCounterMutex.RUnlock()
return fake.newCounterArgsForCall[i].arg1
}
func (fake *MetricsProvider) NewCounterReturns(result1 metrics.Counter) {
fake.NewCounterStub = nil
fake.newCounterReturns = struct {
result1 metrics.Counter
}{result1}
}
func (fake *MetricsProvider) NewCounterReturnsOnCall(i int, result1 metrics.Counter) {
fake.NewCounterStub = nil
if fake.newCounterReturnsOnCall == nil {
fake.newCounterReturnsOnCall = make(map[int]struct {
result1 metrics.Counter
})
}
fake.newCounterReturnsOnCall[i] = struct {
result1 metrics.Counter
}{result1}
}
func (fake *MetricsProvider) NewGauge(arg1 metrics.GaugeOpts) metrics.Gauge {
fake.newGaugeMutex.Lock()
ret, specificReturn := fake.newGaugeReturnsOnCall[len(fake.newGaugeArgsForCall)]
fake.newGaugeArgsForCall = append(fake.newGaugeArgsForCall, struct {
arg1 metrics.GaugeOpts
}{arg1})
fake.recordInvocation("NewGauge", []interface{}{arg1})
fake.newGaugeMutex.Unlock()
if fake.NewGaugeStub != nil {
return fake.NewGaugeStub(arg1)
}
if specificReturn {
return ret.result1
}
return fake.newGaugeReturns.result1
}
func (fake *MetricsProvider) NewGaugeCallCount() int {
fake.newGaugeMutex.RLock()
defer fake.newGaugeMutex.RUnlock()
return len(fake.newGaugeArgsForCall)
}
func (fake *MetricsProvider) NewGaugeArgsForCall(i int) metrics.GaugeOpts {
fake.newGaugeMutex.RLock()
defer fake.newGaugeMutex.RUnlock()
return fake.newGaugeArgsForCall[i].arg1
}
func (fake *MetricsProvider) NewGaugeReturns(result1 metrics.Gauge) {
fake.NewGaugeStub = nil
fake.newGaugeReturns = struct {
result1 metrics.Gauge
}{result1}
}
func (fake *MetricsProvider) NewGaugeReturnsOnCall(i int, result1 metrics.Gauge) {
fake.NewGaugeStub = nil
if fake.newGaugeReturnsOnCall == nil {
fake.newGaugeReturnsOnCall = make(map[int]struct {
result1 metrics.Gauge
})
}
fake.newGaugeReturnsOnCall[i] = struct {
result1 metrics.Gauge
}{result1}
}
func (fake *MetricsProvider) NewHistogram(arg1 metrics.HistogramOpts) metrics.Histogram {
fake.newHistogramMutex.Lock()
ret, specificReturn := fake.newHistogramReturnsOnCall[len(fake.newHistogramArgsForCall)]
fake.newHistogramArgsForCall = append(fake.newHistogramArgsForCall, struct {
arg1 metrics.HistogramOpts
}{arg1})
fake.recordInvocation("NewHistogram", []interface{}{arg1})
fake.newHistogramMutex.Unlock()
if fake.NewHistogramStub != nil {
return fake.NewHistogramStub(arg1)
}
if specificReturn {
return ret.result1
}
return fake.newHistogramReturns.result1
}
func (fake *MetricsProvider) NewHistogramCallCount() int {
fake.newHistogramMutex.RLock()
defer fake.newHistogramMutex.RUnlock()
return len(fake.newHistogramArgsForCall)
}
func (fake *MetricsProvider) NewHistogramArgsForCall(i int) metrics.HistogramOpts {
fake.newHistogramMutex.RLock()
defer fake.newHistogramMutex.RUnlock()
return fake.newHistogramArgsForCall[i].arg1
}
func (fake *MetricsProvider) NewHistogramReturns(result1 metrics.Histogram) {
fake.NewHistogramStub = nil
fake.newHistogramReturns = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsProvider) NewHistogramReturnsOnCall(i int, result1 metrics.Histogram) {
fake.NewHistogramStub = nil
if fake.newHistogramReturnsOnCall == nil {
fake.newHistogramReturnsOnCall = make(map[int]struct {
result1 metrics.Histogram
})
}
fake.newHistogramReturnsOnCall[i] = struct {
result1 metrics.Histogram
}{result1}
}
func (fake *MetricsProvider) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.newCounterMutex.RLock()
defer fake.newCounterMutex.RUnlock()
fake.newGaugeMutex.RLock()
defer fake.newGaugeMutex.RUnlock()
fake.newHistogramMutex.RLock()
defer fake.newHistogramMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *MetricsProvider) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}