Commit 9eb99b3f authored by Srinivasan Muralidharan's avatar Srinivasan Muralidharan
Browse files

FAB-631 WIP - pared down peer for next arch work



Following skeletal end to end flow work, this submit
takes the next steps for Endorser/Committer
  . converts chaincode and endorser to ledgernext
  . removes consensus package
  . chaincode unit tests use ledgernext
  . panics if ledger.GetLedger is called so we
    can catch codepaths that still use that. These
    are mainly core/api and core/peer
  . removes unit tests from core/api and core/ledger
    (to avoid GetLedger calls there)
  . created a minimal core/peernext. core/peer is
    still there for comparison but is not used

Change-Id: I2627e0000e960e1aa66d27ff5ec60a38c4bb7eea
Signed-off-by: default avatarSrinivasan Muralidharan <muralisr@us.ibm.com>
parent ec26cd86
......@@ -113,7 +113,6 @@ behave: behave-deps
linter: gotools
@echo "LINT: Running code checks.."
@echo "Running go vet"
go vet ./consensus/...
go vet ./core/...
go vet ./events/...
go vet ./examples/...
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package consensus
import (
pb "github.com/hyperledger/fabric/protos"
)
// ExecutionConsumer allows callbacks from asycnhronous execution and statetransfer
type ExecutionConsumer interface {
Executed(tag interface{}) // Called whenever Execute completes
Committed(tag interface{}, target *pb.BlockchainInfo) // Called whenever Commit completes
RolledBack(tag interface{}) // Called whenever a Rollback completes
StateUpdated(tag interface{}, target *pb.BlockchainInfo) // Called when state transfer completes, if target is nil, this indicates a failure and a new target should be supplied
}
// Consenter is used to receive messages from the network
// Every consensus plugin needs to implement this interface
type Consenter interface {
RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) error // Called serially with incoming messages from gRPC
ExecutionConsumer
}
// Inquirer is used to retrieve info about the validating network
type Inquirer interface {
GetNetworkInfo() (self *pb.PeerEndpoint, network []*pb.PeerEndpoint, err error)
GetNetworkHandles() (self *pb.PeerID, network []*pb.PeerID, err error)
}
// Communicator is used to send messages to other validators
type Communicator interface {
Broadcast(msg *pb.Message, peerType pb.PeerEndpoint_Type) error
Unicast(msg *pb.Message, receiverHandle *pb.PeerID) error
}
// NetworkStack is used to retrieve network info and send messages
type NetworkStack interface {
Communicator
Inquirer
}
// SecurityUtils is used to access the sign/verify methods from the crypto package
type SecurityUtils interface {
Sign(msg []byte) ([]byte, error)
Verify(peerID *pb.PeerID, signature []byte, message []byte) error
}
// ReadOnlyLedger is used for interrogating the blockchain
type ReadOnlyLedger interface {
GetBlock(id uint64) (block *pb.Block, err error)
GetBlockchainSize() uint64
GetBlockchainInfo() *pb.BlockchainInfo
GetBlockchainInfoBlob() []byte
GetBlockHeadMetadata() ([]byte, error)
}
// LegacyExecutor is used to invoke transactions, potentially modifying the backing ledger
type LegacyExecutor interface {
BeginTxBatch(id interface{}) error
ExecTxs(id interface{}, txs []*pb.Transaction) ([]byte, error)
CommitTxBatch(id interface{}, metadata []byte) (*pb.Block, error)
RollbackTxBatch(id interface{}) error
PreviewCommitTxBatch(id interface{}, metadata []byte) ([]byte, error)
}
// Executor is intended to eventually supplant the old Executor interface
// The problem with invoking the calls directly above, is that they must be coordinated
// with state transfer, to eliminate possible races and ledger corruption
type Executor interface {
Start() // Bring up the resources needed to use this interface
Halt() // Tear down the resources needed to use this interface
Execute(tag interface{}, txs []*pb.Transaction) // Executes a set of transactions, this may be called in succession
Commit(tag interface{}, metadata []byte) // Commits whatever transactions have been executed
Rollback(tag interface{}) // Rolls back whatever transactions have been executed
UpdateState(tag interface{}, target *pb.BlockchainInfo, peers []*pb.PeerID) // Attempts to synchronize state to a particular target, implicitly calls rollback if needed
}
// LedgerManager is used to manipulate the state of the ledger
type LedgerManager interface {
InvalidateState() // Invalidate informs the ledger that it is out of date and should reject queries
ValidateState() // Validate informs the ledger that it is back up to date and should resume replying to queries
}
// StatePersistor is used to store consensus state which should survive a process crash
type StatePersistor interface {
StoreState(key string, value []byte) error
ReadState(key string) ([]byte, error)
ReadStateSet(prefix string) (map[string][]byte, error)
DelState(key string)
}
// Stack is the set of stack-facing methods available to the consensus plugin
type Stack interface {
NetworkStack
SecurityUtils
Executor
LegacyExecutor
LedgerManager
ReadOnlyLedger
StatePersistor
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"strings"
"github.com/op/go-logging"
"github.com/spf13/viper"
"github.com/hyperledger/fabric/consensus"
"github.com/hyperledger/fabric/consensus/noops"
"github.com/hyperledger/fabric/consensus/pbft"
)
var logger *logging.Logger // package-level logger
var consenter consensus.Consenter
func init() {
logger = logging.MustGetLogger("consensus/controller")
}
// NewConsenter constructs a Consenter object if not already present
func NewConsenter(stack consensus.Stack) consensus.Consenter {
plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))
if plugin == "pbft" {
logger.Infof("Creating consensus plugin %s", plugin)
return pbft.GetPlugin(stack)
}
logger.Info("Creating default consensus plugin (noops)")
return noops.GetNoops(stack)
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"github.com/hyperledger/fabric/consensus"
"github.com/hyperledger/fabric/consensus/util/events"
"github.com/hyperledger/fabric/core/peer/statetransfer"
pb "github.com/hyperledger/fabric/protos"
"github.com/op/go-logging"
)
var logger *logging.Logger // package-level logger
func init() {
logger = logging.MustGetLogger("consensus/executor")
}
// PartialStack contains the ledger features required by the executor.Coordinator
type PartialStack interface {
consensus.LegacyExecutor
GetBlockchainInfo() *pb.BlockchainInfo
}
type coordinatorImpl struct {
manager events.Manager // Maintains event thread and sends events to the coordinator
rawExecutor PartialStack // Does the real interaction with the ledger
consumer consensus.ExecutionConsumer // The consumer of this coordinator which receives the callbacks
stc statetransfer.Coordinator // State transfer instance
batchInProgress bool // Are we mid execution batch
skipInProgress bool // Are we mid state transfer
}
// NewCoordinatorImpl creates a new executor.Coordinator
func NewImpl(consumer consensus.ExecutionConsumer, rawExecutor PartialStack, stps statetransfer.PartialStack) consensus.Executor {
co := &coordinatorImpl{
rawExecutor: rawExecutor,
consumer: consumer,
stc: statetransfer.NewCoordinatorImpl(stps),
manager: events.NewManagerImpl(),
}
co.manager.SetReceiver(co)
return co
}
// ProcessEvent is the main event loop for the executor.Coordinator
func (co *coordinatorImpl) ProcessEvent(event events.Event) events.Event {
switch et := event.(type) {
case executeEvent:
logger.Debug("Executor is processing an executeEvent")
if co.skipInProgress {
logger.Error("FATAL programming error, attempted to execute a transaction during state transfer")
return nil
}
if !co.batchInProgress {
logger.Debug("Starting new transaction batch")
co.batchInProgress = true
err := co.rawExecutor.BeginTxBatch(co)
_ = err // TODO This should probably panic, see issue 752
}
co.rawExecutor.ExecTxs(co, et.txs)
co.consumer.Executed(et.tag)
case commitEvent:
logger.Debug("Executor is processing an commitEvent")
if co.skipInProgress {
logger.Error("Likely FATAL programming error, attempted to commit a transaction batch during state transfer")
return nil
}
if !co.batchInProgress {
logger.Error("Likely FATAL programming error, attemted to commit a transaction batch when one does not exist")
return nil
}
_, err := co.rawExecutor.CommitTxBatch(co, et.metadata)
_ = err // TODO This should probably panic, see issue 752
co.batchInProgress = false
info := co.rawExecutor.GetBlockchainInfo()
logger.Debugf("Committed block %d with hash %x to chain", info.Height-1, info.CurrentBlockHash)
co.consumer.Committed(et.tag, info)
case rollbackEvent:
logger.Debug("Executor is processing an rollbackEvent")
if co.skipInProgress {
logger.Error("Programming error, attempted to rollback a transaction batch during state transfer")
return nil
}
if !co.batchInProgress {
logger.Error("Programming error, attempted to rollback a transaction batch which had not started")
return nil
}
err := co.rawExecutor.RollbackTxBatch(co)
_ = err // TODO This should probably panic, see issue 752
co.batchInProgress = false
co.consumer.RolledBack(et.tag)
case stateUpdateEvent:
logger.Debug("Executor is processing a stateUpdateEvent")
if co.batchInProgress {
err := co.rawExecutor.RollbackTxBatch(co)
_ = err // TODO This should probably panic, see issue 752
}
co.skipInProgress = true
info := et.blockchainInfo
for {
err, recoverable := co.stc.SyncToTarget(info.Height-1, info.CurrentBlockHash, et.peers)
if err == nil {
logger.Debug("State transfer sync completed, returning")
co.skipInProgress = false
co.consumer.StateUpdated(et.tag, info)
return nil
}
if !recoverable {
logger.Warningf("State transfer failed irrecoverably, calling back to consumer: %s", err)
co.consumer.StateUpdated(et.tag, nil)
return nil
}
logger.Warningf("State transfer did not complete successfully but is recoverable, trying again: %s", err)
et.peers = nil // Broaden the peers included in recover to all connected
}
default:
logger.Errorf("Unknown event type %s", et)
}
return nil
}
// Commit commits whatever outstanding requests have been executed, it is an error to call this without pending executions
func (co *coordinatorImpl) Commit(tag interface{}, metadata []byte) {
co.manager.Queue() <- commitEvent{tag, metadata}
}
// Execute adds additional executions to the current batch
func (co *coordinatorImpl) Execute(tag interface{}, txs []*pb.Transaction) {
co.manager.Queue() <- executeEvent{tag, txs}
}
// Rollback rolls back the executions from the current batch
func (co *coordinatorImpl) Rollback(tag interface{}) {
co.manager.Queue() <- rollbackEvent{tag}
}
// UpdateState uses the state transfer subsystem to attempt to progress to a target
func (co *coordinatorImpl) UpdateState(tag interface{}, info *pb.BlockchainInfo, peers []*pb.PeerID) {
co.manager.Queue() <- stateUpdateEvent{tag, info, peers}
}
// Start must be called before utilizing the Coordinator
func (co *coordinatorImpl) Start() {
co.stc.Start()
co.manager.Start()
}
// Halt should be called to clean up resources allocated by the Coordinator
func (co *coordinatorImpl) Halt() {
co.stc.Stop()
co.manager.Halt()
}
// Event types
type executeEvent struct {
tag interface{}
txs []*pb.Transaction
}
// Note, this cannot be a simple type alias, in case tag is nil
type rollbackEvent struct {
tag interface{}
}
type commitEvent struct {
tag interface{}
metadata []byte
}
type stateUpdateEvent struct {
tag interface{}
blockchainInfo *pb.BlockchainInfo
peers []*pb.PeerID
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package executor
import (
"bytes"
"fmt"
"testing"
"github.com/hyperledger/fabric/consensus/util/events"
"github.com/op/go-logging"
pb "github.com/hyperledger/fabric/protos"
)
func init() {
logging.SetLevel(logging.DEBUG, "")
}
// -------------------------
//
// Mock consumer
//
// -------------------------
type mockConsumer struct {
ExecutedImpl func(tag interface{}) // Called whenever Execute completes
CommittedImpl func(tag interface{}, target *pb.BlockchainInfo) // Called whenever Commit completes
RolledBackImpl func(tag interface{}) // Called whenever a Rollback completes
StateUpdatedImpl func(tag interface{}, target *pb.BlockchainInfo) // Called when state transfer completes, if target is nil, this indicates a failure and a new target should be supplied
}
func (mock *mockConsumer) Executed(tag interface{}) {
if mock.ExecutedImpl != nil {
mock.ExecutedImpl(tag)
}
}
func (mock *mockConsumer) Committed(tag interface{}, target *pb.BlockchainInfo) {
if mock.CommittedImpl != nil {
mock.CommittedImpl(tag, target)
}
}
func (mock *mockConsumer) RolledBack(tag interface{}) {
if mock.RolledBackImpl != nil {
mock.RolledBackImpl(tag)
}
}
func (mock *mockConsumer) StateUpdated(tag interface{}, target *pb.BlockchainInfo) {
if mock.StateUpdatedImpl != nil {
mock.StateUpdatedImpl(tag, target)
}
}
// -------------------------
//
// Mock rawExecutor
//
// -------------------------
type mockRawExecutor struct {
t *testing.T
curBatch interface{}
curTxs []*pb.Transaction
commitCount uint64
}
func (mock *mockRawExecutor) BeginTxBatch(id interface{}) error {
if mock.curBatch != nil {
e := fmt.Errorf("Attempted to start a new batch without stopping the other")
mock.t.Fatal(e)
return e
}
mock.curBatch = id
return nil
}
func (mock *mockRawExecutor) ExecTxs(id interface{}, txs []*pb.Transaction) ([]byte, error) {
if mock.curBatch != id {
e := fmt.Errorf("Attempted to exec on a different batch")
mock.t.Fatal(e)
return nil, e
}
mock.curTxs = append(mock.curTxs, txs...)
return nil, nil
}
func (mock *mockRawExecutor) CommitTxBatch(id interface{}, meta []byte) (*pb.Block, error) {
if mock.curBatch != id {
e := fmt.Errorf("Attempted to commit a batch which doesn't exist")
mock.t.Fatal(e)
return nil, e
}
mock.commitCount++
return nil, nil
}
func (mock *mockRawExecutor) RollbackTxBatch(id interface{}) error {
if mock.curBatch == nil {
e := fmt.Errorf("Attempted to rollback a batch which doesn't exist")
mock.t.Fatal(e)
return e
}
mock.curTxs = nil
mock.curBatch = nil
return nil
}
func (mock *mockRawExecutor) PreviewCommitTxBatch(id interface{}, meta []byte) ([]byte, error) {
if mock.curBatch != nil {
e := fmt.Errorf("Attempted to preview a batch which doesn't exist")
mock.t.Fatal(e)
return nil, e
}
return nil, nil
}
func (mock *mockRawExecutor) GetBlockchainInfo() *pb.BlockchainInfo {
return &pb.BlockchainInfo{
Height: mock.commitCount,
CurrentBlockHash: []byte(fmt.Sprintf("%d", mock.commitCount)),
PreviousBlockHash: []byte(fmt.Sprintf("%d", mock.commitCount-1)),
}
}
// -------------------------
//
// Mock stateTransfer
//
// -------------------------
type mockStateTransfer struct {
StartImpl func()
StopImpl func()
SyncToTargetImpl func(blockNumber uint64, blockHash []byte, peerIDs []*pb.PeerID) (error, bool)
}
func (mock *mockStateTransfer) Start() {}
func (mock *mockStateTransfer) Stop() {}
func (mock *mockStateTransfer) SyncToTarget(blockNumber uint64, blockHash []byte, peerIDs []*pb.PeerID) (error, bool) {
if mock.SyncToTargetImpl != nil {
return mock.SyncToTargetImpl(blockNumber, blockHash, peerIDs)
}
return nil, false
}
// -------------------------
//
// Mock event manager
//
// -------------------------
type mockEventManager struct {
target events.Receiver
bufferedChannel chan events.Event // This is buffered so that queueing never blocks
}
func (mock *mockEventManager) Start() {}
func (mock *mockEventManager) Halt() {}
func (mock *mockEventManager) Inject(event events.Event) {}
func (mock *mockEventManager) SetReceiver(receiver events.Receiver) {
mock.target = receiver
}
func (mock *mockEventManager) Queue() chan<- events.Event {
return mock.bufferedChannel
}
func (mock *mockEventManager) process() {
for {
select {
case ev := <-mock.bufferedChannel:
events.SendEvent(mock.target, ev)
default:
return
}
}
}
// -------------------------
//
// Util functions
//
// -------------------------
func newMocks(t *testing.T) (*coordinatorImpl, *mockConsumer, *mockRawExecutor, *mockStateTransfer, *mockEventManager) {
mc := &mockConsumer{}
mre := &mockRawExecutor{t: t}
mst := &mockStateTransfer{}
mev := &mockEventManager{bufferedChannel: make(chan events.Event, 100)}
co := &coordinatorImpl{
consumer: mc,
rawExecutor: mre,
stc: mst,
manager: mev,
}
mev.target = co
return co, mc, mre, mst, mev
}
// -------------------------
//
// Actual Tests
//
// -------------------------
// TestNormalExecutes executes 50 transactions, then commits, ensuring that the callbacks are called appropriately