Commit e061a2d2 authored by Jonathan Levi (HACERA)'s avatar Jonathan Levi (HACERA) Committed by Gerrit Code Review
Browse files

Merge "[FAB-12843] wire logspec handler"

parents d5eb2ab9 91e0b0b9
......@@ -81,6 +81,10 @@ func (dr *DatagramReader) Address() string {
return dr.sock.LocalAddr().String()
}
func (dr *DatagramReader) String() string {
return string(dr.buffer.Contents())
}
func (dr *DatagramReader) Start() {
buf := make([]byte, 1024*1024)
for {
......
......@@ -15,6 +15,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"syscall"
"time"
......@@ -111,14 +112,19 @@ var _ = Describe("EndToEnd", func() {
RunQueryInvokeQuery(network, orderer, peer, "testchannel")
RunRespondWith(network, orderer, peer, "testchannel")
CheckForStatsdMetrics(datagramReader)
By("waiting for DeliverFiltered stats to be emitted")
metricsWriteInterval := 5 * time.Second
Eventually(datagramReader, 2*metricsWriteInterval).Should(gbytes.Say("stream_request_duration.protos_Deliver.DeliverFiltered."))
CheckPeerStatsdStreamMetrics(datagramReader.String())
CheckPeerStatsdMetrics(datagramReader.String(), "org1_peer0")
CheckPeerStatsdMetrics(datagramReader.String(), "org2_peer1")
CheckOrdererStatsdMetrics(datagramReader.String(), "ordererorg_orderer")
})
})
Describe("basic kafka network with 2 orgs", func() {
var metricsClient *http.Client
var metricsURL string
BeforeEach(func() {
network = nwo.New(nwo.BasicKafka(), testDir, client, BasePort(), components)
network.MetricsProvider = "prometheus"
......@@ -128,27 +134,6 @@ var _ = Describe("EndToEnd", func() {
networkRunner := network.NetworkGroupRunner()
process = ifrit.Invoke(networkRunner)
Eventually(process.Ready(), network.EventuallyTimeout).Should(BeClosed())
org1Peer0 := network.Peer("Org1", "peer0")
clientCert, err := tls.LoadX509KeyPair(
filepath.Join(network.PeerLocalTLSDir(org1Peer0), "server.crt"),
filepath.Join(network.PeerLocalTLSDir(org1Peer0), "server.key"),
)
Expect(err).NotTo(HaveOccurred())
clientCertPool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(filepath.Join(network.PeerLocalTLSDir(org1Peer0), "ca.crt"))
Expect(err).NotTo(HaveOccurred())
clientCertPool.AppendCertsFromPEM(caCert)
metricsURL = fmt.Sprintf("https://127.0.0.1:%d/metrics", network.PeerPort(org1Peer0, nwo.MetricsPort))
metricsClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: clientCertPool,
},
},
}
})
It("executes a basic kafka network with 2 orgs", func() {
......@@ -158,7 +143,9 @@ var _ = Describe("EndToEnd", func() {
network.CreateAndJoinChannel(orderer, "testchannel")
nwo.DeployChaincode(network, "testchannel", orderer, chaincode)
RunQueryInvokeQuery(network, orderer, peer, "testchannel")
CheckForPometheusMetrics(metricsClient, metricsURL)
CheckPeerOperationEndpoints(network, network.Peer("Org2", "peer1"))
CheckOrdererOperationEndpoints(network, orderer)
})
})
......@@ -405,37 +392,114 @@ func RunRespondWith(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channe
Expect(sess.Err).To(gbytes.Say(`Error: endorsement failure during invoke.`))
}
func CheckForStatsdMetrics(datagramReader *DatagramReader) {
By("waiting for DeliverFiltered stats to be emitted")
Eventually(datagramReader, 10*time.Second /* 2 * metrics.statsd.writeInterval */).Should(gbytes.Say("stream_request_duration.protos_Deliver.DeliverFiltered."))
buf := string(datagramReader.Buffer().Contents())
By("checking for statsd metrics")
Expect(buf).To(ContainSubstring("org1_peer0.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring("org1_peer1.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring("org2_peer0.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring("org2_peer1.go.mem.gc_completed_count:"))
Expect(buf).To(ContainSubstring(".grpc.server.unary_requests_received.protos_Endorser.ProcessProposal:"))
Expect(buf).To(ContainSubstring(".grpc.server.unary_requests_completed.protos_Endorser.ProcessProposal.OK:"))
Expect(buf).To(ContainSubstring(".grpc.server.unary_request_duration.protos_Endorser.ProcessProposal.OK:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_requests_received.protos_Deliver.DeliverFiltered:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_requests_completed.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_request_duration.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_messages_received.protos_Deliver.DeliverFiltered"))
Expect(buf).To(ContainSubstring(".grpc.server.stream_messages_sent.protos_Deliver.DeliverFiltered"))
func CheckPeerStatsdMetrics(contents, prefix string) {
By("checking for peer statsd metrics")
Expect(contents).To(ContainSubstring(prefix + ".go.mem.gc_completed_count:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_requests_received.protos_Endorser.ProcessProposal:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_requests_completed.protos_Endorser.ProcessProposal.OK:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.unary_request_duration.protos_Endorser.ProcessProposal.OK:"))
}
func CheckForPometheusMetrics(client *http.Client, url string) {
func CheckPeerStatsdStreamMetrics(contents string) {
By("checking for stream metrics")
Expect(contents).To(ContainSubstring(".grpc.server.stream_requests_received.protos_Deliver.DeliverFiltered:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_requests_completed.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_request_duration.protos_Deliver.DeliverFiltered.Unknown:"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_messages_received.protos_Deliver.DeliverFiltered"))
Expect(contents).To(ContainSubstring(".grpc.server.stream_messages_sent.protos_Deliver.DeliverFiltered"))
}
func CheckOrdererStatsdMetrics(contents, prefix string) {
By("checking for AtomicBroadcast")
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Broadcast.OK"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_request_duration.orderer_AtomicBroadcast.Deliver."))
By("checking for orderer metrics")
Expect(contents).To(ContainSubstring(prefix + ".go.mem.gc_completed_count:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_requests_received.orderer_AtomicBroadcast.Deliver:"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_requests_completed.orderer_AtomicBroadcast.Deliver."))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_messages_received.orderer_AtomicBroadcast.Deliver"))
Expect(contents).To(ContainSubstring(prefix + ".grpc.server.stream_messages_sent.orderer_AtomicBroadcast.Deliver"))
}
func OrdererOperationalClients(network *nwo.Network, orderer *nwo.Orderer) (authClient, unauthClient *http.Client) {
return operationalClients(network.OrdererLocalTLSDir(orderer))
}
func PeerOperationalClients(network *nwo.Network, peer *nwo.Peer) (authClient, unauthClient *http.Client) {
return operationalClients(network.PeerLocalTLSDir(peer))
}
func operationalClients(tlsDir string) (authClient, unauthClient *http.Client) {
clientCert, err := tls.LoadX509KeyPair(
filepath.Join(tlsDir, "server.crt"),
filepath.Join(tlsDir, "server.key"),
)
Expect(err).NotTo(HaveOccurred())
clientCertPool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(filepath.Join(tlsDir, "ca.crt"))
Expect(err).NotTo(HaveOccurred())
clientCertPool.AppendCertsFromPEM(caCert)
authenticatedClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: clientCertPool,
},
},
}
unauthenticatedClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{RootCAs: clientCertPool},
},
}
return authenticatedClient, unauthenticatedClient
}
func CheckPeerOperationEndpoints(network *nwo.Network, peer *nwo.Peer) {
metricsURL := fmt.Sprintf("https://127.0.0.1:%d/metrics", network.PeerPort(peer, nwo.OperationsPort))
logspecURL := fmt.Sprintf("https://127.0.0.1:%d/logspec", network.PeerPort(peer, nwo.OperationsPort))
authClient, unauthClient := PeerOperationalClients(network, peer)
CheckPeerPometheusMetrics(authClient, metricsURL)
CheckLogspecOperations(authClient, logspecURL)
By("getting interacting without a cert")
_, err := unauthClient.Get(logspecURL)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("remote error: tls: bad certificate"))
}
func CheckOrdererOperationEndpoints(network *nwo.Network, orderer *nwo.Orderer) {
metricsURL := fmt.Sprintf("https://127.0.0.1:%d/metrics", network.OrdererPort(orderer, nwo.OperationsPort))
logspecURL := fmt.Sprintf("https://127.0.0.1:%d/logspec", network.OrdererPort(orderer, nwo.OperationsPort))
authClient, unauthClient := OrdererOperationalClients(network, orderer)
CheckOrdererPometheusMetrics(authClient, metricsURL)
CheckLogspecOperations(authClient, logspecURL)
By("getting interacting without a cert")
_, err := unauthClient.Get(logspecURL)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("remote error: tls: bad certificate"))
}
func CheckPeerPometheusMetrics(client *http.Client, url string) {
By("hitting the prometheus metrics endpoint")
resp, err := client.Get(url)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err := ioutil.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
resp.Body.Close()
body := string(bodyBytes)
Eventually(getBody(client, url)).Should(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
By("checking for some expected metrics")
body := getBody(client, url)()
Expect(body).To(ContainSubstring(`# TYPE go_gc_duration_seconds summary`))
Expect(body).To(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
Expect(body).To(ContainSubstring(`grpc_server_stream_request_duration_count{code="Unknown",method="DeliverFiltered",service="protos_Deliver"}`))
......@@ -444,3 +508,61 @@ func CheckForPometheusMetrics(client *http.Client, url string) {
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_closed counter`))
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_opened counter`))
}
func CheckOrdererPometheusMetrics(client *http.Client, url string) {
By("hitting the prometheus metrics endpoint")
resp, err := client.Get(url)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
resp.Body.Close()
Eventually(getBody(client, url)).Should(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
By("checking for some expected metrics")
body := getBody(client, url)()
Expect(body).To(ContainSubstring(`# TYPE go_gc_duration_seconds summary`))
Expect(body).To(ContainSubstring(`# TYPE grpc_server_stream_request_duration histogram`))
Expect(body).To(ContainSubstring(`grpc_server_stream_request_duration_sum{code="OK",method="Deliver",service="orderer_AtomicBroadcast"`))
Expect(body).To(ContainSubstring(`grpc_server_stream_request_duration_sum{code="OK",method="Broadcast",service="orderer_AtomicBroadcast"`))
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_closed counter`))
Expect(body).To(ContainSubstring(`# TYPE grpc_comm_conn_opened counter`))
}
func CheckLogspecOperations(client *http.Client, logspecURL string) {
By("getting the logspec")
resp, err := client.Get(logspecURL)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err := ioutil.ReadAll(resp.Body)
resp.Body.Close()
Expect(err).NotTo(HaveOccurred())
Expect(string(bodyBytes)).To(MatchJSON(`{"spec":"info"}`))
updateReq, err := http.NewRequest(http.MethodPut, logspecURL, strings.NewReader(`{"spec":"debug"}`))
Expect(err).NotTo(HaveOccurred())
By("setting the logspec")
resp, err = client.Do(updateReq)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusNoContent))
resp, err = client.Get(logspecURL)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()
Expect(err).NotTo(HaveOccurred())
Expect(string(bodyBytes)).To(MatchJSON(`{"spec":"debug"}`))
}
func getBody(client *http.Client, url string) func() string {
return func() string {
resp, err := client.Get(url)
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
bodyBytes, err := ioutil.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
resp.Body.Close()
return string(bodyBytes)
}
}
......@@ -184,24 +184,25 @@ ledger:
history:
enableHistoryDatabase: true
metrics:
provider: {{ .MetricsProvider }}
statsd:
network: udp
address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
writeInterval: 5s
prefix: {{ ReplaceAll (ToLower Peer.ID) "." "_" }}
prometheus:
listenAddress: 127.0.0.1:{{ .PeerPort Peer "Metrics" }}
handlerPath: /metrics
tls:
enabled: true
cert:
file: {{ .PeerLocalTLSDir Peer }}/server.crt
key:
file: {{ .PeerLocalTLSDir Peer }}/server.key
clientAuthRequired: true
clientRootCAs:
files:
- {{ .PeerLocalTLSDir Peer }}/ca.crt
operations:
listenAddress: 127.0.0.1:{{ .PeerPort Peer "Operations" }}
tls:
enabled: true
cert:
file: {{ .PeerLocalTLSDir Peer }}/server.crt
key:
file: {{ .PeerLocalTLSDir Peer }}/server.key
clientAuthRequired: true
clientRootCAs:
files:
- {{ .PeerLocalTLSDir Peer }}/ca.crt
metrics:
provider: {{ .MetricsProvider }}
statsd:
network: udp
address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
writeInterval: 5s
prefix: {{ ReplaceAll (ToLower Peer.ID) "." "_" }}
prometheus:
handlerPath: /metrics
`
......@@ -13,12 +13,12 @@ import (
)
type Core struct {
Logging *Logging `yaml:"logging,omitempty"`
Peer *Peer `yaml:"peer,omitempty"`
VM *VM `yaml:"vm,omitempty"`
Chaincode *Chaincode `yaml:"chaincode,omitempty"`
Ledger *Ledger `yaml:"ledger,omitempty"`
Metrics *Metrics `yaml:"metrics,omitempty"`
Logging *Logging `yaml:"logging,omitempty"`
Peer *Peer `yaml:"peer,omitempty"`
VM *VM `yaml:"vm,omitempty"`
Chaincode *Chaincode `yaml:"chaincode,omitempty"`
Ledger *Ledger `yaml:"ledger,omitempty"`
Operations *Operations `yaml:"operations,omitempty"`
}
type Logging struct {
......@@ -264,6 +264,11 @@ type HistoryConfig struct {
EnableHistoryDatabase bool `yaml:"enableHistoryDatabase"`
}
type Operations struct {
ListenAddress string `yaml:"listenAddress,omitempty"`
TLS *TLS `yaml:"tls"`
}
type Metrics struct {
Provider string `yaml:"provider"`
Statsd *Statsd `yaml:"statsd,omitempty"`
......@@ -278,7 +283,5 @@ type Statsd struct {
}
type Prometheus struct {
ListenAddress string `yaml:"listenAddress,omitempty"`
HandlerPath string `yaml:"handlerPath"`
TLS *TLS `yaml:"tls"`
HandlerPath string `yaml:"handlerPath"`
}
......@@ -9,10 +9,11 @@ package fabricconfig
import "time"
type Orderer struct {
General *General `yaml:"General,omitempty"`
FileLedger *FileLedger `yaml:"FileLedger,omitempty"`
RAMLedger *RAMLedger `yaml:"RAMLedger,omitempty"`
Kafka *Kafka `yaml:"Kafka,omitempty"`
General *General `yaml:"General,omitempty"`
FileLedger *FileLedger `yaml:"FileLedger,omitempty"`
RAMLedger *RAMLedger `yaml:"RAMLedger,omitempty"`
Kafka *Kafka `yaml:"Kafka,omitempty"`
Operations *OrdererOperations `yaml:"Operations,omitempty"`
ExtraProperties map[string]interface{} `yaml:",inline,omitempty"`
}
......@@ -107,3 +108,26 @@ type Backoff struct {
RetryBackoff time.Duration `yaml:"RetryBackoff,omitempty"`
RetryMax int `yaml:"RetryMax,omitempty"`
}
type OrdererOperations struct {
ListenAddress string `yaml:"ListenAddress,omitempty"`
Metrics *OrdererMetrics `yaml:"Metrics,omitempty"`
TLS *OrdererTLS `yaml:"TLS"`
}
type OrdererMetrics struct {
Provider string `yaml:"Provider"`
Statsd *OrdererStatsd `yaml:"Statsd,omitempty"`
Prometheus *OrdererPrometheus `yaml:"Prometheus,omitempty"`
}
type OrdererStatsd struct {
Network string `yaml:"Network,omitempty"`
Address string `yaml:"Address,omitempty"`
WriteInterval time.Duration `yaml:"WriteInterval,omitempty"`
Prefix string `yaml:"Prefix,omitempty"`
}
type OrdererPrometheus struct {
HandlerPath string `yaml:"handlerPath"`
}
......@@ -1160,23 +1160,23 @@ type PortName string
type Ports map[PortName]uint16
const (
ChaincodePort PortName = "Chaincode"
EventsPort PortName = "Events"
HostPort PortName = "HostPort"
ListenPort PortName = "Listen"
ProfilePort PortName = "Profile"
MetricsPort PortName = "Metrics"
ChaincodePort PortName = "Chaincode"
EventsPort PortName = "Events"
HostPort PortName = "HostPort"
ListenPort PortName = "Listen"
ProfilePort PortName = "Profile"
OperationsPort PortName = "Operations"
)
// PeerPortNames returns the list of ports that need to be reserved for a Peer.
func PeerPortNames() []PortName {
return []PortName{ListenPort, ChaincodePort, EventsPort, ProfilePort, MetricsPort}
return []PortName{ListenPort, ChaincodePort, EventsPort, ProfilePort, OperationsPort}
}
// OrdererPortNames returns the list of ports that need to be reserved for an
// Orderer.
func OrdererPortNames() []PortName {
return []PortName{ListenPort, ProfilePort}
return []PortName{ListenPort, ProfilePort, OperationsPort}
}
// BrokerPortNames returns the list of ports that need to be reserved for a
......
......@@ -87,9 +87,29 @@ Kafka:
Password:
Version:{{ end }}
Debug:
BroadcastTraceDir:
DeliverTraceDir:
BroadcastTraceDir:
DeliverTraceDir:
Consensus:
WALDir: {{ .OrdererDir Orderer }}/etcdraft/wal
Operations:
ListenAddress: 127.0.0.1:{{ .OrdererPort Orderer "Operations" }}
TLS:
Enabled: true
PrivateKey: {{ $w.OrdererLocalTLSDir Orderer }}/server.key
Certificate: {{ $w.OrdererLocalTLSDir Orderer }}/server.crt
RootCAs:
- {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt
ClientAuthRequired: true
ClientRootCAs:
- {{ $w.OrdererLocalTLSDir Orderer }}/ca.crt
Metrics:
Provider: {{ .MetricsProvider }}
Statsd:
Network: udp
Address: {{ if .StatsdEndpoint }}{{ .StatsdEndpoint }}{{ else }}127.0.0.1:8125{{ end }}
WriteInterval: 5s
Prefix: {{ ReplaceAll (ToLower Orderer.ID) "." "_" }}
Prometheus:
HandlerPath: /metrics
{{- end }}
`
......@@ -34,6 +34,7 @@ type TopLevel struct {
Kafka Kafka
Debug Debug
Consensus interface{}
Operations Operations
}
// General contains config which should be common among all orderer types.
......@@ -177,6 +178,33 @@ type Debug struct {
DeliverTraceDir string
}
// Operations configures the operations endpont for the orderer.
type Operations struct {
ListenAddress string
Metrics Metrics
TLS TLS
}
// Operations confiures the metrics provider for the orderer.
type Metrics struct {
Provider string
Statsd Statsd
Prometheus Prometheus
}
// Statsd provides the configuration required to emit statsd metrics from the orderer.
type Statsd struct {
Network string
Address string
WriteInterval time.Duration
Prefix string
}
// Prometheus provides the configuration required to host prometheus.
type Prometheus struct {
HandlerPath string
}
// Defaults carries the default orderer configuration values.
var Defaults = TopLevel{
General: General{
......@@ -241,6 +269,12 @@ var Defaults = TopLevel{
BroadcastTraceDir: "",
DeliverTraceDir: "",
},
Operations: Operations{
ListenAddress: "127.0.0.1:0",
Metrics: Metrics{
Provider: "disabled",
},
},
}
// Load parses the orderer YAML file and environment, producing
......
......@@ -8,6 +8,9 @@ package server
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
......@@ -16,16 +19,26 @@ import (
"os"
"time"
kitstatsd "github.com/go-kit/kit/metrics/statsd"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/flogging/httpadmin"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/metrics/statsd"
"github.com/hyperledger/fabric/common/metrics/statsd/goruntime"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/middleware"
"github.com/hyperledger/fabric/msp"
mspmgmt "github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/orderer/common/bootstrap/file"
......@@ -41,6 +54,8 @@ import (
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"github.com/hyperledger/fabric/protos/utils"
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"gopkg.in/alecthomas/kingpin.v2"
)
......@@ -84,7 +99,14 @@ func Start(cmd string, conf *localconfig.TopLevel) {
genesisBlock := extractGenesisBlock(conf)
clusterType := isClusterType(genesisBlock)
signer := localmsp.NewSigner()
serverConfig := initializeServerConfig(conf)
metricsProvider, metricsShutdown, err := initializeOperations(conf.Operations)
if err != nil {
panic(err)
}
defer metricsShutdown()
serverConfig := initializeServerConfig(conf, metricsProvider)
grpcServer := initializeGrpcServer(conf, serverConfig)
caSupport := &comm.CASupport{
AppRootCAsByChain: make(map[string][][]byte),
......@@ -193,7 +215,7 @@ func initializeClusterConfig(conf *localconfig.TopLevel) comm.ClientConfig {
return cc
}
func initializeServerConfig(conf *localconfig.TopLevel) comm.ServerConfig {
func initializeServerConfig(conf *localconfig.TopLevel, metricsProvider metrics.Provider) comm.ServerConfig {
// secure server config
secureOpts := &comm.SecureOptions{
UseTLS: conf.General.TLS.Enabled,
......@@ -247,7 +269,25 @@ func initializeServerConfig(conf *localconfig.TopLevel) comm.ServerConfig {
kaOpts.ServerInterval = conf.General.Keepalive.ServerInterval
kaOpts.ServerTimeout = conf.General.Keepalive.ServerTimeout
return comm.ServerConfig{SecOpts: secureOpts, KaOpts: kaOpts}
commLogger := flogging.MustGetLogger("core.comm").With("server", "Orderer")
if metricsProvider == nil {
metricsProvider = &disabled.Provider{}