gossip_impl.go 38.7 KB
Newer Older
YACOVM's avatar
YACOVM committed
1
/*
2
Copyright IBM Corp. All Rights Reserved.
YACOVM's avatar
YACOVM committed
3

4
SPDX-License-Identifier: Apache-2.0
YACOVM's avatar
YACOVM committed
5
6
7
8
9
10
*/

package gossip

import (
	"bytes"
11
	"crypto/tls"
12
	"errors"
13
	"fmt"
14
	"reflect"
YACOVM's avatar
YACOVM committed
15
16
17
18
	"sync"
	"sync/atomic"
	"time"

19
	"github.com/hyperledger/fabric/gossip/api"
YACOVM's avatar
YACOVM committed
20
	"github.com/hyperledger/fabric/gossip/comm"
Artem Barger's avatar
Artem Barger committed
21
	"github.com/hyperledger/fabric/gossip/common"
YACOVM's avatar
YACOVM committed
22
	"github.com/hyperledger/fabric/gossip/discovery"
YACOVM's avatar
YACOVM committed
23
24
	"github.com/hyperledger/fabric/gossip/filter"
	"github.com/hyperledger/fabric/gossip/gossip/channel"
25
	"github.com/hyperledger/fabric/gossip/gossip/msgstore"
26
	"github.com/hyperledger/fabric/gossip/gossip/pull"
YACOVM's avatar
YACOVM committed
27
	"github.com/hyperledger/fabric/gossip/identity"
YACOVM's avatar
YACOVM committed
28
	"github.com/hyperledger/fabric/gossip/util"
29
	proto "github.com/hyperledger/fabric/protos/gossip"
30
	"github.com/op/go-logging"
YACOVM's avatar
YACOVM committed
31
	"google.golang.org/grpc"
YACOVM's avatar
YACOVM committed
32
33
)

34
35
36
37
38
const (
	presumedDeadChanSize = 100
	acceptChanSize       = 100
)

YACOVM's avatar
YACOVM committed
39
40
41
42
43
var (
	identityExpirationCheckInterval = time.Hour * 24
	identityInactivityCheckInterval = time.Minute * 10
)

YACOVM's avatar
YACOVM committed
44
45
type channelRoutingFilterFactory func(channel.GossipChannel) filter.RoutingFilter

YACOVM's avatar
YACOVM committed
46
type gossipServiceImpl struct {
YACOVM's avatar
YACOVM committed
47
48
49
50
51
52
53
	selfIdentity          api.PeerIdentityType
	includeIdentityPeriod time.Time
	certStore             *certStore
	idMapper              identity.Mapper
	presumedDead          chan common.PKIidType
	disc                  discovery.Discovery
	comm                  comm.Comm
YACOVM's avatar
YACOVM committed
54
55
	incTime               time.Time
	selfOrg               api.OrgIdentityType
YACOVM's avatar
YACOVM committed
56
	*comm.ChannelDeMultiplexer
57
	logger            *logging.Logger
YACOVM's avatar
YACOVM committed
58
59
60
61
62
63
64
65
66
67
68
	stopSignal        *sync.WaitGroup
	conf              *Config
	toDieChan         chan struct{}
	stopFlag          int32
	emitter           batchingEmitter
	discAdapter       *discoveryAdapter
	secAdvisor        api.SecurityAdvisor
	chanState         *channelState
	disSecAdap        *discoverySecurityAdapter
	mcs               api.MessageCryptoService
	stateInfoMsgStore msgstore.MessageStore
YACOVM's avatar
YACOVM committed
69
70
}

YACOVM's avatar
YACOVM committed
71
// NewGossipService creates a gossip instance attached to a gRPC server
72
73
74
75
func NewGossipService(conf *Config, s *grpc.Server, secAdvisor api.SecurityAdvisor,
	mcs api.MessageCryptoService, idMapper identity.Mapper, selfIdentity api.PeerIdentityType,
	secureDialOpts api.PeerSecureDialOpts) Gossip {

YACOVM's avatar
YACOVM committed
76
77
	var c comm.Comm
	var err error
78

YACOVM's avatar
YACOVM committed
79
	lgr := util.GetLogger(util.LoggingGossipModule, conf.ID)
YACOVM's avatar
YACOVM committed
80
	if s == nil {
81
		c, err = createCommWithServer(conf.BindPort, idMapper, selfIdentity, secureDialOpts)
YACOVM's avatar
YACOVM committed
82
	} else {
83
		c, err = createCommWithoutServer(s, conf.TLSServerCert, idMapper, selfIdentity, secureDialOpts)
YACOVM's avatar
YACOVM committed
84
85
86
87
88
89
90
	}

	if err != nil {
		lgr.Error("Failed instntiating communication layer:", err)
		return nil
	}

YACOVM's avatar
YACOVM committed
91
	g := &gossipServiceImpl{
YACOVM's avatar
YACOVM committed
92
93
		selfOrg:               secAdvisor.OrgByPeerIdentity(selfIdentity),
		secAdvisor:            secAdvisor,
YACOVM's avatar
YACOVM committed
94
95
96
97
98
99
100
101
102
103
104
105
106
		selfIdentity:          selfIdentity,
		presumedDead:          make(chan common.PKIidType, presumedDeadChanSize),
		idMapper:              idMapper,
		disc:                  nil,
		mcs:                   mcs,
		comm:                  c,
		conf:                  conf,
		ChannelDeMultiplexer:  comm.NewChannelDemultiplexer(),
		logger:                lgr,
		toDieChan:             make(chan struct{}, 1),
		stopFlag:              int32(0),
		stopSignal:            &sync.WaitGroup{},
		includeIdentityPeriod: time.Now().Add(conf.PublishCertPeriod),
YACOVM's avatar
YACOVM committed
107
	}
108
	g.stateInfoMsgStore = g.newStateInfoMsgStore()
YACOVM's avatar
YACOVM committed
109

YACOVM's avatar
YACOVM committed
110
	g.chanState = newChannelState(g)
YACOVM's avatar
YACOVM committed
111
112
113
114
	g.emitter = newBatchingEmitter(conf.PropagateIterations,
		conf.MaxPropagationBurstSize, conf.MaxPropagationBurstLatency,
		g.sendGossipBatch)

115
116
	g.discAdapter = g.newDiscoveryAdapter()
	g.disSecAdap = g.newDiscoverySecurityAdapter()
117
	g.disc = discovery.NewDiscoveryService(g.selfNetworkMember(), g.discAdapter, g.disSecAdap, g.disclosurePolicy)
118
	g.logger.Info("Creating gossip service with self membership of", g.selfNetworkMember())
YACOVM's avatar
YACOVM committed
119

YACOVM's avatar
YACOVM committed
120
	g.certStore = newCertStore(g.createCertStorePuller(), idMapper, selfIdentity, mcs)
121

122
123
124
125
	if g.conf.ExternalEndpoint == "" {
		g.logger.Warning("External endpoint is empty, peer will not be accessible outside of its organization")
	}

YACOVM's avatar
YACOVM committed
126
	go g.start()
YACOVM's avatar
YACOVM committed
127
	go g.periodicalIdentityValidationAndExpiration()
128
	go g.connect2BootstrapPeers()
YACOVM's avatar
YACOVM committed
129
130
131
132

	return g
}

