Unverified Commit 5c2e2122 authored by yacovm's avatar yacovm Committed by Artem Barger
Browse files

[FAB-14045] Send messages asynchronously in clusters



This change set makes consensus messages be sent asynchronously, over
a buffered channel with a size of 10.

Consensus messages are dropped when the buffer overflows,
and Submit messages are blocking on the buffer.

Without this change set, the sending of large messages takes milliseconds,
while with the change set it takes micro-seconds.

Change-Id: Id60b05b96eed6d9d04f89b8967945b18ddfbef94
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent fc87b4ff
......@@ -24,6 +24,12 @@ import (
"google.golang.org/grpc/connectivity"
)
var (
errOverflow = errors.New("send queue overflown")
errAborted = errors.New("aborted")
errTimeout = errors.New("rpc timeout expired")
)
// ChannelExtractor extracts the channel of a given message,
// or returns an empty string if that's not possible
type ChannelExtractor interface {
......@@ -78,13 +84,15 @@ type MembersByChannel map[string]MemberMapping
// Comm implements Communicator
type Comm struct {
shutdown bool
Lock sync.RWMutex
Logger *flogging.FabricLogger
ChanExt ChannelExtractor
H Handler
Connections *ConnectionStore
Chan2Members MembersByChannel
shutdownSignal chan struct{}
shutdown bool
SendBufferSize int
Lock sync.RWMutex
Logger *flogging.FabricLogger
ChanExt ChannelExtractor
H Handler
Connections *ConnectionStore
Chan2Members MembersByChannel
}
type requestContext struct {
......@@ -181,6 +189,8 @@ func (c *Comm) Configure(channel string, newNodes []RemoteNode) {
c.Lock.Lock()
defer c.Lock.Unlock()
c.createShutdownSignalIfNeeded()
if c.shutdown {
return
}
......@@ -192,11 +202,22 @@ func (c *Comm) Configure(channel string, newNodes []RemoteNode) {
c.cleanUnusedConnections(beforeConfigChange)
}
func (c *Comm) createShutdownSignalIfNeeded() {
if c.shutdownSignal == nil {
c.shutdownSignal = make(chan struct{})
}
}
// Shutdown shuts down the instance
func (c *Comm) Shutdown() {
c.Lock.Lock()
defer c.Lock.Unlock()
c.createShutdownSignalIfNeeded()
if !c.shutdown {
close(c.shutdownSignal)
}
c.shutdown = true
for _, members := range c.Chan2Members {
for _, member := range members {
......@@ -307,11 +328,13 @@ func (c *Comm) createRemoteContext(stub *Stub, channel string) func() (*RemoteCo
clusterClient := orderer.NewClusterClient(conn)
rc := &RemoteContext{
endpoint: stub.Endpoint,
Logger: c.Logger,
ProbeConn: probeConnection,
conn: conn,
Client: clusterClient,
SendBuffSize: c.SendBufferSize,
shutdownSignal: c.shutdownSignal,
endpoint: stub.Endpoint,
Logger: c.Logger,
ProbeConn: probeConnection,
conn: conn,
Client: clusterClient,
}
return rc, nil
}
......@@ -387,24 +410,30 @@ func (stub *Stub) Activate(createRemoteContext func() (*RemoteContext, error)) e
// RemoteContext interacts with remote cluster
// nodes. Every call can be aborted via call to Abort()
type RemoteContext struct {
Logger *flogging.FabricLogger
endpoint string
Client orderer.ClusterClient
ProbeConn func(conn *grpc.ClientConn) error
conn *grpc.ClientConn
nextStreamID uint64
streamsByID sync.Map
SendBuffSize int
shutdownSignal chan struct{}
Logger *flogging.FabricLogger
endpoint string
Client orderer.ClusterClient
ProbeConn func(conn *grpc.ClientConn) error
conn *grpc.ClientConn
nextStreamID uint64
streamsByID sync.Map
}
// Stream is used to send/receive messages to/from the remote cluster member.
type Stream struct {
ID uint64
NodeName string
Endpoint string
Logger *flogging.FabricLogger
Timeout time.Duration
abortChan <-chan struct{}
sendBuff chan *orderer.StepRequest
commShutdown chan struct{}
abortReason *atomic.Value
ID uint64
NodeName string
Endpoint string
Logger *flogging.FabricLogger
Timeout time.Duration
orderer.Cluster_StepClient
Cancel func()
Cancel func(error)
canceled *uint32
}
......@@ -418,12 +447,51 @@ func (stream *Stream) Canceled() bool {
// Send sends the given request to the remote cluster member.
func (stream *Stream) Send(request *orderer.StepRequest) error {
if stream.Canceled() {
return errors.New(stream.abortReason.Load().(string))
}
var allowDrop bool
// We want to drop consensus transactions if the remote node cannot keep up with us,
// otherwise we'll slow down the entire FSM.
if request.GetConsensusRequest() != nil {
allowDrop = true
}
return stream.sendOrDrop(request, allowDrop)
}
// sendOrDrop sends the given request to the remote cluster member, or drops it
// if it is a consensus request and the queue is full.
func (stream *Stream) sendOrDrop(request *orderer.StepRequest, allowDrop bool) error {
if allowDrop && len(stream.sendBuff) == cap(stream.sendBuff) {
stream.Cancel(errOverflow)
return errOverflow
}
select {
case <-stream.abortChan:
return errors.New("stream aborted")
case stream.sendBuff <- request:
return nil
case <-stream.commShutdown:
return nil
}
}
// sendMessage sends the request down the stream
func (stream *Stream) sendMessage(request *orderer.StepRequest) {
start := time.Now()
var err error
defer func() {
if !stream.Logger.IsEnabledFor(zap.DebugLevel) {
return
}
stream.Logger.Debugf("Send of %s to %s(%s) took %v", requestAsString(request), stream.NodeName, stream.Endpoint, time.Since(start))
var result string
if err != nil {
result = fmt.Sprintf("but failed due to %s", err.Error())
}
stream.Logger.Debugf("Send of %s to %s(%s) took %v %s", requestAsString(request),
stream.NodeName, stream.Endpoint, time.Since(start), result)
}()
f := func() (*orderer.StepResponse, error) {
......@@ -431,8 +499,22 @@ func (stream *Stream) Send(request *orderer.StepRequest) error {
return nil, err
}
_, err := stream.operateWithTimeout(f)
return err
_, err = stream.operateWithTimeout(f)
}
func (stream *Stream) periodicFlushStream() {
defer stream.Cancel(errAborted)
for {
select {
case msg := <-stream.sendBuff:
stream.sendMessage(msg)
case <-stream.abortChan:
return
case <-stream.commShutdown:
return
}
}
}
// Recv receives a message from a remote cluster member.
......@@ -477,16 +559,16 @@ func (stream *Stream) operateWithTimeout(invoke StreamOperation) (*orderer.StepR
select {
case r := <-responseChan:
if r.err != nil {
stream.Cancel()
stream.Cancel(r.err)
}
return r.res, r.err
case <-timer.C:
stream.Cancel()
stream.Cancel(errTimeout)
// Wait for the operation goroutine to end
operationEnded.Wait()
stream.Logger.Warningf("Stream %d to %s(%s) was forcibly terminated because timeout (%v) expired",
stream.ID, stream.NodeName, stream.Endpoint, stream.Timeout)
return nil, errors.New("rpc timeout expired")
return nil, errTimeout
}
}
......@@ -525,31 +607,48 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
var canceled uint32
abortChan := make(chan struct{})
abort := func() {
cancel()
rc.streamsByID.Delete(streamID)
rc.Logger.Debugf("Stream %d to %s(%s) is aborted", streamID, nodeName, rc.endpoint)
atomic.StoreUint32(&canceled, 1)
close(abortChan)
}
once := &sync.Once{}
abortReason := &atomic.Value{}
cancelWithReason := func(err error) {
abortReason.Store(err.Error())
once.Do(abort)
}
logger := flogging.MustGetLogger("orderer.common.cluster.step")
stepLogger := logger.WithOptions(zap.AddCallerSkip(1))
s := &Stream{
abortReason: abortReason,
abortChan: abortChan,
sendBuff: make(chan *orderer.StepRequest, rc.SendBuffSize),
commShutdown: rc.shutdownSignal,
NodeName: nodeName,
Logger: stepLogger,
ID: streamID,
Endpoint: rc.endpoint,
Timeout: timeout,
Cluster_StepClient: stream,
Cancel: abort,
Cancel: cancelWithReason,
canceled: &canceled,
}
rc.Logger.Debugf("Created new stream to %s with ID of %d", rc.endpoint, streamID)
rc.Logger.Debugf("Created new stream to %s with ID of %d and buffer size of %d",
rc.endpoint, streamID, cap(s.sendBuff))
rc.streamsByID.Store(streamID, s)
go s.periodicFlushStream()
return s, nil
}
......@@ -557,7 +656,7 @@ func (rc *RemoteContext) NewStream(timeout time.Duration) (*Stream, error) {
// causes all operations that use this RemoteContext to terminate.
func (rc *RemoteContext) Abort() {
rc.streamsByID.Range(func(_, value interface{}) bool {
value.(*Stream).Cancel()
value.(*Stream).Cancel(errAborted)
return false
})
}
......
......@@ -8,6 +8,7 @@ package cluster_test
import (
"context"
"crypto/rand"
"fmt"
"net"
"sync"
......@@ -113,6 +114,7 @@ func (*mockChannelExtractor) TargetChannel(msg proto.Message) string {
}
type clusterNode struct {
freezeWG sync.WaitGroup
dialer *cluster.PredicateDialer
handler *mocks.Handler
nodeInfo cluster.RemoteNode
......@@ -124,6 +126,7 @@ type clusterNode struct {
}
func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
cn.freezeWG.Wait()
req, err := stream.Recv()
if err != nil {
return err
......@@ -137,6 +140,14 @@ func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
return stream.Send(&orderer.StepResponse{})
}
func (cn *clusterNode) freeze() {
cn.freezeWG.Add(1)
}
func (cn *clusterNode) unfreeze() {
cn.freezeWG.Done()
}
func (cn *clusterNode) resurrect() {
gRPCServer, err := comm_utils.NewGRPCServer(cn.bindAddress, cn.serverConfig)
if err != nil {
......@@ -221,11 +232,12 @@ func newTestNode(t *testing.T) *clusterNode {
}
tstSrv.c = &cluster.Comm{
Logger: flogging.MustGetLogger("test"),
Chan2Members: make(cluster.MembersByChannel),
H: handler,
ChanExt: channelExtractor,
Connections: cluster.NewConnectionStore(dialer),
SendBufferSize: 1,
Logger: flogging.MustGetLogger("test"),
Chan2Members: make(cluster.MembersByChannel),
H: handler,
ChanExt: channelExtractor,
Connections: cluster.NewConnectionStore(dialer),
}
orderer.RegisterClusterServer(gRPCServer.Server(), tstSrv)
......@@ -233,6 +245,195 @@ func newTestNode(t *testing.T) *clusterNode {
return tstSrv
}
func TestSendBigMessage(t *testing.T) {
t.Parallel()
// Scenario: Basic test that spawns 5 nodes and sends a big message
// from one of the nodes to the others.
// A receiver node's Step() server side method (which calls Recv)
// is frozen until the sender's node Send method returns,
// Hence - the sender node finishes calling Send
// before a receiver node starts calling Recv.
// This ensures that Send is non blocking even with big messages.
// In the test, we send a total of 8MB of random data (2MB to each node).
// The randomness is used so gRPC compression won't compress it to a lower size.
node1 := newTestNode(t)
node2 := newTestNode(t)
node3 := newTestNode(t)
node4 := newTestNode(t)
node5 := newTestNode(t)
for _, node := range []*clusterNode{node2, node3, node4, node5} {
node.c.SendBufferSize = 1
}
defer node1.stop()
defer node2.stop()
defer node3.stop()
defer node4.stop()
defer node5.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo, node3.nodeInfo, node4.nodeInfo, node5.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
node3.c.Configure(testChannel, config)
node4.c.Configure(testChannel, config)
node5.c.Configure(testChannel, config)
var messageReceived sync.WaitGroup
messageReceived.Add(4)
msgSize := 1024 * 1024 * 2
bigMsg := &orderer.ConsensusRequest{
Channel: testChannel,
Payload: make([]byte, msgSize),
}
_, err := rand.Read(bigMsg.Payload)
assert.NoError(t, err)
wrappedMsg := &orderer.StepRequest{
Payload: &orderer.StepRequest_ConsensusRequest{
ConsensusRequest: bigMsg,
},
}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
node.handler.On("OnConsensus", testChannel, node1.nodeInfo.ID, mock.Anything).Run(func(args mock.Arguments) {
msg := args.Get(2).(*orderer.ConsensusRequest)
assert.Len(t, msg.Payload, msgSize)
messageReceived.Done()
}).Return(nil)
}
streams := map[uint64]*cluster.Stream{}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
rm, err := node1.c.Remote(testChannel, node.nodeInfo.ID)
assert.NoError(t, err)
stream := assertEventualEstablishStream(t, rm)
streams[node.nodeInfo.ID] = stream
}
t0 := time.Now()
for _, node := range []*clusterNode{node2, node3, node4, node5} {
stream := streams[node.nodeInfo.ID]
// Freeze the node, in order to block its Recv
node.freeze()
t1 := time.Now()
err = stream.Send(wrappedMsg)
assert.NoError(t, err)
t.Log("Sending took", time.Since(t1))
t1 = time.Now()
// Unfreeze the node. It can now call Recv, and signal the messageReceived waitGroup.
node.unfreeze()
}
t.Log("Total sending time to all 4 nodes took:", time.Since(t0))
messageReceived.Wait()
}
func TestBlockingSend(t *testing.T) {
t.Parallel()
// Scenario: Basic test that spawns 2 nodes and sends from the first node
// to the second node, three SubmitRequests, or three consensus requests.
// SubmitRequests should block, but consensus requests should not.
for _, testCase := range []struct {
description string
messageToSend *orderer.StepRequest
streamUnblocks bool
elapsedGreaterThan time.Duration
overflowErr string
}{
{
description: "SubmitRequest",
messageToSend: wrapSubmitReq(testReq),
streamUnblocks: true,
elapsedGreaterThan: time.Second / 2,
},
{
description: "ConsensusRequest",
messageToSend: testConsensusReq,
overflowErr: "send queue overflown",
},
} {
t.Run(testCase.description, func(t *testing.T) {
node1 := newTestNode(t)
node2 := newTestNode(t)
node1.c.SendBufferSize = 1
node2.c.SendBufferSize = 1
defer node1.stop()
defer node2.stop()
config := []cluster.RemoteNode{node1.nodeInfo, node2.nodeInfo}
node1.c.Configure(testChannel, config)
node2.c.Configure(testChannel, config)
rm, err := node1.c.Remote(testChannel, node2.nodeInfo.ID)
assert.NoError(t, err)
client := &mocks.ClusterClient{}
fakeStream := &mocks.StepClient{}
// Replace real client with a mock client
rm.Client = client
rm.ProbeConn = func(_ *grpc.ClientConn) error {
return nil
}
// Configure client to return the mock stream
fakeStream.On("Context", mock.Anything).Return(context.Background())
client.On("Step", mock.Anything).Return(fakeStream, nil).Once()
var unBlock sync.WaitGroup
unBlock.Add(1)
fakeStream.On("Send", mock.Anything).Run(func(_ mock.Arguments) {
unBlock.Wait()
}).Return(errors.New("oops"))
stream, err := rm.NewStream(time.Hour)
assert.NoError(t, err)
// The first send doesn't block, even though the Send operation blocks.
err = stream.Send(testCase.messageToSend)
assert.NoError(t, err)
// The second once doesn't either.
// At this point, we have 1 goroutine which is blocked on Send(),
// and one message in the buffer.
err = stream.Send(testCase.messageToSend)
if testCase.overflowErr == "" {
assert.NoError(t, err)
} else {
assert.EqualError(t, err, testCase.overflowErr)
}
// The third blocks, so we need to unblock it ourselves
// in order for it to go through, unless the operation
// is non blocking.
go func() {
time.Sleep(time.Second)
if testCase.streamUnblocks {
unBlock.Done()
}
}()
t1 := time.Now()
stream.Send(testCase.messageToSend)
elapsed := time.Since(t1)
t.Log("Elapsed time:", elapsed)
assert.True(t, elapsed > testCase.elapsedGreaterThan)
})
}
}
func TestBasic(t *testing.T) {
t.Parallel()
// Scenario: Basic test that spawns 2 nodes and sends each other
......@@ -513,7 +714,7 @@ func testAbort(t *testing.T, abortFunc func(*cluster.RemoteContext), rpcTimeout
gt.Eventually(func() error {
stream, err = rm.NewStream(rpcTimeout)
return err
}).Should(gomega.Succeed())
}, time.Second*10, time.Millisecond*10).Should(gomega.Succeed())
stream.Send(wrapSubmitReq(testSubReq))
_, err = stream.Recv()
......
......@@ -14,6 +14,7 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/protos/orderer"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
......@@ -65,6 +66,10 @@ const (
// Consensus passes the given ConsensusRequest message to the raft.Node instance.
func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) error {
if s.Logger.IsEnabledFor(zapcore.DebugLevel) {
defer s.consensusSent(time.Now(), destination, msg)
}
stream, err := s.getOrCreateStream(destination, ConsensusOperation)
if err != nil {
return err
......@@ -89,6 +94,10 @@ func (s *RPC) SendConsensus(destination uint64, msg *orderer.ConsensusRequest) e
// SendSubmit sends a SubmitRequest to the given destination node.
func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) error {
if s.Logger.IsEnabledFor(zapcore.DebugLevel) {
defer s.submitSent(time.Now(), destination, request)
}
stream, err := s.getOrCreateStream(destination, SubmitOperation)
if err != nil {
return err
......@@ -110,6 +119,14 @@ func (s *RPC) SendSubmit(destination uint64, request *orderer.SubmitRequest) err
return err
}
func (s *RPC) submitSent(start time.Time, to uint64, msg *orderer.SubmitRequest) {
s.Logger.Debugf("Sending msg of %d bytes to %d on channel %s took %v", submitMsgLength(msg), to, s.Channel, time.Since(start))
}
func (s *RPC) consensusSent(start time.Time, to uint64, msg *orderer.ConsensusRequest) {
s.Logger.Debugf("Sending msg of %d bytes to %d on channel %s took %v", len(msg.Payload), to, s.Channel, time.Since(start))
}
// getProposeStream obtains a Submit stream for the given destination node
func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType) (orderer.Cluster_StepClient, error) {
stream := s.getStream(destination, operationType)
......@@ -122,7 +139,7 @@ func (s *RPC) getOrCreateStream(destination uint64, operationType OperationType)
}
stream, err = stub.NewStream(s.Timeout)
if err != nil {
return nil, errors.WithStack(err)
return nil, err
}
s.mapStream(destination, stream, operationType)
return stream, nil
......@@ -156,3 +173,10 @@ func (s *RPC) cleanCanceledStreams(operationType OperationType) {
delete(s.StreamsByType[operationType], destination)
}
}
func submitMsgLength(request *orderer.SubmitRequest) int {
if request.Payload == nil {
return 0
}
return len(request.Payload.Payload)
}
......@@ -9,6 +9,7 @@ package cluster_test
import (
"context"
"io"