Commit eba7e3fe authored by Will Lahti's avatar Will Lahti Committed by Matthew Sykes
Browse files

Instrument metrics into chaincode handler



Adds metrics for transactions received, completed,
and transaction duration.

FAB-12797 #done

Change-Id: I4bd48ed57e37e2be8ac95319a996889d61fa09e2
Signed-off-by: default avatarWill Lahti <wtlahti@us.ibm.com>
Signed-off-by: default avatarMatthew Sykes <sykesmat@us.ibm.com>
parent 61656ce8
......@@ -11,6 +11,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/chaincode/platforms"
"github.com/hyperledger/fabric/core/common/ccprovider"
......@@ -54,6 +55,7 @@ type ChaincodeSupport struct {
SystemCCProvider sysccprovider.SystemChaincodeProvider
Lifecycle Lifecycle
appConfig ApplicationConfigRetriever
Metrics *Metrics
}
// NewChaincodeSupport creates a new ChaincodeSupport instance.
......@@ -70,6 +72,7 @@ func NewChaincodeSupport(
SystemCCProvider sysccprovider.SystemChaincodeProvider,
platformRegistry *platforms.Registry,
appConfig ApplicationConfigRetriever,
metricsProvider metrics.Provider,
) *ChaincodeSupport {
cs := &ChaincodeSupport{
UserRunsCC: userRunsCC,
......@@ -80,6 +83,7 @@ func NewChaincodeSupport(
SystemCCProvider: SystemCCProvider,
Lifecycle: lifecycle,
appConfig: appConfig,
Metrics: NewMetrics(metricsProvider),
}
// Keep TestQueries working
......@@ -177,6 +181,7 @@ func (cs *ChaincodeSupport) HandleChaincodeStream(stream ccintf.ChaincodeStream)
UUIDGenerator: UUIDGeneratorFunc(util.GenerateUUID),
LedgerGetter: peer.Default,
AppConfig: cs.appConfig,
Metrics: cs.Metrics,
}
return handler.ProcessStream(stream)
......
......@@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/metrics/disabled"
mc "github.com/hyperledger/fabric/common/mocks/config"
mocklgr "github.com/hyperledger/fabric/common/mocks/ledger"
mockpeer "github.com/hyperledger/fabric/common/mocks/peer"
......@@ -192,6 +193,7 @@ func initMockPeer(chainIDs ...string) (*ChaincodeSupport, error) {
sccp,
pr,
peer.DefaultSupport,
&disabled.Provider{},
)
ipRegistry.ChaincodeSupport = chaincodeSupport
......
......@@ -30,6 +30,7 @@ import (
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
mc "github.com/hyperledger/fabric/common/mocks/config"
mockpolicies "github.com/hyperledger/fabric/common/mocks/policies"
"github.com/hyperledger/fabric/common/policies"
......@@ -140,6 +141,7 @@ func initPeer(chainIDs ...string) (net.Listener, *ChaincodeSupport, func(), erro
sccp,
pr,
peer.DefaultSupport,
&disabled.Provider{},
)
ipRegistry.ChaincodeSupport = chaincodeSupport
pb.RegisterChaincodeSupportServer(grpcServer, chaincodeSupport)
......
......@@ -9,6 +9,7 @@ package chaincode
import (
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
......@@ -162,6 +163,8 @@ type Handler struct {
chatStream ccintf.ChaincodeStream
// errChan is used to communicate errors from the async send to the receive loop
errChan chan error
// Metrics holds chaincode metrics
Metrics *Metrics
}
// handleMessage is called by ProcessStream to dispatch messages.
......@@ -244,6 +247,7 @@ func (h *Handler) HandleTransaction(msg *pb.ChaincodeMessage, delegate handleFun
return
}
startTime := time.Now()
var txContext *TransactionContext
var err error
if msg.Type == pb.ChaincodeMessage_INVOKE_CHAINCODE {
......@@ -252,6 +256,13 @@ func (h *Handler) HandleTransaction(msg *pb.ChaincodeMessage, delegate handleFun
txContext, err = h.isValidTxSim(msg.ChannelId, msg.Txid, "no ledger context")
}
chaincodeName := h.chaincodeID.Name + ":" + h.chaincodeID.Version
h.Metrics.ShimRequestsReceived.With(
"type", msg.Type.String(),
"channel", msg.ChannelId,
"chaincode", chaincodeName,
).Add(1)
var resp *pb.ChaincodeMessage
if err == nil {
resp, err = delegate(msg, txContext)
......@@ -266,6 +277,19 @@ func (h *Handler) HandleTransaction(msg *pb.ChaincodeMessage, delegate handleFun
chaincodeLogger.Debugf("[%s] Completed %s. Sending %s", shorttxid(msg.Txid), msg.Type, resp.Type)
h.ActiveTransactions.Remove(msg.ChannelId, msg.Txid)
h.serialSendAsync(resp)
duration := time.Since(startTime)
h.Metrics.ShimRequestDuration.With(
"type", msg.Type.String(),
"channel", msg.ChannelId,
"chaincode", chaincodeName,
"success", strconv.FormatBool(resp.Type != pb.ChaincodeMessage_ERROR),
).Observe(float64(duration) / float64(time.Second))
h.Metrics.ShimRequestsCompleted.With(
"type", msg.Type.String(),
"channel", msg.ChannelId,
"chaincode", chaincodeName,
"success", strconv.FormatBool(resp.Type != pb.ChaincodeMessage_ERROR),
).Add(1)
}
func shorttxid(txid string) string {
......
......@@ -11,6 +11,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
"github.com/hyperledger/fabric/common/mocks/config"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/aclmgmt/resources"
......@@ -42,6 +43,9 @@ var _ = Describe("Handler", func() {
fakeLedgerGetter *mock.LedgerGetter
fakeHandlerRegistry *fake.Registry
fakeApplicationConfigRetriever *fake.ApplicationConfigRetriever
fakeShimRequestsReceived *metricsfakes.Counter
fakeShimRequestsCompleted *metricsfakes.Counter
fakeShimRequestDuration *metricsfakes.Histogram
responseNotifier chan *pb.ChaincodeMessage
txContext *chaincode.TransactionContext
......@@ -84,6 +88,19 @@ var _ = Describe("Handler", func() {
}
fakeApplicationConfigRetriever.GetApplicationConfigReturns(applicationCapability, true)
fakeShimRequestsReceived = &metricsfakes.Counter{}
fakeShimRequestsReceived.WithReturns(fakeShimRequestsReceived)
fakeShimRequestsCompleted = &metricsfakes.Counter{}
fakeShimRequestsCompleted.WithReturns(fakeShimRequestsCompleted)
fakeShimRequestDuration = &metricsfakes.Histogram{}
fakeShimRequestDuration.WithReturns(fakeShimRequestDuration)
chaincodeMetrics := &chaincode.Metrics{
ShimRequestsReceived: fakeShimRequestsReceived,
ShimRequestsCompleted: fakeShimRequestsCompleted,
ShimRequestDuration: fakeShimRequestDuration,
}
handler = &chaincode.Handler{
ACLProvider: fakeACLProvider,
ActiveTransactions: fakeTransactionRegistry,
......@@ -100,9 +117,10 @@ var _ = Describe("Handler", func() {
return "generated-query-id"
}),
AppConfig: fakeApplicationConfigRetriever,
Metrics: chaincodeMetrics,
}
chaincode.SetHandlerChatStream(handler, fakeChatStream)
chaincode.SetHandlerChaincodeID(handler, &pb.ChaincodeID{Name: "test-handler-name"})
chaincode.SetHandlerChaincodeID(handler, &pb.ChaincodeID{Name: "test-handler-name", Version: "1.0"})
chaincode.SetHandlerCCInstance(handler, &sysccprovider.ChaincodeInstance{ChaincodeName: "cc-instance-name"})
})
......@@ -178,7 +196,99 @@ var _ = Describe("Handler", func() {
Expect(transactionID).To(Equal("tx-id"))
})
Context("wwhen the transaction ID has already been regustered", func() {
It("records shim requests received before requests completed", func() {
fakeShimRequestsReceived.AddStub = func(delta float64) {
defer GinkgoRecover()
Expect(fakeShimRequestsCompleted.AddCallCount()).To(Equal(0))
}
handler.HandleTransaction(incomingMessage, fakeMessageHandler.Handle)
Eventually(fakeChatStream.SendCallCount).Should(Equal(1))
msg := fakeChatStream.SendArgsForCall(0)
Expect(msg).To(Equal(expectedResponse))
Expect(fakeShimRequestsReceived.WithCallCount()).To(Equal(1))
labelValues := fakeShimRequestsReceived.WithArgsForCall(0)
Expect(labelValues).To(Equal([]string{
"type", "GET_STATE",
"channel", "channel-id",
"chaincode", "test-handler-name:1.0",
}))
Expect(fakeShimRequestsReceived.AddCallCount()).To(Equal(1))
Expect(fakeShimRequestsReceived.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
})
It("records transactions completed after transactions received", func() {
fakeShimRequestsCompleted.AddStub = func(delta float64) {
defer GinkgoRecover()
Expect(fakeShimRequestsReceived.AddCallCount()).To(Equal(1))
}
handler.HandleTransaction(incomingMessage, fakeMessageHandler.Handle)
Eventually(fakeChatStream.SendCallCount).Should(Equal(1))
Expect(fakeShimRequestsCompleted.WithCallCount()).To(Equal(1))
labelValues := fakeShimRequestsCompleted.WithArgsForCall(0)
Expect(labelValues).To(Equal([]string{
"type", "GET_STATE",
"channel", "channel-id",
"chaincode", "test-handler-name:1.0",
"success", "true",
}))
Expect(fakeShimRequestsCompleted.AddCallCount()).To(Equal(1))
Expect(fakeShimRequestsCompleted.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
})
It("records transactions duration", func() {
handler.HandleTransaction(incomingMessage, fakeMessageHandler.Handle)
Eventually(fakeChatStream.SendCallCount).Should(Equal(1))
Expect(fakeShimRequestDuration.WithCallCount()).To(Equal(1))
labelValues := fakeShimRequestDuration.WithArgsForCall(0)
Expect(labelValues).To(Equal([]string{
"type", "GET_STATE",
"channel", "channel-id",
"chaincode", "test-handler-name:1.0",
"success", "true",
}))
Expect(fakeShimRequestDuration.ObserveArgsForCall(0)).NotTo(BeZero())
Expect(fakeShimRequestDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1.0))
})
Context("when the transaction returns an error", func() {
BeforeEach(func() {
fakeMessageHandler.HandleReturns(nil, errors.New("I am a total failure"))
})
It("records metrics with success=false", func() {
handler.HandleTransaction(incomingMessage, fakeMessageHandler.Handle)
Eventually(fakeChatStream.SendCallCount).Should(Equal(1))
Expect(fakeShimRequestsCompleted.WithCallCount()).To(Equal(1))
labelValues := fakeShimRequestsCompleted.WithArgsForCall(0)
Expect(labelValues).To(Equal([]string{
"type", "GET_STATE",
"channel", "channel-id",
"chaincode", "test-handler-name:1.0",
"success", "false",
}))
Expect(fakeShimRequestsCompleted.AddCallCount()).To(Equal(1))
Expect(fakeShimRequestsCompleted.AddArgsForCall(0)).To(BeNumerically("~", 1.0))
Expect(fakeShimRequestDuration.WithCallCount()).To(Equal(1))
labelValues = fakeShimRequestDuration.WithArgsForCall(0)
Expect(labelValues).To(Equal([]string{
"type", "GET_STATE",
"channel", "channel-id",
"chaincode", "test-handler-name:1.0",
"success", "false",
}))
Expect(fakeShimRequestDuration.ObserveArgsForCall(0)).NotTo(BeZero())
Expect(fakeShimRequestDuration.ObserveArgsForCall(0)).To(BeNumerically("<", 1.0))
})
})
Context("when the transaction ID has already been registered", func() {
BeforeEach(func() {
fakeTransactionRegistry.AddReturns(false)
})
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package chaincode
import "github.com/hyperledger/fabric/common/metrics"
var (
shimRequestsReceived = metrics.CounterOpts{
Namespace: "chaincode",
Name: "shim_requests_received",
Help: "The number of chaincode shim requests received.",
LabelNames: []string{"type", "channel", "chaincode"},
StatsdFormat: "%{#fqname}.%{type}.%{channel}.%{chaincode}",
}
shimRequestsCompleted = metrics.CounterOpts{
Namespace: "chaincode",
Name: "shim_requests_completed",
Help: "The number of chaincode shim requests completed.",
LabelNames: []string{"type", "channel", "chaincode", "success"},
StatsdFormat: "%{#fqname}.%{type}.%{channel}.%{chaincode}.%{success}",
}
shimRequestDuration = metrics.HistogramOpts{
Namespace: "chaincode",
Name: "shim_request_duration",
Help: "The time to complete chaincode shim requests.",
LabelNames: []string{"type", "channel", "chaincode", "success"},
StatsdFormat: "%{#fqname}.%{type}.%{channel}.%{chaincode}.%{success}",
}
)
type Metrics struct {
ShimRequestsReceived metrics.Counter
ShimRequestsCompleted metrics.Counter
ShimRequestDuration metrics.Histogram
}
func NewMetrics(p metrics.Provider) *Metrics {
return &Metrics{
ShimRequestsReceived: p.NewCounter(shimRequestsReceived),
ShimRequestsCompleted: p.NewCounter(shimRequestsCompleted),
ShimRequestDuration: p.NewHistogram(shimRequestDuration),
}
}
......@@ -19,6 +19,7 @@ import (
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/genesis"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/mocks/scc"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/tools/configtxgen/configtxgentest"
......@@ -234,6 +235,7 @@ func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
mp,
platforms.NewRegistry(&golang.Platform{}),
peer.DefaultSupport,
&disabled.Provider{},
)
// Init the policy checker
......
......@@ -251,7 +251,7 @@ func serve(args []string) error {
pb.RegisterDeliverServer(peerServer.Server(), abServer)
// Initialize chaincode service
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr)
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, metricsProvider)
logger.Debugf("Running peer")
......@@ -606,7 +606,7 @@ func computeChaincodeEndpoint(peerHostname string) (ccEndpoint string, err error
//NOTE - when we implement JOIN we will no longer pass the chainID as param
//The chaincode support will come up without registering system chaincodes
//which will be registered only during join phase.
func registerChaincodeSupport(grpcServer *comm.GRPCServer, ccEndpoint string, ca tlsgen.CA, packageProvider *persistence.PackageProvider, aclProvider aclmgmt.ACLProvider, pr *platforms.Registry, lifecycleSCC *lifecycle.SCC) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
func registerChaincodeSupport(grpcServer *comm.GRPCServer, ccEndpoint string, ca tlsgen.CA, packageProvider *persistence.PackageProvider, aclProvider aclmgmt.ACLProvider, pr *platforms.Registry, lifecycleSCC *lifecycle.SCC, metricsProvider metrics.Provider) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider) {
//get user mode
userRunsCC := chaincode.IsDevMode()
tlsEnabled := viper.GetBool("peer.tls.enabled")
......@@ -636,6 +636,7 @@ func registerChaincodeSupport(grpcServer *comm.GRPCServer, ccEndpoint string, ca
sccp,
pr,
peer.DefaultSupport,
metricsProvider,
)
ipRegistry.ChaincodeSupport = chaincodeSupport
ccp := chaincode.NewProvider(chaincodeSupport)
......@@ -662,7 +663,7 @@ func registerChaincodeSupport(grpcServer *comm.GRPCServer, ccEndpoint string, ca
// 1) setup local chaincode install path
// 2) create chaincode specific tls CA
// 3) start the chaincode specific gRPC listening service
func startChaincodeServer(peerHost string, aclProvider aclmgmt.ACLProvider, pr *platforms.Registry) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider, *persistence.PackageProvider) {
func startChaincodeServer(peerHost string, aclProvider aclmgmt.ACLProvider, pr *platforms.Registry, metricsProvider metrics.Provider) (*chaincode.ChaincodeSupport, ccprovider.ChaincodeProvider, *scc.Provider, *persistence.PackageProvider) {
// Setup chaincode path
chaincodeInstallPath := ccprovider.GetChaincodeInstallPathFromViper()
ccprovider.SetChaincodesPath(chaincodeInstallPath)
......@@ -703,6 +704,7 @@ func startChaincodeServer(peerHost string, aclProvider aclmgmt.ACLProvider, pr *
aclProvider,
pr,
lifecycleSCC,
metricsProvider,
)
go ccSrv.Start()
return chaincodeSupport, ccp, sccp, packageProvider
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment