Commit d632e74a authored by Srinivasan Muralidharan's avatar Srinivasan Muralidharan
Browse files

FAB-437 bare-minimum, end to end skeleton using solo



The main purpose of this checking
  . show commit followed by simulation in action
  . users can get a feel for the flow of the protocol across
    the different legs of the end-end path
  . identify key areas for attention (esp. grep for
    "!!IMPORTANT!!")

"deploy and "invoke" Chaincode commands from CLI will also
convert a successful proposal response into a transaction
and send it to the Orderer (if configured in core.yaml).

"query" is removed from CLI. Invoke can also return values
now in ProposalResponse.Response.Payload.

REST calls should not be affected and should work with
old ledger.

See core.yaml for default orderer setup.

This also introduces a stop-gap "committer" whose only
task is to commit blocks from the orderer.

To test :
1. Terminal 1 - run the "solo" orderer
   cd fabric/orderer
   go build
   ./orderer

2. Terminal 2 - run the peer
   peer node start 1>/tmp/peer.out 2>&1

3. Terminal 3 - deploy and invoke take usual params
   peer chaincode deploy ... 1>/tmp/out 2>&1
   peer chaincode invoke ... 1>/tmp/out 2>&1

/tmp/peer.out and /tmp/out will contain tell tale signs
of the round trip in action.

Change-Id: Ic1aa31993fc57ce145c39967d4d682fd2dc5704b
Signed-off-by: default avatarSrinivasan Muralidharan <muralisr@us.ibm.com>
parent 423334fc
......@@ -34,6 +34,7 @@ import (
"github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/crypto"
"github.com/hyperledger/fabric/core/ledger"
ledgernext "github.com/hyperledger/fabric/core/ledgernext"
pb "github.com/hyperledger/fabric/protos"
)
......@@ -490,23 +491,37 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, t *pb.
}
//hopefully we are restarting from existing image and the deployed transaction exists
depTx, ledgerErr = ledger.GetTransactionByID(chaincode)
if ledgerErr != nil {
return cID, cMsg, fmt.Errorf("Could not get deployment transaction for %s - %s", chaincode, ledgerErr)
}
if depTx == nil {
return cID, cMsg, fmt.Errorf("deployment transaction does not exist for %s", chaincode)
}
if nil != chaincodeSupport.secHelper {
var err error
depTx, err = chaincodeSupport.secHelper.TransactionPreExecution(depTx)
// Note that t is now decrypted and is a deep clone of the original input t
if nil != err {
return cID, cMsg, fmt.Errorf("failed tx preexecution%s - %s", chaincode, err)
var depPayload []byte
if _, ok := context.Value(TXSimulatorKey).(ledgernext.TxSimulator); ok {
depPayload, ledgerErr = getCDSFromLCCC(context, string(DefaultChain), chaincode)
if ledgerErr != nil {
return cID, cMsg, fmt.Errorf("Could not get deployment transaction from LCCC for %s - %s", chaincode, ledgerErr)
}
} else {
depTx, ledgerErr = ledger.GetTransactionByID(chaincode)
if ledgerErr != nil {
return cID, cMsg, fmt.Errorf("Could not get deployment transaction for %s - %s", chaincode, ledgerErr)
}
if depTx == nil {
return cID, cMsg, fmt.Errorf("deployment transaction does not exist for %s", chaincode)
}
if nil != chaincodeSupport.secHelper {
var err error
depTx, err = chaincodeSupport.secHelper.TransactionPreExecution(depTx)
// Note that t is now decrypted and is a deep clone of the original input t
if nil != err {
return cID, cMsg, fmt.Errorf("failed tx preexecution%s - %s", chaincode, err)
}
}
depPayload = depTx.Payload
}
if depPayload == nil {
return cID, cMsg, fmt.Errorf("failed to get deployment payload %s - %s", chaincode, ledgerErr)
}
//Get lang from original deployment
err := proto.Unmarshal(depTx.Payload, cds)
err = proto.Unmarshal(depPayload, cds)
if err != nil {
return cID, cMsg, fmt.Errorf("failed to unmarshal deployment transactions for %s - %s", chaincode, err)
}
......
......@@ -38,6 +38,10 @@ func createTx(typ pb.Transaction_Type, ccname string, args [][]byte) (*pb.Transa
return tx, nil
}
func getCDSFromLCCC(ctxt context.Context, chainID string, chaincodeID string) ([]byte, error) {
return ExecuteChaincode(ctxt, pb.Transaction_CHAINCODE_INVOKE, string(DefaultChain), "lccc", [][]byte{[]byte("getdepspec"), []byte(chainID), []byte(chaincodeID)})
}
// ExecuteChaincode executes a given chaincode given chaincode name and arguments
func ExecuteChaincode(ctxt context.Context, typ pb.Transaction_Type, chainname string, ccname string, args [][]byte) ([]byte, error) {
var tx *pb.Transaction
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package committer
// Committer is the interface supported by committers
// The only committer is noopssinglechain committer.
// The interface is intentionally sparse with the sole
// aim of "leave-everything-to-the-committer-for-now".
// As we solidify the bootstrap process and as we add
// more support (such as Gossip) this interface will
// change
type Committer interface {
//Start registers and opens communications
Start() error
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package noopssinglechain
import (
"fmt"
"time"
"github.com/op/go-logging"
"github.com/spf13/viper"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/ledgernext/kvledger"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"golang.org/x/net/context"
"google.golang.org/grpc"
pb "github.com/hyperledger/fabric/protos"
)
//--------!!!IMPORTANT!!-!!IMPORTANT!!-!!IMPORTANT!!---------
// This Orderer is based off fabric/orderer/sample_clients/
// deliver_stdout/client.go. This is used merely to complete
// the loop for the "skeleton" path so we can reason about and
// modify committer component more effectively using code.
var logger *logging.Logger // package-level logger
func init() {
logger = logging.MustGetLogger("noopssinglechain")
}
type deliverClient struct {
client ab.AtomicBroadcast_DeliverClient
windowSize uint64
unAcknowledged uint64
solo *solo
}
func newDeliverClient(client ab.AtomicBroadcast_DeliverClient, windowSize uint64, solo *solo) *deliverClient {
return &deliverClient{client: client, windowSize: windowSize, solo: solo}
}
func (r *deliverClient) seekOldest() error {
return r.client.Send(&ab.DeliverUpdate{
Type: &ab.DeliverUpdate_Seek{
Seek: &ab.SeekInfo{
Start: ab.SeekInfo_OLDEST,
WindowSize: r.windowSize,
},
},
})
}
func (r *deliverClient) seekNewest() error {
return r.client.Send(&ab.DeliverUpdate{
Type: &ab.DeliverUpdate_Seek{
Seek: &ab.SeekInfo{
Start: ab.SeekInfo_NEWEST,
WindowSize: r.windowSize,
},
},
})
}
func (r *deliverClient) seek(blockNumber uint64) error {
return r.client.Send(&ab.DeliverUpdate{
Type: &ab.DeliverUpdate_Seek{
Seek: &ab.SeekInfo{
Start: ab.SeekInfo_SPECIFIED,
SpecifiedNumber: blockNumber,
WindowSize: r.windowSize,
},
},
})
}
// constructBlock constructs a block from a list of transactions
func (r *deliverClient) constructBlock(transactions []*pb.Transaction2) *pb.Block2 {
block := &pb.Block2{}
for _, tx := range transactions {
txBytes, _ := proto.Marshal(tx)
block.Transactions = append(block.Transactions, txBytes)
}
return block
}
// commit the received transaction
func (r *deliverClient) commit(txs []*pb.Transaction2) error {
rawblock := r.constructBlock(txs)
lgr := kvledger.GetLedger(r.solo.ledger)
var err error
if _, _, err = lgr.RemoveInvalidTransactionsAndPrepare(rawblock); err != nil {
return err
}
if err = lgr.Commit(); err != nil {
return err
}
return err
}
func (r *deliverClient) readUntilClose() {
for {
msg, err := r.client.Recv()
if err != nil {
return
}
switch t := msg.Type.(type) {
case *ab.DeliverResponse_Error:
if t.Error == ab.Status_SUCCESS {
fmt.Println("ERROR! Received success in error field")
return
}
fmt.Println("Got error ", t)
case *ab.DeliverResponse_Block:
txs := []*pb.Transaction2{}
for _, d := range t.Block.Messages {
if d != nil && d.Data != nil {
tx := &pb.Transaction2{}
if err = proto.Unmarshal(d.Data, tx); err != nil {
fmt.Printf("Error getting tx(%s)...dropping block\n", err)
continue
}
txs = append(txs, tx)
}
}
if err = r.commit(txs); err != nil {
fmt.Printf("Got error while committing(%s)\n", err)
} else {
fmt.Printf("Commit success, created a block!\n", err)
}
r.unAcknowledged++
if r.unAcknowledged >= r.windowSize/2 {
fmt.Println("Sending acknowledgement")
err = r.client.Send(&ab.DeliverUpdate{Type: &ab.DeliverUpdate_Acknowledgement{Acknowledgement: &ab.Acknowledgement{Number: t.Block.Number}}})
if err != nil {
return
}
r.unAcknowledged = 0
}
default:
fmt.Println("Received unknown: ", t)
return
}
}
}
type solo struct {
//ledger to commit to
ledger string
//orderer to connect to
orderer string
//client of the orderer
client *deliverClient
}
const defaultTimeout = time.Second * 3
//Start establishes communication with an orders
func (s *solo) Start() error {
if s.client != nil {
return fmt.Errorf("Client to (%s) exists", s.orderer)
}
var opts []grpc.DialOption
opts = append(opts, grpc.WithInsecure())
opts = append(opts, grpc.WithTimeout(defaultTimeout))
opts = append(opts, grpc.WithBlock())
conn, err := grpc.Dial(s.orderer, opts...)
if err != nil {
return err
}
var abc ab.AtomicBroadcast_DeliverClient
abc, err = ab.NewAtomicBroadcastClient(conn).Deliver(context.TODO())
if err != nil {
return err
}
s.client = newDeliverClient(abc, 10, s)
if err = s.client.seekOldest(); err != nil {
return err
}
s.client.readUntilClose()
return err
}
// NewCommitter constructs a committer object if not already present
func NewCommitter() committer.Committer {
if viper.GetBool("peer.committer.enabled") {
//TODO ledger needs to be configured, for now just the default
ledger := string(chaincode.DefaultChain)
orderer := viper.GetString("peer.committer.ledger.orderer")
logger.Infof("Creating committer for single noops endorser")
return &solo{ledger: ledger, orderer: orderer}
}
logger.Infof("Committer disabled")
return nil
}
......@@ -126,6 +126,9 @@ func (e *Endorser) callChaincode(ctxt context.Context, cis *pb.ChaincodeInvocati
if txsim, err = e.getTxSimulator(chainName); err != nil {
return nil, nil, err
}
defer txsim.Done()
ctxt = context.WithValue(ctxt, chaincode.TXSimulatorKey, txsim)
b, err = chaincode.ExecuteChaincode(ctxt, pb.Transaction_CHAINCODE_INVOKE, chainName, cis.ChaincodeSpec.ChaincodeID.Name, cis.ChaincodeSpec.CtorMsg.Args)
......@@ -215,7 +218,7 @@ func (e *Endorser) ProcessProposal(ctx context.Context, prop *pb.Proposal) (*pb.
//1 -- simulate
//TODO what do we do with response ? We need it for Invoke responses for sure
//Which field in PayloadResponse will carry return value ?
_, simulationResult, err := e.simulateProposal(ctx, prop)
payload, simulationResult, err := e.simulateProposal(ctx, prop)
if err != nil {
return &pb.ProposalResponse{Response: &pb.Response2{Status: 500, Message: err.Error()}}, err
}
......@@ -237,7 +240,7 @@ func (e *Endorser) ProcessProposal(ctx context.Context, prop *pb.Proposal) (*pb.
}
//TODO when we have additional field in response, use "resp" bytes from the simulation
resp := &pb.Response2{Status: 200, Message: "Proposal accepted"}
resp := &pb.Response2{Status: 200, Message: "Proposal accepted", Payload: payload}
return &pb.ProposalResponse{Response: resp, ActionBytes: actionBytes, Endorsement: endorsement}, nil
}
......@@ -51,6 +51,9 @@ const (
//GETCCINFO get chaincode
GETCCINFO = "getid"
//GETDEPSPEC get ChaincodeDeploymentSpec
GETDEPSPEC = "getdepspec"
)
//---------- the LCCC -----------------
......@@ -333,7 +336,7 @@ func (lccc *LifeCycleSysCC) executeDeploy(stub shim.ChaincodeStubInterface, chai
*}
**/
_, err = lccc.createChaincode(stub, chainname, cds.ChaincodeSpec.ChaincodeID.Name, cds.CodePackage)
_, err = lccc.createChaincode(stub, chainname, cds.ChaincodeSpec.ChaincodeID.Name, code)
return err
}
......@@ -383,7 +386,7 @@ func (lccc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) ([]byte, er
err := lccc.executeDeploy(stub, chainname, code)
return nil, err
case GETCCINFO:
case GETCCINFO, GETDEPSPEC:
if len(args) != 3 {
return nil, InvalidArgsLenErr(len(args))
}
......@@ -398,7 +401,10 @@ func (lccc *LifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) ([]byte, er
return nil, TXNotFoundErr(chain + "/" + ccname)
}
return []byte(ccrow.Columns[1].GetString_()), nil
if function == GETCCINFO {
return []byte(ccrow.Columns[1].GetString_()), nil
}
return ccrow.Columns[2].GetBytes(), nil
}
return nil, InvalidFunctionErr(function)
......
......@@ -218,8 +218,9 @@ func TestRetryFailedDeploy(t *testing.T) {
t.FailNow()
}
args = [][]byte{[]byte(GETCCINFO), []byte("test"), []byte(cds.ChaincodeSpec.ChaincodeID.Name)}
if _, err := stub.MockInvoke("1", args); err != nil {
//get the deploymentspec
args = [][]byte{[]byte(GETDEPSPEC), []byte("test"), []byte(cds.ChaincodeSpec.ChaincodeID.Name)}
if depspec, err := stub.MockInvoke("1", args); err != nil || depspec == nil {
t.FailNow()
}
}
......@@ -178,6 +178,12 @@ func chaincodeInvokeOrQuery(cmd *cobra.Command, args []string, invoke bool) (err
return fmt.Errorf("Error endorsing %s: %s\n", chainFuncName, err)
}
if proposalResp != nil {
if err = sendTransaction(proposalResp); err != nil {
return fmt.Errorf("Error sending transaction %s: %s\n", chainFuncName, err)
}
}
logger.Infof("Invoke result: %v", proposalResp)
} else {
//for now let's continue to use Query with devops
......@@ -259,3 +265,39 @@ func checkChaincodeCmdParams(cmd *cobra.Command) error {
return nil
}
//sendTransactions converts a ProposalResponse and sends it as
//a Transaction to the orderer
func sendTransaction(presp *pb.ProposalResponse) error {
var orderer string
if viper.GetBool("peer.committer.enabled") {
orderer = viper.GetString("peer.committer.ledger.orderer")
}
if orderer == "" {
return nil
}
var err error
if presp != nil {
if presp.Response.Status != 200 {
return fmt.Errorf("Proposal response erred with status %d", presp.Response.Status)
}
//create a tx with ActionBytes and Endorsements and send it out
if presp.ActionBytes != nil {
var b []byte
tx := &pb.Transaction2{}
tx.EndorsedActions = []*pb.EndorsedAction{
&pb.EndorsedAction{ActionBytes: presp.ActionBytes, Endorsements: []*pb.Endorsement{presp.Endorsement}, ProposalBytes: []byte{}}}
b, err = proto.Marshal(tx)
if err != nil {
return err
}
if b != nil {
err = Send(orderer, b)
}
}
}
return err
}
......@@ -23,6 +23,7 @@ import (
"github.com/hyperledger/fabric/core"
"github.com/hyperledger/fabric/peer/common"
pb "github.com/hyperledger/fabric/protos"
"github.com/spf13/cobra"
)
......@@ -42,43 +43,50 @@ var chaincodeDeployCmd = &cobra.Command{
}
//deploy the command via Endorser
func deploy(cmd *cobra.Command) error {
func deploy(cmd *cobra.Command) (*pb.ProposalResponse, error) {
spec, err := getChaincodeSpecification(cmd)
if err != nil {
return err
return nil, err
}
ctxt := context.Background()
cds, err := core.GetChaincodeBytes(ctxt, spec)
if err != nil {
return fmt.Errorf("Error getting chaincode code %s: %s", chainFuncName, err)
return nil, fmt.Errorf("Error getting chaincode code %s: %s", chainFuncName, err)
}
endorserClient, err := common.GetEndorserClient(cmd)
if err != nil {
return fmt.Errorf("Error getting endorser client %s: %s", chainFuncName, err)
return nil, fmt.Errorf("Error getting endorser client %s: %s", chainFuncName, err)
}
prop, err := getDeployProposal(cds)
if err != nil {
return fmt.Errorf("Error creating proposal %s: %s\n", chainFuncName, err)
return nil, fmt.Errorf("Error creating proposal %s: %s\n", chainFuncName, err)
}
proposalResult, err := endorserClient.ProcessProposal(ctxt, prop)
proposalResponse, err := endorserClient.ProcessProposal(ctxt, prop)
if err != nil {
return fmt.Errorf("Error endorsing %s: %s\n", chainFuncName, err)
return nil, fmt.Errorf("Error endorsing %s: %s\n", chainFuncName, err)
}
logger.Infof("Deploy(endorser) result: %v", proposalResult)
return nil
logger.Infof("Deploy(endorser) result: %v", proposalResponse)
return proposalResponse, nil
}
// chaincodeDeploy deploys the chaincode. On success, the chaincode name
// (hash) is printed to STDOUT for use by subsequent chaincode-related CLI
// commands.
func chaincodeDeploy(cmd *cobra.Command, args []string) error {
err := deploy(cmd)
presult, err := deploy(cmd)
if err != nil {
return err
}
if presult != nil {
err = sendTransaction(presult)
}
return err
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package chaincode
//--------!!!IMPORTANT!!-!!IMPORTANT!!-!!IMPORTANT!!---------
// This Orderer client is based off fabric/orderer/sample_clients/
// broadcast_timestamp/client.go
// It is temporary and will go away from CLI when SDK implements
// interactions for the V1 architecture
//-------------------------------------------------------------
import (
"fmt"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
type broadcastClient struct {
client ab.AtomicBroadcast_BroadcastClient
}
// newBroadcastClient creates a simple instance of the broadcastClient interface
func newBroadcastClient(client ab.AtomicBroadcast_BroadcastClient) *broadcastClient {
return &broadcastClient{client: client}
}
func (s *broadcastClient) broadcast(transaction []byte) error {
return s.client.Send(&ab.BroadcastMessage{transaction})
}
//Send data to solo orderer
func Send(serverAddr string, data []byte) error {
conn, err := grpc.Dial(serverAddr, grpc.WithInsecure())
defer conn.Close()
if err != nil {
return fmt.Errorf("Error connecting: %s", err)
}
client, err := ab.NewAtomicBroadcastClient(conn).Broadcast(context.TODO())
if err != nil {
return fmt.Errorf("Error connecting: %s", err)
}
s := newBroadcastClient(client)
s.broadcast(data)
return nil
}
......@@ -42,5 +42,5 @@ var chaincodeQueryCmd = &cobra.Command{
}
func chaincodeQuery(cmd *cobra.Command, args []string) error {
return chaincodeInvokeOrQuery(cmd, args, false)
return fmt.Errorf("chaincode query is deprecated and should not be used")
}