Commit 91e0b0b9 authored by Matthew Sykes's avatar Matthew Sykes
Browse files

[FAB-12843] wire logspec handler



This introduces the concept of an "operations" endpoint for the orderer
and peer. The endpoint will support health checks, logspec management,
and, when enabled, prometheus metrics.

The endpoint uses TLS mutual auth for authentication and authorization.
Fabric roles are not used so a separate client CA is recommended.

Change-Id: Ifeb0e63faf0e3868845efad981cd1bbd16941535
Signed-off-by: default avatarMatthew Sykes <sykesmat@us.ibm.com>
parent 7c6241cf
......@@ -81,6 +81,10 @@ func (dr *DatagramReader) Address() string {
return dr.sock.LocalAddr().String()
}
func (dr *DatagramReader) String() string {
return string(dr.buffer.Contents())
}
func (dr *DatagramReader) Start() {
buf := make([]byte, 1024*1024)
for {
......
......@@ -15,6 +15,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"syscall"
"time"
......@@ -111,14 +112,19 @@ var _ = Describe("EndToEnd", func() {
RunQueryInvokeQuery(network, orderer, peer, "testchannel")
RunRespondWith(network, orderer, peer, "testchannel")
CheckForStatsdMetrics(datagramReader)
By("waiting for DeliverFiltered stats to be emitted")
metricsWriteInterval := 5 * time.Second
Eventually(datagramReader, 2*metricsWriteInterval).Should(gbytes.Say("stream_request_duration.protos_Deliver.DeliverFiltered."))
CheckPeerStatsdStreamMetrics(datagramReader.String())
CheckPeerStatsdMetrics(datagramReader.String(), "org1_peer0")
CheckPeerStatsdMetrics(datagramReader.String(), "org2_peer1")
CheckOrdererStatsdMetrics(datagramReader.String(), "ordererorg_orderer")
})
})
Describe("basic kafka network with 2 orgs", func() {
var metricsClient *http.Client
var metricsURL string
BeforeEach(func() {
network = nwo.New(nwo.BasicKafka(), testDir, client, BasePort(), components)
network.MetricsProvider = "prometheus"
......@@ -128,27 +134,6 @@ var _ = Describe("EndToEnd", func() {
networkRunner := network.NetworkGroupRunner()
process = ifrit.Invoke(networkRunner)
Eventually(process.Ready(), network.EventuallyTimeout).Should(BeClosed())
org1Peer0 := network.Peer("Org1", "peer0")
clientCert, err := tls.LoadX509KeyPair(
filepath.Join(network.PeerLocalTLSDir(org1Peer0), "server.crt"),
filepath.Join(network.PeerLocalTLSDir(org1Peer0), "server.key"),
)
Expect(err).NotTo(HaveOccurred())
clientCertPool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(filepath.Join(network.PeerLocalTLSDir(org1Peer0), "ca.crt"))
Expect(err).NotTo(HaveOccurred())
clientCertPool.AppendCertsFromPEM(caCert)
metricsURL = fmt.Sprintf("https://127.0.0.1:%d/metrics", network.PeerPort(org1Peer0, nwo.MetricsPort))
metricsClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: clientCertPool,
},
},
}
})
It("executes a basic kafka network with 2 orgs", func() {
......@@ -158,7 +143,9 @@ var _ = Describe("EndToEnd", func() {
network.CreateAndJoinChannel(orderer, "testchannel")
nwo.DeployChaincode(network, "testchannel", orderer, chaincode)
RunQueryInvokeQuery(network, orderer, peer, "testchannel")
CheckForPometheusMetrics(metricsClient, metricsURL)
CheckPeerOperationEndpoints(network, network.Peer("Org2", "peer1"))
CheckOrdererOperationEndpoints(network, orderer)
})
})
......@@ -405,37 +392,114 @@ func RunRespondWith(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channe
Expect(sess.Err).To(gbytes.Say(`Error: endorsement failure during invoke.`))
}
func CheckForStatsdMetrics(datagramReader *DatagramReader) {
By("waiting for DeliverFiltered stats to be emitted")
Eventually(datagramReader, 10*time.Second /* 2 * metrics.statsd.writeInterval */).Should(gbytes.Say("stream_request_duration.protos_Deliver.DeliverFiltered."))
buf := string(datagramReader.Buffer().Contents())
By("checking for statsd metrics")
Expect(buf).To(ContainSubstring("org1_peer0.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring("org1_peer1.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring("org2_peer0.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring("org2_peer1.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring(".grpc.server.unary_requests_received.protos_Endorser.ProcessProposal:"))
Expect(buf).To(ContainSubstring(".grpc.server.unary_requests_completed.protos_Endorser.ProcessProposal.OK:"))
Expect(buf).To(ContainSubstring(".grpc.server.unary_request_duration.protos_Endorser.ProcessProposal.OK:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_requests_received.protos_Deliver.DeliverFiltered:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_requests_completed.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_request_duration.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_messages_received.protos_Deliver.DeliverFiltered"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_messages_sent.protos_Deliver.DeliverFiltered"))
func CheckPeerStatsdMetrics(contents, prefix string) {
By("checking for peer statsd metrics")
Expect(contents).To(ContainSubstring(prefix + ".go.mem.gc_completed_count:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_requests_received.protos_Endorser.ProcessProposal:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_requests_completed.protos_Endorser.ProcessProposal.OK:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_request_duration.protos_Endorser.ProcessProposal.OK:"))
}
func CheckForPometheusMetrics(client *http.Client, url string) {
func CheckPeerStatsdStreamMetrics(contents string) {
By("checking for stream metrics")
Expect(contents).To(ContainSubstring(".grpc.server.stream_requests_received.protos_Deliver.DeliverFiltered:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_requests_completed.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_request_duration.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_messages_received.protos_Deliver.DeliverFiltered"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_messages_sent.protos_Deliver.DeliverFiltered"))
}
func CheckOrdererStatsdMetrics(contents, prefix string) {
By("checking for AtomicBroadcast")
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Broadcast.OK"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Deliver."))
By("checking for orderer metrics")
Expect(contents).To(ContainSubstring(prefix + ".go.mem.gc_completed_count:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_requests_received.orderer_AtomicBroadcast.Deliver:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_requests_completed.orderer_AtomicBroadcast.Deliver."))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_messages_received.orderer_AtomicBroadcast.Deliver"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_messages_sent.orderer_AtomicBroadcast.Deliver"))
}
func OrdererOperationalClients(network *nwo.Network, orderer *nwo.Orderer) (authClient, unauthClient *http.Client) {
return operationalClients(network.OrdererLocalTLSDir(orderer))
}
func PeerOperationalClients(network *nwo.Network, peer *nwo.Peer) (authClient, unauthClient *http.Client) {
return operationalClients(network.PeerLocalTLSDir(peer))
}
func operationalClients(tlsDir string) (authClient, unauthClient *http.Client) {
clientCert, err := tls.LoadX509KeyPair(
filepath.Join(tlsDir, "server.crt"),
filepath.Join(tlsDir, "server.key"),
)
Expect(err).NotTo(HaveOccurred())
clientCertPool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(filepath.Join(tlsDir, "ca.crt"))
Expect(err).NotTo(HaveOccurred())
clientCertPool.AppendCertsFromPEM(caCert)
authenticatedClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: clientCertPool,
},
},
}
unauthenticatedClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: clientCertPool},
},
}
return authenticatedClient, unauthenticatedClient
}
func CheckPeerOperationEndpoints(network *nwo.Network, peer *nwo.Peer) {
metricsURL := fmt.Sprintf("https://127.0.0.1:%d/metrics", network.PeerPort(peer, nwo.OperationsPort))
logspecURL := fmt.Sprintf("https://127.0.0.1:%d/logspec", network.PeerPort(peer, nwo.OperationsPort))
authClient, unauthClient := PeerOperationalClients(network, peer)
CheckPeerPometheusMetrics(authClient, metricsURL)
CheckLogspecOperations(authClient, logspecURL)
By("getting interacting without a cert")
_, err := unauthClient.Get(logspecURL)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("remote error: tls: bad certificate"))
}
func CheckOrdererOperationEndpoints(network *nwo.Network, orderer *nwo.Orderer) {
metricsURL := fmt.Sprintf("https://127.0.0.1:%d/metrics", network.OrdererPort(orderer, nwo.OperationsPort))
logspecURL := fmt.Sprintf("https://127.0.0.1:%d/logspec", network.OrdererPort(orderer, nwo.OperationsPort))
authClient, unauthClient := OrdererOperationalClients(network, orderer)
CheckOrdererPometheusMetrics(authClient, metricsURL)
CheckLogspecOperations(authClient, logspecURL)
By("getting interacting without a cert")
_, err := unauthClient.Get(logspecURL)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("remote error: tls: bad certificate"))
}
func CheckPeerPometheusMetrics(client *http.Client, url string) {
By("hitting the prometheus metrics endpoint")
resp, err := client.Get(url)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err := ioutil.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
resp.Body.Close()
body := string(bodyBytes)
Eventually(getBody(client, url)).Should(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
By("checking for some expected metrics")
body := getBody(client, url)()
Expect(body).To(ContainSubstring(`# TYPE go_gc_duration_seconds summary`))
Expect(body).To(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
Expect(body).To(ContainSubstring(`grpc_server_stream_request_duration_count{code="Unknown",method="DeliverFiltered",service="protos_Deliver"}`))
......@@ -444,3 +508,61 @@ func CheckForPometheusMetrics(client *http.Client, url string) {
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_closed counter`))
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_opened counter`))
}
func CheckOrdererPometheusMetrics(client *http.Client, url string) {
By("hitting the prometheus metrics endpoint")
resp, err := client.Get(url)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
resp.Body.Close()
Eventually(getBody(client, url)).Should(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
By("checking for some expected metrics")
body := getBody(client, url)()
Expect(body).To(ContainSubstring(`# TYPE go_gc_duration_seconds summary`))
Expect(body).To(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
Expect(body).To(ContainSubstring(`grpc_server_stream_request_duration_sum{code="OK",method="Deliver",service="orderer_AtomicBroadcast"`))
Expect(body).To(ContainSubstring(`grpc_server_stream_request_duration_sum{code="OK",method="Broadcast",service="orderer_AtomicBroadcast"`))
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_closed counter`))
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_opened counter`))
}
func CheckLogspecOperations(client *http.Client, logspecURL string) {
By("getting the logspec")
resp, err := client.Get(logspecURL)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
Expect(err).NotTo(HaveOccurred())
Expect(string(bodyBytes)).To(MatchJSON(`{"spec":"info"}`))
updateReq, err := http.NewRequest(http.MethodPut, logspecURL, strings.NewReader(`{"spec":"debug"}`))
Expect(err).NotTo(HaveOccurred())
By("setting the logspec")
resp, err = client.Do(updateReq)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNoContent))
resp, err = client.Get(logspecURL)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
Expect(err).NotTo(HaveOccurred())
Expect(string(bodyBytes)).To(MatchJSON(`{"spec":"debug"}`))
}
func getBody(client *http.Client, url string) func() string {
return func() string {
resp, err := client.Get(url)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err := ioutil.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
resp.Body.Close()
return string(bodyBytes)
}
}
......@@ -184,24 +184,25 @@ ledger:
history:
enableHistoryDatabase: true
metrics:
provider: {{ .MetricsProvider }}
statsd:
network: udp
address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
writeInterval: 5s
prefix: {{ ReplaceAll (ToLower Peer.ID) "." "_" }}
prometheus:
listenAddress: 127.0.0.1:{{ .PeerPort Peer "Metrics" }}
handlerPath: /metrics
tls:
enabled: true
cert:
file: {{ .PeerLocalTLSDir Peer }}/server.crt
key:
file: {{ .PeerLocalTLSDir Peer }}/server.key
clientAuthRequired: true
clientRootCAs:
files:
- {{ .PeerLocalTLSDir Peer }}/ca.crt
operations:
listenAddress: 127.0.0.1:{{ .PeerPort Peer "Operations" }}
tls:
enabled: true
cert:
file: {{ .PeerLocalTLSDir Peer }}/server.crt
key:
file: {{ .PeerLocalTLSDir Peer }}/server.key
clientAuthRequired: true
clientRootCAs:
files:
- {{ .PeerLocalTLSDir Peer }}/ca.crt
metrics:
provider: {{ .MetricsProvider }}
statsd:
network: udp
address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
writeInterval: 5s
prefix: {{ ReplaceAll (ToLower Peer.ID) "." "_" }}
prometheus:
handlerPath: /metrics
`
......@@ -13,12 +13,12 @@ import (
)
type Core struct {
Logging *Logging `yaml:"logging,omitempty"`
Peer *Peer `yaml:"peer,omitempty"`
VM *VM `yaml:"vm,omitempty"`
Chaincode *Chaincode `yaml:"chaincode,omitempty"`
Ledger *Ledger `yaml:"ledger,omitempty"`
Metrics *Metrics `yaml:"metrics,omitempty"`
Logging *Logging `yaml:"logging,omitempty"`
Peer *Peer `yaml:"peer,omitempty"`
VM *VM `yaml:"vm,omitempty"`
Chaincode *Chaincode `yaml:"chaincode,omitempty"`
Ledger *Ledger `yaml:"ledger,omitempty"`
Operations *Operations `yaml:"operations,omitempty"`
}
type Logging struct {
......@@ -264,6 +264,11 @@ type HistoryConfig struct {
EnableHistoryDatabase bool `yaml:"enableHistoryDatabase"`
}
type Operations struct {
ListenAddress string `yaml:"listenAddress,omitempty"`
TLS *TLS `yaml:"tls"`
}
type Metrics struct {
Provider string `yaml:"provider"`
Statsd *Statsd `yaml:"statsd,omitempty"`
......@@ -278,7 +283,5 @@ type Statsd struct {
}
type Prometheus struct {
ListenAddress string `yaml:"listenAddress,omitempty"`
HandlerPath string `yaml:"handlerPath"`
TLS *TLS `yaml:"tls"`
HandlerPath string `yaml:"handlerPath"`
}
......@@ -9,10 +9,11 @@ package fabricconfig
import "time"
type Orderer struct {
General *General `yaml:"General,omitempty"`
FileLedger *FileLedger `yaml:"FileLedger,omitempty"`
RAMLedger *RAMLedger `yaml:"RAMLedger,omitempty"`
Kafka *Kafka `yaml:"Kafka,omitempty"`
General *General `yaml:"General,omitempty"`
FileLedger *FileLedger `yaml:"FileLedger,omitempty"`
RAMLedger *RAMLedger `yaml:"RAMLedger,omitempty"`
Kafka *Kafka `yaml:"Kafka,omitempty"`
Operations *OrdererOperations `yaml:"Operations,omitempty"`
ExtraProperties map[string]interface{} `yaml:",inline,omitempty"`
}
......@@ -107,3 +108,26 @@ type Backoff struct {
RetryBackoff time.Duration `yaml:"RetryBackoff,omitempty"`
RetryMax int `yaml:"RetryMax,omitempty"`
}
type OrdererOperations struct {
ListenAddress string `yaml:"ListenAddress,omitempty"`
Metrics *OrdererMetrics `yaml:"Metrics,omitempty"`
TLS *OrdererTLS `yaml:"TLS"`
}
type OrdererMetrics struct {
Provider string `yaml:"Provider"`
Statsd *OrdererStatsd `yaml:"Statsd,omitempty"`
Prometheus *OrdererPrometheus `yaml:"Prometheus,omitempty"`
}
type OrdererStatsd struct {
Network string `yaml:"Network,omitempty"`
Address string `yaml:"Address,omitempty"`
WriteInterval time.Duration `yaml:"WriteInterval,omitempty"`
Prefix string `yaml:"Prefix,omitempty"`
}
type OrdererPrometheus struct {
HandlerPath string `yaml:"handlerPath"`
}
......@@ -1160,23 +1160,23 @@ type PortName string
type Ports map[PortName]uint16
const (
ChaincodePort PortName = "Chaincode"
EventsPort PortName = "Events"
HostPort PortName = "HostPort"
ListenPort PortName = "Listen"
ProfilePort PortName = "Profile"
MetricsPort PortName = "Metrics"
ChaincodePort PortName = "Chaincode"
EventsPort PortName = "Events"
HostPort PortName = "HostPort"
ListenPort PortName = "Listen"
ProfilePort PortName = "Profile"
OperationsPort PortName = "Operations"
)
// PeerPortNames returns the list of ports that need to be reserved for a Peer.
func PeerPortNames() []PortName {
return []PortName{ListenPort, ChaincodePort, EventsPort, ProfilePort, MetricsPort}
return []PortName{ListenPort, ChaincodePort, EventsPort, ProfilePort, OperationsPort}
}
// OrdererPortNames returns the list of ports that need to be reserved for an
// Orderer.
func OrdererPortNames() []PortName {
return []PortName{ListenPort, ProfilePort}
return []PortName{ListenPort, ProfilePort, OperationsPort}
}
// BrokerPortNames returns the list of ports that need to be reserved for a
......
......@@ -87,9 +87,29 @@ Kafka:
Password:
Version:{{ end }}
Debug:
BroadcastTraceDir:
DeliverTraceDir:
BroadcastTraceDir:
DeliverTraceDir:
Consensus:
WALDir: {{ .OrdererDir Orderer }}/etcdraft/wal
Operations:
ListenAddress: 127.0.0.1:{{ .OrdererPort Orderer "Operations" }}
TLS:
Enabled: true
PrivateKey: {{ $w.OrdererLocalTLSDir Orderer }}/server.key
Certificate: {{ $w.OrdererLocalTLSDir Orderer }}/server.crt
RootCAs:
- {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt
ClientAuthRequired: true
ClientRootCAs:
- {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt
Metrics:
Provider: {{ .MetricsProvider }}
Statsd:
Network: udp
Address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
WriteInterval: 5s
Prefix: {{ ReplaceAll (ToLower Orderer.ID) "." "_" }}
Prometheus:
HandlerPath: /metrics
{{- end }}
`
......@@ -34,6 +34,7 @@ type TopLevel struct {
Kafka Kafka
Debug Debug
Consensus interface{}
Operations Operations
}
// General contains config which should be common among all orderer types.
......@@ -177,6 +178,33 @@ type Debug struct {
DeliverTraceDir string
}
// Operations configures the operations endpont for the orderer.
type Operations struct {
ListenAddress string
Metrics Metrics
TLS TLS
}
// Operations confiures the metrics provider for the orderer.
type Metrics struct {
Provider string
Statsd Statsd
Prometheus Prometheus
}
// Statsd provides the configuration required to emit statsd metrics from the orderer.
type Statsd struct {
Network string
Address string
WriteInterval time.Duration
Prefix string
}
// Prometheus provides the configuration required to host prometheus.
type Prometheus struct {
HandlerPath string
}
// Defaults carries the default orderer configuration values.
var Defaults = TopLevel{
General: General{
......@@ -241,6 +269,12 @@ var Defaults = TopLevel{
BroadcastTraceDir: "",
DeliverTraceDir: "",
},
Operations: Operations{
ListenAddress: "127.0.0.1:0",
Metrics: Metrics{
Provider: "disabled",
},
},
}
// Load parses the orderer YAML file and environment, producing
......
......@@ -8,6 +8,9 @@ package server
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
......@@ -16,16 +19,26 @@ import (
"os"
"time"
kitstatsd "github.com/go-kit/kit/metrics/statsd"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/flogging/httpadmin"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/metrics/statsd"
"github.com/hyperledger/fabric/common/metrics/statsd/goruntime"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/middleware"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/orderer/common/bootstrap/file"
......@@ -41,6 +54,8 @@ import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)
......@@ -84,7 +99,14 @@ func Start(cmd string, conf *localconfig.TopLevel) {
genesisBlock := extractGenesisBlock(conf)
clusterType := isClusterType(genesisBlock)
signer := localmsp.NewSigner()
serverConfig := initializeServerConfig(conf)
metricsProvider, metricsShutdown, err := initializeOperations(conf.Operations)
if err != nil {
panic(err)
}
defer metricsShutdown()
serverConfig := initializeServerConfig(conf, metricsProvider)
grpcServer := initializeGrpcServer(conf, serverConfig)
caSupport := &comm.CASupport{
AppRootCAsByChain: make(map[string][][]byte),
......@@ -193,7 +215,7 @@ func initializeClusterConfig(conf *localconfig.TopLevel) comm.ClientConfig {
return cc
}
func initializeServerConfig(conf *localconfig.TopLevel) comm.ServerConfig {
func initializeServerConfig(conf *localconfig.TopLevel, metricsProvider metrics.Provider) comm.ServerConfig {
// secure server config
secureOpts := &comm.SecureOptions{
UseTLS: conf.General.TLS.Enabled,
......@@ -247,7 +269,25 @@ func initializeServerConfig(conf *localconfig.TopLevel) comm.ServerConfig {
kaOpts.ServerInterval = conf.General.Keepalive.ServerInterval
kaOpts.ServerTimeout = conf.General.Keepalive.ServerTimeout
return comm.ServerConfig{SecOpts: secureOpts, KaOpts: kaOpts}
commLogger := flogging.MustGetLogger("core.comm").With("server", "Orderer")
if metricsProvider == nil {
metricsProvider = &disabled.Provider{}