133
134
135
136
137
138
139
140
141
142
func (g *gossipServiceImpl) newStateInfoMsgStore() msgstore.MessageStore {
	pol := proto.NewGossipMessageComparator(0)
	return msgstore.NewMessageStoreExpirable(pol,
		msgstore.Noop,
		g.conf.PublishStateInfoInterval*100,
		nil,
		nil,
		msgstore.Noop)
}

143
144
145
146
147
func (g *gossipServiceImpl) selfNetworkMember() discovery.NetworkMember {
	self := discovery.NetworkMember{
		Endpoint:         g.conf.ExternalEndpoint,
		PKIid:            g.comm.GetPKIid(),
		Metadata:         []byte{},
148
		InternalEndpoint: g.conf.InternalEndpoint,
149
	}
150
151
152
	if g.disc != nil {
		self.Metadata = g.disc.Self().Metadata
	}
153
154
155
	return self
}

YACOVM's avatar
YACOVM committed
156
157
158
159
160
func newChannelState(g *gossipServiceImpl) *channelState {
	return &channelState{
		stopping: int32(0),
		channels: make(map[string]channel.GossipChannel),
		g:        g,
161
	}
YACOVM's avatar
YACOVM committed
162
163
}

164
165
166
func createCommWithoutServer(s *grpc.Server, cert *tls.Certificate, idStore identity.Mapper,
	identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
	return comm.NewCommInstance(s, cert, idStore, identity, secureDialOpts)
167
168
}

YACOVM's avatar
YACOVM committed
169
// NewGossipServiceWithServer creates a new gossip instance with a gRPC server
170
171
172
func NewGossipServiceWithServer(conf *Config, secAdvisor api.SecurityAdvisor, mcs api.MessageCryptoService,
	mapper identity.Mapper, identity api.PeerIdentityType, secureDialOpts api.PeerSecureDialOpts) Gossip {
	return NewGossipService(conf, nil, secAdvisor, mcs, mapper, identity, secureDialOpts)
YACOVM's avatar
YACOVM committed
173
174
}

175
176
177
func createCommWithServer(port int, idStore identity.Mapper, identity api.PeerIdentityType,
	secureDialOpts api.PeerSecureDialOpts) (comm.Comm, error) {
	return comm.NewCommInstanceWithServer(port, idStore, identity, secureDialOpts)
YACOVM's avatar
YACOVM committed
178
179
}

YACOVM's avatar
YACOVM committed
180
181
182
183
func (g *gossipServiceImpl) toDie() bool {
	return atomic.LoadInt32(&g.stopFlag) == int32(1)
}

YACOVM's avatar
YACOVM committed
184
func (g *gossipServiceImpl) JoinChan(joinMsg api.JoinChannelMessage, chainID common.ChainID) {
185
	// joinMsg is supposed to have been already verified
YACOVM's avatar
YACOVM committed
186
	g.chanState.joinChannel(joinMsg, chainID)
187

188
189
190
191
192
	for _, org := range joinMsg.Members() {
		g.learnAnchorPeers(org, joinMsg.AnchorPeersOf(org))
	}
}

193
194
195
196
197
198
199
200
// SuspectPeers makes the gossip instance validate identities of suspected peers, and close
// any connections to peers with identities that are found invalid
func (g *gossipServiceImpl) SuspectPeers(isSuspected api.PeerSuspector) {
	for _, pkiID := range g.certStore.listRevokedPeers(isSuspected) {
		g.comm.CloseConn(&comm.RemotePeer{PKIID: pkiID})
	}
}

YACOVM's avatar
YACOVM committed
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
func (g *gossipServiceImpl) periodicalIdentityValidationAndExpiration() {
	// We check once every identityExpirationCheckInterval for identities that have been expired
	go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
		// We need to validate every identity to check if it has been expired
		return true
	}, identityExpirationCheckInterval)

	// We check once every identityInactivityCheckInterval for identities that have not been used for a long time
	go g.periodicalIdentityValidation(func(identity api.PeerIdentityType) bool {
		// We don't validate any identity, because we just want to know whether
		// it has not been used for a long time
		return false
	}, identityInactivityCheckInterval)
}

func (g *gossipServiceImpl) periodicalIdentityValidation(suspectFunc api.PeerSuspector, interval time.Duration) {
	for {
		select {
		case s := <-g.toDieChan:
			g.toDieChan <- s
			return
		case <-time.After(interval):
			g.SuspectPeers(suspectFunc)
		}
	}
}

228
229
func (g *gossipServiceImpl) learnAnchorPeers(orgOfAnchorPeers api.OrgIdentityType, anchorPeers []api.AnchorPeer) {
	for _, ap := range anchorPeers {
230
231
		if ap.Host == "" {
			g.logger.Warning("Got empty hostname, skipping connecting to anchor peer", ap)
232
			continue
233
234
235
		}
		if ap.Port == 0 {
			g.logger.Warning("Got invalid port (0), skipping connecting to anchor peer", ap)
236
			continue
237
		}
YACOVM's avatar
YACOVM committed
238
		endpoint := fmt.Sprintf("%s:%d", ap.Host, ap.Port)
239
		// Skip connecting to self
240
		if g.selfNetworkMember().Endpoint == endpoint || g.selfNetworkMember().InternalEndpoint == endpoint {
YACOVM's avatar
YACOVM committed
241
			g.logger.Info("Anchor peer with same endpoint, skipping connecting to myself")
242
243
			continue
		}
YACOVM's avatar
YACOVM committed
244

245
		inOurOrg := bytes.Equal(g.selfOrg, orgOfAnchorPeers)
246
		if !inOurOrg && g.selfNetworkMember().Endpoint == "" {
247
			g.logger.Infof("Anchor peer %s:%d isn't in our org(%v) and we have no external endpoint, skipping", ap.Host, ap.Port, string(orgOfAnchorPeers))
248
249
			continue
		}
250
		identifier := func() (*discovery.PeerIdentification, error) {
251
			remotePeerIdentity, err := g.comm.Handshake(&comm.RemotePeer{Endpoint: endpoint})
252
253
			if err != nil {
				g.logger.Warning("Deep probe of", endpoint, "failed:", err)
254
				return nil, err
255
			}
256
257
			isAnchorPeerInMyOrg := bytes.Equal(g.selfOrg, g.secAdvisor.OrgByPeerIdentity(remotePeerIdentity))
			if bytes.Equal(orgOfAnchorPeers, g.selfOrg) && !isAnchorPeerInMyOrg {
258
259
260
				err := fmt.Sprintf("Anchor peer %s isn't in our org, but is claimed to be", endpoint)
				g.logger.Warning(err)
				return nil, errors.New(err)
261
			}
262
263
264
265
266
267
268
269
			pkiID := g.mcs.GetPKIidOfCert(remotePeerIdentity)
			if len(pkiID) == 0 {
				return nil, fmt.Errorf("Wasn't able to extract PKI-ID of remote peer with identity of %v", remotePeerIdentity)
			}
			return &discovery.PeerIdentification{
				ID:      pkiID,
				SelfOrg: isAnchorPeerInMyOrg,
			}, nil
270
271
		}

272
		g.disc.Connect(discovery.NetworkMember{
273
			InternalEndpoint: endpoint, Endpoint: endpoint}, identifier)
274
	}
YACOVM's avatar
YACOVM committed
275
276
}

YACOVM's avatar
YACOVM committed
277
278
279
280
281
282
283
284
285
286
func (g *gossipServiceImpl) handlePresumedDead() {
	defer g.logger.Debug("Exiting")
	g.stopSignal.Add(1)
	defer g.stopSignal.Done()
	for {
		select {
		case s := <-g.toDieChan:
			g.toDieChan <- s
			return
		case deadEndpoint := <-g.comm.PresumedDead():
Artem Barger's avatar
Artem Barger committed
287
			g.presumedDead <- deadEndpoint
YACOVM's avatar
YACOVM committed
288
289
290
291
292
		}
	}
}

func (g *gossipServiceImpl) syncDiscovery() {
293
	g.logger.Debug("Entering discovery sync with interval", g.conf.PullInterval)
YACOVM's avatar
YACOVM committed
294
295
296
297
298
299
300
301
302
303
304
305
	defer g.logger.Debug("Exiting discovery sync loop")
	for !g.toDie() {
		g.disc.InitiateSync(g.conf.PullPeerNum)
		time.Sleep(g.conf.PullInterval)
	}
}

func (g *gossipServiceImpl) start() {
	go g.syncDiscovery()
	go g.handlePresumedDead()

	msgSelector := func(msg interface{}) bool {
306
		gMsg, isGossipMsg := msg.(proto.ReceivedMessage)
YACOVM's avatar
YACOVM committed
307
308
309
310
311
312
313
		if !isGossipMsg {
			return false
		}

		isConn := gMsg.GetGossipMessage().GetConn() != nil
		isEmpty := gMsg.GetGossipMessage().GetEmpty() != nil

314
		return !(isConn || isEmpty)
YACOVM's avatar
YACOVM committed
315
316
317
318
319
320
321
322
323
	}

	incMsgs := g.comm.Accept(msgSelector)

	go g.acceptMessages(incMsgs)

	g.logger.Info("Gossip instance", g.conf.ID, "started")
}

324
func (g *gossipServiceImpl) acceptMessages(incMsgs <-chan proto.ReceivedMessage) {
YACOVM's avatar
YACOVM committed
325
326
327
328
329
330
331
332
333
334
335
336
337
338
	defer g.logger.Debug("Exiting")
	g.stopSignal.Add(1)
	defer g.stopSignal.Done()
	for {
		select {
		case s := <-g.toDieChan:
			g.toDieChan <- s
			return
		case msg := <-incMsgs:
			g.handleMessage(msg)
		}
	}
}

339
func (g *gossipServiceImpl) handleMessage(m proto.ReceivedMessage) {
YACOVM's avatar
YACOVM committed
340
341
342
	if g.toDie() {
		return
	}
YACOVM's avatar
YACOVM committed
343

YACOVM's avatar
YACOVM committed
344
	if m == nil || m.GetGossipMessage() == nil {
YACOVM's avatar
YACOVM committed
345
346
347
		return
	}

YACOVM's avatar
YACOVM committed
348
	msg := m.GetGossipMessage()
YACOVM's avatar
YACOVM committed
349

350
	g.logger.Debug("Entering,", m.GetConnectionInfo(), "sent us", msg)
YACOVM's avatar
YACOVM committed
351
352
353
354
355
356
357
358
	defer g.logger.Debug("Exiting")

	if !g.validateMsg(m) {
		g.logger.Warning("Message", msg, "isn't valid")
		return
	}

	if msg.IsChannelRestricted() {
359
360
361
		if gc := g.chanState.lookupChannelForMsg(m); gc == nil {
			// If we're not in the channel, we should still forward to peers of our org
			// in case it's a StateInfo message
362
			if g.isInMyorg(discovery.NetworkMember{PKIid: m.GetConnectionInfo().ID}) && msg.IsStateInfoMsg() {
YACOVM's avatar
YACOVM committed
363
364
365
				if g.stateInfoMsgStore.Add(msg) {
					g.emitter.Add(msg)
				}
YACOVM's avatar
YACOVM committed
366
			}
YACOVM's avatar
YACOVM committed
367
			if !g.toDie() {
YACOVM's avatar
YACOVM committed
368
				g.logger.Debug("No such channel", msg.Channel, "discarding message", msg)
YACOVM's avatar
YACOVM committed
369
			}
YACOVM's avatar
YACOVM committed
370
		} else {
371
372
373
374
375
376
			if m.GetGossipMessage().IsLeadershipMsg() {
				if err := g.validateLeadershipMessage(m.GetGossipMessage()); err != nil {
					g.logger.Warning("Failed validating LeaderElection message:", err)
					return
				}
			}
YACOVM's avatar
YACOVM committed
377
			gc.HandleMessage(m)
YACOVM's avatar
YACOVM committed
378
		}
YACOVM's avatar
YACOVM committed
379
380
		return
	}
YACOVM's avatar
YACOVM committed
381

YACOVM's avatar
YACOVM committed
382
	if selectOnlyDiscoveryMessages(m) {
383
384
385
386
387
388
389
390
391
392
393
394
		// It's a membership request, check its self information
		// matches the sender
		if m.GetGossipMessage().GetMemReq() != nil {
			sMsg, err := m.GetGossipMessage().GetMemReq().SelfInformation.ToGossipMessage()
			if err != nil {
				g.logger.Warning("Got membership request with invalid selfInfo:", err)
				return
			}
			if !sMsg.IsAliveMsg() {
				g.logger.Warning("Got membership request with selfInfo that isn't an AliveMessage")
				return
			}
395
			if !bytes.Equal(sMsg.GetAliveMsg().Membership.PkiId, m.GetConnectionInfo().ID) {
396
397
398
399
				g.logger.Warning("Got membership request with selfInfo that doesn't match the handshake")
				return
			}
		}
YACOVM's avatar
YACOVM committed
400
401
402
		g.forwardDiscoveryMsg(m)
	}

403
	if msg.IsPullMsg() && msg.GetPullMsgType() == proto.PullMsgType_IDENTITY_MSG {
YACOVM's avatar
YACOVM committed
404
405
406
407
		g.certStore.handleMessage(m)
	}
}

