Commit 7a5265eb authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Adjust logging positions

parent 82acef52
......@@ -8,6 +8,7 @@ package deliver
import (
"context"
"fmt"
"io"
"math"
"strconv"
......@@ -19,6 +20,7 @@ import (
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/comm"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
......@@ -320,6 +322,10 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
if config.Log.Ordering {
fmt.Printf("ord1,%d,%d\n", time.Now().UnixNano(), block.Header.Number)
}
if err := srv.SendBlockResponse(block); err != nil {
logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err)
return cb.Status_INTERNAL_SERVER_ERROR, err
......
......@@ -3,17 +3,15 @@ package config
import (
"encoding/json"
"errors"
"fmt"
"os"
)
var Log LogConfig
type LogConfig struct {
Validation bool
Endorsement bool
Validation bool
Ordering bool
FullCommit bool
}
func ReadConfig(path string) error {
......@@ -31,13 +29,11 @@ func ReadConfig(path string) error {
lc := LogConfig{}
if err := dec.Decode(&lc); err != nil {
Log = LogConfig{false, false, false, false}
Log = LogConfig{false, false, false}
return err
}
Log = lc
fmt.Printf("Log config loaded: %t\n", Log)
return nil
}
......@@ -2,5 +2,4 @@
"Validation": true,
"Endorsement": false,
"Ordering": false,
"FullCommit": false
}
\ No newline at end of file
......@@ -8,14 +8,13 @@ package committer
import (
"fmt"
"strconv"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/hyperledger/fabric_org/config"
"github.com/pkg/errors"
)
......@@ -99,22 +98,8 @@ func (lc *LedgerCommitter) CommitWithPvtData(blockAndPvtData *ledger.BlockAndPvt
return err
}
if config.Log.FullCommit {
/*
txData := blockAndPvtData.Block.Data.Data[0]
msg, _ := utils.UnmarshalEnvelope(txData)
payload, _ := utils.UnmarshalPayload(msg.Payload)
hdr := payload.GetHeader()
chdr, _ := utils.UnmarshalChannelHeader(hdr.ChannelHeader)
firstTransaction := chdr.TxId
*/
//fmt.Printf("{\"ts\":" + strconv.FormatInt(time.Now().UnixNano(), 10) + ",\"msg\":\"FABRIC PERF Validation\",\"block\":\"" + strconv.Itoa(int(blockAndPvtData.Block.GetHeader().GetNumber())) + "\",\"STEP\":3}\n")
fmt.Printf(strconv.FormatInt(time.Now().UnixNano(), 10) + "," + strconv.Itoa(int(blockAndPvtData.Block.GetHeader().GetNumber())) + " (Step 3)\n")
}
if config.Log.Validation {
fmt.Println("val1," + strconv.Itoa(int(blockAndPvtData.Block.GetHeader().GetNumber())) + "," + strconv.FormatInt(time.Now().UnixNano(), 10))
fmt.Printf("val3,%d,%d\n", time.Now().UnixNano(), blockAndPvtData.Block.GetHeader().GetNumber())
}
return nil
......
......@@ -341,59 +341,34 @@ func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *bl
return
}
// Validate tx with vscc and policy
logger.Debug("Validating transaction vscc tx validate")
err, cde := v.Vscc.VSCCValidateTx(tIdx, payload, d, block)
if err != nil {
logger.Errorf("VSCCValidateTx for transaction txId = %s returned error: %s", txID, err)
switch err.(type) {
case *commonerrors.VSCCExecutionFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
case *commonerrors.VSCCInfoLookupFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
default:
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: cde,
}
return
}
}
strVSCC := os.Getenv("STREAMCHAIN_VSCC")
var strVSCC = os.Getenv("STREAMCHAIN_VSCC")
if strVSCC == "" || strVSCC == "true" {
// Validate tx with vscc and policy
logger.Debug("Validating transaction vscc tx validate")
err, cde := v.Vscc.VSCCValidateTx(tIdx, payload, d, block)
logger.Errorf("VSCCValidateTx for transaction txId = %s returned error: %s", txID, err)
switch err.(type) {
case *commonerrors.VSCCExecutionFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
case *commonerrors.VSCCInfoLookupFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
default:
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: cde,
if err != nil {
logger.Errorf("VSCCValidateTx for transaction txId = %s returned error: %s", txID, err)
switch err.(type) {
case *commonerrors.VSCCExecutionFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
case *commonerrors.VSCCInfoLookupFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
default:
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: cde,
}
return
}
return
}
}
......
......@@ -18,6 +18,7 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/chaincode/platforms"
"github.com/hyperledger/fabric/core/chaincode/shim"
"github.com/hyperledger/fabric/core/common/ccprovider"
......@@ -27,7 +28,6 @@ import (
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/transientstore"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/hyperledger/fabric_org/config"
"github.com/pkg/errors"
"go.uber.org/zap"
)
......@@ -447,7 +447,7 @@ func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedPro
}
if config.Log.Endorsement {
fmt.Printf("%d,%f\n", time.Now().UnixNano()/1000000, time.Since(startTime).Seconds()*1000)
fmt.Printf("end,%d,%d\n", time.Now().UnixNano()/1000000, time.Since(startTime).Nanoseconds())
}
endorserLogger.Debug("Exit: request from", addr)
......
......@@ -8,13 +8,13 @@ package kvledger
import (
"fmt"
"strconv"
"sync"
"time"
"github.com/hyperledger/fabric/common/flogging"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/core/ledger/confighistory"
......@@ -28,7 +28,6 @@ import (
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric_org/config"
"github.com/pkg/errors"
)
......@@ -307,9 +306,9 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
return err
}
if config.Log.FullCommit {
//fmt.Printf("{\"ts\":" + strconv.FormatInt(time.Now().UnixNano(), 10) + ",\"msg\":\"FABRIC PERF Validation\",\"block\":" + strconv.Itoa(int(blockNo)) + ",\"STEP\":2}\n")
fmt.Printf(strconv.FormatInt(time.Now().UnixNano(), 10) + "," + strconv.Itoa(int(blockNo)) + " (Step 2)\n")
if config.Log.Validation {
fmt.Printf("val2,%d,%d\n", time.Now().UnixNano(), blockNo)
}
elapsedBlockProcessing := time.Since(startBlockProcessing)
......
......@@ -10,12 +10,12 @@ import (
"bytes"
"encoding/hex"
"fmt"
"strconv"
"time"
"github.com/golang/protobuf/proto"
vsccErrors "github.com/hyperledger/fabric/common/errors"
util2 "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/committer"
"github.com/hyperledger/fabric/core/committer/txvalidator"
"github.com/hyperledger/fabric/core/common/privdata"
......@@ -30,7 +30,6 @@ import (
"github.com/hyperledger/fabric/protos/peer"
transientstore2 "github.com/hyperledger/fabric/protos/transientstore"
"github.com/hyperledger/fabric/protos/utils"
"github.com/hyperledger/fabric_org/config"
"github.com/pkg/errors"
"github.com/spf13/viper"
)
......@@ -155,36 +154,8 @@ func (c *coordinator) VerifyBlock(block *common.Block, privateDataSets util.PvtD
logger.Infof("[%s] Received block [%d] from buffer", c.ChainID, block.Header.Number)
if config.Log.FullCommit {
//fmt.Printf("{\"ts\":" + strconv.FormatInt(time.Now().UnixNano(), 10) + ",\"msg\":\"FABRIC PERF Validation\",\"block\":" + strconv.Itoa(int(block.Header.Number)) + ",\"STEP\":0}\n")
fmt.Printf(strconv.FormatInt(time.Now().UnixNano(), 10) + "," + strconv.Itoa(int(block.Header.Number)) + " (Step 0)\n")
}
if config.Log.Validation {
/*
txcount := -1
if block != nil && block.Metadata != nil && block.Metadata.Metadata != nil && block.Data != nil && block.Data.Data != nil {
txcount = 0
txsFilter := ha.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
if len(txsFilter) == 0 {
txsFilter = ha.NewTxValidationFlags(len(block.Data.Data))
block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txsFilter
}
for txIndex := range block.Data.Data {
if txsFilter.IsInvalid(txIndex) {
continue
}
txcount++
}
}
*/
fmt.Println("val0," + strconv.Itoa(int(block.Header.Number)) + "," + strconv.FormatInt(time.Now().UnixNano(), 10))
//logs.WriteString("{\"ts\":" + strconv.FormatInt(time.Now().UnixNano(), 10) + ",\"msg\":\"FABRIC PERF Validation\",\"block\":" + strconv.Itoa(int(block.Header.Number)) + ",\"transactions\":" + strconv.Itoa(txcount) + ",\"STEP\":0}\n")
fmt.Printf("val0,%d,%d\n", time.Now().UnixNano(), block.Header.Number)
}
logger.Debugf("[%s] Validating block [%d]", c.ChainID, block.Header.Number)
......@@ -199,9 +170,8 @@ func (c *coordinator) VerifyBlock(block *common.Block, privateDataSets util.PvtD
// StoreBlock stores block with private data into the ledger
func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error {
if config.Log.FullCommit {
//fmt.Printf("{\"ts\":" + strconv.FormatInt(time.Now().UnixNano(), 10) + ",\"msg\":\"FABRIC PERF Validation\",\"block\":" + strconv.Itoa(int(block.Header.Number)) + ",\"STEP\":1}\n")
fmt.Printf(strconv.FormatInt(time.Now().UnixNano(), 10) + "," + strconv.Itoa(int(block.Header.Number)) + " (Step 1)\n")
if config.Log.Validation {
fmt.Printf("val1,%d,%d\n", time.Now().UnixNano(), block.Header.Number)
}
blockAndPvtData := &ledger.BlockAndPvtData{
......
......@@ -15,7 +15,6 @@ import (
"time"
pb "github.com/golang/protobuf/proto"
vsccErrors "github.com/hyperledger/fabric/common/errors"
"github.com/hyperledger/fabric/gossip/api"
"github.com/hyperledger/fabric/gossip/comm"
common2 "github.com/hyperledger/fabric/gossip/common"
......@@ -55,6 +54,17 @@ const (
enqueueRetryInterval = time.Millisecond * 100
)
// Configuration keeps state transfer configuration parameters
type Configuration struct {
AntiEntropyInterval time.Duration
AntiEntropyStateResponseTimeout time.Duration
AntiEntropyBatchSize uint64
MaxBlockDistance int
AntiEntropyMaxRetries int
ChannelBufferSize int
EnableStateTransfer bool
}
// GossipAdapter defines gossip/communication required interface for state provider
type GossipAdapter interface {
// Send sends a message to remote peers
......@@ -151,10 +161,76 @@ type GossipStateProviderImpl struct {
once sync.Once
stateTransferActive int32
requestValidator *stateRequestValidator
blockingMode bool
config *Configuration
}
var logger = util.GetLogger(util.StateLogger, "")
// stateRequestValidator facilitates validation of the state request messages
type stateRequestValidator struct {
}
// validate checks for RemoteStateRequest message validity
func (v *stateRequestValidator) validate(request *proto.RemoteStateRequest, batchSize uint64) error {
if request.StartSeqNum > request.EndSeqNum {
return errors.Errorf("Invalid sequence interval [%d...%d).", request.StartSeqNum, request.EndSeqNum)
}
if request.EndSeqNum > batchSize+request.StartSeqNum {
return errors.Errorf("Requesting blocks range [%d-%d) greater than configured allowed"+
" (%d) batching size for anti-entropy.", request.StartSeqNum, request.EndSeqNum, batchSize)
}
return nil
}
// readConfiguration reading state configuration
func readConfiguration() *Configuration {
config := &Configuration{
AntiEntropyInterval: defAntiEntropyInterval,
AntiEntropyStateResponseTimeout: defAntiEntropyStateResponseTimeout,
AntiEntropyBatchSize: defAntiEntropyBatchSize,
MaxBlockDistance: defMaxBlockDistance,
AntiEntropyMaxRetries: defAntiEntropyMaxRetries,
ChannelBufferSize: defChannelBufferSize,
EnableStateTransfer: true,
}
if viper.IsSet("peer.gossip.state.checkInterval") {
config.AntiEntropyInterval = viper.GetDuration("peer.gossip.state.checkInterval")
}
if viper.IsSet("peer.gossip.state.responseTimeout") {
config.AntiEntropyStateResponseTimeout = viper.GetDuration("peer.gossip.state.responseTimeout")
}
if viper.IsSet("peer.gossip.state.batchSize") {
config.AntiEntropyBatchSize = uint64(viper.GetInt("peer.gossip.state.batchSize"))
}
if viper.IsSet("peer.gossip.state.blockBufferSize") {
config.MaxBlockDistance = viper.GetInt("peer.gossip.state.blockBufferSize")
}
if viper.IsSet("peer.gossip.state.maxRetries") {
config.AntiEntropyMaxRetries = viper.GetInt("peer.gossip.state.maxRetries")
}
if viper.IsSet("peer.gossip.state.channelSize") {
config.ChannelBufferSize = viper.GetInt("peer.gossip.state.channelSize")
}
if viper.IsSet("peer.gossip.state.enabled") {
config.EnableStateTransfer = viper.GetBool("peer.gossip.state.enabled")
}
return config
}
// NewGossipStateProvider creates state provider with coordinator instance
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider {
......@@ -201,6 +277,9 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
return nil
}
// Reading state configuration
config := readConfiguration()
s := &GossipStateProviderImpl{
// MessageCryptoService
mediator: services,
......@@ -219,15 +298,19 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
ledger: ledger,
stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateResponseCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stopCh: make(chan struct{}, 1),
stateTransferActive: 0,
once: sync.Once{},
requestValidator: &stateRequestValidator{},
config: config,
}
logger.Infof("Updating metadata information, "+
......@@ -241,8 +324,10 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
go s.listen()
// Deliver in order messages into the incoming channel
go s.deliverPayloads()
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
if s.config.EnableStateTransfer {
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
}
// Taking care of state request messages
go s.processStateRequests()
......@@ -349,7 +434,7 @@ func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage) {
incoming := msg.GetGossipMessage()
if incoming.GetStateRequest() != nil {
if len(s.stateRequestCh) < defChannelBufferSize {
if len(s.stateRequestCh) < s.config.ChannelBufferSize {
// Forward state request to the channel, if there are too
// many message of state request ignore to avoid flooding.
s.stateRequestCh <- msg
......@@ -386,15 +471,8 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
}
request := msg.GetGossipMessage().GetStateRequest()
batchSize := request.EndSeqNum - request.StartSeqNum
if batchSize > defAntiEntropyBatchSize {
logger.Errorf("Requesting blocks batchSize size (%d) greater than configured allowed"+
" (%d) batching for anti-entropy. Ignoring request...", batchSize, defAntiEntropyBatchSize)
return
}
if request.StartSeqNum > request.EndSeqNum {
logger.Errorf("Invalid sequence interval [%d...%d], ignoring request...", request.StartSeqNum, request.EndSeqNum)
if err := s.requestValidator.validate(request, s.config.AntiEntropyBatchSize); err != nil {
logger.Errorf("State request validation failed, %s. Ignoring request...", err)
return
}
......@@ -580,8 +658,6 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
MaxNumOutstanding, _ = strconv.Atoi(strPipeline)
}
logger.Errorf("STREAM CHAIN Pipelining ", pipeline, MaxNumOutstanding)
var inChans []chan *PipelineData
var outChans []chan *PipelineData
inChans = make([]chan *PipelineData, MaxNumOutstanding)
......@@ -624,13 +700,6 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
continue
}
}
if err := s.commitBlock(rawBlock, p); err != nil {
if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr {
logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr)
return
}
logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
}
job := &PipelineData{rawBlock, p, payload.SeqNum}
......@@ -663,7 +732,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
case <-s.stopCh:
s.stopCh <- struct{}{}
return
case <-time.After(defAntiEntropyInterval):
case <-time.After(s.config.AntiEntropyInterval):
ourHeight, err := s.ledger.LedgerHeight()
if err != nil {
// Unable to read from ledger continue to the next round
......@@ -708,7 +777,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
defer atomic.StoreInt32(&s.stateTransferActive, 0)
for prev := start; prev <= end; {
next := min(end, prev+defAntiEntropyBatchSize)
next := min(end, prev+s.config.AntiEntropyBatchSize)
gossipMsg := s.stateRequestMessage(prev, next)
......@@ -716,7 +785,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
tryCounts := 0
for !responseReceived {
if tryCounts > defAntiEntropyMaxRetries {
if tryCounts > s.config.AntiEntropyMaxRetries {
logger.Warningf("Wasn't able to get blocks in range [%d...%d), after %d retries",
prev, next, tryCounts)
return
......@@ -750,7 +819,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
}
prev = index + 1
responseReceived = true
case <-time.After(defAntiEntropyStateResponseTimeout):
case <-time.After(s.config.AntiEntropyStateResponseTimeout):
case <-s.stopCh:
s.stopCh <- struct{}{}
return
......@@ -836,15 +905,16 @@ func (s *GossipStateProviderImpl) addPayload(payload *proto.Payload, blockingMod
return errors.Wrap(err, "Failed obtaining ledger height")
}
if !blockingMode && payload.SeqNum-height >= defMaxBlockDistance {
if !blockingMode && payload.SeqNum-height >= uint64(s.config.MaxBlockDistance) {
return errors.Errorf("Ledger height is at %d, cannot enqueue block with sequence of %d", height, payload.SeqNum)
}
for blockingMode && s.payloads.Size() > defMaxBlockDistance*2 {
for blockingMode && s.payloads.Size() > s.config.MaxBlockDistance*2 {
time.Sleep(enqueueRetryInterval)
}
s.payloads.Push(payload)
logger.Debugf("Blocks payloads buffer size for channel [%s] is %d blocks", s.chainID, s.payloads.Size())
return nil
}
......
......@@ -7,11 +7,13 @@ SPDX-License-Identifier: Apache-2.0
package broadcast
import (
"fmt"
"io"
"time"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
......@@ -134,6 +136,11 @@ func (mt *MetricsTracker) BeginEnqueue() {
// ProcessMessage validates and enqueues a single message
func (bh *Handler) ProcessMessage(msg *cb.Envelope, addr string) (resp *ab.BroadcastResponse) {
if config.Log.Ordering {
fmt.Printf("ord0,%d,%s\n", time.Now().UnixNano(), addr)
}
tracker := &MetricsTracker{
ChannelID: "unknown",
TxType: "unknown",
......
......@@ -9,8 +9,19 @@ SPDX-License-Identifier: Apache-2.0
// function should be included in this package.
package main
import "github.com/hyperledger/fabric/orderer/common/server"
import (
"fmt"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/orderer/common/server"
)
func main() {
// Read in config
if err := config.ReadConfig("log.json"); err != nil {
fmt.Printf("%s\n", err)
}
server.Main()
}
......@@ -43,6 +43,10 @@ func addFlags(cmd *cobra.Command) {
common.AddOrdererFlags(cmd)
flags := cmd.PersistentFlags()
flags.StringVarP(&transient, "transient", "", "", "Transient map of arguments in JSON encoding")
flags.IntVar(&parallelism, "parallelism", 1, "Number of invocations per thread")
flags.IntVar(&repetitions, "repetitions", 1, "Number of parallel invocations")