Commit c00203f8 authored by yacovm's avatar yacovm Committed by Yacov Manevich
Browse files

[FAB-14691] Add to msgStore and puller atomically

When adding a block to the message store, if it is added and
not rejected - it is then added to the block puller.

When the block is removed from the message store, a callback
is triggered to remove the block from the block puller.

However, these 2 operations are not atomic.

Since we can add a block to the message store from both AddToMsgStore
(which is invoked by Gossip() ) and from HandleMessage,
we can have the following schedule:

  1) A block with sequence of 100 is gossiped by the upper
    layer of the peer, and AddToMsgStore is called, which
    adds the block to the message store, and the CPU is preempted.
  2) A block message 210 is received via HandleMessage and it causes
     the block 100 to be evicted from the message store, and the
    callback to remove the message from the block puller is called,
    but it is not removed because it is not there yet.
  3) The block 210 is added to the block puller, since it was added
     to the message store.
  4) The CPU is back to perform AddToMsgStore, and adds block 100
     to the block puller.

Now the block puller has block 100, and the message store doesn't
have block 100 anymore - which means it will never be evicted from
the block puller.

To prevent this we need to make these 2 operations be atomic.

Change-Id: I3b7d0d013ce8da5d9a0e40f8b0cdbc3edaed22c9
Signed-off-by: default avataryacovm <>
parent ce25550a
......@@ -502,6 +502,8 @@ func (gc *gossipChannel) EligibleForChannel(member discovery.NetworkMember) bool
// AddToMsgStore adds a given GossipMessage to the message store
func (gc *gossipChannel) AddToMsgStore(msg *proto.SignedGossipMessage) {
if msg.IsDataMsg() {
defer gc.Unlock()
added := gc.blockMsgStore.Add(msg)
if added {
gc.logger.Debugf("Adding %v to the block puller", msg)
......@@ -587,7 +589,13 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
gc.logger.Warning("Failed verifying block", m.GetDataMsg().Payload.SeqNum)
added = gc.blockMsgStore.Add(msg.GetGossipMessage())
if added {
gc.logger.Debugf("Adding %v to the block puller", msg.GetGossipMessage())
} else { // StateInfoMsg verification should be handled in a layer above
// since we don't have access to the id mapper here
added = gc.stateInfoMsgStore.Add(msg.GetGossipMessage())
......@@ -598,11 +606,6 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
// DeMultiplex to local subscribers
if m.IsDataMsg() {
gc.logger.Debugf("Adding %v to the block puller", msg.GetGossipMessage())
......@@ -626,6 +629,8 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
// Iterate over the envelopes, and filter out blocks
// that we already have in the blockMsgStore, or blocks that
// are too far in the past.
var msgs []*proto.SignedGossipMessage
var items []*proto.Envelope
filteredEnvelopes := []*proto.Envelope{}
for _, item := range m.GetDataUpdate().Data {
gMsg, err := item.ToGossipMessage()
......@@ -644,6 +649,15 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
if !gc.verifyBlock(gMsg.GossipMessage, msg.GetConnectionInfo().ID) {
msgs = append(msgs, gMsg)
items = append(items, item)
defer gc.Unlock()
for i, gMsg := range msgs {
item := items[i]
added := gc.blockMsgStore.Add(gMsg)
if !added {
// If this block doesn't need to be added, it means it either already
......@@ -652,6 +666,7 @@ func (gc *gossipChannel) HandleMessage(msg proto.ReceivedMessage) {
filteredEnvelopes = append(filteredEnvelopes, item)
// Replace the update message with just the blocks that should be processed
m.GetDataUpdate().Data = filteredEnvelopes
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment