Commit 40d9afb0 authored by Srinivasan Muralidharan's avatar Srinivasan Muralidharan Committed by Gerrit Code Review
Browse files

Merge "FAB-854 Removed old peer pkg, moved peernext->peer"

parents 021b3c48 eefe40b9
......@@ -34,7 +34,7 @@ import (
"github.com/hyperledger/fabric/core/chaincode/platforms"
"github.com/hyperledger/fabric/core/container"
crypto "github.com/hyperledger/fabric/core/crypto"
"github.com/hyperledger/fabric/core/peernext"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/core/util"
pb "github.com/hyperledger/fabric/protos"
)
......
......@@ -26,7 +26,7 @@ import (
"github.com/hyperledger/fabric/core/chaincode"
ledger "github.com/hyperledger/fabric/core/ledgernext"
"github.com/hyperledger/fabric/core/ledgernext/kvledger"
"github.com/hyperledger/fabric/core/peernext"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/core/util"
pb "github.com/hyperledger/fabric/protos"
)
......
This diff is collapsed.
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package peer
import (
"sync"
pb "github.com/hyperledger/fabric/protos"
)
//-----------------------------------------------------------------------------
//
// Sync Handler
//
//-----------------------------------------------------------------------------
type syncHandler struct {
sync.Mutex
correlationID uint64
}
func (sh *syncHandler) shouldHandle(correlationID uint64) bool {
return correlationID == sh.correlationID
}
//-----------------------------------------------------------------------------
//
// Sync Blocks Handler
//
//-----------------------------------------------------------------------------
type syncBlocksRequestHandler struct {
syncHandler
channel chan *pb.SyncBlocks
}
func (sbh *syncBlocksRequestHandler) reset() {
if sbh.channel != nil {
close(sbh.channel)
}
sbh.channel = make(chan *pb.SyncBlocks, SyncBlocksChannelSize())
sbh.correlationID++
}
func newSyncBlocksRequestHandler() *syncBlocksRequestHandler {
sbh := &syncBlocksRequestHandler{}
sbh.reset()
return sbh
}
//-----------------------------------------------------------------------------
//
// Sync State Snapshot Handler
//
//-----------------------------------------------------------------------------
type syncStateSnapshotRequestHandler struct {
syncHandler
channel chan *pb.SyncStateSnapshot
}
func (srh *syncStateSnapshotRequestHandler) reset() {
if srh.channel != nil {
close(srh.channel)
}
srh.channel = make(chan *pb.SyncStateSnapshot, SyncStateSnapshotChannelSize())
srh.correlationID++
}
func (srh *syncStateSnapshotRequestHandler) createRequest() *pb.SyncStateSnapshotRequest {
return &pb.SyncStateSnapshotRequest{CorrelationId: srh.correlationID}
}
func newSyncStateSnapshotRequestHandler() *syncStateSnapshotRequestHandler {
srh := &syncStateSnapshotRequestHandler{}
srh.reset()
return srh
}
//-----------------------------------------------------------------------------
//
// Sync State Deltas Handler
//
//-----------------------------------------------------------------------------
type syncStateDeltasHandler struct {
syncHandler
channel chan *pb.SyncStateDeltas
}
func (ssdh *syncStateDeltasHandler) reset() {
if ssdh.channel != nil {
close(ssdh.channel)
}
ssdh.channel = make(chan *pb.SyncStateDeltas, SyncStateDeltasChannelSize())
ssdh.correlationID++
}
func (ssdh *syncStateDeltasHandler) createRequest(syncBlockRange *pb.SyncBlockRange) *pb.SyncStateDeltasRequest {
return &pb.SyncStateDeltasRequest{Range: syncBlockRange}
}
func newSyncStateDeltasHandler() *syncStateDeltasHandler {
ssdh := &syncStateDeltasHandler{}
ssdh.reset()
return ssdh
}
......@@ -50,23 +50,6 @@ type Peer interface {
NewOpenchainDiscoveryHello() (*pb.Message, error)
}
// BlocksRetriever interface for retrieving blocks .
type BlocksRetriever interface {
RequestBlocks(*pb.SyncBlockRange) (<-chan *pb.SyncBlocks, error)
}
// StateRetriever interface for retrieving state deltas, etc.
type StateRetriever interface {
RequestStateSnapshot() (<-chan *pb.SyncStateSnapshot, error)
RequestStateDeltas(syncBlockRange *pb.SyncBlockRange) (<-chan *pb.SyncStateDeltas, error)
}
// RemoteLedger interface for retrieving remote ledger data.
type RemoteLedger interface {
BlocksRetriever
StateRetriever
}
// BlockChainAccessor interface for retreiving blocks by block number
type BlockChainAccessor interface {
GetBlockByNumber(blockNumber uint64) (*pb.Block, error)
......@@ -97,7 +80,6 @@ type StateAccessor interface {
// MessageHandler standard interface for handling Openchain messages.
type MessageHandler interface {
RemoteLedger
HandleMessage(msg *pb.Message) error
SendMessage(msg *pb.Message) error
To() (pb.PeerEndpoint, error)
......@@ -117,7 +99,6 @@ type MessageHandlerCoordinator interface {
Broadcast(*pb.Message, pb.PeerEndpoint_Type) []error
Unicast(*pb.Message, *pb.PeerID) error
GetPeers() (*pb.PeersMessage, error)
GetRemoteLedger(receiver *pb.PeerID) (RemoteLedger, error)
PeersDiscovered(*pb.PeersMessage) error
ExecuteTransaction(transaction *pb.Transaction) *pb.Response
Discoverer
......@@ -177,7 +158,7 @@ type handlerMap struct {
}
// HandlerFactory for creating new MessageHandlers
type HandlerFactory func(MessageHandlerCoordinator, ChatStream, bool) (MessageHandler, error)
type HandlerFactory func(MessageHandlerCoordinator, ChatStream, bool, MessageHandler) (MessageHandler, error)
// EngineFactory for creating new engines
type EngineFactory func(MessageHandlerCoordinator) (Engine, error)
......@@ -321,17 +302,6 @@ func getPeerAddresses(peersMsg *pb.PeersMessage) []string {
return addresses
}
// GetRemoteLedger returns the RemoteLedger interface for the remote Peer Endpoint
func (p *Impl) GetRemoteLedger(receiverHandle *pb.PeerID) (RemoteLedger, error) {
p.handlerMap.RLock()
defer p.handlerMap.RUnlock()
remoteLedger, ok := p.handlerMap.m[*receiverHandle]
if !ok {
return nil, fmt.Errorf("Remote ledger not found for receiver %s", receiverHandle.Name)
}
return remoteLedger, nil
}
// PeersDiscovered used by MessageHandlers for notifying this coordinator of discovered PeerEndoints. May include this Peer's PeerEndpoint.
func (p *Impl) PeersDiscovered(peersMessage *pb.PeersMessage) error {
thisPeersEndpoint, err := GetPeerEndpoint()
......@@ -593,7 +563,7 @@ func (p *Impl) chatWithPeer(address string) error {
func (p *Impl) handleChat(ctx context.Context, stream ChatStream, initiatedStream bool) error {
deadline, ok := ctx.Deadline()
peerLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)
handler, err := p.handlerFactory(p, stream, initiatedStream)
handler, err := p.handlerFactory(p, stream, initiatedStream, nil)
if err != nil {
return fmt.Errorf("Error creating handler during handleChat initiation: %s", err)
}
......@@ -643,14 +613,15 @@ func (p *Impl) newHelloMessage() (*pb.HelloMessage, error) {
if err != nil {
return nil, fmt.Errorf("Error creating hello message: %s", err)
}
p.ledgerWrapper.RLock()
defer p.ledgerWrapper.RUnlock()
//size := p.ledgerWrapper.ledger.GetBlockchainSize()
blockChainInfo, err := p.ledgerWrapper.ledger.GetBlockchainInfo()
if err != nil {
return nil, fmt.Errorf("Error creating hello message, error getting block chain info: %s", err)
}
return &pb.HelloMessage{PeerEndpoint: endpoint, BlockchainInfo: blockChainInfo}, nil
//p.ledgerWrapper.RLock()
//defer p.ledgerWrapper.RUnlock()
////size := p.ledgerWrapper.ledger.GetBlockchainSize()
//blockChainInfo, err := p.ledgerWrapper.ledger.GetBlockchainInfo()
//if err != nil {
// return nil, fmt.Errorf("Error creating hello message, error getting block chain info: %s", err)
//}
//return &pb.HelloMessage{PeerEndpoint: endpoint, BlockchainInfo: blockChainInfo}, nil
return &pb.HelloMessage{PeerEndpoint: endpoint}, nil
}
// GetBlockByNumber return a block by block number
......
This diff is collapsed.
This diff is collapsed.
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package statetransfer
import (
"bytes"
"fmt"
"os"
"sort"
"sync"
"testing"
"time"
configSetup "github.com/hyperledger/fabric/core/config"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/protos"
"github.com/op/go-logging"
)
func init() {
logging.SetLevel(logging.DEBUG, "")
}
var AllFailures = [...]mockResponse{Timeout, Corrupt, OutOfOrder}
type testPartialStack struct {
*MockRemoteHashLedgerDirectory
*MockLedger
}
func TestMain(m *testing.M) {
configSetup.SetupTestConfig("./../../../peer")
os.Exit(m.Run())
}
func newPartialStack(ml *MockLedger, rld *MockRemoteHashLedgerDirectory) PartialStack {
return &testPartialStack{
MockLedger: ml,
MockRemoteHashLedgerDirectory: rld,
}
}
func newTestStateTransfer(ml *MockLedger, rld *MockRemoteHashLedgerDirectory) *coordinatorImpl {
ci := NewCoordinatorImpl(newPartialStack(ml, rld)).(*coordinatorImpl)
ci.Start()
return ci
}
func newTestThreadlessStateTransfer(ml *MockLedger, rld *MockRemoteHashLedgerDirectory) *coordinatorImpl {
return NewCoordinatorImpl(newPartialStack(ml, rld)).(*coordinatorImpl)
}
type MockRemoteHashLedgerDirectory struct {
*HashLedgerDirectory
}
func (mrls *MockRemoteHashLedgerDirectory) GetMockRemoteLedgerByPeerID(peerID *protos.PeerID) *MockRemoteLedger {
ml, _ := mrls.GetLedgerByPeerID(peerID)
return ml.(*MockRemoteLedger)
}
func createRemoteLedgers(low, high uint64) *MockRemoteHashLedgerDirectory {
rols := make(map[protos.PeerID]peer.BlockChainAccessor)
for i := low; i <= high; i++ {
peerID := &protos.PeerID{
Name: fmt.Sprintf("Peer %d", i),
}
l := &MockRemoteLedger{}
rols[*peerID] = l
}
return &MockRemoteHashLedgerDirectory{&HashLedgerDirectory{rols}}
}
func executeStateTransfer(sts *coordinatorImpl, ml *MockLedger, blockNumber, sequenceNumber uint64, mrls *MockRemoteHashLedgerDirectory) error {
for peerID := range mrls.remoteLedgers {
mrls.GetMockRemoteLedgerByPeerID(&peerID).blockHeight = blockNumber + 1
}
var err error
blockHash := SimpleGetBlockHash(blockNumber)
for i := 0; i < 100; i++ {
var recoverable bool
err, recoverable = sts.SyncToTarget(blockNumber, blockHash, nil)
if err == nil || !recoverable {
break
}
time.Sleep(10 * time.Millisecond)
// Try to sync for up to 10 seconds
}
if err != nil {
return err
}
if size := ml.GetBlockchainSize(); size != blockNumber+1 {
return fmt.Errorf("Blockchain should be caught up to block %d, but is only %d tall", blockNumber, size)
}
block, err := ml.GetBlock(blockNumber)
if nil != err {
return fmt.Errorf("Error retrieving last block in the mock chain.")
}
if stateHash, _ := ml.GetCurrentStateHash(); !bytes.Equal(stateHash, block.StateHash) {
return fmt.Errorf("Current state does not validate against the latest block.")
}
return nil
}
type filterResult struct {
triggered bool
peerID *protos.PeerID
mutex *sync.Mutex
}
func (res filterResult) wasTriggered() bool {
res.mutex.Lock()
defer res.mutex.Unlock()
return res.triggered
}
func makeSimpleFilter(failureTrigger mockRequest, failureType mockResponse) (func(mockRequest, *protos.PeerID) mockResponse, *filterResult) {
res := &filterResult{triggered: false, mutex: &sync.Mutex{}}
return func(request mockRequest, peerID *protos.PeerID) mockResponse {
//fmt.Println("Received a request", request, "for replicaId", replicaId)
if request != failureTrigger {
return Normal
}
res.mutex.Lock()
defer res.mutex.Unlock()
if !res.triggered {
res.triggered = true
res.peerID = peerID
}
if *peerID == *res.peerID {
fmt.Println("Failing it with", failureType)
return failureType
}
return Normal
}, res
}
func TestStartupValidStateGenesis(t *testing.T) {
mrls := createRemoteLedgers(2, 1) // No remote targets available
// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
ml.PutBlock(0, SimpleGetBlock(0))
sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}
}
func TestStartupValidStateExisting(t *testing.T) {
mrls := createRemoteLedgers(2, 1) // No remote targets available
// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
height := uint64(50)
for i := uint64(0); i < height; i++ {
ml.PutBlock(i, SimpleGetBlock(i))
}
ml.state = SimpleGetState(height - 1)
sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}
}
func TestStartupInvalidStateGenesis(t *testing.T) {
mrls := createRemoteLedgers(1, 3)
// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
ml.PutBlock(0, SimpleGetBlock(0))
ml.state = ^ml.state // Ensure the state is wrong
sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, 0, 0, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}
}
func TestStartupInvalidStateExisting(t *testing.T) {
mrls := createRemoteLedgers(1, 3)
// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
height := uint64(50)
for i := uint64(0); i < height; i++ {
ml.PutBlock(i, SimpleGetBlock(i))
}
ml.state = ^SimpleGetState(height - 1) // Ensure the state is wrong
sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, height-1, height-1, mrls); nil != err {
t.Fatalf("Startup failure: %s", err)
}
}
func TestCatchupSimple(t *testing.T) {
mrls := createRemoteLedgers(1, 3)
// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, nil, t)
ml.PutBlock(0, SimpleGetBlock(0))
sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
if err := executeStateTransfer(sts, ml, 7, 10, mrls); nil != err {
t.Fatalf("Simplest case: %s", err)
}
}
func TestCatchupWithLowMaxDeltas(t *testing.T) {
mrls := createRemoteLedgers(1, 3)
// Test from blockheight of 1, with valid genesis block
deltasTransferred := uint64(0)
blocksTransferred := uint64(0)
ml := NewMockLedger(mrls, func(request mockRequest, peerID *protos.PeerID) mockResponse {
if request == SyncDeltas {
deltasTransferred++
}
if request == SyncBlocks {
blocksTransferred++
}
return Normal
}, t)
ml.PutBlock(0, SimpleGetBlock(0))
sts := newTestStateTransfer(ml, mrls)
maxRange := uint64(3)
sts.maxStateDeltaRange = maxRange
sts.maxBlockRange = maxRange
defer sts.Stop()
targetBlock := uint64(7)
if err := executeStateTransfer(sts, ml, targetBlock, 10, mrls); nil != err {
t.Fatalf("Without deltas case: %s", err)
}
existingBlocks := uint64(1)
targetTransferred := (targetBlock - existingBlocks) / maxRange
if (targetBlock-existingBlocks)%maxRange != 0 {
targetTransferred++
}
if deltasTransferred != targetTransferred {
t.Errorf("Expected %d state deltas transferred, got %d", targetTransferred, deltasTransferred)
}
if blocksTransferred != targetTransferred {
t.Errorf("Expected %d state blocks transferred, got %d", targetTransferred, blocksTransferred)
}
}
func TestCatchupWithoutDeltas(t *testing.T) {
mrls := createRemoteLedgers(1, 3)
deltasTransferred := false
// Test from blockheight of 1, with valid genesis block
ml := NewMockLedger(mrls, func(request mockRequest, peerID *protos.PeerID) mockResponse {
if request == SyncDeltas {
deltasTransferred = true
}
return Normal
}, t)
ml.PutBlock(0, SimpleGetBlock(0))
sts := NewCoordinatorImpl(newPartialStack(ml, mrls)).(*coordinatorImpl)
sts.maxStateDeltas = 0
done := make(chan struct{})
go func() {
sts.blockThread()
close(done)
}()
if err := executeStateTransfer(sts, ml, 7, 10, mrls); nil != err {
t.Fatalf("Without deltas case: %s", err)
}
if deltasTransferred {
t.Fatalf("State delta retrieval should not occur during this test")
}
sts.Stop()
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatalf("Timed out waiting for block sync to complete")
}
for i := uint64(0); i <= 7; i++ {
if _, err := ml.GetBlockByNumber(i); err != nil {
t.Errorf("Expected block %d but got error %s", i, err)
}
}
}
func TestCatchupSyncBlocksErrors(t *testing.T) {
for _, failureType := range AllFailures {
mrls := createRemoteLedgers(1, 3)
// Test from blockheight of 1 with valid genesis block
// Timeouts of 10 milliseconds
filter, result := makeSimpleFilter(SyncBlocks, failureType)
ml := NewMockLedger(mrls, filter, t)
ml.PutBlock(0, SimpleGetBlock(0))
sts := newTestStateTransfer(ml, mrls)
defer sts.Stop()
sts.BlockRequestTimeout = 10 * time.Millisecond
if err := executeStateTransfer(sts, ml, 7, 10, mrls); nil != err {
t.Fatalf("SyncBlocksErrors %s case: %s", failureType, err)