408
func (g *gossipServiceImpl) forwardDiscoveryMsg(msg proto.ReceivedMessage) {
YACOVM's avatar
YACOVM committed
409
410
411
	defer func() { // can be closed while shutting down
		recover()
	}()
412

YACOVM's avatar
YACOVM committed
413
414
	g.discAdapter.incChan <- msg.GetGossipMessage()
}
YACOVM's avatar
YACOVM committed
415

YACOVM's avatar
YACOVM committed
416
417
// validateMsg checks the signature of the message if exists,
// and also checks that the tag matches the message type
418
func (g *gossipServiceImpl) validateMsg(msg proto.ReceivedMessage) bool {
YACOVM's avatar
YACOVM committed
419
420
421
422
423
424
	if err := msg.GetGossipMessage().IsTagLegal(); err != nil {
		g.logger.Warning("Tag of", msg.GetGossipMessage(), "isn't legal:", err)
		return false
	}

	if msg.GetGossipMessage().IsAliveMsg() {
425
		if !g.disSecAdap.ValidateAliveMsg(msg.GetGossipMessage()) {
YACOVM's avatar
YACOVM committed
426
			return false
YACOVM's avatar
YACOVM committed
427
		}
YACOVM's avatar
YACOVM committed
428
	}
YACOVM's avatar
YACOVM committed
429

YACOVM's avatar
YACOVM committed
430
	if msg.GetGossipMessage().IsStateInfoMsg() {
431
		if err := g.validateStateInfoMsg(msg.GetGossipMessage()); err != nil {
YACOVM's avatar
YACOVM committed
432
433
			g.logger.Warning("StateInfo message", msg, "is found invalid:", err)
			return false
YACOVM's avatar
YACOVM committed
434
		}
YACOVM's avatar
YACOVM committed
435
	}
YACOVM's avatar
YACOVM committed
436
	return true
YACOVM's avatar
YACOVM committed
437
438
439
}

func (g *gossipServiceImpl) sendGossipBatch(a []interface{}) {
440
	msgs2Gossip := make([]*proto.SignedGossipMessage, len(a))
YACOVM's avatar
YACOVM committed
441
	for i, e := range a {
442
		msgs2Gossip[i] = e.(*proto.SignedGossipMessage)
YACOVM's avatar
YACOVM committed
443
	}
YACOVM's avatar
YACOVM committed
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
	g.gossipBatch(msgs2Gossip)
}

// gossipBatch - This is the method that actually decides to which peers to gossip the message
// batch we possess.
// For efficiency, we first isolate all the messages that have the same routing policy
// and send them together, and only after that move to the next group of messages.
// i.e: we send all blocks of channel C to the same group of peers,
// and send all StateInfo messages to the same group of peers, etc. etc.
// When we send blocks, we send only to peers that advertised themselves in the channel.
// When we send StateInfo messages, we send to peers in the channel.
// When we send messages that are marked to be sent only within the org, we send all of these messages
// to the same set of peers.
// The rest of the messages that have no restrictions on their destinations can be sent
// to any group of peers.
459
func (g *gossipServiceImpl) gossipBatch(msgs []*proto.SignedGossipMessage) {
YACOVM's avatar
YACOVM committed
460
461
462
463
	if g.disc == nil {
		g.logger.Error("Discovery has not been initialized yet, aborting!")
		return
	}
YACOVM's avatar
YACOVM committed
464

465
466
467
468
	var blocks []*proto.SignedGossipMessage
	var stateInfoMsgs []*proto.SignedGossipMessage
	var orgMsgs []*proto.SignedGossipMessage
	var leadershipMsgs []*proto.SignedGossipMessage
YACOVM's avatar
YACOVM committed
469
470

	isABlock := func(o interface{}) bool {
471
		return o.(*proto.SignedGossipMessage).IsDataMsg()
YACOVM's avatar
YACOVM committed
472
473
	}
	isAStateInfoMsg := func(o interface{}) bool {
474
		return o.(*proto.SignedGossipMessage).IsStateInfoMsg()
YACOVM's avatar
YACOVM committed
475
	}
476
477
478
479
480
481
482
483
	aliveMsgsWithNoEndpointAndInOurOrg := func(o interface{}) bool {
		msg := o.(*proto.SignedGossipMessage)
		if !msg.IsAliveMsg() {
			return false
		}
		member := msg.GetAliveMsg().Membership
		return member.Endpoint == "" && g.isInMyorg(discovery.NetworkMember{PKIid: member.PkiId})
	}
YACOVM's avatar
YACOVM committed
484
	isOrgRestricted := func(o interface{}) bool {
485
		return aliveMsgsWithNoEndpointAndInOurOrg(o) || o.(*proto.SignedGossipMessage).IsOrgRestricted()
YACOVM's avatar
YACOVM committed
486
	}
487
	isLeadershipMsg := func(o interface{}) bool {
488
		return o.(*proto.SignedGossipMessage).IsLeadershipMsg()
489
	}
YACOVM's avatar
YACOVM committed
490
491
492
493

	// Gossip blocks
	blocks, msgs = partitionMessages(isABlock, msgs)
	g.gossipInChan(blocks, func(gc channel.GossipChannel) filter.RoutingFilter {
494
		return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
YACOVM's avatar
YACOVM committed
495
496
	})

497
	// Gossip Leadership messages
498
499
	leadershipMsgs, msgs = partitionMessages(isLeadershipMsg, msgs)
	g.gossipInChan(leadershipMsgs, func(gc channel.GossipChannel) filter.RoutingFilter {
500
		return filter.CombineRoutingFilters(gc.EligibleForChannel, gc.IsMemberInChan, g.isInMyorg)
501
502
	})

503
504
505
506
507
	// Gossip StateInfo messages
	stateInfoMsgs, msgs = partitionMessages(isAStateInfoMsg, msgs)
	for _, stateInfMsg := range stateInfoMsgs {
		peerSelector := g.isInMyorg
		gc := g.chanState.lookupChannelForGossipMsg(stateInfMsg.GossipMessage)
508
		if gc != nil && g.hasExternalEndpoint(stateInfMsg.GossipMessage.GetStateInfo().PkiId) {
509
510
			peerSelector = gc.IsMemberInChan
		}
511

512
513
514
515
		peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), peerSelector)
		g.comm.Send(stateInfMsg, peers2Send...)
	}

