Commit 98b9b0a5 authored by yacovm's avatar yacovm Committed by Yacov Manevich
Browse files

[FAB-14802] Warn about cert expiration - Part II



This change set adds warnings about certificate expiration
from ingress connections.

Change-Id: I5e5a87f7e34ef93738066960e6c8c810f128d030
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent d8e438dd
......@@ -541,7 +541,7 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) {
f := func() (*orderer.StepResponse, error) {
startSend := time.Now()
stream.expCheck.checkExpiration(startSend)
stream.expCheck.checkExpiration(startSend, stream.Channel)
err := stream.Cluster_StepClient.Send(request)
stream.metrics.reportMsgSendTime(stream.Endpoint, stream.Channel, time.Since(startSend))
return nil, err
......@@ -697,11 +697,10 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
minimumExpirationWarningInterval: rc.minimumExpirationWarningInterval,
expirationWarningThreshold: rc.certExpWarningThreshold,
expiresAt: rc.expiresAt,
channel: s.Channel,
endpoint: s.Endpoint,
nodeName: s.NodeName,
alert: func(template string, args ...interface{}) {
s.Logger.Warningf(template, args)
s.Logger.Warningf(template, args...)
},
}
......
......@@ -9,9 +9,11 @@ package cluster
import (
"context"
"io"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/protos/orderer"
"go.uber.org/zap"
"google.golang.org/grpc"
......@@ -37,10 +39,12 @@ type StepStream interface {
// Service defines the raft Service
type Service struct {
StreamCountReporter *StreamCountReporter
Dispatcher Dispatcher
Logger *flogging.FabricLogger
StepLogger *flogging.FabricLogger
StreamCountReporter *StreamCountReporter
Dispatcher Dispatcher
Logger *flogging.FabricLogger
StepLogger *flogging.FabricLogger
MinimumExpirationWarningInterval time.Duration
CertExpWarningThreshold time.Duration
}
// Step passes an implementation-specific message to another cluster member.
......@@ -50,10 +54,11 @@ func (s *Service) Step(stream orderer.Cluster_StepServer) error {
addr := util.ExtractRemoteAddress(stream.Context())
commonName := commonNameFromContext(stream.Context())
exp := s.initializeExpirationCheck(stream, addr, commonName)
s.Logger.Debugf("Connection from %s(%s)", commonName, addr)
defer s.Logger.Debugf("Closing connection from %s(%s)", commonName, addr)
for {
err := s.handleMessage(stream, addr)
err := s.handleMessage(stream, addr, exp)
if err == io.EOF {
s.Logger.Debugf("%s(%s) disconnected", commonName, addr)
return nil
......@@ -65,7 +70,7 @@ func (s *Service) Step(stream orderer.Cluster_StepServer) error {
}
}
func (s *Service) handleMessage(stream StepStream, addr string) error {
func (s *Service) handleMessage(stream StepStream, addr string, exp *certificateExpirationCheck) error {
request, err := stream.Recv()
if err == io.EOF {
return err
......@@ -75,6 +80,8 @@ func (s *Service) handleMessage(stream StepStream, addr string) error {
return err
}
exp.checkExpiration(time.Now(), extractChannel(request))
if s.StepLogger.IsEnabledFor(zap.DebugLevel) {
nodeName := commonNameFromContext(stream.Context())
s.StepLogger.Debugf("Received message from %s(%s): %v", nodeName, addr, requestAsString(request))
......@@ -98,3 +105,36 @@ func (s *Service) handleSubmit(request *orderer.SubmitRequest, stream StepStream
}
return err
}
func (s *Service) initializeExpirationCheck(stream orderer.Cluster_StepServer, endpoint, nodeName string) *certificateExpirationCheck {
return &certificateExpirationCheck{
minimumExpirationWarningInterval: s.MinimumExpirationWarningInterval,
expirationWarningThreshold: s.CertExpWarningThreshold,
expiresAt: expiresAt(stream),
endpoint: endpoint,
nodeName: nodeName,
alert: func(template string, args ...interface{}) {
s.Logger.Warningf(template, args...)
},
}
}
func expiresAt(stream orderer.Cluster_StepServer) time.Time {
cert := comm.ExtractCertificateFromContext(stream.Context())
if cert == nil {
return time.Time{}
}
return cert.NotAfter
}
func extractChannel(msg *orderer.StepRequest) string {
if consReq := msg.GetConsensusRequest(); consReq != nil {
return consReq.Channel
}
if submitReq := msg.GetSubmitRequest(); submitReq != nil {
return submitReq.Channel
}
return ""
}
......@@ -9,8 +9,11 @@ package cluster_test
import (
"context"
"io"
"strings"
"testing"
"time"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/core/comm"
......@@ -20,6 +23,8 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var (
......@@ -237,3 +242,108 @@ func TestServiceGRPC(t *testing.T) {
StepLogger: flogging.MustGetLogger("test"),
})
}
func TestExpirationWarningIngress(t *testing.T) {
t.Parallel()
ca, err := tlsgen.NewCA()
assert.NoError(t, err)
serverCert, err := ca.NewServerCertKeyPair("127.0.0.1")
assert.NoError(t, err)
clientCert, err := ca.NewClientCertKeyPair()
assert.NoError(t, err)
dispatcher := &mocks.Dispatcher{}
dispatcher.On("DispatchConsensus", mock.Anything, mock.Anything).Return(nil)
svc := &cluster.Service{
CertExpWarningThreshold: time.Until(clientCert.TLSCert.NotAfter),
MinimumExpirationWarningInterval: time.Second * 2,
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: cluster.NewMetrics(&disabled.Provider{}),
},
Logger: flogging.MustGetLogger("test"),
StepLogger: flogging.MustGetLogger("test"),
Dispatcher: dispatcher,
}
alerts := make(chan struct{}, 10)
svc.Logger = svc.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "expires in less than 23h59m") {
alerts <- struct{}{}
}
return nil
}))
srvConf := comm.ServerConfig{
SecOpts: &comm.SecureOptions{
Certificate: serverCert.Cert,
Key: serverCert.Key,
UseTLS: true,
ClientRootCAs: [][]byte{ca.CertBytes()},
RequireClientCert: true,
},
}
srv, err := comm.NewGRPCServer("127.0.0.1:0", srvConf)
assert.NoError(t, err)
orderer.RegisterClusterServer(srv.Server(), svc)
go srv.Start()
defer srv.Stop()
clientConf := comm.ClientConfig{
Timeout: time.Second * 3,
SecOpts: &comm.SecureOptions{
ServerRootCAs: [][]byte{ca.CertBytes()},
UseTLS: true,
Key: clientCert.Key,
Certificate: clientCert.Cert,
RequireClientCert: true,
},
}
client, err := comm.NewGRPCClient(clientConf)
assert.NoError(t, err)
conn, err := client.NewConnection(srv.Address(), "")
assert.NoError(t, err)
cl := orderer.NewClusterClient(conn)
stream, err := cl.Step(context.Background())
assert.NoError(t, err)
err = stream.Send(consensusRequest)
assert.NoError(t, err)
// An alert is logged at the first time.
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have received an alert")
}
err = stream.Send(consensusRequest)
assert.NoError(t, err)
// No alerts in a consecutive time.
select {
case <-alerts:
t.Fatal("Should have not received an alert")
case <-time.After(time.Millisecond * 500):
}
// Wait for alert expiration interval to expire.
time.Sleep(svc.MinimumExpirationWarningInterval + time.Second)
err = stream.Send(consensusRequest)
assert.NoError(t, err)
// An alert should be logged now after the timeout expired.
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have received an alert")
}
}
......@@ -617,11 +617,10 @@ type certificateExpirationCheck struct {
lastWarning time.Time
nodeName string
endpoint string
channel string
alert func(string, ...interface{})
}
func (exp *certificateExpirationCheck) checkExpiration(currentTime time.Time) {
func (exp *certificateExpirationCheck) checkExpiration(currentTime time.Time, channel string) {
timeLeft := exp.expiresAt.Sub(currentTime)
if timeLeft > exp.expirationWarningThreshold {
return
......@@ -633,6 +632,6 @@ func (exp *certificateExpirationCheck) checkExpiration(currentTime time.Time) {
}
exp.alert("Certificate of %s from %s for channel %s expires in less than %v",
exp.nodeName, exp.endpoint, exp.channel, timeLeft)
exp.nodeName, exp.endpoint, channel, timeLeft)
exp.lastWarning = currentTime
}
......@@ -285,6 +285,8 @@ func New(
comm := createComm(clusterDialer, consenter, conf.General.Cluster, metricsProvider)
consenter.Communication = comm
svc := &cluster.Service{
CertExpWarningThreshold: conf.General.Cluster.CertExpirationWarningThreshold,
MinimumExpirationWarningInterval: cluster.MinimumExpirationWarningInterval,
StreamCountReporter: &cluster.StreamCountReporter{
Metrics: comm.Metrics,
},
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment