Commit ac63a6f5 authored by Matthew Sykes's avatar Matthew Sykes
Browse files

[FAB-12854] DRY up peer/orderer operations code



Change-Id: I79a199f192370c8153b3e32485aa11215332edb8
Signed-off-by: default avatarMatthew Sykes <sykesmat@us.ibm.com>
parent ed088b17
// Code generated by counterfeiter. DO NOT EDIT.
package fakes
import (
sync "sync"
operations "github.com/hyperledger/fabric/core/operations"
)
type Logger struct {
WarnStub func(...interface{})
warnMutex sync.RWMutex
warnArgsForCall []struct {
arg1 []interface{}
}
WarnfStub func(string, ...interface{})
warnfMutex sync.RWMutex
warnfArgsForCall []struct {
arg1 string
arg2 []interface{}
}
invocations map[string][][]interface{}
invocationsMutex sync.RWMutex
}
func (fake *Logger) Warn(arg1 ...interface{}) {
fake.warnMutex.Lock()
fake.warnArgsForCall = append(fake.warnArgsForCall, struct {
arg1 []interface{}
}{arg1})
fake.recordInvocation("Warn", []interface{}{arg1})
fake.warnMutex.Unlock()
if fake.WarnStub != nil {
fake.WarnStub(arg1...)
}
}
func (fake *Logger) WarnCallCount() int {
fake.warnMutex.RLock()
defer fake.warnMutex.RUnlock()
return len(fake.warnArgsForCall)
}
func (fake *Logger) WarnCalls(stub func(...interface{})) {
fake.warnMutex.Lock()
defer fake.warnMutex.Unlock()
fake.WarnStub = stub
}
func (fake *Logger) WarnArgsForCall(i int) []interface{} {
fake.warnMutex.RLock()
defer fake.warnMutex.RUnlock()
argsForCall := fake.warnArgsForCall[i]
return argsForCall.arg1
}
func (fake *Logger) Warnf(arg1 string, arg2 ...interface{}) {
fake.warnfMutex.Lock()
fake.warnfArgsForCall = append(fake.warnfArgsForCall, struct {
arg1 string
arg2 []interface{}
}{arg1, arg2})
fake.recordInvocation("Warnf", []interface{}{arg1, arg2})
fake.warnfMutex.Unlock()
if fake.WarnfStub != nil {
fake.WarnfStub(arg1, arg2...)
}
}
func (fake *Logger) WarnfCallCount() int {
fake.warnfMutex.RLock()
defer fake.warnfMutex.RUnlock()
return len(fake.warnfArgsForCall)
}
func (fake *Logger) WarnfCalls(stub func(string, ...interface{})) {
fake.warnfMutex.Lock()
defer fake.warnfMutex.Unlock()
fake.WarnfStub = stub
}
func (fake *Logger) WarnfArgsForCall(i int) (string, []interface{}) {
fake.warnfMutex.RLock()
defer fake.warnfMutex.RUnlock()
argsForCall := fake.warnfArgsForCall[i]
return argsForCall.arg1, argsForCall.arg2
}
func (fake *Logger) Invocations() map[string][][]interface{} {
fake.invocationsMutex.RLock()
defer fake.invocationsMutex.RUnlock()
fake.warnMutex.RLock()
defer fake.warnMutex.RUnlock()
fake.warnfMutex.RLock()
defer fake.warnfMutex.RUnlock()
copiedInvocations := map[string][][]interface{}{}
for key, value := range fake.invocations {
copiedInvocations[key] = value
}
return copiedInvocations
}
func (fake *Logger) recordInvocation(key string, args []interface{}) {
fake.invocationsMutex.Lock()
defer fake.invocationsMutex.Unlock()
if fake.invocations == nil {
fake.invocations = map[string][][]interface{}{}
}
if fake.invocations[key] == nil {
fake.invocations[key] = [][]interface{}{}
}
fake.invocations[key] = append(fake.invocations[key], args)
}
var _ operations.Logger = new(Logger)
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package operations_test
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net/http"
"path/filepath"
"testing"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
func TestOperations(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Operations Suite")
}
func generateCertificates(tempDir string) {
serverCA, err := tlsgen.NewCA()
Expect(err).NotTo(HaveOccurred())
err = ioutil.WriteFile(filepath.Join(tempDir, "server-ca.pem"), serverCA.CertBytes(), 0640)
Expect(err).NotTo(HaveOccurred())
serverKeyPair, err := serverCA.NewServerCertKeyPair("127.0.0.1")
Expect(err).NotTo(HaveOccurred())
err = ioutil.WriteFile(filepath.Join(tempDir, "server-cert.pem"), serverKeyPair.Cert, 0640)
Expect(err).NotTo(HaveOccurred())
err = ioutil.WriteFile(filepath.Join(tempDir, "server-key.pem"), serverKeyPair.Key, 0640)
Expect(err).NotTo(HaveOccurred())
clientCA, err := tlsgen.NewCA()
Expect(err).NotTo(HaveOccurred())
err = ioutil.WriteFile(filepath.Join(tempDir, "client-ca.pem"), clientCA.CertBytes(), 0640)
Expect(err).NotTo(HaveOccurred())
clientKeyPair, err := clientCA.NewClientCertKeyPair()
Expect(err).NotTo(HaveOccurred())
err = ioutil.WriteFile(filepath.Join(tempDir, "client-cert.pem"), clientKeyPair.Cert, 0640)
Expect(err).NotTo(HaveOccurred())
err = ioutil.WriteFile(filepath.Join(tempDir, "client-key.pem"), clientKeyPair.Key, 0640)
Expect(err).NotTo(HaveOccurred())
}
func newHTTPClient(tlsDir string) *http.Client {
clientCert, err := tls.LoadX509KeyPair(
filepath.Join(tlsDir, "client-cert.pem"),
filepath.Join(tlsDir, "client-key.pem"),
)
Expect(err).NotTo(HaveOccurred())
clientCertPool := x509.NewCertPool()
caCert, err := ioutil.ReadFile(filepath.Join(tlsDir, "server-ca.pem"))
Expect(err).NotTo(HaveOccurred())
clientCertPool.AppendCertsFromPEM(caCert)
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{clientCert},
RootCAs: clientCertPool,
},
},
}
}
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package operations
import (
"context"
"crypto/tls"
"net"
"net/http"
"os"
"strings"
"time"
kitstatsd "github.com/go-kit/kit/metrics/statsd"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/flogging/httpadmin"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/metrics/statsd"
"github.com/hyperledger/fabric/common/metrics/statsd/goruntime"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/middleware"
prom "github.com/prometheus/client_golang/prometheus"
)
//go:generate counterfeiter -o fakes/logger.go -fake-name Logger . Logger
type Logger interface {
Warn(args ...interface{})
Warnf(template string, args ...interface{})
}
type Statsd struct {
Network string
Address string
WriteInterval time.Duration
Prefix string
}
type Prometheus struct {
HandlerPath string
}
type MetricsOptions struct {
Provider string
Statsd *Statsd
Prometheus *Prometheus
}
type Options struct {
Logger Logger
ListenAddress string
Metrics MetricsOptions
TLS TLS
}
type System struct {
metrics.Provider
logger Logger
options Options
statsd *kitstatsd.Statsd
collectorTicker *time.Ticker
sendTicker *time.Ticker
httpServer *http.Server
mux *http.ServeMux
addr string
}
func NewSystem(o Options) *System {
logger := o.Logger
if logger == nil {
logger = flogging.MustGetLogger("operations.runner")
}
system := &System{
logger: logger,
options: o,
}
system.initializeServer()
system.initializeLoggingHandler()
system.initializeMetricsProvider()
return system
}
func (s *System) Run(signals <-chan os.Signal, ready chan<- struct{}) error {
err := s.Start()
if err != nil {
return err
}
close(ready)
select {
case <-signals:
return s.Stop()
}
}
func (s *System) Start() error {
err := s.startMetricsTickers()
if err != nil {
return err
}
listener, err := s.listen()
if err != nil {
return err
}
s.addr = listener.Addr().String()
go s.httpServer.Serve(listener)
return nil
}
func (s *System) Stop() error {
if s.collectorTicker != nil {
s.collectorTicker.Stop()
s.collectorTicker = nil
}
if s.sendTicker != nil {
s.sendTicker.Stop()
s.sendTicker = nil
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
return s.httpServer.Shutdown(ctx)
}
func (s *System) initializeServer() {
s.mux = http.NewServeMux()
s.httpServer = &http.Server{
Addr: s.options.ListenAddress,
Handler: s.mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 2 * time.Minute,
}
}
func (s *System) handlerChain(h http.Handler, secure bool) http.Handler {
if secure {
return middleware.NewChain(middleware.RequireCert(), middleware.WithRequestID(util.GenerateUUID)).Handler(h)
}
return middleware.NewChain(middleware.WithRequestID(util.GenerateUUID)).Handler(h)
}
func (s *System) initializeMetricsProvider() error {
m := s.options.Metrics
providerType := m.Provider
switch providerType {
case "statsd":
prefix := m.Statsd.Prefix
if prefix != "" && !strings.HasSuffix(prefix, ".") {
prefix = prefix + "."
}
ks := kitstatsd.New(prefix, s)
s.Provider = &statsd.Provider{Statsd: ks}
s.statsd = ks
return nil
case "prometheus":
s.Provider = &prometheus.Provider{}
secure := s.options.TLS.Enabled && s.options.TLS.ClientCertRequired
s.mux.Handle(m.Prometheus.HandlerPath, s.handlerChain(prom.Handler(), secure))
return nil
default:
if providerType != "disabled" {
s.logger.Warnf("Unknown provider type: %s; metrics disabled", providerType)
}
s.Provider = &disabled.Provider{}
return nil
}
}
func (s *System) initializeLoggingHandler() {
secure := s.options.TLS.Enabled && s.options.TLS.ClientCertRequired
s.mux.Handle("/logspec", s.handlerChain(httpadmin.NewSpecHandler(), secure))
}
func (s *System) startMetricsTickers() error {
m := s.options.Metrics
if s.statsd != nil {
network := m.Statsd.Network
address := m.Statsd.Address
c, err := net.Dial(network, address)
if err != nil {
return err
}
c.Close()
opts := s.options.Metrics.Statsd
writeInterval := opts.WriteInterval
s.collectorTicker = time.NewTicker(writeInterval / 2)
goCollector := goruntime.NewCollector(s.Provider)
go goCollector.CollectAndPublish(s.collectorTicker.C)
s.sendTicker = time.NewTicker(writeInterval)
go s.statsd.SendLoop(s.sendTicker.C, network, address)
}
return nil
}
func (s *System) listen() (net.Listener, error) {
listener, err := net.Listen("tcp", s.options.ListenAddress)
if err != nil {
return nil, err
}
tlsConfig, err := s.options.TLS.Config()
if err != nil {
return nil, err
}
if tlsConfig != nil {
listener = tls.NewListener(listener, tlsConfig)
}
return listener, nil
}
func (s *System) Addr() string {
return s.addr
}
func (s *System) Log(keyvals ...interface{}) error {
s.logger.Warn(keyvals...)
return nil
}
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package operations_test
import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"syscall"
"time"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/prometheus"
"github.com/hyperledger/fabric/common/metrics/statsd"
"github.com/hyperledger/fabric/core/operations"
"github.com/hyperledger/fabric/core/operations/fakes"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/tedsuo/ifrit"
)
var _ = Describe("System", func() {
var (
fakeLogger *fakes.Logger
tempDir string
client *http.Client
options operations.Options
system *operations.System
)
BeforeEach(func() {
var err error
tempDir, err := ioutil.TempDir("", "opssys")
Expect(err).NotTo(HaveOccurred())
generateCertificates(tempDir)
client = newHTTPClient(tempDir)
fakeLogger = &fakes.Logger{}
options = operations.Options{
Logger: fakeLogger,
ListenAddress: "127.0.0.1:0",
Metrics: operations.MetricsOptions{
Provider: "disabled",
},
TLS: operations.TLS{
Enabled: true,
CertFile: filepath.Join(tempDir, "server-cert.pem"),
KeyFile: filepath.Join(tempDir, "server-key.pem"),
ClientCertRequired: true,
ClientCACertFiles: []string{filepath.Join(tempDir, "client-ca.pem")},
},
}
system = operations.NewSystem(options)
})
AfterEach(func() {
os.RemoveAll(tempDir)
if system != nil {
system.Stop()
}
})
It("hosts a secure endpoint for logging", func() {
err := system.Start()
Expect(err).NotTo(HaveOccurred())
resp, err := client.Get(fmt.Sprintf("https://%s/logspec", system.Addr()))
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
resp.Body.Close()
})
Context("when TLS is disabled", func() {
BeforeEach(func() {
options.TLS.Enabled = false
system = operations.NewSystem(options)
})
It("hosts an insecure endpoint for logging", func() {
err := system.Start()
Expect(err).NotTo(HaveOccurred())
resp, err := client.Get(fmt.Sprintf("http://%s/logspec", system.Addr()))
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
resp.Body.Close()
})
})
Context("when listen fails", func() {
var listener net.Listener
BeforeEach(func() {
var err error
listener, err = net.Listen("tcp", "127.0.0.1:0")
Expect(err).NotTo(HaveOccurred())
options.ListenAddress = listener.Addr().String()
system = operations.NewSystem(options)
})
AfterEach(func() {
listener.Close()
})
It("returns an error", func() {
err := system.Start()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("bind: address already in use"))
})
})
Context("when a bad TLS configuration is provided", func() {
BeforeEach(func() {
options.TLS.CertFile = "cert-file-does-not-exist"
system = operations.NewSystem(options)
})
It("returns an error", func() {
err := system.Start()
Expect(err).To(MatchError("open cert-file-does-not-exist: no such file or directory"))
})
})
It("proxies Log to the provided logger", func() {
err := system.Log("key", "value")
Expect(err).NotTo(HaveOccurred())
Expect(fakeLogger.WarnCallCount()).To(Equal(1))
Expect(fakeLogger.WarnArgsForCall(0)).To(Equal([]interface{}{"key", "value"}))
})
Context("when a logger is not provided", func() {
BeforeEach(func() {
options.Logger = nil
system = operations.NewSystem(options)
})
It("does not panic when logging", func() {
Expect(func() { system.Log("key", "value") }).NotTo(Panic())
})
It("returns nil from Log", func() {
err := system.Log("key", "value")
Expect(err).NotTo(HaveOccurred())
})
})
Context("when the metrics provider is disabled", func() {
BeforeEach(func() {
options.Metrics = operations.MetricsOptions{
Provider: "disabled",
}
system = operations.NewSystem(options)
Expect(system).NotTo(BeNil())
})
It("sets up a disabled provider", func() {
Expect(system.Provider).To(Equal(&disabled.Provider{}))
})
})
Context("when the metrics provider is prometheus", func() {
BeforeEach(func() {
options.Metrics = operations.MetricsOptions{
Provider: "prometheus",
Prometheus: &operations.Prometheus{
HandlerPath: "/metrics",
},
}
system = operations.NewSystem(options)