YACOVM's avatar
YACOVM committed
516
517
518
519
520
521
522
523
	// Gossip messages restricted to our org
	orgMsgs, msgs = partitionMessages(isOrgRestricted, msgs)
	peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), g.isInMyorg)
	for _, msg := range orgMsgs {
		g.comm.Send(msg, peers2Send...)
	}

	// Finally, gossip the remaining messages
YACOVM's avatar
YACOVM committed
524
	for _, msg := range msgs {
525
526
527
528
529
530
		if !msg.IsAliveMsg() {
			g.logger.Error("Unknown message type", msg)
			continue
		}
		selectByOriginOrg := g.peersByOriginOrgPolicy(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId})
		peers2Send := filter.SelectPeers(g.conf.PropagatePeerNum, g.disc.GetMembership(), selectByOriginOrg)
531
532
533
534
535
536
537
538
539
		g.sendAndFilterSecrets(msg, peers2Send...)
	}
}

func (g *gossipServiceImpl) sendAndFilterSecrets(msg *proto.SignedGossipMessage, peers ...*comm.RemotePeer) {
	for _, peer := range peers {
		// Prevent forwarding alive messages of external organizations
		// to peers that have no external endpoints
		aliveMsgFromDiffOrg := msg.IsAliveMsg() && !g.isInMyorg(discovery.NetworkMember{PKIid: msg.GetAliveMsg().Membership.PkiId})
540
		if aliveMsgFromDiffOrg && !g.hasExternalEndpoint(peer.PKIID) {
541
542
543
544
545
546
547
548
			continue
		}
		// Don't gossip secrets
		if !g.isInMyorg(discovery.NetworkMember{PKIid: peer.PKIID}) {
			msg.Envelope.SecretEnvelope = nil
		}

		g.comm.Send(msg, peer)
YACOVM's avatar
YACOVM committed
549
550
551
	}
}

YACOVM's avatar
YACOVM committed
552
// gossipInChan gossips a given GossipMessage slice according to a channel's routing policy.
553
func (g *gossipServiceImpl) gossipInChan(messages []*proto.SignedGossipMessage, chanRoutingFactory channelRoutingFilterFactory) {
YACOVM's avatar
YACOVM committed
554
555
556
557
558
	if len(messages) == 0 {
		return
	}
	totalChannels := extractChannels(messages)
	var channel common.ChainID
559
	var messagesOfChannel []*proto.SignedGossipMessage
YACOVM's avatar
YACOVM committed
560
561
562
563
564
	for len(totalChannels) > 0 {
		// Take first channel
		channel, totalChannels = totalChannels[0], totalChannels[1:]
		// Extract all messages of that channel
		grabMsgs := func(o interface{}) bool {
565
			return bytes.Equal(o.(*proto.SignedGossipMessage).Channel, channel)
YACOVM's avatar
YACOVM committed
566
567
		}
		messagesOfChannel, messages = partitionMessages(grabMsgs, messages)
568
569
570
		if len(messagesOfChannel) == 0 {
			continue
		}
YACOVM's avatar
YACOVM committed
571
572
573
574
575
576
577
		// Grab channel object for that channel
		gc := g.chanState.getGossipChannelByChainID(channel)
		if gc == nil {
			g.logger.Warning("Channel", channel, "wasn't found")
			continue
		}
		// Select the peers to send the messages to
578
579
		// For leadership messages we will select all peers that pass routing factory - e.g. all peers in channel and org
		membership := g.disc.GetMembership()
580
581
582
583
584
585
586
		var peers2Send []*comm.RemotePeer
		if messagesOfChannel[0].IsLeadershipMsg() {
			peers2Send = filter.SelectPeers(len(membership), membership, chanRoutingFactory(gc))
		} else {
			peers2Send = filter.SelectPeers(g.conf.PropagatePeerNum, membership, chanRoutingFactory(gc))
		}

YACOVM's avatar
YACOVM committed
587
588
		// Send the messages to the remote peers
		for _, msg := range messagesOfChannel {
589
			g.comm.Send(msg, peers2Send...)
YACOVM's avatar
YACOVM committed
590
591
592
593
		}
	}
}

YACOVM's avatar
YACOVM committed
594
// Gossip sends a message to other peers to the network
YACOVM's avatar
YACOVM committed
595
func (g *gossipServiceImpl) Gossip(msg *proto.GossipMessage) {
YACOVM's avatar
YACOVM committed
596
597
598
599
600
601
	// Educate developers to Gossip messages with the right tags.
	// See IsTagLegal() for wanted behavior.
	if err := msg.IsTagLegal(); err != nil {
		panic(err)
	}

602
603
604
	sMsg := &proto.SignedGossipMessage{
		GossipMessage: msg,
	}
605

606
607
608
609
610
611
612
613
614
	var err error
	if sMsg.IsDataMsg() {
		sMsg, err = sMsg.NoopSign()
	} else {
		_, err = sMsg.Sign(func(msg []byte) ([]byte, error) {
			return g.mcs.Sign(msg)
		})
	}

615
616
617
618
	if err != nil {
		g.logger.Warning("Failed signing message:", err)
		return
	}
619

YACOVM's avatar
YACOVM committed
620
621
622
623
624
625
626
	if msg.IsChannelRestricted() {
		gc := g.chanState.getGossipChannelByChainID(msg.Channel)
		if gc == nil {
			g.logger.Warning("Failed obtaining gossipChannel of", msg.Channel, "aborting")
			return
		}
		if msg.IsDataMsg() {
627
			gc.AddToMsgStore(sMsg)
YACOVM's avatar
YACOVM committed
628
629
630
631
632
		}
	}

	if g.conf.PropagateIterations == 0 {
		return
YACOVM's avatar
YACOVM committed
633
	}
634
	g.emitter.Add(sMsg)
YACOVM's avatar
YACOVM committed
635
636
}

YACOVM's avatar
YACOVM committed
637
638
// Send sends a message to remote peers
func (g *gossipServiceImpl) Send(msg *proto.GossipMessage, peers ...*comm.RemotePeer) {
639
640
641
642
643
644
	m, err := msg.NoopSign()
	if err != nil {
		g.logger.Warning("Failed creating SignedGossipMessage:", err)
		return
	}
	g.comm.Send(m, peers...)
YACOVM's avatar
YACOVM committed
645
646
647
}

// GetPeers returns a mapping of endpoint --> []discovery.NetworkMember
YACOVM's avatar
YACOVM committed
648
func (g *gossipServiceImpl) Peers() []discovery.NetworkMember {
649
	return g.disc.GetMembership()
YACOVM's avatar
YACOVM committed
650
651
652
653
654
655
656
}

// PeersOfChannel returns the NetworkMembers considered alive
// and also subscribed to the channel given
func (g *gossipServiceImpl) PeersOfChannel(channel common.ChainID) []discovery.NetworkMember {
	gc := g.chanState.getGossipChannelByChainID(channel)
	if gc == nil {
YACOVM's avatar
YACOVM committed
657
		g.logger.Debug("No such channel", channel)
YACOVM's avatar
YACOVM committed
658
659
660
661
		return nil
	}

	return gc.GetPeers()
YACOVM's avatar
YACOVM committed
662
663
}

YACOVM's avatar
YACOVM committed
664
// Stop stops the gossip component
YACOVM's avatar
YACOVM committed
665
666
667
668
func (g *gossipServiceImpl) Stop() {
	if g.toDie() {
		return
	}
669
	atomic.StoreInt32(&g.stopFlag, int32(1))
YACOVM's avatar
YACOVM committed
670
	g.logger.Info("Stopping gossip")
671
672
673
674
675
676
	comWG := sync.WaitGroup{}
	comWG.Add(1)
	go func() {
		defer comWG.Done()
		g.comm.Stop()
	}()
YACOVM's avatar
YACOVM committed
677
	g.chanState.stop()
YACOVM's avatar
YACOVM committed
678
	g.discAdapter.close()
679
	g.disc.Stop()
680
	g.certStore.stop()
YACOVM's avatar
YACOVM committed
681
682
683
	g.toDieChan <- struct{}{}
	g.emitter.Stop()
	g.ChannelDeMultiplexer.Close()
684
	g.stateInfoMsgStore.Stop()
YACOVM's avatar
YACOVM committed
685
	g.stopSignal.Wait()
686
	comWG.Wait()
YACOVM's avatar
YACOVM committed
687
688
689
690
691
692
}

func (g *gossipServiceImpl) UpdateMetadata(md []byte) {
	g.disc.UpdateMetadata(md)
}

YACOVM's avatar
YACOVM committed
693
694
695
696
697
// UpdateChannelMetadata updates the self metadata the peer
// publishes to other peers about its channel-related state
func (g *gossipServiceImpl) UpdateChannelMetadata(md []byte, chainID common.ChainID) {
	gc := g.chanState.getGossipChannelByChainID(chainID)
	if gc == nil {
YACOVM's avatar
YACOVM committed
698
		g.logger.Debug("No such channel", chainID)
YACOVM's avatar
YACOVM committed
699
700
701
702
703
704
705
706
707
708
		return
	}
	stateInfMsg, err := g.createStateInfoMsg(md, chainID)
	if err != nil {
		g.logger.Error("Failed creating StateInfo message")
		return
	}
	gc.UpdateStateInfo(stateInfMsg)
}

YACOVM's avatar
YACOVM committed
709
710
711
712
// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// If passThrough is false, the messages are processed by the gossip layer beforehand.
// If passThrough is true, the gossip layer doesn't intervene and the messages
// can be used to send a reply back to the sender
713
func (g *gossipServiceImpl) Accept(acceptor common.MessageAcceptor, passThrough bool) (<-chan *proto.GossipMessage, <-chan proto.ReceivedMessage) {
YACOVM's avatar
YACOVM committed
714
715
716
	if passThrough {
		return nil, g.comm.Accept(acceptor)
	}
717
718
719
720
721
722
723
724
725
726
727
728
	acceptByType := func(o interface{}) bool {
		if o, isGossipMsg := o.(*proto.GossipMessage); isGossipMsg {
			return acceptor(o)
		}
		if o, isSignedMsg := o.(*proto.SignedGossipMessage); isSignedMsg {
			sMsg := o
			return acceptor(sMsg.GossipMessage)
		}
		g.logger.Warning("Message type:", reflect.TypeOf(o), "cannot be evaluated")
		return false
	}
	inCh := g.AddChannel(acceptByType)
729
	outCh := make(chan *proto.GossipMessage, acceptChanSize)
YACOVM's avatar
YACOVM committed
730
731
732
733
734
735
	go func() {
		for {
			select {
			case s := <-g.toDieChan:
				g.toDieChan <- s
				return
736
			case m := <-inCh:
YACOVM's avatar
YACOVM committed
737
738
739
				if m == nil {
					return
				}
740
				outCh <- m.(*proto.SignedGossipMessage).GossipMessage
YACOVM's avatar
YACOVM committed
741
742
743
			}
		}
	}()
YACOVM's avatar
YACOVM committed
744
	return outCh, nil
YACOVM's avatar
YACOVM committed
745
746
747
}

