Unverified Commit 38c1515c authored by yacovm's avatar yacovm Committed by Artem Barger
Browse files

[FAB-13805] Unify Step and Submit into a stream



This change set removes Step RPC from the cluster protobuf,
and renames Submit stream to a Step stream, and makes both
transaction forwarding and consensus messages use the
new Step stream.

It also makes both egress Send() and Recv(), have a maximum
timeout (the RPC timeout in the config).
A Send or Recv that is used to send a consensus message,
or send (receive) a transaction (status) will now abort prematurely
in order to protect against any liveness issue on the remote node,
and also to return an answer to clients within a timely manner.

Change-Id: Id942b248212f5c324e12af34fce48f96fdbb6aea
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent 06671310
......@@ -115,7 +115,7 @@ func noopBinding(_ context.Context, _ []byte) error {
// ExtractCertificateHashFromContext extracts the hash of the certificate from the given context.
// If the certificate isn't present, nil is returned
func ExtractCertificateHashFromContext(ctx context.Context) []byte {
rawCert := ExtractCertificateFromContext(ctx)
rawCert := ExtractRawCertificateFromContext(ctx)
if len(rawCert) == 0 {
return nil
}
......@@ -126,7 +126,7 @@ func ExtractCertificateHashFromContext(ctx context.Context) []byte {
// ExtractCertificateFromContext returns the TLS certificate (if applicable)
// from the given context of a gRPC stream
func ExtractCertificateFromContext(ctx context.Context) []byte {
func ExtractCertificateFromContext(ctx context.Context) *x509.Certificate {
pr, extracted := peer.FromContext(ctx)
if !extracted {
return nil
......@@ -145,5 +145,15 @@ func ExtractCertificateFromContext(ctx context.Context) []byte {
if len(certs) == 0 {
return nil
}
return certs[0].Raw
return certs[0]
}
// ExtractRawCertificateFromContext returns the raw TLS certificate (if applicable)
// from the given context of a gRPC stream
func ExtractRawCertificateFromContext(ctx context.Context) []byte {
cert := ExtractCertificateFromContext(ctx)
if cert == nil {
return nil
}
return cert.Raw
}
......@@ -11,6 +11,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/golang/protobuf/proto"
......@@ -18,16 +19,11 @@ import (
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
)
const (
// DefaultRPCTimeout is the default RPC timeout
// that RPCs use
DefaultRPCTimeout = time.Second * 5
)
// ChannelExtractor extracts the channel of a given message,
// or returns an empty string if that's not possible
type ChannelExtractor interface {
......@@ -38,8 +34,8 @@ type ChannelExtractor interface {
// Handler handles Step() and Submit() requests and returns a corresponding response
type Handler interface {
OnStep(channel string, sender uint64, req *orderer.StepRequest) (*orderer.StepResponse, error)
OnSubmit(channel string, sender uint64, req *orderer.SubmitRequest) (*orderer.SubmitResponse, error)
OnConsensus(channel string, sender uint64, req *orderer.ConsensusRequest) error
OnSubmit(channel string, sender uint64, req *orderer.SubmitRequest) error
}
// RemoteNode represents a cluster member
......@@ -89,7 +85,6 @@ type Comm struct {
H Handler
Connections *ConnectionStore
Chan2Members MembersByChannel
RPCTimeout time.Duration
}
type requestContext struct {
......@@ -99,23 +94,22 @@ type requestContext struct {
// DispatchSubmit identifies the channel and sender of the submit request and passes it
// to the underlying Handler
func (c *Comm) DispatchSubmit(ctx context.Context, request *orderer.SubmitRequest) (*orderer.SubmitResponse, error) {
c.Logger.Debug(request.Channel)
func (c *Comm) DispatchSubmit(ctx context.Context, request *orderer.SubmitRequest) error {
reqCtx, err := c.requestContext(ctx, request)
if err != nil {
return nil, errors.WithStack(err)
return err
}
return c.H.OnSubmit(reqCtx.channel, reqCtx.sender, request)
}
// DispatchStep identifies the channel and sender of the step request and passes it
// DispatchConsensus identifies the channel and sender of the step request and passes it
// to the underlying Handler
func (c *Comm) DispatchStep(ctx context.Context, request *orderer.StepRequest) (*orderer.StepResponse, error) {
func (c *Comm) DispatchConsensus(ctx context.Context, request *orderer.ConsensusRequest) error {
reqCtx, err := c.requestContext(ctx, request)
if err != nil {
return nil, errors.WithStack(err)
return err
}
return c.H.OnStep(reqCtx.channel, reqCtx.sender, request)
return c.H.OnConsensus(reqCtx.channel, reqCtx.sender, request)
}
// classifyRequest identifies the sender and channel of the request and returns
......@@ -125,6 +119,7 @@ func (c *Comm) requestContext(ctx context.Context, msg proto.Message) (*requestC
if channel == "" {
return nil, errors.Errorf("badly formatted message, cannot extract channel")
}
c.Lock.RLock()
mapping, exists := c.Chan2Members[channel]
c.Lock.RUnlock()
......@@ -133,10 +128,11 @@ func (c *Comm) requestContext(ctx context.Context, msg proto.Message) (*requestC
return nil, errors.Errorf("channel %s doesn't exist", channel)
}
cert := comm.ExtractCertificateFromContext(ctx)
cert := comm.ExtractRawCertificateFromContext(ctx)
if len(cert) == 0 {
return nil, errors.Errorf("no TLS certificate sent")
}
stub := mapping.LookupByClientCert(cert)
if stub == nil {
return nil, errors.Errorf("certificate extracted from TLS connection isn't authorized")
......@@ -170,7 +166,7 @@ func (c *Comm) Remote(channel string, id uint64) (*RemoteContext, error) {
return stub.RemoteContext, nil
}
err := stub.Activate(c.createRemoteContext(stub))
err := stub.Activate(c.createRemoteContext(stub, channel))
if err != nil {
return nil, errors.WithStack(err)
}
......@@ -239,14 +235,14 @@ func (c *Comm) applyMembershipConfig(channel string, newNodes []RemoteNode) {
for _, node := range newNodes {
newNodeIDs[node.ID] = struct{}{}
c.updateStubInMapping(mapping, node)
c.updateStubInMapping(channel, mapping, node)
}
// Remove all stubs without a corresponding node
// in the new nodes
for id, stub := range mapping {
if _, exists := newNodeIDs[id]; exists {
c.Logger.Info(id, "exists in both old and new membership, skipping its deactivation")
c.Logger.Info(id, "exists in both old and new membership for channel", channel, ", skipping its deactivation")
continue
}
c.Logger.Info("Deactivated node", id, "who's endpoint is", stub.Endpoint, "as it's removed from membership")
......@@ -256,10 +252,10 @@ func (c *Comm) applyMembershipConfig(channel string, newNodes []RemoteNode) {
}
// updateStubInMapping updates the given RemoteNode and adds it to the MemberMapping
func (c *Comm) updateStubInMapping(mapping MemberMapping, node RemoteNode) {
func (c *Comm) updateStubInMapping(channel string, mapping MemberMapping, node RemoteNode) {
stub := mapping.ByID(node.ID)
if stub == nil {
c.Logger.Info("Allocating a new stub for node", node.ID, "with endpoint of", node.Endpoint)
c.Logger.Info("Allocating a new stub for node", node.ID, "with endpoint of", node.Endpoint, "for channel", channel)
stub = &Stub{}
}
......@@ -267,7 +263,8 @@ func (c *Comm) updateStubInMapping(mapping MemberMapping, node RemoteNode) {
// and if so - then deactivate the stub, to trigger
// a re-creation of its gRPC connection
if !bytes.Equal(stub.ServerTLSCert, node.ServerTLSCert) {
c.Logger.Info("Deactivating node", node.ID, "with endpoint of", node.Endpoint, "due to TLS certificate change")
c.Logger.Info("Deactivating node", node.ID, "in channel", channel,
"with endpoint of", node.Endpoint, "due to TLS certificate change")
stub.Deactivate()
}
......@@ -283,24 +280,19 @@ func (c *Comm) updateStubInMapping(mapping MemberMapping, node RemoteNode) {
}
// Activate the stub
stub.Activate(c.createRemoteContext(stub))
stub.Activate(c.createRemoteContext(stub, channel))
}
// createRemoteStub returns a function that creates a RemoteContext.
// It is used as a parameter to Stub.Activate() in order to activate
// a stub atomically.
func (c *Comm) createRemoteContext(stub *Stub) func() (*RemoteContext, error) {
func (c *Comm) createRemoteContext(stub *Stub, channel string) func() (*RemoteContext, error) {
return func() (*RemoteContext, error) {
timeout := c.RPCTimeout
if timeout == time.Duration(0) {
timeout = DefaultRPCTimeout
}
c.Logger.Debug("Connecting to", stub.RemoteNode, "with gRPC timeout of", timeout)
c.Logger.Debug("Connecting to", stub.RemoteNode, "for channel", channel)
conn, err := c.Connections.Connection(stub.Endpoint, stub.ServerTLSCert)
if err != nil {
c.Logger.Warningf("Unable to obtain connection to %d(%s): %v", stub.ID, stub.Endpoint, err)
c.Logger.Warningf("Unable to obtain connection to %d(%s) (channel %s): %v", stub.ID, stub.Endpoint, channel, err)
return nil, err
}
......@@ -315,14 +307,11 @@ func (c *Comm) createRemoteContext(stub *Stub) func() (*RemoteContext, error) {
clusterClient := orderer.NewClusterClient(conn)
rc := &RemoteContext{
ProbeConn: probeConnection,
conn: conn,
RPCTimeout: timeout,
Client: clusterClient,
onAbort: func() {
c.Logger.Info("Aborted connection to", stub.ID, stub.Endpoint)
stub.RemoteContext = nil
},
endpoint: stub.Endpoint,
Logger: c.Logger,
ProbeConn: probeConnection,
conn: conn,
Client: clusterClient,
}
return rc, nil
}
......@@ -398,81 +387,185 @@ func (stub *Stub) Activate(createRemoteContext func() (*RemoteContext, error)) e
// RemoteContext interacts with remote cluster
// nodes. Every call can be aborted via call to Abort()
type RemoteContext struct {
RPCTimeout time.Duration
onAbort func()
Client orderer.ClusterClient
stepLock sync.Mutex
cancelStep func()
submitLock sync.Mutex
cancelSubmitStream func()
submitStream orderer.Cluster_SubmitClient
ProbeConn func(conn *grpc.ClientConn) error
conn *grpc.ClientConn
}
// SubmitStream creates a new Submit stream
func (rc *RemoteContext) SubmitStream() (orderer.Cluster_SubmitClient, error) {
rc.submitLock.Lock()
defer rc.submitLock.Unlock()
// Close previous submit stream to prevent resource leak
rc.closeSubmitStream()
Logger *flogging.FabricLogger
endpoint string
Client orderer.ClusterClient
ProbeConn func(conn *grpc.ClientConn) error
conn *grpc.ClientConn
nextStreamID uint64
streamsByID sync.Map
}
ctx, cancel := context.WithCancel(context.TODO())
submitStream, err := rc.Client.Submit(ctx)
if err != nil {
cancel()
return nil, errors.WithStack(err)
// Stream is used to send/receive messages to/from the remote cluster member.
type Stream struct {
ID uint64
NodeName string
Endpoint string
Logger *flogging.FabricLogger
Timeout time.Duration
orderer.Cluster_StepClient
Cancel func()
canceled *uint32
}
// StreamOperation denotes an operation done by a stream, such a Send or Receive.
type StreamOperation func() (*orderer.StepResponse, error)
// Canceled returns whether the stream was canceled.
func (stream *Stream) Canceled() bool {
return atomic.LoadUint32(stream.canceled) == uint32(1)
}
// Send sends the given request to the remote cluster member.
func (stream *Stream) Send(request *orderer.StepRequest) error {
start := time.Now()
defer func() {
if !stream.Logger.IsEnabledFor(zap.DebugLevel) {
return
}
stream.Logger.Debugf("Send of %s to %s(%s) took %v", requestAsString(request), stream.NodeName, stream.Endpoint, time.Since(start))
}()
f := func() (*orderer.StepResponse, error) {
err := stream.Cluster_StepClient.Send(request)
return nil, err
}
_, err := stream.operateWithTimeout(f)
return err
}
// Recv receives a message from a remote cluster member.
func (stream *Stream) Recv() (*orderer.StepResponse, error) {
start := time.Now()
defer func() {
if !stream.Logger.IsEnabledFor(zap.DebugLevel) {
return
}
stream.Logger.Debugf("Receive from %s(%s) took %v", stream.NodeName, stream.Endpoint, time.Since(start))
}()
f := func() (*orderer.StepResponse, error) {
return stream.Cluster_StepClient.Recv()
}
return stream.operateWithTimeout(f)
}
// operateWithTimeout performs the given operation on the stream, and blocks until the timeout expires.
func (stream *Stream) operateWithTimeout(invoke StreamOperation) (*orderer.StepResponse, error) {
timer := time.NewTimer(stream.Timeout)
defer timer.Stop()
var operationEnded sync.WaitGroup
operationEnded.Add(1)
responseChan := make(chan struct {
res *orderer.StepResponse
err error
}, 1)
go func() {
defer operationEnded.Done()
res, err := invoke()
responseChan <- struct {
res *orderer.StepResponse
err error
}{res: res, err: err}
}()
select {
case r := <-responseChan:
if r.err != nil {
stream.Cancel()
}
return r.res, r.err
case <-timer.C:
stream.Cancel()
// Wait for the operation goroutine to end
operationEnded.Wait()
stream.Logger.Warningf("Stream %d to %s(%s) was forcibly terminated because timeout (%v) expired",
stream.ID, stream.NodeName, stream.Endpoint, stream.Timeout)
return nil, errors.New("rpc timeout expired")
}
rc.submitStream = submitStream
rc.cancelSubmitStream = cancel
return rc.submitStream, nil
}
// Step passes an implementation-specific message to another cluster member.
func (rc *RemoteContext) Step(req *orderer.StepRequest) (*orderer.StepResponse, error) {
func requestAsString(request *orderer.StepRequest) string {
switch t := request.GetPayload().(type) {
case *orderer.StepRequest_SubmitRequest:
if t.SubmitRequest == nil || t.SubmitRequest.Payload == nil {
return fmt.Sprintf("Empty SubmitRequest: %v", t.SubmitRequest)
}
return fmt.Sprintf("SubmitRequest for channel %s with payload of size %d",
t.SubmitRequest.Channel, len(t.SubmitRequest.Payload.Payload))
case *orderer.StepRequest_ConsensusRequest:
return fmt.Sprintf("ConsensusRequest for channel %s with payload of size %d",
t.ConsensusRequest.Channel, len(t.ConsensusRequest.Payload))
default:
return fmt.Sprintf("unknown type: %v", request)
}
}
// NewStream creates a new stream.
// It is not thread safe, and Send() or Recv() block only until the timeout expires.
func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
if err := rc.ProbeConn(rc.conn); err != nil {
return nil, err
}
ctx, abort := context.WithCancel(context.TODO())
ctx, cancel := context.WithTimeout(ctx, rc.RPCTimeout)
defer cancel()
rc.stepLock.Lock()
rc.cancelStep = abort
rc.stepLock.Unlock()
ctx, cancel := context.WithCancel(context.TODO())
stream, err := rc.Client.Step(ctx)
if err != nil {
cancel()
return nil, errors.WithStack(err)
}
return rc.Client.Step(ctx, req)
}
streamID := atomic.AddUint64(&rc.nextStreamID, 1)
nodeName := commonNameFromContext(stream.Context())
// Abort aborts the contexts the RemoteContext uses,
// thus effectively causes all operations on the embedded
// ClusterClient to end.
func (rc *RemoteContext) Abort() {
rc.stepLock.Lock()
defer rc.stepLock.Unlock()
var canceled uint32
rc.submitLock.Lock()
defer rc.submitLock.Unlock()
abort := func() {
cancel()
rc.streamsByID.Delete(streamID)
rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint)
atomic.StoreUint32(&canceled, 1)
}
if rc.cancelStep != nil {
rc.cancelStep()
rc.cancelStep = nil
logger := flogging.MustGetLogger("orderer.common.cluster.step")
stepLogger := logger.WithOptions(zap.AddCallerSkip(1))
s := &Stream{
NodeName: nodeName,
Logger: stepLogger,
ID: streamID,
Endpoint: rc.endpoint,
Timeout: timeout,
Cluster_StepClient: stream,
Cancel: abort,
canceled: &canceled,
}
rc.closeSubmitStream()
rc.onAbort()
rc.Logger.Debugf("Created new stream to %s with ID of %d", rc.endpoint, streamID)
rc.streamsByID.Store(streamID, s)
return s, nil
}
// closeSubmitStream closes the Submit stream
// and invokes its cancellation function
func (rc *RemoteContext) closeSubmitStream() {
if rc.cancelSubmitStream != nil {
rc.cancelSubmitStream()
rc.cancelSubmitStream = nil
}
// Abort aborts the contexts the RemoteContext uses, thus effectively
// causes all operations that use this RemoteContext to terminate.
func (rc *RemoteContext) Abort() {
rc.streamsByID.Range(func(_, value interface{}) bool {
value.(*Stream).Cancel()
return false
})
}
if rc.submitStream != nil {
rc.submitStream.CloseSend()
rc.submitStream = nil
func commonNameFromContext(ctx context.Context) string {
cert := comm.ExtractCertificateFromContext(ctx)
if cert == nil {
return "unidentified node"
}
return cert.Subject.CommonName
}
This diff is collapsed.
......@@ -26,8 +26,6 @@ type SecureDialer interface {
Dial(address string, verifyFunc RemoteVerifier) (*grpc.ClientConn, error)
}
//go:generate mockery -dir . -name ConnectionMapper -case underscore -output ./mocks/
// ConnectionMapper maps certificates to connections
type ConnectionMapper interface {
Lookup(cert []byte) (*grpc.ClientConn, bool)
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import common "github.com/hyperledger/fabric/protos/common"
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import context "context"
......@@ -11,38 +12,8 @@ type ClusterClient struct {
mock.Mock
}
// Step provides a mock function with given fields: ctx, in, opts
func (_m *ClusterClient) Step(ctx context.Context, in *orderer.StepRequest, opts ...grpc.CallOption) (*orderer.StepResponse, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx, in)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 *orderer.StepResponse
if rf, ok := ret.Get(0).(func(context.Context, *orderer.StepRequest, ...grpc.CallOption) *orderer.StepResponse); ok {
r0 = rf(ctx, in, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*orderer.StepResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *orderer.StepRequest, ...grpc.CallOption) error); ok {
r1 = rf(ctx, in, opts...)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Submit provides a mock function with given fields: ctx, opts
func (_m *ClusterClient) Submit(ctx context.Context, opts ...grpc.CallOption) (orderer.Cluster_SubmitClient, error) {
// Step provides a mock function with given fields: ctx, opts
func (_m *ClusterClient) Step(ctx context.Context, opts ...grpc.CallOption) (orderer.Cluster_StepClient, error) {
_va := make([]interface{}, len(opts))
for _i := range opts {
_va[_i] = opts[_i]
......@@ -52,12 +23,12 @@ func (_m *ClusterClient) Submit(ctx context.Context, opts ...grpc.CallOption) (o
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)
var r0 orderer.Cluster_SubmitClient
if rf, ok := ret.Get(0).(func(context.Context, ...grpc.CallOption) orderer.Cluster_SubmitClient); ok {
var r0 orderer.Cluster_StepClient
if rf, ok := ret.Get(0).(func(context.Context, ...grpc.CallOption) orderer.Cluster_StepClient); ok {
r0 = rf(ctx, opts...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(orderer.Cluster_SubmitClient)
r0 = ret.Get(0).(orderer.Cluster_StepClient)
}
}
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import cluster "github.com/hyperledger/fabric/orderer/common/cluster"
......
// Code generated by mockery v1.0.0. DO NOT EDIT.
package mocks
import context "context"
......@@ -10,48 +11,30 @@ type Dispatcher struct {
mock.Mock
}
// DispatchStep provides a mock function with given fields: ctx, request
func (_m *Dispatcher) DispatchStep(ctx context.Context, request *orderer.StepRequest) (*orderer.StepResponse, error) {
// DispatchConsensus provides a mock function with given fields: ctx, request
func (_m *Dispatcher) DispatchConsensus(ctx context.Context, request *orderer.ConsensusRequest) error {
ret := _m.Called(ctx, request)
var r0 *orderer.StepResponse
if rf, ok := ret.Get(0).(func(context.Context, *orderer.StepRequest) *orderer.StepResponse); ok {
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, *orderer.ConsensusRequest) error); ok {
r0 = rf(ctx, request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*orderer.StepResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, *orderer.StepRequest) error); ok {
r1 = rf(ctx, request)
} else {
r1 = ret.Error(1)
r0 = ret.Error(0)
}
return r0, r1
return r0
}
// DispatchSubmit provides a mock function with given fields: ctx, request
func (_m *