Commit 9c2ecfc2 authored by YACOVM's avatar YACOVM Committed by Yacov Manevich
Browse files

WIP- Fabric gossip component



This is a commit that contains only APIs and protobuff
The API between the ledger and the gossip component is in gossip/api/api.go

Change-Id: I9f6aef85f3b03e2d3a6b9850148e9cf4d1a93ce3
Signed-off-by: default avatarYacov Manevich <yacovm@il.ibm.com>
parent 55593ac6
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package api
import (
"github.com/hyperledger/fabric/gossip/discovery"
"google.golang.org/grpc"
)
type GossipEmitterFactory interface {
NewGossipEmitter(id string, discSvc discovery.DiscoveryService) GossipService
}
// GossipService is used to publish new blocks to the gossip network
type GossipService interface {
// payload: Holds the block's content, hash and seqNum
Publish(payload Payload) error
}
type BindAddress struct {
Host string
Port int16
}
// Payload defines an object that contains a ledger block
type Payload struct {
Data []byte // The content of the message, possibly encrypted or signed
Hash string // The message hash
SeqNum uint64 // The message sequence number
}
type GossipMemberFactory interface {
NewGossipMember(discovery.DiscoveryService, ReplicationProvider, MessageCryptoService, MessagePolicyVerifier, *grpc.Server) GossipMember
NewGossipMemberWithRPCServer(discovery.DiscoveryService, ReplicationProvider, MessageCryptoService, MessagePolicyVerifier, BindAddress) (GossipMember, error)
}
// GossipMember is used to obtain new blocks from the gossip network
type GossipMember interface {
// RegisterCallback registers a callback that is invoked on messages
// from startSeq to endSeq and invokes the callback when they arrive
RegisterCallback(startSeq uint64, endSeq uint64, callback func([]Payload))
}
// ReplicationProvider used by the GossipMember in order to obtain Blocks of
// certain seqNum range to be sent to the requester
type ReplicationProvider interface {
// GetData used by the gossip component to obtain certain blocks from the ledger.
// Returns the blocks requested with the given sequence numbers, or an error if
// some block requested is not available.
GetData(startSeqNum uint64, endSeqNum uint64) ([]Payload, error)
// LastBlockSeq used by the gossip component to obtain the last sequence of a block the ledger has.
LastBlockSeq() uint64
}
// MessageCryptoVerifier verifies the message's authenticity,
// if messages are cryptographically signed
type MessageCryptoService interface {
// Verify returns nil whether the message and its identifier are authentic,
// otherwise returns an error
Verify(seqNum uint64, sender string, payload Payload) error
// Sign signs the payload
Sign(sender string, Payload Payload) Payload
// SignBlob signs a blob
SignBlob([]byte) []byte
// VerifyBlob verifies a blob, returns error on failure
// and nil if the blob is correctly signed
VerifyBlob(sender string, blob []byte) error
}
// MessagePolicyVerifier verifies whether the message conforms to all required policies,
// and can be safely delivered to the user.
type MessagePolicyVerifier interface {
Verify(seqNum uint64, sender string, payload Payload) error
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package comm
import (
"github.com/hyperledger/fabric/gossip/proto"
"sync"
)
type CommModule interface {
// Send sends a message to endpoints
Send(msg *proto.GossipMessage, endpoints ...string)
// Probe probes a remote node and returns nil if its responsive
Probe(endpoint string) error
// Accept returns a dedicated read-only channel for messages sent by other nodes that match a certain predicate.
// Each message from the channel can be used to send a reply back to the sender
Accept(MessageAcceptor) <-chan *ReceivedMessage
// PresumedDead returns a read-only channel for node endpoints that are suspected to be offline
PresumedDead() <-chan string
// CloseConn closes a connection to a certain endpoint
CloseConn(endpoint string)
// Stop stops the module
Stop()
}
type MessageAcceptor func(*proto.GossipMessage) bool
type ReceivedMessage struct {
*proto.GossipMessage
lock *sync.Mutex
srvStream proto.Gossip_GossipStreamServer
clStream proto.Gossip_GossipStreamClient
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package discovery
import "github.com/hyperledger/fabric/gossip/proto"
// CryptoService is an interface that the discovery expects to be implemented and passed on creation
type CryptoService interface {
// validateAliveMsg validates that an Alive message is authentic
ValidateAliveMsg(*proto.AliveMessage) bool
// SignMessage signs an AliveMessage and updates its signature field
SignMessage(*proto.AliveMessage) *proto.AliveMessage
}
// CommService is an interface that the discovery expects to be implemented and passed on creation
type CommService interface {
// Gossip gossips a message
Gossip(msg *proto.GossipMessage)
// SendToPeer sends to a given peer a message.
// The nonce can be anything since the communication module handles the nonce itself
SendToPeer(peer *NetworkMember, msg *proto.GossipMessage)
// Ping probes a remote peer and returns if it's responsive or not
Ping(peer *NetworkMember) bool
// Accept returns a read-only channel for membership messages sent from remote peers
Accept() <-chan GossipMsg
// PresumedDead returns a read-only channel for peers that are presumed to be dead
PresumedDead() <-chan string
// CloseConn orders to close the connection with a certain peer
CloseConn(id string)
}
type GossipMsg interface {
GetGossipMessage() *proto.GossipMessage
}
type NetworkMember struct {
Id string
Endpoint string
Metadata []byte
}
type DiscoveryService interface {
// Self returns this instance's membership information
Self() NetworkMember
// UpdateMetadata updates this instance's metadata
UpdateMetadata([]byte)
// UpdateEndpoint updates this instance's endpoint
UpdateEndpoint(string)
// Stops this instance
Stop()
// GetMembership returns the alive members in the view
GetMembership() []NetworkMember
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gossip
import (
"github.com/hyperledger/fabric/gossip/discovery"
"github.com/hyperledger/fabric/gossip/proto"
"time"
)
type GossipService interface {
// GetPeersMetadata returns a mapping of endpoint --> metadata
GetPeersMetadata() map[string][]byte
// UpdateMetadata updates the self metadata of the discovery layer
UpdateMetadata([]byte)
// Gossip sends a message to other peers to the network
Gossip(msg *proto.GossipMessage)
// Accept returns a channel that outputs messages from other peers
Accept(MessageAcceptor) <-chan *proto.GossipMessage
// Stop stops the gossip component
Stop()
}
type MessageAcceptor func(*proto.GossipMessage) bool
type GossipConfig struct {
BindPort int
Id string
SelfEndpoint string
BootstrapPeers []*discovery.NetworkMember
PropagateIterations int
PropagatePeerNum int
MaxMessageCountToStore int
MaxPropagationBurstSize int
MaxPropagationBurstLatency time.Duration
PullInterval time.Duration
PullPeerNum int
}
// Code generated by protoc-gen-go.
// source: message.proto
// DO NOT EDIT!
/*
Package proto is a generated protocol buffer package.
It is generated from these files:
message.proto
It has these top-level messages:
GossipMessage
DataRequest
GossipHello
DataUpdate
DataDigest
DataMessage
AckMessage
Payload
AliveMessage
PeerTime
MembershipRequest
MembershipResponse
Member
Empty
*/
package proto
import proto1 "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
import (
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto1.Marshal
var _ = fmt.Errorf
var _ = math.Inf
type GossipMessage struct {
Nonce uint64 `protobuf:"varint,1,opt,name=nonce" json:"nonce,omitempty"`
// Types that are valid to be assigned to Content:
// *GossipMessage_AliveMsg
// *GossipMessage_MemReq
// *GossipMessage_MemRes
// *GossipMessage_DataMsg
// *GossipMessage_Hello
// *GossipMessage_DataDig
// *GossipMessage_DataReq
// *GossipMessage_DataUpdate
// *GossipMessage_AckMsg
// *GossipMessage_Empty
Content isGossipMessage_Content `protobuf_oneof:"content"`
}
func (m *GossipMessage) Reset() { *m = GossipMessage{} }
func (m *GossipMessage) String() string { return proto1.CompactTextString(m) }
func (*GossipMessage) ProtoMessage() {}
type isGossipMessage_Content interface {
isGossipMessage_Content()
}
type GossipMessage_AliveMsg struct {
AliveMsg *AliveMessage `protobuf:"bytes,2,opt,name=aliveMsg,oneof"`
}
type GossipMessage_MemReq struct {
MemReq *MembershipRequest `protobuf:"bytes,3,opt,name=memReq,oneof"`
}
type GossipMessage_MemRes struct {
MemRes *MembershipResponse `protobuf:"bytes,4,opt,name=memRes,oneof"`
}
type GossipMessage_DataMsg struct {
DataMsg *DataMessage `protobuf:"bytes,5,opt,name=dataMsg,oneof"`
}
type GossipMessage_Hello struct {
Hello *GossipHello `protobuf:"bytes,6,opt,name=hello,oneof"`
}
type GossipMessage_DataDig struct {
DataDig *DataDigest `protobuf:"bytes,7,opt,name=dataDig,oneof"`
}
type GossipMessage_DataReq struct {
DataReq *DataRequest `protobuf:"bytes,8,opt,name=dataReq,oneof"`
}
type GossipMessage_DataUpdate struct {
DataUpdate *DataUpdate `protobuf:"bytes,9,opt,name=dataUpdate,oneof"`
}
type GossipMessage_AckMsg struct {
AckMsg *AckMessage `protobuf:"bytes,10,opt,name=ackMsg,oneof"`
}
type GossipMessage_Empty struct {
Empty *Empty `protobuf:"bytes,11,opt,name=empty,oneof"`
}
func (*GossipMessage_AliveMsg) isGossipMessage_Content() {}
func (*GossipMessage_MemReq) isGossipMessage_Content() {}
func (*GossipMessage_MemRes) isGossipMessage_Content() {}
func (*GossipMessage_DataMsg) isGossipMessage_Content() {}
func (*GossipMessage_Hello) isGossipMessage_Content() {}
func (*GossipMessage_DataDig) isGossipMessage_Content() {}
func (*GossipMessage_DataReq) isGossipMessage_Content() {}
func (*GossipMessage_DataUpdate) isGossipMessage_Content() {}
func (*GossipMessage_AckMsg) isGossipMessage_Content() {}
func (*GossipMessage_Empty) isGossipMessage_Content() {}
func (m *GossipMessage) GetContent() isGossipMessage_Content {
if m != nil {
return m.Content
}
return nil
}
func (m *GossipMessage) GetAliveMsg() *AliveMessage {
if x, ok := m.GetContent().(*GossipMessage_AliveMsg); ok {
return x.AliveMsg
}
return nil
}
func (m *GossipMessage) GetMemReq() *MembershipRequest {
if x, ok := m.GetContent().(*GossipMessage_MemReq); ok {
return x.MemReq
}
return nil
}
func (m *GossipMessage) GetMemRes() *MembershipResponse {
if x, ok := m.GetContent().(*GossipMessage_MemRes); ok {
return x.MemRes
}
return nil
}
func (m *GossipMessage) GetDataMsg() *DataMessage {
if x, ok := m.GetContent().(*GossipMessage_DataMsg); ok {
return x.DataMsg
}
return nil
}
func (m *GossipMessage) GetHello() *GossipHello {
if x, ok := m.GetContent().(*GossipMessage_Hello); ok {
return x.Hello
}
return nil
}
func (m *GossipMessage) GetDataDig() *DataDigest {
if x, ok := m.GetContent().(*GossipMessage_DataDig); ok {
return x.DataDig
}
return nil
}
func (m *GossipMessage) GetDataReq() *DataRequest {
if x, ok := m.GetContent().(*GossipMessage_DataReq); ok {
return x.DataReq
}
return nil
}
func (m *GossipMessage) GetDataUpdate() *DataUpdate {
if x, ok := m.GetContent().(*GossipMessage_DataUpdate); ok {
return x.DataUpdate
}
return nil
}
func (m *GossipMessage) GetAckMsg() *AckMessage {
if x, ok := m.GetContent().(*GossipMessage_AckMsg); ok {
return x.AckMsg
}
return nil
}
func (m *GossipMessage) GetEmpty() *Empty {
if x, ok := m.GetContent().(*GossipMessage_Empty); ok {
return x.Empty
}
return nil
}
// XXX_OneofFuncs is for the internal use of the proto package.
func (*GossipMessage) XXX_OneofFuncs() (func(msg proto1.Message, b *proto1.Buffer) error, func(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error), []interface{}) {
return _GossipMessage_OneofMarshaler, _GossipMessage_OneofUnmarshaler, []interface{}{
(*GossipMessage_AliveMsg)(nil),
(*GossipMessage_MemReq)(nil),
(*GossipMessage_MemRes)(nil),
(*GossipMessage_DataMsg)(nil),
(*GossipMessage_Hello)(nil),
(*GossipMessage_DataDig)(nil),
(*GossipMessage_DataReq)(nil),
(*GossipMessage_DataUpdate)(nil),
(*GossipMessage_AckMsg)(nil),
(*GossipMessage_Empty)(nil),
}
}
func _GossipMessage_OneofMarshaler(msg proto1.Message, b *proto1.Buffer) error {
m := msg.(*GossipMessage)
// content
switch x := m.Content.(type) {
case *GossipMessage_AliveMsg:
b.EncodeVarint(2<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.AliveMsg); err != nil {
return err
}
case *GossipMessage_MemReq:
b.EncodeVarint(3<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.MemReq); err != nil {
return err
}
case *GossipMessage_MemRes:
b.EncodeVarint(4<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.MemRes); err != nil {
return err
}
case *GossipMessage_DataMsg:
b.EncodeVarint(5<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.DataMsg); err != nil {
return err
}
case *GossipMessage_Hello:
b.EncodeVarint(6<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.Hello); err != nil {
return err
}
case *GossipMessage_DataDig:
b.EncodeVarint(7<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.DataDig); err != nil {
return err
}
case *GossipMessage_DataReq:
b.EncodeVarint(8<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.DataReq); err != nil {
return err
}
case *GossipMessage_DataUpdate:
b.EncodeVarint(9<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.DataUpdate); err != nil {
return err
}
case *GossipMessage_AckMsg:
b.EncodeVarint(10<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.AckMsg); err != nil {
return err
}
case *GossipMessage_Empty:
b.EncodeVarint(11<<3 | proto1.WireBytes)
if err := b.EncodeMessage(x.Empty); err != nil {
return err
}
case nil:
default:
return fmt.Errorf("GossipMessage.Content has unexpected type %T", x)
}
return nil
}
func _GossipMessage_OneofUnmarshaler(msg proto1.Message, tag, wire int, b *proto1.Buffer) (bool, error) {
m := msg.(*GossipMessage)
switch tag {
case 2: // content.aliveMsg
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(AliveMessage)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_AliveMsg{msg}
return true, err
case 3: // content.memReq
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(MembershipRequest)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_MemReq{msg}
return true, err
case 4: // content.memRes
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(MembershipResponse)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_MemRes{msg}
return true, err
case 5: // content.dataMsg
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(DataMessage)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_DataMsg{msg}
return true, err
case 6: // content.hello
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(GossipHello)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_Hello{msg}
return true, err
case 7: // content.dataDig
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(DataDigest)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_DataDig{msg}
return true, err
case 8: // content.dataReq
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(DataRequest)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_DataReq{msg}
return true, err
case 9: // content.dataUpdate
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(DataUpdate)
err := b.DecodeMessage(msg)
m.Content = &GossipMessage_DataUpdate{msg}
return true, err
case 10: // content.ackMsg
if wire != proto1.WireBytes {
return true, proto1.ErrInternalBadWireType
}
msg := new(AckMessage)