func selectOnlyDiscoveryMessages(m interface{}) bool {
748
	msg, isGossipMsg := m.(proto.ReceivedMessage)
YACOVM's avatar
YACOVM committed
749
750
751
752
753
754
755
756
757
758
759
760
	if !isGossipMsg {
		return false
	}
	alive := msg.GetGossipMessage().GetAliveMsg()
	memRes := msg.GetGossipMessage().GetMemRes()
	memReq := msg.GetGossipMessage().GetMemReq()

	selected := alive != nil || memReq != nil || memRes != nil

	return selected
}

761
func (g *gossipServiceImpl) newDiscoveryAdapter() *discoveryAdapter {
YACOVM's avatar
YACOVM committed
762
763
764
	return &discoveryAdapter{
		c:        g.comm,
		stopping: int32(0),
765
766
767
768
769
		gossipFunc: func(msg *proto.SignedGossipMessage) {
			if g.conf.PropagateIterations == 0 {
				return
			}
			g.emitter.Add(msg)
YACOVM's avatar
YACOVM committed
770
		},
771
772
773
		incChan:          make(chan *proto.SignedGossipMessage),
		presumedDead:     g.presumedDead,
		disclosurePolicy: g.disclosurePolicy,
YACOVM's avatar
YACOVM committed
774
775
776
777
778
779
	}
}

// discoveryAdapter is used to supply the discovery module with needed abilities
// that the comm interface in the discovery module declares
type discoveryAdapter struct {
780
781
782
783
784
785
	stopping         int32
	c                comm.Comm
	presumedDead     chan common.PKIidType
	incChan          chan *proto.SignedGossipMessage
	gossipFunc       func(message *proto.SignedGossipMessage)
	disclosurePolicy discovery.DisclosurePolicy
YACOVM's avatar
YACOVM committed
786
787
788
}

func (da *discoveryAdapter) close() {
789
	atomic.StoreInt32(&da.stopping, int32(1))
YACOVM's avatar
YACOVM committed
790
791
792
793
794
795
796
	close(da.incChan)
}

func (da *discoveryAdapter) toDie() bool {
	return atomic.LoadInt32(&da.stopping) == int32(1)
}

797
func (da *discoveryAdapter) Gossip(msg *proto.SignedGossipMessage) {
YACOVM's avatar
YACOVM committed
798
799
800
	if da.toDie() {
		return
	}
801

YACOVM's avatar
YACOVM committed
802
803
804
	da.gossipFunc(msg)
}

805
func (da *discoveryAdapter) SendToPeer(peer *discovery.NetworkMember, msg *proto.SignedGossipMessage) {
YACOVM's avatar
YACOVM committed
806
807
808
	if da.toDie() {
		return
	}
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
	// Check membership requests for peers that we know of their PKI-ID.
	// The only peers we don't know about their PKI-IDs are bootstrap peers.
	if memReq := msg.GetMemReq(); memReq != nil && len(peer.PKIid) != 0 {
		selfMsg, err := memReq.SelfInformation.ToGossipMessage()
		if err != nil {
			// Shouldn't happen
			panic("Tried to send a membership request with a malformed AliveMessage")
		}
		// Apply the EnvelopeFilter of the disclosure policy
		// on the alive message of the selfInfo field of the membership request
		_, omitConcealedFields := da.disclosurePolicy(peer)
		selfMsg.Envelope = omitConcealedFields(selfMsg)
		// Backup old known field
		oldKnown := memReq.Known
		// Override new SelfInfo message with updated envelope
		memReq = &proto.MembershipRequest{
			SelfInformation: selfMsg.Envelope,
			Known:           oldKnown,
		}
		// Update original message
		msg.Content = &proto.GossipMessage_MemReq{
			MemReq: memReq,
		}
		// Update the envelope of the outer message, no need to sign (point2point)
833
834
835
836
		msg, err = msg.NoopSign()
		if err != nil {
			return
		}
837
	}
838
	da.c.Send(msg, &comm.RemotePeer{PKIID: peer.PKIid, Endpoint: peer.PreferredEndpoint()})
YACOVM's avatar
YACOVM committed
839
840
841
}

func (da *discoveryAdapter) Ping(peer *discovery.NetworkMember) bool {
YACOVM's avatar
YACOVM committed
842
843
	err := da.c.Probe(&comm.RemotePeer{Endpoint: peer.PreferredEndpoint(), PKIID: peer.PKIid})
	return err == nil
YACOVM's avatar
YACOVM committed
844
845
}

846
func (da *discoveryAdapter) Accept() <-chan *proto.SignedGossipMessage {
YACOVM's avatar
YACOVM committed
847
848
849
	return da.incChan
}

Artem Barger's avatar
Artem Barger committed
850
func (da *discoveryAdapter) PresumedDead() <-chan common.PKIidType {
YACOVM's avatar
YACOVM committed
851
852
853
854
	return da.presumedDead
}

func (da *discoveryAdapter) CloseConn(peer *discovery.NetworkMember) {
855
	da.c.CloseConn(&comm.RemotePeer{PKIID: peer.PKIid})
YACOVM's avatar
YACOVM committed
856
857
}

YACOVM's avatar
YACOVM committed
858
type discoverySecurityAdapter struct {
859
860
861
862
863
864
865
	identity              api.PeerIdentityType
	includeIdentityPeriod time.Time
	idMapper              identity.Mapper
	sa                    api.SecurityAdvisor
	mcs                   api.MessageCryptoService
	c                     comm.Comm
	logger                *logging.Logger
YACOVM's avatar
YACOVM committed
866
867
}

868
func (g *gossipServiceImpl) newDiscoverySecurityAdapter() *discoverySecurityAdapter {
YACOVM's avatar
YACOVM committed
869
	return &discoverySecurityAdapter{
870
871
872
873
874
875
876
		sa:                    g.secAdvisor,
		idMapper:              g.idMapper,
		mcs:                   g.mcs,
		c:                     g.comm,
		logger:                g.logger,
		includeIdentityPeriod: g.includeIdentityPeriod,
		identity:              g.selfIdentity,
YACOVM's avatar
YACOVM committed
877
878
879
880
	}
}

// validateAliveMsg validates that an Alive message is authentic
881
func (sa *discoverySecurityAdapter) ValidateAliveMsg(m *proto.SignedGossipMessage) bool {
882
	am := m.GetAliveMsg()
883
	if am == nil || am.Membership == nil || am.Membership.PkiId == nil || !m.IsSigned() {
884
		sa.logger.Warning("Invalid alive message:", m)
YACOVM's avatar
YACOVM committed
885
886
887
888
889
		return false
	}

	var identity api.PeerIdentityType

YACOVM's avatar
YACOVM committed
890
	// If identity is included inside AliveMessage
YACOVM's avatar
YACOVM committed
891
892
	if am.Identity != nil {
		identity = api.PeerIdentityType(am.Identity)
893
		claimedPKIID := am.Membership.PkiId
894
		err := sa.idMapper.Put(claimedPKIID, identity)
YACOVM's avatar
YACOVM committed
895
896
897
898
899
		if err != nil {
			sa.logger.Warning("Failed validating identity of", am, "reason:", err)
			return false
		}
	} else {
900
		identity, _ = sa.idMapper.Get(am.Membership.PkiId)
YACOVM's avatar
YACOVM committed
901
		if identity != nil {
902
			sa.logger.Debug("Fetched identity of", am.Membership.PkiId, "from identity store")
YACOVM's avatar
YACOVM committed
903
904
905
906
		}
	}

	if identity == nil {
907
		sa.logger.Debug("Don't have certificate for", am)
YACOVM's avatar
YACOVM committed
908
909
910
		return false
	}

911
	return sa.validateAliveMsgSignature(m, identity)
YACOVM's avatar
YACOVM committed
912
913
914
}

// SignMessage signs an AliveMessage and updates its signature field
915
func (sa *discoverySecurityAdapter) SignMessage(m *proto.GossipMessage, internalEndpoint string) *proto.Envelope {
916
917
918
	signer := func(msg []byte) ([]byte, error) {
		return sa.mcs.Sign(msg)
	}
919
920
	if m.IsAliveMsg() && time.Now().Before(sa.includeIdentityPeriod) {
		m.GetAliveMsg().Identity = sa.identity
YACOVM's avatar
YACOVM committed
921
	}
922
923
924
	sMsg := &proto.SignedGossipMessage{
		GossipMessage: m,
	}
925
926
927
928
929
930
	e, err := sMsg.Sign(signer)
	if err != nil {
		sa.logger.Warning("Failed signing message:", err)
		return nil
	}

931
932
933
	if internalEndpoint == "" {
		return e
	}
934
935
936
937
938
	e.SignSecret(signer, &proto.Secret{
		Content: &proto.Secret_InternalEndpoint{
			InternalEndpoint: internalEndpoint,
		},
	})
939
	return e
YACOVM's avatar
YACOVM committed
940
941
}

942
func (sa *discoverySecurityAdapter) validateAliveMsgSignature(m *proto.SignedGossipMessage, identity api.PeerIdentityType) bool {
943
944
945
946
947
948
949
950
951
952
953
954
	am := m.GetAliveMsg()
	// At this point we got the certificate of the peer, proceed to verifying the AliveMessage
	verifier := func(peerIdentity []byte, signature, message []byte) error {
		return sa.mcs.Verify(api.PeerIdentityType(peerIdentity), signature, message)
	}

	// We verify the signature on the message
	err := m.Verify(identity, verifier)
	if err != nil {
		sa.logger.Warning("Failed verifying:", am, ":", err)
		return false
	}
955

956
	return true
YACOVM's avatar
YACOVM committed
957
}
YACOVM's avatar
YACOVM committed
958
959

func (g *gossipServiceImpl) createCertStorePuller() pull.Mediator {
960
	conf := pull.Config{
961
		MsgType:           proto.PullMsgType_IDENTITY_MSG,
YACOVM's avatar
YACOVM committed
962
		Channel:           []byte(""),
963
		ID:                g.conf.InternalEndpoint,
YACOVM's avatar
YACOVM committed
964
965
966
967
		PeerCountToSelect: g.conf.PullPeerNum,
		PullInterval:      g.conf.PullInterval,
		Tag:               proto.GossipMessage_EMPTY,
	}
968
	pkiIDFromMsg := func(msg *proto.SignedGossipMessage) string {
YACOVM's avatar
YACOVM committed
969
		identityMsg := msg.GetPeerIdentity()
970
		if identityMsg == nil || identityMsg.PkiId == nil {
YACOVM's avatar
YACOVM committed
971
972
			return ""
		}
973
		return fmt.Sprintf("%s", string(identityMsg.PkiId))
YACOVM's avatar
YACOVM committed
974
	}
975
	certConsumer := func(msg *proto.SignedGossipMessage) {
YACOVM's avatar
YACOVM committed
976
		idMsg := msg.GetPeerIdentity()
977
		if idMsg == nil || idMsg.Cert == nil || idMsg.PkiId == nil {
YACOVM's avatar
YACOVM committed
978
979
980
			g.logger.Warning("Invalid PeerIdentity:", idMsg)
			return
		}
981
		err := g.idMapper.Put(common.PKIidType(idMsg.PkiId), api.PeerIdentityType(idMsg.Cert))
YACOVM's avatar
YACOVM committed
982
983
984
		if err != nil {
			g.logger.Warning("Failed associating PKI-ID with certificate:", err)
		}
YACOVM's avatar
YACOVM committed
985
		g.logger.Info("Learned of a new certificate:", idMsg.Cert)
YACOVM's avatar
YACOVM committed
986
	}
987
	adapter := &pull.PullAdapter{
988
989
990
991
		Sndr:        g.comm,
		MemSvc:      g.disc,
		IdExtractor: pkiIDFromMsg,
		MsgCons:     certConsumer,
992
		DigFilter:   g.sameOrgOrOurOrgPullFilter,
993
994
	}
	return pull.NewPullMediator(conf, adapter)
YACOVM's avatar
YACOVM committed
995
}
YACOVM's avatar
YACOVM committed
996

997
998
999
1000
func (g *gossipServiceImpl) sameOrgOrOurOrgPullFilter(msg proto.ReceivedMessage) func(string) bool {
	peersOrg := g.secAdvisor.OrgByPeerIdentity(msg.GetConnectionInfo().Identity)
	if len(peersOrg) == 0 {
		g.logger.Warning("Failed determining organization of", msg.GetConnectionInfo())
For faster browsing, not all history is shown. View entire blame