Commit 193d8844 authored by yacovm's avatar yacovm Committed by Yacov Manevich
Browse files

[FAB-14858] Address flake in TestSendBigMessage



This change set addresses a flake in TestSendBigMessage that happens
due to creating the stream too early and freezing too late.

Now the code first freezes, and only then creates the stream.

Also switched the waitGroup to a condition variable which is more
idiomatic for this use case.

Change-Id: I673e8d6cc38caf68fd298dc1a3acfc0af1302961
Signed-off-by: default avataryacovm <yacovm@il.ibm.com>
parent da4ddd05
......@@ -128,7 +128,9 @@ func (*mockChannelExtractor) TargetChannel(msg proto.Message) string {
}
type clusterNode struct {
freezeWG sync.WaitGroup
lock sync.Mutex
frozen bool
freezeCond sync.Cond
dialer *cluster.PredicateDialer
handler *mocks.Handler
nodeInfo cluster.RemoteNode
......@@ -140,7 +142,7 @@ type clusterNode struct {
}
func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
cn.freezeWG.Wait()
cn.waitIfFrozen()
req, err := stream.Recv()
if err != nil {
return err
......@@ -154,12 +156,28 @@ func (cn *clusterNode) Step(stream orderer.Cluster_StepServer) error {
return stream.Send(&orderer.StepResponse{})
}
func (cn *clusterNode) waitIfFrozen() {
cn.lock.Lock()
// There is no freeze after an unfreeze so no need
// for a for loop.
if cn.frozen {
cn.freezeCond.Wait()
return
}
cn.lock.Unlock()
}
func (cn *clusterNode) freeze() {
cn.freezeWG.Add(1)
cn.lock.Lock()
defer cn.lock.Unlock()
cn.frozen = true
}
func (cn *clusterNode) unfreeze() {
cn.freezeWG.Done()
cn.lock.Lock()
cn.frozen = false
cn.lock.Unlock()
cn.freezeCond.Broadcast()
}
func (cn *clusterNode) resurrect() {
......@@ -245,6 +263,8 @@ func newTestNodeWithMetrics(t *testing.T, metrics cluster.MetricsProvider, tlsCo
srv: gRPCServer,
}
tstSrv.freezeCond.L = &tstSrv.lock
tstSrv.c = &cluster.Comm{
CertExpWarningThreshold: time.Hour,
SendBufferSize: 1,
......@@ -329,6 +349,11 @@ func TestSendBigMessage(t *testing.T) {
streams := map[uint64]*cluster.Stream{}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
// Freeze the node, in order to block its Recv
node.freeze()
}
for _, node := range []*clusterNode{node2, node3, node4, node5} {
rm, err := node1.c.Remote(testChannel, node.nodeInfo.ID)
assert.NoError(t, err)
......@@ -340,8 +365,6 @@ func TestSendBigMessage(t *testing.T) {
t0 := time.Now()
for _, node := range []*clusterNode{node2, node3, node4, node5} {
stream := streams[node.nodeInfo.ID]
// Freeze the node, in order to block its Recv
node.freeze()
t1 := time.Now()
err = stream.Send(wrappedMsg)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment