Commit d8e438dd authored by yacovm's avatar yacovm Committed by Yacov Manevich
Browse files

[FAB-14796] Warn about cert expiration - Part I



This change set adds warnings to the logs in case the
server certificate of the remote node(s) are about to expire.

The threshold is 1 week before the expiration, and it is logged
every 5 minutes.

Change-Id: I8523a86d7b65db2722d3c7dbf8ce640557302d82
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 96fa4f3a
......@@ -9,6 +9,8 @@ package cluster
import (
"bytes"
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"sync"
"sync/atomic"
......@@ -24,6 +26,12 @@ import (
"google.golang.org/grpc/connectivity"
)
const (
// MinimumExpirationWarningInterval is the default minimum time interval
// between consecutive warnings about certificate expiration.
MinimumExpirationWarningInterval = time.Minute * 5
)
var (
errOverflow = errors.New("send queue overflown")
errAborted = errors.New("aborted")
......@@ -84,16 +92,18 @@ type MembersByChannel map[string]MemberMapping
// Comm implements Communicator
type Comm struct {
shutdownSignal chan struct{}
shutdown bool
SendBufferSize int
Lock sync.RWMutex
Logger *flogging.FabricLogger
ChanExt ChannelExtractor
H Handler
Connections *ConnectionStore
Chan2Members MembersByChannel
Metrics *Metrics
MinimumExpirationWarningInterval time.Duration
CertExpWarningThreshold time.Duration
shutdownSignal chan struct{}
shutdown bool
SendBufferSize int
Lock sync.RWMutex
Logger *flogging.FabricLogger
ChanExt ChannelExtractor
H Handler
Connections *ConnectionStore
Chan2Members MembersByChannel
Metrics *Metrics
}
type requestContext struct {
......@@ -310,6 +320,13 @@ func (c *Comm) updateStubInMapping(channel string, mapping MemberMapping, node R
// a stub atomically.
func (c *Comm) createRemoteContext(stub *Stub, channel string) func() (*RemoteContext, error) {
return func() (*RemoteContext, error) {
cert, err := x509.ParseCertificate(stub.ServerTLSCert)
if err != nil {
pemString := string(pem.EncodeToMemory(&pem.Block{Bytes: stub.ServerTLSCert}))
c.Logger.Errorf("Invalid DER for channel %s, endpoint %s, ID %d: %v", channel, stub.Endpoint, stub.ID, pemString)
return nil, errors.Wrap(err, "invalid certificate DER")
}
c.Logger.Debug("Connecting to", stub.RemoteNode, "for channel", channel)
conn, err := c.Connections.Connection(stub.Endpoint, stub.ServerTLSCert)
......@@ -333,16 +350,19 @@ func (c *Comm) createRemoteContext(stub *Stub, channel string) func() (*RemoteCo
}
rc := &RemoteContext{
workerCountReporter: workerCountReporter,
Channel: channel,
Metrics: c.Metrics,
SendBuffSize: c.SendBufferSize,
shutdownSignal: c.shutdownSignal,
endpoint: stub.Endpoint,
Logger: c.Logger,
ProbeConn: probeConnection,
conn: conn,
Client: clusterClient,
expiresAt: cert.NotAfter,
minimumExpirationWarningInterval: c.MinimumExpirationWarningInterval,
certExpWarningThreshold: c.CertExpWarningThreshold,
workerCountReporter: workerCountReporter,
Channel: channel,
Metrics: c.Metrics,
SendBuffSize: c.SendBufferSize,
shutdownSignal: c.shutdownSignal,
endpoint: stub.Endpoint,
Logger: c.Logger,
ProbeConn: probeConnection,
conn: conn,
Client: clusterClient,
}
return rc, nil
}
......@@ -418,18 +438,21 @@ func (stub *Stub) Activate(createRemoteContext func() (*RemoteContext, error)) e
// RemoteContext interacts with remote cluster
// nodes. Every call can be aborted via call to Abort()
type RemoteContext struct {
Metrics *Metrics
Channel string
SendBuffSize int
shutdownSignal chan struct{}
Logger *flogging.FabricLogger
endpoint string
Client orderer.ClusterClient
ProbeConn func(conn *grpc.ClientConn) error
conn *grpc.ClientConn
nextStreamID uint64
streamsByID streamsMapperReporter
workerCountReporter workerCountReporter
expiresAt time.Time
minimumExpirationWarningInterval time.Duration
certExpWarningThreshold time.Duration
Metrics *Metrics
Channel string
SendBuffSize int
shutdownSignal chan struct{}
Logger *flogging.FabricLogger
endpoint string
Client orderer.ClusterClient
ProbeConn func(conn *grpc.ClientConn) error
conn *grpc.ClientConn
nextStreamID uint64
streamsByID streamsMapperReporter
workerCountReporter workerCountReporter
}
// Stream is used to send/receive messages to/from the remote cluster member.
......@@ -448,6 +471,7 @@ type Stream struct {
orderer.Cluster_StepClient
Cancel func(error)
canceled *uint32
expCheck *certificateExpirationCheck
}
// StreamOperation denotes an operation done by a stream, such a Send or Receive.
......@@ -517,6 +541,7 @@ func (stream *Stream) sendMessage(request *orderer.StepRequest) {
f := func() (*orderer.StepResponse, error) {
startSend := time.Now()
stream.expCheck.checkExpiration(startSend)
err := stream.Cluster_StepClient.Send(request)
stream.metrics.reportMsgSendTime(stream.Endpoint, stream.Channel, time.Since(startSend))
return nil, err
......@@ -668,6 +693,18 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
canceled: &canceled,
}
s.expCheck = &certificateExpirationCheck{
minimumExpirationWarningInterval: rc.minimumExpirationWarningInterval,
expirationWarningThreshold: rc.certExpWarningThreshold,
expiresAt: rc.expiresAt,
channel: s.Channel,
endpoint: s.Endpoint,
nodeName: s.NodeName,
alert: func(template string, args ...interface{}) {
s.Logger.Warningf(template, args)
},
}
rc.Logger.Debugf("Created new stream to %s with ID of %d and buffer size of %d",
rc.endpoint, streamID, cap(s.sendBuff))
......
......@@ -9,8 +9,10 @@ package cluster_test
import (
"context"
"crypto/rand"
"crypto/x509"
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"testing"
......@@ -31,6 +33,8 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
......@@ -242,13 +246,14 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo
}
tstSrv.c = &cluster.Comm{
SendBufferSize: 1,
Logger: flogging.MustGetLogger("test"),
Chan2Members: make(cluster.MembersByChannel),
H: handler,
ChanExt: channelExtractor,
Connections: cluster.NewConnectionStore(dialer, tlsConnGauge),
Metrics: cluster.NewMetrics(metrics),
CertExpWarningThreshold: time.Hour,
SendBufferSize: 1,
Logger: flogging.MustGetLogger("test"),
Chan2Members: make(cluster.MembersByChannel),
H: handler,
ChanExt: channelExtractor,
Connections: cluster.NewConnectionStore(dialer, tlsConnGauge),
Metrics: cluster.NewMetrics(metrics),
}
orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv)
......@@ -1258,6 +1263,81 @@ func TestMetrics(t *testing.T) {
}
}
func TestCertExpirationWarningEgress(t *testing.T) {
t.Parallel()
// Scenario: Ensures that when certificates are due to expire,
// a warning is logged to the log.
node1 := newTestNode(t)
node2 := newTestNode(t)
cert, err := x509.ParseCertificate(node2.nodeInfo.ServerTLSCert)
assert.NoError(t, err)
assert.NotNil(t, cert)
// Let the NotAfter time of the certificate be T1, the current time be T0.
// So time.Until is (T1 - T0), which means we have (T1 - T0) time left.
// We want to trigger a warning, so we set the warning threshold to be 20 seconds above
// the time left, so the time left would be smaller than the threshold.
node1.c.CertExpWarningThreshold = time.Until(cert.NotAfter) + time.Second*20
// We only alert once in 3 seconds
node1.c.MinimumExpirationWarningInterval = time.Second * 3
defer node1.stop()
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
stub, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
assert.NoError(t, err)
mockgRPC := &mocks.StepClient{}
mockgRPC.On("Send", mock.Anything).Return(nil)
mockgRPC.On("Context").Return(context.Background())
mockClient := &mocks.ClusterClient{}
mockClient.On("Step", mock.Anything).Return(mockgRPC, nil)
stub.Client = mockClient
stream := assertEventualEstablishStream(t, stub)
alerts := make(chan struct{}, 100)
stream.Logger = stream.Logger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error {
if strings.Contains(entry.Message, "expires in less than") {
alerts <- struct{}{}
}
return nil
}))
// Send a message to the node and expert an alert to be logged.
stream.Send(wrapSubmitReq(testReq))
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have logged an alert")
}
// Send another message, and ensure we don't log anything to the log, because the
// alerts should be suppressed before the minimum interval timeout expires.
stream.Send(wrapSubmitReq(testReq))
select {
case <-alerts:
t.Fatal("Should not have logged an alert")
case <-time.After(time.Millisecond * 500):
}
// Wait enough time for the alert interval to clear.
time.Sleep(node1.c.MinimumExpirationWarningInterval + time.Second)
// Send again a message, and this time it should be logged again.
stream.Send(wrapSubmitReq(testReq))
select {
case <-alerts:
case <-time.After(time.Second * 5):
t.Fatal("Should have logged an alert")
}
}
func assertBiDiCommunicationForChannel(t *testing.T, node1, node2 *clusterNode, msgToSend *orderer.SubmitRequest, channel string) {
for _, tst := range []struct {
label string
......
......@@ -12,6 +12,7 @@ import (
"encoding/pem"
"reflect"
"sync/atomic"
"time"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/configtx"
......@@ -608,3 +609,30 @@ func (scr *StreamCountReporter) Decrement() {
count := atomic.AddUint32(&scr.count, ^uint32(0))
scr.Metrics.reportStreamCount(count)
}
type certificateExpirationCheck struct {
minimumExpirationWarningInterval time.Duration
expiresAt time.Time
expirationWarningThreshold time.Duration
lastWarning time.Time
nodeName string
endpoint string
channel string
alert func(string, ...interface{})
}
func (exp *certificateExpirationCheck) checkExpiration(currentTime time.Time) {
timeLeft := exp.expiresAt.Sub(currentTime)
if timeLeft > exp.expirationWarningThreshold {
return
}
timeSinceLastWarning := currentTime.Sub(exp.lastWarning)
if timeSinceLastWarning < exp.minimumExpirationWarningInterval {
return
}
exp.alert("Certificate of %s from %s for channel %s expires in less than %v",
exp.nodeName, exp.endpoint, exp.channel, timeLeft)
exp.lastWarning = currentTime
}
......@@ -73,6 +73,7 @@ type Cluster struct {
ReplicationBackgroundRefreshInterval time.Duration
ReplicationMaxRetries int
SendBufferSize int
CertExpirationWarningThreshold time.Duration
}
// Keepalive contains configuration for gRPC servers.
......@@ -229,6 +230,7 @@ var Defaults = TopLevel{
ReplicationBackgroundRefreshInterval: time.Minute * 5,
ReplicationRetryTimeout: time.Second * 5,
ReplicationPullTimeout: time.Second * 5,
CertExpirationWarningThreshold: time.Hour * 24 * 7,
},
LocalMSPDir: "msp",
LocalMSPID: "SampleOrg",
......@@ -367,6 +369,8 @@ func (c *TopLevel) completeInitialization(configDir string) {
c.General.Cluster.ReplicationRetryTimeout = Defaults.General.Cluster.ReplicationRetryTimeout
case c.General.Cluster.ReplicationBackgroundRefreshInterval == 0:
c.General.Cluster.ReplicationBackgroundRefreshInterval = Defaults.General.Cluster.ReplicationBackgroundRefreshInterval
case c.General.Cluster.CertExpirationWarningThreshold == 0:
c.General.Cluster.CertExpirationWarningThreshold = Defaults.General.Cluster.CertExpirationWarningThreshold
case c.Kafka.TLS.Enabled && c.Kafka.TLS.Certificate == "":
logger.Panicf("General.Kafka.TLS.Certificate must be set if General.Kafka.TLS.Enabled is set to true.")
case c.Kafka.TLS.Enabled && c.Kafka.TLS.PrivateKey == "":
......
......@@ -79,6 +79,7 @@ func testEtcdRaftOSNSuccess(gt *GomegaWithT, tempDir, configtxgen, cwd, orderer,
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("General.Cluster.ReplicationBackgroundRefreshInterval = 5m0s"))
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("General.Cluster.ReplicationMaxRetries = 12"))
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("General.Cluster.SendBufferSize = 10"))
gt.Eventually(ordererProcess.Err, time.Minute).Should(gbytes.Say("General.Cluster.CertExpirationWarningThreshold = 168h0m0s"))
// Consensus.EvictionSuspicion is not specified in orderer.yaml, so let's ensure
// it is really configured autonomously via the etcdraft chain itself.
......
......@@ -282,7 +282,7 @@ func New(
ChainSelector: consenter,
}
comm := createComm(clusterDialer, consenter, conf.General.Cluster.SendBufferSize, metricsProvider)
comm := createComm(clusterDialer, consenter, conf.General.Cluster, metricsProvider)
consenter.Communication = comm
svc := &cluster.Service{
StreamCountReporter: &cluster.StreamCountReporter{
......@@ -296,16 +296,18 @@ func New(
return consenter
}
func createComm(clusterDialer *cluster.PredicateDialer, c *Consenter, sendBuffSize int, p metrics.Provider) *cluster.Comm {
func createComm(clusterDialer *cluster.PredicateDialer, c *Consenter, config localconfig.Cluster, p metrics.Provider) *cluster.Comm {
metrics := cluster.NewMetrics(p)
comm := &cluster.Comm{
SendBufferSize: sendBuffSize,
Logger: flogging.MustGetLogger("orderer.common.cluster"),
Chan2Members: make(map[string]cluster.MemberMapping),
Connections: cluster.NewConnectionStore(clusterDialer, metrics.EgressTLSConnectionCount),
Metrics: metrics,
ChanExt: c,
H: c,
MinimumExpirationWarningInterval: cluster.MinimumExpirationWarningInterval,
CertExpWarningThreshold: config.CertExpirationWarningThreshold,
SendBufferSize: config.SendBufferSize,
Logger: flogging.MustGetLogger("orderer.common.cluster"),
Chan2Members: make(map[string]cluster.MemberMapping),
Connections: cluster.NewConnectionStore(clusterDialer, metrics.EgressTLSConnectionCount),
Metrics: metrics,
ChanExt: c,
H: c,
}
c.Communication = comm
return comm
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment