gossip_test.go 44.1 KB
Newer Older
YACOVM's avatar
YACOVM committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
YACOVM's avatar
YACOVM committed
3

4
SPDX-License-Identifier: Apache-2.0
YACOVM's avatar
YACOVM committed
5
6
7
8
9
*/

package gossip

import (
YACOVM's avatar
YACOVM committed
10
	"bytes"
11
	"errors"
YACOVM's avatar
YACOVM committed
12
	"fmt"
13
	"math/rand"
YACOVM's avatar
YACOVM committed
14
	"runtime"
YACOVM's avatar
YACOVM committed
15
16
	"strconv"
	"strings"
YACOVM's avatar
YACOVM committed
17
18
19
20
21
	"sync"
	"sync/atomic"
	"testing"
	"time"

22
23
	"reflect"

24
	"github.com/hyperledger/fabric/bccsp/factory"
25
	"github.com/hyperledger/fabric/gossip/api"
26
	"github.com/hyperledger/fabric/gossip/comm"
27
	"github.com/hyperledger/fabric/gossip/common"
YACOVM's avatar
YACOVM committed
28
29
30
	"github.com/hyperledger/fabric/gossip/discovery"
	"github.com/hyperledger/fabric/gossip/gossip/algo"
	"github.com/hyperledger/fabric/gossip/util"
31
	proto "github.com/hyperledger/fabric/protos/gossip"
YACOVM's avatar
YACOVM committed
32
33
34
	"github.com/stretchr/testify/assert"
)

YACOVM's avatar
YACOVM committed
35
36
37
var timeout = time.Second * time.Duration(180)

var testWG = sync.WaitGroup{}
YACOVM's avatar
YACOVM committed
38

39
40
41
42
43
44
45
46
var tests = []func(t *testing.T){
	TestPull,
	TestConnectToAnchorPeers,
	TestMembership,
	TestDissemination,
	TestMembershipConvergence,
	TestMembershipRequestSpoofing,
	TestDataLeakage,
47
	TestLeaveChannel,
48
49
	//TestDisseminateAll2All: {},
	TestIdentityExpiration,
50
	TestSendByCriteria,
51
52
53
54
	TestMultipleOrgEndpointLeakage,
	TestConfidentiality,
	TestAnchorPeer,
	TestBootstrapPeerMisConfiguration,
55
	TestNoMessagesSelfLoop,
56
57
}

YACOVM's avatar
YACOVM committed
58
func init() {
59
	util.SetupTestLogging()
60
	rand.Seed(int64(time.Now().Second()))
YACOVM's avatar
YACOVM committed
61
	aliveTimeInterval := time.Duration(1000) * time.Millisecond
62
	discovery.SetAliveTimeInterval(aliveTimeInterval)
YACOVM's avatar
YACOVM committed
63
	discovery.SetAliveExpirationCheckInterval(aliveTimeInterval)
64
	discovery.SetAliveExpirationTimeout(aliveTimeInterval * 10)
65
	discovery.SetReconnectInterval(aliveTimeInterval)
66
	discovery.SetMaxConnAttempts(5)
67
68
69
	for range tests {
		testWG.Add(1)
	}
70
	factory.InitFactories(nil)
YACOVM's avatar
YACOVM committed
71
72
}

73
74
var expirationTimes map[string]time.Time = map[string]time.Time{}

YACOVM's avatar
YACOVM committed
75
var orgInChannelA = api.OrgIdentityType("ORG1")
YACOVM's avatar
YACOVM committed
76
77
78
79
80
81
82
83

func acceptData(m interface{}) bool {
	if dataMsg := m.(*proto.GossipMessage).GetDataMsg(); dataMsg != nil {
		return true
	}
	return false
}

84
85
86
87
88
89
90
func acceptLeadershp(message interface{}) bool {
	validMsg := message.(*proto.GossipMessage).Tag == proto.GossipMessage_CHAN_AND_ORG &&
		message.(*proto.GossipMessage).IsLeadershipMsg()

	return validMsg
}

YACOVM's avatar
YACOVM committed
91
type joinChanMsg struct {
92
	members2AnchorPeers map[string][]api.AnchorPeer
YACOVM's avatar
YACOVM committed
93
94
}

YACOVM's avatar
YACOVM committed
95
96
97
98
// SequenceNumber returns the sequence number of the block this joinChanMsg
// is derived from
func (*joinChanMsg) SequenceNumber() uint64 {
	return uint64(time.Now().UnixNano())
YACOVM's avatar
YACOVM committed
99
100
}

101
102
103
104
// Members returns the organizations of the channel
func (jcm *joinChanMsg) Members() []api.OrgIdentityType {
	if jcm.members2AnchorPeers == nil {
		return []api.OrgIdentityType{orgInChannelA}
105
	}
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
	members := make([]api.OrgIdentityType, len(jcm.members2AnchorPeers))
	i := 0
	for org := range jcm.members2AnchorPeers {
		members[i] = api.OrgIdentityType(org)
		i++
	}
	return members
}

// AnchorPeersOf returns the anchor peers of the given organization
func (jcm *joinChanMsg) AnchorPeersOf(org api.OrgIdentityType) []api.AnchorPeer {
	if jcm.members2AnchorPeers == nil {
		return []api.AnchorPeer{}
	}
	return jcm.members2AnchorPeers[string(org)]
YACOVM's avatar
YACOVM committed
121
122
}

YACOVM's avatar
YACOVM committed
123
type naiveCryptoService struct {
124
	sync.RWMutex
125
	allowedPkiIDS map[string]struct{}
126
	revokedPkiIDS map[string]struct{}
YACOVM's avatar
YACOVM committed
127
128
}

129
130
131
132
func (cs *naiveCryptoService) OrgByPeerIdentity(api.PeerIdentityType) api.OrgIdentityType {
	return nil
}

133
134
135
136
137
138
139
func (*naiveCryptoService) Expiration(peerIdentity api.PeerIdentityType) (time.Time, error) {
	if exp, exists := expirationTimes[string(peerIdentity)]; exists {
		return exp, nil
	}
	return time.Now().Add(time.Hour), nil
}

YACOVM's avatar
YACOVM committed
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
type orgCryptoService struct {
}

// OrgByPeerIdentity returns the OrgIdentityType
// of a given peer identity
func (*orgCryptoService) OrgByPeerIdentity(identity api.PeerIdentityType) api.OrgIdentityType {
	return orgInChannelA
}

// Verify verifies a JoinChanMessage, returns nil on success,
// and an error on failure
func (*orgCryptoService) Verify(joinChanMsg api.JoinChannelMessage) error {
	return nil
}

155
156
// VerifyByChannel verifies a peer's signature on a message in the context
// of a specific channel
157
158
159
160
161
162
163
164
func (cs *naiveCryptoService) VerifyByChannel(_ common.ChainID, identity api.PeerIdentityType, _, _ []byte) error {
	if cs.allowedPkiIDS == nil {
		return nil
	}
	if _, allowed := cs.allowedPkiIDS[string(identity)]; allowed {
		return nil
	}
	return errors.New("Forbidden")
165
166
}

167
168
169
170
171
172
173
174
175
func (cs *naiveCryptoService) ValidateIdentity(peerIdentity api.PeerIdentityType) error {
	cs.RLock()
	defer cs.RUnlock()
	if cs.revokedPkiIDS == nil {
		return nil
	}
	if _, revoked := cs.revokedPkiIDS[string(cs.GetPKIidOfCert(peerIdentity))]; revoked {
		return errors.New("revoked")
	}
176
177
178
179
180
181
182
183
184
185
	return nil
}

// GetPKIidOfCert returns the PKI-ID of a peer's identity
func (*naiveCryptoService) GetPKIidOfCert(peerIdentity api.PeerIdentityType) common.PKIidType {
	return common.PKIidType(peerIdentity)
}

// VerifyBlock returns nil if the block is properly signed,
// else returns error
186
func (*naiveCryptoService) VerifyBlock(chainID common.ChainID, seqNum uint64, signedBlock []byte) error {
187
188
189
190
191
	return nil
}

// Sign signs msg with this peer's signing key and outputs
// the signature if no error occurred.
YACOVM's avatar
YACOVM committed
192
func (*naiveCryptoService) Sign(msg []byte) ([]byte, error) {
193
194
195
	sig := make([]byte, len(msg))
	copy(sig, msg)
	return sig, nil
YACOVM's avatar
YACOVM committed
196
197
}

198
199
200
201
202
203
204
// Verify checks that signature is a valid signature of message under a peer's verification key.
// If the verification succeeded, Verify returns nil meaning no error occurred.
// If peerCert is nil, then the signature is verified against this peer's verification key.
func (*naiveCryptoService) Verify(peerIdentity api.PeerIdentityType, signature, message []byte) error {
	equal := bytes.Equal(signature, message)
	if !equal {
		return fmt.Errorf("Wrong signature:%v, %v", signature, message)
YACOVM's avatar
YACOVM committed
205
	}
206
	return nil
YACOVM's avatar
YACOVM committed
207
208
}

209
210
211
212
213
214
215
216
217
func (cs *naiveCryptoService) revoke(pkiID common.PKIidType) {
	cs.Lock()
	defer cs.Unlock()
	if cs.revokedPkiIDS == nil {
		cs.revokedPkiIDS = map[string]struct{}{}
	}
	cs.revokedPkiIDS[string(pkiID)] = struct{}{}
}

YACOVM's avatar
YACOVM committed
218
func bootPeers(portPrefix int, ids ...int) []string {
YACOVM's avatar
YACOVM committed
219
220
	peers := []string{}
	for _, id := range ids {
221
		peers = append(peers, fmt.Sprintf("localhost:%d", id+portPrefix))
YACOVM's avatar
YACOVM committed
222
223
224
225
	}
	return peers
}

226
func newGossipInstanceWithCustomMCS(portPrefix int, id int, maxMsgCount int, mcs api.MessageCryptoService, boot ...int) Gossip {
YACOVM's avatar
YACOVM committed
227
228
	port := id + portPrefix
	conf := &Config{
YACOVM's avatar
YACOVM committed
229
230
231
232
233
234
		BindPort:                   port,
		BootstrapPeers:             bootPeers(portPrefix, boot...),
		ID:                         fmt.Sprintf("p%d", id),
		MaxBlockCountToStore:       maxMsgCount,
		MaxPropagationBurstLatency: time.Duration(500) * time.Millisecond,
		MaxPropagationBurstSize:    20,
YACOVM's avatar
YACOVM committed
235
236
		PropagateIterations:        1,
		PropagatePeerNum:           3,
YACOVM's avatar
YACOVM committed
237
		PullInterval:               time.Duration(2) * time.Second,
YACOVM's avatar
YACOVM committed
238
		PullPeerNum:                5,
239
240
		InternalEndpoint:           fmt.Sprintf("localhost:%d", port),
		ExternalEndpoint:           fmt.Sprintf("1.2.3.4:%d", port),
YACOVM's avatar
YACOVM committed
241
		PublishCertPeriod:          time.Duration(4) * time.Second,
YACOVM's avatar
YACOVM committed
242
243
		PublishStateInfoInterval:   time.Duration(1) * time.Second,
		RequestStateInfoInterval:   time.Duration(1) * time.Second,
YACOVM's avatar
YACOVM committed
244
	}
245
	selfID := api.PeerIdentityType(conf.InternalEndpoint)
246
	g := NewGossipServiceWithServer(conf, &orgCryptoService{}, mcs,
247
		selfID, nil)
248

YACOVM's avatar
YACOVM committed
249
	return g
YACOVM's avatar
YACOVM committed
250
251
}

252
253
254
255
func newGossipInstance(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip {
	return newGossipInstanceWithCustomMCS(portPrefix, id, maxMsgCount, &naiveCryptoService{}, boot...)
}

YACOVM's avatar
YACOVM committed
256
func newGossipInstanceWithOnlyPull(portPrefix int, id int, maxMsgCount int, boot ...int) Gossip {
YACOVM's avatar
YACOVM committed
257
258
	port := id + portPrefix
	conf := &Config{
YACOVM's avatar
YACOVM committed
259
260
261
262
263
		BindPort:                   port,
		BootstrapPeers:             bootPeers(portPrefix, boot...),
		ID:                         fmt.Sprintf("p%d", id),
		MaxBlockCountToStore:       maxMsgCount,
		MaxPropagationBurstLatency: time.Duration(1000) * time.Millisecond,
YACOVM's avatar
YACOVM committed
264
265
266
267
268
		MaxPropagationBurstSize:    10,
		PropagateIterations:        0,
		PropagatePeerNum:           0,
		PullInterval:               time.Duration(1000) * time.Millisecond,
		PullPeerNum:                20,
269
270
		InternalEndpoint:           fmt.Sprintf("localhost:%d", port),
		ExternalEndpoint:           fmt.Sprintf("1.2.3.4:%d", port),
YACOVM's avatar
YACOVM committed
271
		PublishCertPeriod:          time.Duration(0) * time.Second,
YACOVM's avatar
YACOVM committed
272
273
		PublishStateInfoInterval:   time.Duration(1) * time.Second,
		RequestStateInfoInterval:   time.Duration(1) * time.Second,
YACOVM's avatar
YACOVM committed
274
	}
275
276

	cryptoService := &naiveCryptoService{}
277
	selfID := api.PeerIdentityType(conf.InternalEndpoint)
278
	g := NewGossipServiceWithServer(conf, &orgCryptoService{}, cryptoService,
279
		selfID, nil)
YACOVM's avatar
YACOVM committed
280
	return g
YACOVM's avatar
YACOVM committed
281
282
}

283
284
285
286
287
288
289
290
291
func TestLeaveChannel(t *testing.T) {
	t.Parallel()
	defer testWG.Done()
	portPrefix := 4500
	// Scenario: Have 3 peers in a channel and make one of them leave it.
	// Ensure the peers don't recognize the other peer when it left the channel

	p0 := newGossipInstance(portPrefix, 0, 100, 2)
	p0.JoinChan(&joinChanMsg{}, common.ChainID("A"))
292
	p0.UpdateLedgerHeight(1, common.ChainID("A"))
293
294
295
296
	defer p0.Stop()

	p1 := newGossipInstance(portPrefix, 1, 100, 0)
	p1.JoinChan(&joinChanMsg{}, common.ChainID("A"))
297
	p1.UpdateLedgerHeight(1, common.ChainID("A"))
298
299
300
301
	defer p1.Stop()

	p2 := newGossipInstance(portPrefix, 2, 100, 1)
	p2.JoinChan(&joinChanMsg{}, common.ChainID("A"))
302
	p2.UpdateLedgerHeight(1, common.ChainID("A"))
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
	defer p2.Stop()

	countMembership := func(g Gossip, expected int) func() bool {
		return func() bool {
			peers := g.PeersOfChannel(common.ChainID("A"))
			return len(peers) == expected
		}
	}

	// Wait until everyone sees each other in the channel
	waitUntilOrFail(t, countMembership(p0, 2))
	waitUntilOrFail(t, countMembership(p1, 2))
	waitUntilOrFail(t, countMembership(p2, 2))

	// Now p2 leaves the channel
	p2.LeaveChan(common.ChainID("A"))

	// Ensure channel membership is adjusted accordingly
	waitUntilOrFail(t, countMembership(p0, 1))
	waitUntilOrFail(t, countMembership(p1, 1))
	waitUntilOrFail(t, countMembership(p2, 0))

}

YACOVM's avatar
YACOVM committed
327
func TestPull(t *testing.T) {
YACOVM's avatar
YACOVM committed
328
	t.Parallel()
329
	defer testWG.Done()
330
	portPrefix := 5610
YACOVM's avatar
YACOVM committed
331
332
333
334
335
	t1 := time.Now()
	// Scenario: Turn off forwarding and use only pull-based gossip.
	// First phase: Ensure full membership view for all nodes
	// Second phase: Disseminate 10 messages and ensure all nodes got them

YACOVM's avatar
YACOVM committed
336
337
	shortenedWaitTime := time.Duration(200) * time.Millisecond
	algo.SetDigestWaitTime(shortenedWaitTime)
YACOVM's avatar
YACOVM committed
338
339
340
341
342
343
344
345
346
347
	algo.SetRequestWaitTime(shortenedWaitTime)
	algo.SetResponseWaitTime(shortenedWaitTime)

	defer func() {
		algo.SetDigestWaitTime(time.Duration(1) * time.Second)
		algo.SetRequestWaitTime(time.Duration(1) * time.Second)
		algo.SetResponseWaitTime(time.Duration(2) * time.Second)
	}()

	stopped := int32(0)
YACOVM's avatar
YACOVM committed
348
	go waitForTestCompletion(&stopped, t)
YACOVM's avatar
YACOVM committed
349
350
351

	n := 5
	msgsCount2Send := 10
352

YACOVM's avatar
YACOVM committed
353
354
	peers := make([]Gossip, n)
	wg := sync.WaitGroup{}
YACOVM's avatar
YACOVM committed
355
	wg.Add(n)
YACOVM's avatar
YACOVM committed
356
357
	for i := 1; i <= n; i++ {
		go func(i int) {
YACOVM's avatar
YACOVM committed
358
359
360
			defer wg.Done()
			pI := newGossipInstanceWithOnlyPull(portPrefix, i, 100, 0)
			pI.JoinChan(&joinChanMsg{}, common.ChainID("A"))
361
			pI.UpdateLedgerHeight(1, common.ChainID("A"))
362
			peers[i-1] = pI
YACOVM's avatar
YACOVM committed
363
364
365
366
		}(i)
	}
	wg.Wait()

367
368
369
370
	time.Sleep(time.Second)

	boot := newGossipInstanceWithOnlyPull(portPrefix, 0, 100)
	boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
371
	boot.UpdateLedgerHeight(1, common.ChainID("A"))
372

YACOVM's avatar
YACOVM committed
373
374
	knowAll := func() bool {
		for i := 1; i <= n; i++ {
YACOVM's avatar
YACOVM committed
375
			neighborCount := len(peers[i-1].Peers())
YACOVM's avatar
YACOVM committed
376
377
378
379
380
381
382
383
384
			if n != neighborCount {
				return false
			}
		}
		return true
	}

	receivedMessages := make([]int, n)
	wg = sync.WaitGroup{}
YACOVM's avatar
YACOVM committed
385
	wg.Add(n)
YACOVM's avatar
YACOVM committed
386
387
	for i := 1; i <= n; i++ {
		go func(i int) {
YACOVM's avatar
YACOVM committed
388
			acceptChan, _ := peers[i-1].Accept(acceptData, false)
YACOVM's avatar
YACOVM committed
389
390
391
392
393
394
			go func(index int, ch <-chan *proto.GossipMessage) {
				defer wg.Done()
				for j := 0; j < msgsCount2Send; j++ {
					<-ch
					receivedMessages[index]++
				}
YACOVM's avatar
YACOVM committed
395
			}(i-1, acceptChan)
YACOVM's avatar
YACOVM committed
396
397
398
399
		}(i)
	}

	for i := 1; i <= msgsCount2Send; i++ {
400
		boot.Gossip(createDataMsg(uint64(i), []byte{}, common.ChainID("A")))
YACOVM's avatar
YACOVM committed
401
402
	}

YACOVM's avatar
YACOVM committed
403
	waitUntilOrFail(t, knowAll)
YACOVM's avatar
YACOVM committed
404
	waitUntilOrFailBlocking(t, wg.Wait)
YACOVM's avatar
YACOVM committed
405
406
407
408
409
410
411
412
413

	receivedAll := func() bool {
		for i := 0; i < n; i++ {
			if msgsCount2Send != receivedMessages[i] {
				return false
			}
		}
		return true
	}
YACOVM's avatar
YACOVM committed
414
	waitUntilOrFail(t, receivedAll)
YACOVM's avatar
YACOVM committed
415
416
417
418
419

	stop := func() {
		stopPeers(append(peers, boot))
	}

YACOVM's avatar
YACOVM committed
420
	waitUntilOrFailBlocking(t, stop)
YACOVM's avatar
YACOVM committed
421

YACOVM's avatar
YACOVM committed
422
	t.Log("Took", time.Since(t1))
YACOVM's avatar
YACOVM committed
423
	atomic.StoreInt32(&stopped, int32(1))
YACOVM's avatar
YACOVM committed
424
	fmt.Println("<<<TestPull>>>")
YACOVM's avatar
YACOVM committed
425
426
}

427
428
func TestConnectToAnchorPeers(t *testing.T) {
	t.Parallel()
429
	defer testWG.Done()
430
431
432
433
434
	// Scenario: spawn 10 peers, and have them join a channel
	// of 3 anchor peers that don't exist yet.
	// Wait 5 seconds, and then spawn a random anchor peer out of the 3.
	// Ensure that all peers successfully see each other in the channel

435
	portPrefix := 8610
436
437
438
439
	// Scenario: Spawn 5 peers, and make each of them connect to
	// the other 2 using join channel.
	stopped := int32(0)
	go waitForTestCompletion(&stopped, t)
440
441
	n := 10
	anchorPeercount := 3
442

443
	jcm := &joinChanMsg{members2AnchorPeers: map[string][]api.AnchorPeer{string(orgInChannelA): {}}}
444
	for i := 0; i < anchorPeercount; i++ {
445
		ap := api.AnchorPeer{
446
447
			Port: portPrefix + i,
			Host: "localhost",
448
		}
449
		jcm.members2AnchorPeers[string(orgInChannelA)] = append(jcm.members2AnchorPeers[string(orgInChannelA)], ap)
450
451
452
453
454
455
456
	}

	peers := make([]Gossip, n)
	wg := sync.WaitGroup{}
	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
457
			peers[i] = newGossipInstance(portPrefix, i+anchorPeercount, 100)
458
			peers[i].JoinChan(jcm, common.ChainID("A"))
459
			peers[i].UpdateLedgerHeight(1, common.ChainID("A"))
460
461
462
			wg.Done()
		}(i)
	}
463

464
	waitUntilOrFailBlocking(t, wg.Wait)
465
466
467
468
469
470

	time.Sleep(time.Second * 5)

	// Now start a random anchor peer
	anchorPeer := newGossipInstance(portPrefix, rand.Intn(anchorPeercount), 100)
	anchorPeer.JoinChan(jcm, common.ChainID("A"))
471
	anchorPeer.UpdateLedgerHeight(1, common.ChainID("A"))
472
473
474

	defer anchorPeer.Stop()
	waitUntilOrFail(t, checkPeersMembership(t, peers, n))
475
476
477

	channelMembership := func() bool {
		for _, peer := range peers {
478
			if len(peer.PeersOfChannel(common.ChainID("A"))) != n {
479
480
481
482
483
484
485
486
487
488
489
490
491
492
				return false
			}
		}
		return true
	}
	waitUntilOrFail(t, channelMembership)

	stop := func() {
		stopPeers(peers)
	}
	waitUntilOrFailBlocking(t, stop)

	fmt.Println("<<<TestConnectToAnchorPeers>>>")
	atomic.StoreInt32(&stopped, int32(1))
493

494
495
}

YACOVM's avatar
YACOVM committed
496
func TestMembership(t *testing.T) {
YACOVM's avatar
YACOVM committed
497
	t.Parallel()
498
	defer testWG.Done()
499
	portPrefix := 4610
YACOVM's avatar
YACOVM committed
500
501
502
503
504
505
	t1 := time.Now()
	// Scenario: spawn 20 nodes and a single bootstrap node and then:
	// 1) Check full membership views for all nodes but the bootstrap node.
	// 2) Update metadata of last peer and ensure it propagates to all peers

	stopped := int32(0)
YACOVM's avatar
YACOVM committed
506
	go waitForTestCompletion(&stopped, t)
YACOVM's avatar
YACOVM committed
507

YACOVM's avatar
YACOVM committed
508
	n := 10
509
	var lastPeer = fmt.Sprintf("localhost:%d", n+portPrefix)
YACOVM's avatar
YACOVM committed
510
511
	boot := newGossipInstance(portPrefix, 0, 100)
	boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
512
	boot.UpdateLedgerHeight(1, common.ChainID("A"))
YACOVM's avatar
YACOVM committed
513
514
515

	peers := make([]Gossip, n)
	wg := sync.WaitGroup{}
YACOVM's avatar
YACOVM committed
516
	wg.Add(n)
YACOVM's avatar
YACOVM committed
517
518
	for i := 1; i <= n; i++ {
		go func(i int) {
YACOVM's avatar
YACOVM committed
519
520
			defer wg.Done()
			pI := newGossipInstance(portPrefix, i, 100, 0)
521
			peers[i-1] = pI
YACOVM's avatar
YACOVM committed
522
			pI.JoinChan(&joinChanMsg{}, common.ChainID("A"))
523
			pI.UpdateLedgerHeight(1, common.ChainID("A"))
YACOVM's avatar
YACOVM committed
524
525
526
		}(i)
	}

YACOVM's avatar
YACOVM committed
527
	waitUntilOrFailBlocking(t, wg.Wait)
YACOVM's avatar
YACOVM committed
528
	t.Log("Peers started")
YACOVM's avatar
YACOVM committed
529
530
531

	seeAllNeighbors := func() bool {
		for i := 1; i <= n; i++ {
YACOVM's avatar
YACOVM committed
532
			neighborCount := len(peers[i-1].Peers())
YACOVM's avatar
YACOVM committed
533
534
535
536
537
538
539
			if neighborCount != n {
				return false
			}
		}
		return true
	}

YACOVM's avatar
YACOVM committed
540
	membershipEstablishTime := time.Now()
YACOVM's avatar
YACOVM committed
541
	waitUntilOrFail(t, seeAllNeighbors)
YACOVM's avatar
YACOVM committed
542
	t.Log("membership established in", time.Since(membershipEstablishTime))
YACOVM's avatar
YACOVM committed
543

YACOVM's avatar
YACOVM committed
544
	t.Log("Updating metadata...")
YACOVM's avatar
YACOVM committed
545
	// Change metadata in last node
546
	peers[len(peers)-1].UpdateMetadata([]byte("bla bla"))
YACOVM's avatar
YACOVM committed
547
548

	metaDataUpdated := func() bool {
549
		if !bytes.Equal([]byte("bla bla"), metadataOfPeer(boot.Peers(), lastPeer)) {
YACOVM's avatar
YACOVM committed
550
551
			return false
		}
552
		for i := 0; i < n-1; i++ {
553
			if !bytes.Equal([]byte("bla bla"), metadataOfPeer(peers[i].Peers(), lastPeer)) {
YACOVM's avatar
YACOVM committed
554
555
556
557
558
				return false
			}
		}
		return true
	}
YACOVM's avatar
YACOVM committed
559
	metadataDisseminationTime := time.Now()
YACOVM's avatar
YACOVM committed
560
	waitUntilOrFail(t, metaDataUpdated)
561
	fmt.Println("Metadata updated")
YACOVM's avatar
YACOVM committed
562
	t.Log("Metadata dissemination took", time.Since(metadataDisseminationTime))
YACOVM's avatar
YACOVM committed
563
564
565
566
567

	stop := func() {
		stopPeers(append(peers, boot))
	}

YACOVM's avatar
YACOVM committed
568
	stopTime := time.Now()
YACOVM's avatar
YACOVM committed
569
	waitUntilOrFailBlocking(t, stop)
YACOVM's avatar
YACOVM committed
570
	t.Log("Stop took", time.Since(stopTime))
YACOVM's avatar
YACOVM committed
571

YACOVM's avatar
YACOVM committed
572
	t.Log("Took", time.Since(t1))
YACOVM's avatar
YACOVM committed
573
	atomic.StoreInt32(&stopped, int32(1))
YACOVM's avatar
YACOVM committed
574
	fmt.Println("<<<TestMembership>>>")
575

YACOVM's avatar
YACOVM committed
576
577
}

578
579
580
func TestNoMessagesSelfLoop(t *testing.T) {
	t.Parallel()
	defer testWG.Done()
581
	portPrefix := 17610
582
583
584

	boot := newGossipInstance(portPrefix, 0, 100)
	boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
585
	boot.UpdateLedgerHeight(1, common.ChainID("A"))
586
587
588

	peer := newGossipInstance(portPrefix, 1, 100, 0)
	peer.JoinChan(&joinChanMsg{}, common.ChainID("A"))
589
	peer.UpdateLedgerHeight(1, common.ChainID("A"))
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639

	// Wait until both peers get connected
	waitUntilOrFail(t, checkPeersMembership(t, []Gossip{peer}, 1))
	_, commCh := boot.Accept(func(msg interface{}) bool {
		return msg.(proto.ReceivedMessage).GetGossipMessage().IsDataMsg()
	}, true)

	wg := sync.WaitGroup{}
	wg.Add(2)

	// Make sure sending peer is not getting his own
	// message back
	go func(ch <-chan proto.ReceivedMessage) {
		defer wg.Done()
		for {
			select {
			case msg := <-ch:
				{
					if msg.GetGossipMessage().IsDataMsg() {
						t.Fatal("Should not receive data message back, got", msg)
					}
				}
				// Waiting for 2 seconds to make sure we won't
				// get message back w.h.p.
			case <-time.After(2 * time.Second):
				{
					return
				}
			}
		}
	}(commCh)

	peerCh, _ := peer.Accept(acceptData, false)

	// Ensure recipient gets his message
	go func(ch <-chan *proto.GossipMessage) {
		defer wg.Done()
		<-ch
	}(peerCh)

	boot.Gossip(createDataMsg(uint64(2), []byte{}, common.ChainID("A")))
	waitUntilOrFailBlocking(t, wg.Wait)

	stop := func() {
		stopPeers([]Gossip{peer, boot})
	}

	waitUntilOrFailBlocking(t, stop)
}

YACOVM's avatar
YACOVM committed
640
func TestDissemination(t *testing.T) {
YACOVM's avatar
YACOVM committed
641
	t.Parallel()
642
	defer testWG.Done()
643
	portPrefix := 3610
YACOVM's avatar
YACOVM committed
644
645
646
647
648
649
	t1 := time.Now()
	// Scenario: 20 nodes and a bootstrap node.
	// The bootstrap node sends 10 messages and we count
	// that each node got 10 messages after a few seconds

	stopped := int32(0)
YACOVM's avatar
YACOVM committed
650
	go waitForTestCompletion(&stopped, t)
YACOVM's avatar
YACOVM committed
651

YACOVM's avatar
YACOVM committed
652
	n := 10
YACOVM's avatar
YACOVM committed
653
	msgsCount2Send := 10
YACOVM's avatar
YACOVM committed
654
655
	boot := newGossipInstance(portPrefix, 0, 100)
	boot.JoinChan(&joinChanMsg{}, common.ChainID("A"))
656
657
	boot.UpdateLedgerHeight(1, common.ChainID("A"))
	boot.UpdateChaincodes([]*proto.Chaincode{{Name: "exampleCC", Version: "1.2"}}, common.ChainID("A"))
658

YACOVM's avatar
YACOVM committed
659
660
661
	peers := make([]Gossip, n)
	receivedMessages := make([]int, n)
	wg := sync.WaitGroup{}
YACOVM's avatar
YACOVM committed
662
	wg.Add(n)
YACOVM's avatar
YACOVM committed
663
	for i := 1; i <= n; i++ {
YACOVM's avatar
YACOVM committed
664
		pI := newGossipInstance(portPrefix, i, 100, 0)
665
		peers[i-1] = pI
YACOVM's avatar
YACOVM committed
666
		pI.JoinChan(&joinChanMsg{}, common.ChainID("A"))
667
668
		pI.UpdateLedgerHeight(1, common.ChainID("A"))
		pI.UpdateChaincodes([]*proto.Chaincode{{Name: "exampleCC", Version: "1.2"}}, common.ChainID("A"))
YACOVM's avatar
YACOVM committed
669
		acceptChan, _ := pI.Accept(acceptData, false)
YACOVM's avatar
YACOVM committed
670
671
672
673
674
675
		go func(index int, ch <-chan *proto.GossipMessage) {
			defer wg.Done()
			for j := 0; j < msgsCount2Send; j++ {
				<-ch
				receivedMessages[index]++
			}
YACOVM's avatar
YACOVM committed
676
		}(i-1, acceptChan)
YACOVM's avatar
YACOVM committed
677
678
		// Change metadata in last node
		if i == n {
679
			pI.UpdateLedgerHeight(2, common.ChainID("A"))
YACOVM's avatar
YACOVM committed
680
681
		}
	}
682
	var lastPeer = fmt.Sprintf("localhost:%d", n+portPrefix)
YACOVM's avatar
YACOVM committed
683
	metaDataUpdated := func() bool {
684
		if 2 != heightOfPeer(boot.PeersOfChannel(common.ChainID("A")), lastPeer) {
YACOVM's avatar
YACOVM committed
685
686
687
			return false
		}
		for i := 0; i < n-1; i++ {
688
			if 2 != heightOfPeer(peers[i].PeersOfChannel(common.ChainID("A")), lastPeer) {
YACOVM's avatar
YACOVM committed
689
690
				return false
			}
691
692
693
694
695
696
697
698
699
			for _, p := range peers[i].PeersOfChannel(common.ChainID("A")) {
				if len(p.Properties.Chaincodes) != 1 {
					return false
				}

				if !reflect.DeepEqual(p.Properties.Chaincodes, []*proto.Chaincode{{Name: "exampleCC", Version: "1.2"}}) {
					return false
				}
			}
YACOVM's avatar
YACOVM committed
700
701
		}
		return true
YACOVM's avatar
YACOVM committed
702
703
	}

YACOVM's avatar
YACOVM committed
704
	membershipTime := time.Now()
705
	waitUntilOrFail(t, checkPeersMembership(t, peers, n))
YACOVM's avatar
YACOVM committed
706
	t.Log("Membership establishment took", time.Since(membershipTime))
YACOVM's avatar
YACOVM committed
707

708
	for i := 2; i <= msgsCount2Send+1; i++ {
709
		boot.Gossip(createDataMsg(uint64(i), []byte{}, common.ChainID("A")))
YACOVM's avatar
YACOVM committed
710
711
	}

YACOVM's avatar
YACOVM committed
712
	t2 := time.Now()
YACOVM's avatar
YACOVM committed
713
	waitUntilOrFailBlocking(t, wg.Wait)
YACOVM's avatar
YACOVM committed
714
715
716
717
	t.Log("Block dissemination took", time.Since(t2))
	t2 = time.Now()
	waitUntilOrFail(t, metaDataUpdated)
	t.Log("Metadata dissemination took", time.Since(t2))
YACOVM's avatar
YACOVM committed
718
719
720
721

	for i := 0; i < n; i++ {
		assert.Equal(t, msgsCount2Send, receivedMessages[i])
	}
722
723
724
725
726
727
728
729
730

	//Sending leadership messages
	receivedLeadershipMessages := make([]int, n)
	wgLeadership := sync.WaitGroup{}
	wgLeadership.Add(n)
	for i := 1; i <= n; i++ {
		leadershipChan, _ := peers[i-1].Accept(acceptLeadershp, false)
		go func(index int, ch <-chan *proto.GossipMessage) {
			defer wgLeadership.Done()
731
732
733
734
			msg := <-ch
			if bytes.Equal(msg.Channel, common.ChainID("A")) {
				receivedLeadershipMessages[index]++
			}
735
736
737
738
739
740
741
		}(i-1, leadershipChan)
	}

	seqNum := 0
	incTime := uint64(time.Now().UnixNano())
	t3 := time.Now()

742
	leadershipMsg := createLeadershipMsg(true, common.ChainID("A"), incTime, uint64(seqNum), boot.(*gossipServiceImpl).conf.InternalEndpoint, boot.(*gossipServiceImpl).comm.GetPKIid())
743
744
745
746
747
748
749
750
751
	boot.Gossip(leadershipMsg)

	waitUntilOrFailBlocking(t, wgLeadership.Wait)
	t.Log("Leadership message dissemination took", time.Since(t3))

	for i := 0; i < n; i++ {
		assert.Equal(t, 1, receivedLeadershipMessages[i])
	}

YACOVM's avatar
YACOVM committed
752
	t.Log("Stopping peers")
YACOVM's avatar
YACOVM committed
753
754
755
756
757

	stop := func() {
		stopPeers(append(peers, boot))
	}

YACOVM's avatar
YACOVM committed
758
	stopTime := time.Now()
YACOVM's avatar
YACOVM committed
759
	waitUntilOrFailBlocking(t, stop)
YACOVM's avatar
YACOVM committed
760
761
	t.Log("Stop took", time.Since(stopTime))
	t.Log("Took", time.Since(t1))
YACOVM's avatar
YACOVM committed
762
	atomic.StoreInt32(&stopped, int32(1))
YACOVM's avatar
YACOVM committed
763
	fmt.Println("<<<TestDissemination>>>")
YACOVM's avatar
YACOVM committed
764
765
766
}

func TestMembershipConvergence(t *testing.T) {
YACOVM's avatar
YACOVM committed
767
	t.Parallel()
768
	defer testWG.Done()
769
	portPrefix := 2610
YACOVM's avatar
YACOVM committed
770
771
772
773
774
775
776
777
778
779
780
781
782
	// Scenario: Spawn 12 nodes and 3 bootstrap peers
	// but assign each node to its bootstrap peer group modulo 3.
	// Then:
	// 1) Check all groups know only themselves in the view and not others.
	// 2) Bring up a node that will connect to all bootstrap peers.
	// 3) Wait a few seconds and check that all views converged to a single one
	// 4) Kill that last node, wait a while and:
	// 4)a) Ensure all nodes consider it as dead
	// 4)b) Ensure all node still know each other

	t1 := time.Now()

	stopped := int32(0)
YACOVM's avatar
YACOVM committed
783
	go waitForTestCompletion(&stopped, t)
YACOVM's avatar
YACOVM committed
784
785
786
	boot0 := newGossipInstance(portPrefix, 0, 100)
	boot1 := newGossipInstance(portPrefix, 1, 100)
	boot2 := newGossipInstance(portPrefix, 2, 100)
YACOVM's avatar
YACOVM committed
787
788
789
790
791
792

	peers := []Gossip{boot0, boot1, boot2}
	// 0: {3, 6, 9, 12}
	// 1: {4, 7, 10, 13}
	// 2: {5, 8, 11, 14}
	for i := 3; i < 15; i++ {
YACOVM's avatar
YACOVM committed
793
		pI := newGossipInstance(portPrefix, i, 100, i%3)
YACOVM's avatar
YACOVM committed
794
795
796
		peers = append(peers, pI)
	}

797
	waitUntilOrFail(t, checkPeersMembership(t, peers, 4))
YACOVM's avatar
YACOVM committed
798
	t.Log("Sets of peers connected successfully")
YACOVM's avatar
YACOVM committed
799

YACOVM's avatar
YACOVM committed
800
	connectorPeer := newGossipInstance(portPrefix, 15, 100, 0, 1, 2)
YACOVM's avatar
YACOVM committed
801
802
803
804
	connectorPeer.UpdateMetadata([]byte("Connector"))

	fullKnowledge := func() bool {
		for i := 0; i < 15; i++ {
YACOVM's avatar
YACOVM committed
805
			if 15 != len(peers[i].Peers()) {
YACOVM's avatar
YACOVM committed
806
807
				return false
			}
808
			if "Connector" != string(metadataOfPeer(peers[i].Peers(), "localhost:2625")) {
YACOVM's avatar
YACOVM committed
809
810
811
812
813
814
				return false
			}
		}
		return true
	}

YACOVM's avatar
YACOVM committed
815
	waitUntilOrFail(t, fullKnowledge)
YACOVM's avatar
YACOVM committed
816

YACOVM's avatar
YACOVM committed
817
	t.Log("Stopping connector...")
YACOVM's avatar
YACOVM committed
818
	waitUntilOrFailBlocking(t, connectorPeer.Stop)
YACOVM's avatar
YACOVM committed
819
	t.Log("Stopped")
YACOVM's avatar
YACOVM committed
820
821
822
823
	time.Sleep(time.Duration(15) * time.Second)

	ensureForget := func() bool {
		for i := 0; i < 15; i++ {
YACOVM's avatar
YACOVM committed
824
			if 14 != len(peers[i].Peers()) {
YACOVM's avatar
YACOVM committed
825
826
827
828
829
830
				return false
			}
		}
		return true
	}

YACOVM's avatar
YACOVM committed
831
	waitUntilOrFail(t, ensureForget)
YACOVM's avatar
YACOVM committed
832

YACOVM's avatar
YACOVM committed
833
834
835
	connectorPeer = newGossipInstance(portPrefix, 15, 100)
	connectorPeer.UpdateMetadata([]byte("Connector2"))
	t.Log("Started connector")
YACOVM's avatar
YACOVM committed
836
837
838

	ensureResync := func() bool {
		for i := 0; i < 15; i++ {
YACOVM's avatar
YACOVM committed
839
			if 15 != len(peers[i].Peers()) {
YACOVM's avatar
YACOVM committed
840
841
				return false
			}
842
			if "Connector2" != string(metadataOfPeer(peers[i].Peers(), "localhost:2625")) {
YACOVM's avatar
YACOVM committed
843
844
845
846
847
848
				return false
			}
		}
		return true
	}

YACOVM's avatar
YACOVM committed
849
	waitUntilOrFail(t, ensureResync)
YACOVM's avatar
YACOVM committed
850

YACOVM's avatar
YACOVM committed
851
	waitUntilOrFailBlocking(t, connectorPeer.Stop)
YACOVM's avatar
YACOVM committed
852

YACOVM's avatar
YACOVM committed
853
	t.Log("Stopping peers")
YACOVM's avatar
YACOVM committed
854
855
856
857
	stop := func() {
		stopPeers(peers)
	}

YACOVM's avatar
YACOVM committed
858
	waitUntilOrFailBlocking(t, stop)
YACOVM's avatar
YACOVM committed
859
	atomic.StoreInt32(&stopped, int32(1))
YACOVM's avatar
YACOVM committed
860
861
862
863
	t.Log("Took", time.Since(t1))
	fmt.Println("<<<TestMembershipConvergence>>>")
}

864
865
func TestMembershipRequestSpoofing(t *testing.T) {
	t.Parallel()
866
	defer testWG.Done()
867
868
869
870
871
	// Scenario: g1, g2, g3 are peers, and g2 is malicious, and wants
	// to impersonate g3 when sending a membership request to g1.
	// Expected output: g1 should *NOT* respond to g2,
	// However, g1 should respond to g3 when it sends the message itself.

872
	portPrefix := 2000
873
874
875
876
877
878
879
880
881
882
883
884
885
	g1 := newGossipInstance(portPrefix, 0, 100)
	g2 := newGossipInstance(portPrefix, 1, 100, 2)
	g3 := newGossipInstance(portPrefix, 2, 100, 1)
	defer g1.Stop()
	defer g2.Stop()
	defer g3.Stop()

	// Wait for g2 and g3 to know about each other
	waitUntilOrFail(t, checkPeersMembership(t, []Gossip{g2, g3}, 1))
	// Obtain an alive message from p3
	_, aliveMsgChan := g2.Accept(func(o interface{}) bool {
		msg := o.(proto.ReceivedMessage).GetGossipMessage()
		// Make sure we get an AliveMessage and it's about g3
886
		return msg.IsAliveMsg() && bytes.Equal(msg.GetAliveMsg().Membership.PkiId, []byte("localhost:2002"))
887
888
889
890
891
892
	}, true)
	aliveMsg := <-aliveMsgChan

	// Obtain channel for messages from g1 to g2
	_, g1ToG2 := g2.Accept(func(o interface{}) bool {
		connInfo := o.(proto.ReceivedMessage).GetConnectionInfo()
893
		return bytes.Equal([]byte("localhost:2000"), connInfo.ID)
894
895
896
897
898
	}, true)

	// Obtain channel for messages from g1 to g3
	_, g1ToG3 := g3.Accept(func(o interface{}) bool {
		connInfo := o.(proto.ReceivedMessage).GetConnectionInfo()
899
		return bytes.Equal([]byte("localhost:2000"), connInfo.ID)
900
901
902
903
	}, true)

	// Now, create a membership request message
	memRequestSpoofFactory := func(aliveMsgEnv *proto.Envelope) *proto.SignedGossipMessage {
904
		sMsg, _ := (&proto.GossipMessage{
905
906
907
908
909
910
911
912
913
			Tag:   proto.GossipMessage_EMPTY,
			Nonce: uint64(0),
			Content: &proto.GossipMessage_MemReq{
				MemReq: &proto.MembershipRequest{
					SelfInformation: aliveMsgEnv,
					Known:           [][]byte{},
				},
			},
		}).NoopSign()
914
		return sMsg
915
916
	}
	spoofedMemReq := memRequestSpoofFactory(aliveMsg.GetSourceEnvelope())
917
	g2.Send(spoofedMemReq.GossipMessage, &comm.RemotePeer{Endpoint: "localhost:2000", PKIID: common.PKIidType("localhost:2000")})
918
919
920
921
922
923
924
925
	select {
	case <-time.After(time.Second):
		break
	case <-g1ToG2:
		assert.Fail(t, "Received response from g1 but shouldn't have")
	}

	// Now send the same message from g3 to g1
926
	g3.Send(spoofedMemReq.GossipMessage, &comm.RemotePeer{Endpoint: "localhost:2000", PKIID: common.PKIidType("localhost:2000")})
927
928
929
930
931
932
933
934
	select {
	case <-time.After(time.Second):
		assert.Fail(t, "Didn't receive a message back from g1 on time")
	case <-g1ToG3:
		break
	}
}

YACOVM's avatar
YACOVM committed
935
936
func TestDataLeakage(t *testing.T) {
	t.Parallel()
937
	defer testWG.Done()
938
	portPrefix := 1610
YACOVM's avatar
YACOVM committed
939
940
941
	// Scenario: spawn some nodes and let them all
	// establish full membership.
	// Then, have half be in channel A and half be in channel B.
942
943
944
945
	// However, make it so that only the first 3 from each channel
	// are eligible to obtain blocks from the channels they're in.
	// Ensure nodes only get messages of their channels and in case they
	// are eligible for the channels.
YACOVM's avatar
YACOVM committed
946
947

	totalPeers := []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} // THIS MUST BE EVEN AND NOT ODD
948
949
950
951
952
953
	// Peer0 and Peer5 disseminate blocks
	// only 1,2 and 6,7 should get blocks.

	mcs := &naiveCryptoService{
		allowedPkiIDS: map[string]struct{}{
			// Channel A
954
955
956
			"localhost:1610": {},
			"localhost:1611": {},
			"localhost:1612": {},
957
			// Channel B
958
959
960
			"localhost:1615": {},
			"localhost:1616": {},
			"localhost:1617": {},
961
962
		},
	}
YACOVM's avatar
YACOVM committed
963
964
965
966
967
968
969
970
971
972
973
974

	stopped := int32(0)
	go waitForTestCompletion(&stopped, t)

	n := len(totalPeers)
	peers := make([]Gossip, n)
	wg := sync.WaitGroup{}
	for i := 0; i < n; i++ {
		wg.Add(1)
		go func(i int) {
			totPeers := append([]int(nil), totalPeers[:i]...)
			bootPeers := append(totPeers, totalPeers[i+1:]...)
975
			peers[i] = newGossipInstanceWithCustomMCS(portPrefix, i, 100, mcs, bootPeers...)
YACOVM's avatar
YACOVM committed
976
977
978
979
980
			wg.Done()
		}(i)
	}

	waitUntilOrFailBlocking(t, wg.Wait)
981
	waitUntilOrFail(t, checkPeersMembership(t, peers, n-1))
YACOVM's avatar
YACOVM committed
982
983
984

	channels := []common.ChainID{common.ChainID("A"), common.ChainID("B")}

985
	height := uint64(1)
986

YACOVM's avatar
YACOVM committed
987
988
989
990
	for i, channel := range channels {
		for j := 0; j < (n / 2); j++ {
			instanceIndex := (n/2)*i + j
			peers[instanceIndex].JoinChan(&joinChanMsg{}, channel)
991
992
			if i != 0 {
				height = uint64(2)
993
			}
994
			peers[instanceIndex].UpdateLedgerHeight(height, channel)
YACOVM's avatar
YACOVM committed
995
996
997
998
			t.Log(instanceIndex, "joined", string(channel))
		}
	}

999
	// Wait until all peers have other peers in the per-channel view
YACOVM's avatar
YACOVM committed
1000
	seeChannelMetadata := func() bool {
For faster browsing, not all history is shown. View entire blame