Commit 4d304de4 authored by Lucas Kuhring's avatar Lucas Kuhring
Browse files

Incorperate original streamchain changes

parent 6bb0b6ec
......@@ -60,6 +60,15 @@ func (bh *MSPConfigHandler) ProposeMSP(mspConfig *mspprotos.MSPConfig) (msp.MSP,
if err != nil {
return nil, errors.WithMessage(err, "creating the MSP manager failed")
}
case int32(msp.NOOP):
theMsp, err = msp.New(&msp.NoopNewOpts{NewBaseOpts: msp.NewBaseOpts{Version: bh.version}})
if err != nil {
return nil, errors.WithMessage(err, "creating the MSP manager failed")
}
default:
return nil, errors.New(fmt.Sprintf("Setup error: unsupported msp type %d", mspConfig.Type))
}
......
......@@ -9,6 +9,7 @@ package txvalidator
import (
"context"
"fmt"
"os"
"time"
"github.com/golang/protobuf/proto"
......@@ -367,6 +368,35 @@ func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *bl
}
}
strVSCC := os.Getenv("STREAMCHAIN_VSCC")
if strVSCC == "" || strVSCC == "true" {
// Validate tx with vscc and policy
logger.Debug("Validating transaction vscc tx validate")
err, cde := v.Vscc.VSCCValidateTx(tIdx, payload, d, block)
logger.Errorf("VSCCValidateTx for transaction txId = %s returned error: %s", txID, err)
switch err.(type) {
case *commonerrors.VSCCExecutionFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
case *commonerrors.VSCCInfoLookupFailureError:
results <- &blockValidationResult{
tIdx: tIdx,
err: err,
}
return
default:
results <- &blockValidationResult{
tIdx: tIdx,
validationCode: cde,
}
return
}
}
invokeCC, upgradeCC, err := v.getTxCCInstance(payload)
if err != nil {
logger.Errorf("Get chaincode instance from transaction txId = %s returned error: %+v", txID, err)
......
......@@ -18,7 +18,7 @@ import (
coreUtil "github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/sysccprovider"
"github.com/hyperledger/fabric/core/handlers/validation/api"
validation "github.com/hyperledger/fabric/core/handlers/validation/api"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
......@@ -63,10 +63,10 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
}
/* obtain the list of namespaces we're writing stuff to;
at first, we establish a few facts about this invocation:
1) which namespaces does it write to?
2) does it write to LSCC's namespace?
3) does it write to any cc that cannot be invoked? */
at first, we establish a few facts about this invocation:
1) which namespaces does it write to?
2) does it write to LSCC's namespace?
3) does it write to any cc that cannot be invoked? */
writesToLSCC := false
writesToNonInvokableSCC := false
respPayload, err := utils.GetActionFromEnvelope(envBytes)
......@@ -269,6 +269,7 @@ func (v *VsccValidatorImpl) VSCCValidateTxForCC(ctx *Context) error {
if e, isExecutionError := err.(*validation.ExecutionFailureError); isExecutionError {
return &commonerrors.VSCCExecutionFailureError{Err: e}
}
// Else, treat it as an endorsement error.
return &commonerrors.VSCCEndorsementPolicyError{Err: err}
}
......
......@@ -7,6 +7,9 @@ package stateleveldb
import (
"bytes"
"hash/fnv"
"os"
"sync"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
......@@ -23,6 +26,14 @@ var compositeKeySep = []byte{0x00}
var lastKeyIndicator = byte(0x01)
var savePointKey = []byte{0x00}
const LockCount = 64
func hash(s []byte) uint32 {
h := fnv.New32a()
h.Write(s)
return h.Sum32()
}
// VersionedDBProvider implements interface VersionedDBProvider
type VersionedDBProvider struct {
dbProvider *leveldbhelper.Provider
......@@ -48,13 +59,14 @@ func (provider *VersionedDBProvider) Close() {
// VersionedDB implements VersionedDB interface
type versionedDB struct {
db *leveldbhelper.DBHandle
dbName string
db *leveldbhelper.DBHandle
dbName string
dataLock []sync.RWMutex
}
// newVersionedDB constructs an instance of VersionedDB
func newVersionedDB(db *leveldbhelper.DBHandle, dbName string) *versionedDB {
return &versionedDB{db, dbName}
return &versionedDB{db, dbName, make([]sync.RWMutex, LockCount)}
}
// Open implements method in VersionedDB interface
......@@ -82,6 +94,11 @@ func (vdb *versionedDB) BytesKeySupported() bool {
func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.VersionedValue, error) {
logger.Debugf("GetState(). ns=%s, key=%s", namespace, key)
compositeKey := constructCompositeKey(namespace, key)
lockID := hash(compositeKey) % LockCount
vdb.dataLock[lockID].RLock()
defer vdb.dataLock[lockID].RUnlock()
dbVal, err := vdb.db.Get(compositeKey)
if err != nil {
return nil, err
......@@ -94,6 +111,12 @@ func (vdb *versionedDB) GetState(namespace string, key string) (*statedb.Version
// GetVersion implements method in VersionedDB interface
func (vdb *versionedDB) GetVersion(namespace string, key string) (*version.Height, error) {
compositeKey := constructCompositeKey(namespace, key)
lockID := hash(compositeKey) % LockCount
vdb.dataLock[lockID].RLock()
defer vdb.dataLock[lockID].RUnlock()
versionedValue, err := vdb.GetState(namespace, key)
if err != nil {
return nil, err
......@@ -174,6 +197,9 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
compositeKey := constructCompositeKey(ns, k)
logger.Debugf("Channel [%s]: Applying key(string)=[%s] key(bytes)=[%#v]", vdb.dbName, string(compositeKey), compositeKey)
lockID := hash(compositeKey) % LockCount
vdb.dataLock[lockID].Lock()
if vv.Value == nil {
dbBatch.Delete(compositeKey)
} else {
......@@ -183,6 +209,7 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
}
dbBatch.Put(compositeKey, encodedVal)
}
vdb.dataLock[lockID].Unlock()
}
}
// Record a savepoint at a given height
......@@ -192,10 +219,18 @@ func (vdb *versionedDB) ApplyUpdates(batch *statedb.UpdateBatch, height *version
if height != nil {
dbBatch.Put(savePointKey, height.ToBytes())
}
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := vdb.db.WriteBatch(dbBatch, true); err != nil {
sync := true
if os.Getenv("STREAMCHAIN_SYNCDB") == "false" {
sync = false
}
// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
if err := vdb.db.WriteBatch(dbBatch, sync); err != nil {
return err
}
return nil
}
......
......@@ -145,7 +145,7 @@ func (h *queryHelper) getPrivateData(ns, coll, key string) ([]byte, error) {
}
if !version.AreSame(hashVersion, ver) {
return nil, &txmgr.ErrPvtdataNotAvailable{Msg: fmt.Sprintf(
"private data matching public hash version is not available. Public hash version = %s, Private data version = %s",
"private data matching public hash version is not available. Public hash version = %#v, Private data version = %#v",
hashVersion, ver)}
}
if h.rwsetBuilder != nil {
......@@ -285,7 +285,7 @@ func (h *queryHelper) done() {
}
defer func() {
h.txmgr.commitRWLock.RUnlock()
//h.txmgr.commitRWLock.RUnlock()
h.doneInvoked = true
for _, itr := range h.itrs {
itr.Close()
......
......@@ -85,7 +85,7 @@ func (txmgr *LockBasedTxMgr) GetLastSavepoint() (*version.Height, error) {
// NewQueryExecutor implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewQueryExecutor(txid string) (ledger.QueryExecutor, error) {
qe := newQueryExecutor(txmgr, txid)
txmgr.commitRWLock.RLock()
//txmgr.commitRWLock.RLock()
return qe, nil
}
......@@ -96,7 +96,7 @@ func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, er
if err != nil {
return nil, err
}
txmgr.commitRWLock.RLock()
//txmgr.commitRWLock.RLock()
return s, nil
}
......@@ -475,13 +475,13 @@ func (txmgr *LockBasedTxMgr) Commit() error {
}
commitHeight := version.NewHeight(txmgr.current.blockNum(), txmgr.current.maxTxNumber())
txmgr.commitRWLock.Lock()
//txmgr.commitRWLock.Lock()
logger.Debugf("Write lock acquired for committing updates to state database")
if err := txmgr.db.ApplyPrivacyAwareUpdates(txmgr.current.batch, commitHeight); err != nil {
txmgr.commitRWLock.Unlock()
return err
}
txmgr.commitRWLock.Unlock()
//txmgr.commitRWLock.Unlock()
// only while holding a lock on oldBlockCommit, we should clear the cache as the
// cache is being used by the old pvtData committer to load the version of
// hashedKeys. Also, note that the PrepareForExpiringKeys uses the cache.
......
......@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package ledgerstorage
import (
"os"
"sync"
"github.com/hyperledger/fabric/common/flogging"
......@@ -129,9 +130,13 @@ func (s *Store) CommitWithPvtData(blockAndPvtdata *ledger.BlockAndPvtData) error
logger.Debugf("Skipping writing block [%d] to pvt block store as the store height is [%d]", blockNum, pvtBlkStoreHt)
}
if err := s.AddBlock(blockAndPvtdata.Block); err != nil {
s.pvtdataStore.Rollback()
return err
strAddBlock := os.Getenv("STREAMCHAIN_ADDBLOCK")
if blockAndPvtdata.Block.Header.Number < 10 || strAddBlock == "" || strAddBlock == "true" {
if err := s.AddBlock(blockAndPvtdata.Block); err != nil { //ZSOLT: move this call up to the parallel part...
s.pvtdataStore.Rollback()
return err
}
}
if writtenToPvtStore {
......
......@@ -77,6 +77,8 @@ type Coordinator interface {
// returns missing transaction ids
StoreBlock(block *common.Block, data util.PvtDataCollections) error
VerifyBlock(block *common.Block, data util.PvtDataCollections) (*common.Block, error)
// StorePvtData used to persist private data into transient store
StorePvtData(txid string, privData *transientstore2.TxPvtReadWriteSetWithConfigInfo, blckHeight uint64) error
......@@ -141,13 +143,12 @@ func (c *coordinator) StorePvtData(txID string, privData *transientstore2.TxPvtR
return c.TransientStore.PersistWithConfig(txID, blkHeight, privData)
}
// StoreBlock stores block with private data into the ledger
func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error {
func (c *coordinator) VerifyBlock(block *common.Block, privateDataSets util.PvtDataCollections) (*common.Block, error) {
if block.Data == nil {
return errors.New("Block data is empty")
return block, errors.New("Block data is empty")
}
if block.Header == nil {
return errors.New("Block header is nil")
return block, errors.New("Block data is empty")
}
logger.Infof("[%s] Received block [%d] from buffer", c.ChainID, block.Header.Number)
......@@ -155,10 +156,17 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
logger.Debugf("[%s] Validating block [%d]", c.ChainID, block.Header.Number)
err := c.Validator.Validate(block)
if err != nil {
logger.Errorf("Validation failed: %+v", err)
return err
return block, errors.WithMessage(err, "Validation failed")
}
return block, err
}
// StoreBlock stores block with private data into the ledger
func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error {
//logs.WriteString("{\"ts\":" + strconv.FormatInt(time.Now().UnixNano(), 10) + ",\"msg\":\"FABRIC PERF Validation\",\"block\":" + strconv.Itoa(int(block.Header.Number)) + ",\"STEP\":1}\n")
blockAndPvtData := &ledger.BlockAndPvtData{
Block: block,
PvtData: make(ledger.TxPvtDataMap),
......
......@@ -8,6 +8,8 @@ package state
import (
"bytes"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
......@@ -53,17 +55,6 @@ const (
enqueueRetryInterval = time.Millisecond * 100
)
// Configuration keeps state transfer configuration parameters
type Configuration struct {
AntiEntropyInterval time.Duration
AntiEntropyStateResponseTimeout time.Duration
AntiEntropyBatchSize uint64
MaxBlockDistance int
AntiEntropyMaxRetries int
ChannelBufferSize int
EnableStateTransfer bool
}
// GossipAdapter defines gossip/communication required interface for state provider
type GossipAdapter interface {
// Send sends a message to remote peers
......@@ -105,6 +96,8 @@ type ledgerResources interface {
// returns missing transaction ids
StoreBlock(block *common.Block, data util.PvtDataCollections) error
VerifyBlock(block *common.Block, data util.PvtDataCollections) (*common.Block, error)
// StorePvtData used to persist private date into transient store
StorePvtData(txid string, privData *transientstore.TxPvtReadWriteSetWithConfigInfo, blckHeight uint64) error
......@@ -158,76 +151,10 @@ type GossipStateProviderImpl struct {
once sync.Once
stateTransferActive int32
requestValidator *stateRequestValidator
blockingMode bool
config *Configuration
}
var logger = util.GetLogger(util.StateLogger, "")
// stateRequestValidator facilitates validation of the state request messages
type stateRequestValidator struct {
}
// validate checks for RemoteStateRequest message validity
func (v *stateRequestValidator) validate(request *proto.RemoteStateRequest, batchSize uint64) error {
if request.StartSeqNum > request.EndSeqNum {
return errors.Errorf("Invalid sequence interval [%d...%d).", request.StartSeqNum, request.EndSeqNum)
}
if request.EndSeqNum > batchSize+request.StartSeqNum {
return errors.Errorf("Requesting blocks range [%d-%d) greater than configured allowed"+
" (%d) batching size for anti-entropy.", request.StartSeqNum, request.EndSeqNum, batchSize)
}
return nil
}
// readConfiguration reading state configuration
func readConfiguration() *Configuration {
config := &Configuration{
AntiEntropyInterval: defAntiEntropyInterval,
AntiEntropyStateResponseTimeout: defAntiEntropyStateResponseTimeout,
AntiEntropyBatchSize: defAntiEntropyBatchSize,
MaxBlockDistance: defMaxBlockDistance,
AntiEntropyMaxRetries: defAntiEntropyMaxRetries,
ChannelBufferSize: defChannelBufferSize,
EnableStateTransfer: true,
}
if viper.IsSet("peer.gossip.state.checkInterval") {
config.AntiEntropyInterval = viper.GetDuration("peer.gossip.state.checkInterval")
}
if viper.IsSet("peer.gossip.state.responseTimeout") {
config.AntiEntropyStateResponseTimeout = viper.GetDuration("peer.gossip.state.responseTimeout")
}
if viper.IsSet("peer.gossip.state.batchSize") {
config.AntiEntropyBatchSize = uint64(viper.GetInt("peer.gossip.state.batchSize"))
}
if viper.IsSet("peer.gossip.state.blockBufferSize") {
config.MaxBlockDistance = viper.GetInt("peer.gossip.state.blockBufferSize")
}
if viper.IsSet("peer.gossip.state.maxRetries") {
config.AntiEntropyMaxRetries = viper.GetInt("peer.gossip.state.maxRetries")
}
if viper.IsSet("peer.gossip.state.channelSize") {
config.ChannelBufferSize = viper.GetInt("peer.gossip.state.channelSize")
}
if viper.IsSet("peer.gossip.state.enabled") {
config.EnableStateTransfer = viper.GetBool("peer.gossip.state.enabled")
}
return config
}
// NewGossipStateProvider creates state provider with coordinator instance
// to orchestrate arrival of private rwsets and blocks before committing them into the ledger.
func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger ledgerResources) GossipStateProvider {
......@@ -274,9 +201,6 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
return nil
}
// Reading state configuration
config := readConfiguration()
s := &GossipStateProviderImpl{
// MessageCryptoService
mediator: services,
......@@ -295,19 +219,15 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
ledger: ledger,
stateResponseCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stateResponseCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, config.ChannelBufferSize),
stateRequestCh: make(chan proto.ReceivedMessage, defChannelBufferSize),
stopCh: make(chan struct{}, 1),
stateTransferActive: 0,
once: sync.Once{},
requestValidator: &stateRequestValidator{},
config: config,
}
logger.Infof("Updating metadata information, "+
......@@ -321,10 +241,8 @@ func NewGossipStateProvider(chainID string, services *ServicesMediator, ledger l
go s.listen()
// Deliver in order messages into the incoming channel
go s.deliverPayloads()
if s.config.EnableStateTransfer {
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
}
// Execute anti entropy to fill missing gaps
go s.antiEntropy()
// Taking care of state request messages
go s.processStateRequests()
......@@ -431,7 +349,7 @@ func (s *GossipStateProviderImpl) directMessage(msg proto.ReceivedMessage) {
incoming := msg.GetGossipMessage()
if incoming.GetStateRequest() != nil {
if len(s.stateRequestCh) < s.config.ChannelBufferSize {
if len(s.stateRequestCh) < defChannelBufferSize {
// Forward state request to the channel, if there are too
// many message of state request ignore to avoid flooding.
s.stateRequestCh <- msg
......@@ -468,8 +386,15 @@ func (s *GossipStateProviderImpl) handleStateRequest(msg proto.ReceivedMessage)
}
request := msg.GetGossipMessage().GetStateRequest()
if err := s.requestValidator.validate(request, s.config.AntiEntropyBatchSize); err != nil {
logger.Errorf("State request validation failed, %s. Ignoring request...", err)
batchSize := request.EndSeqNum - request.StartSeqNum
if batchSize > defAntiEntropyBatchSize {
logger.Errorf("Requesting blocks batchSize size (%d) greater than configured allowed"+
" (%d) batching for anti-entropy. Ignoring request...", batchSize, defAntiEntropyBatchSize)
return
}
if request.StartSeqNum > request.EndSeqNum {
logger.Errorf("Invalid sequence interval [%d...%d], ignoring request...", request.StartSeqNum, request.EndSeqNum)
return
}
......@@ -603,9 +528,74 @@ func (s *GossipStateProviderImpl) queueNewMessage(msg *proto.GossipMessage) {
}
}
//used in the pipelining
type PipelineData struct {
block *common.Block
privateDataSets util.PvtDataCollections
seq uint64
}
func (s *GossipStateProviderImpl) commitPayloads(chans []chan *PipelineData, maxOutstanding int) {
seq := int(0)
for {
job := <-chans[seq]
seq++
if seq == maxOutstanding {
seq = 0
}
//logger.Errorf("SC: store ", job.seq)
s.ledger.StoreBlock(job.block, job.privateDataSets)
}
}
func (s *GossipStateProviderImpl) verifyPayloads(cIn chan *PipelineData, cOut chan *PipelineData) {
for {
job := <-cIn
//logger.Errorf("SC: verify ", job.seq)
block, err := s.ledger.VerifyBlock(job.block, job.privateDataSets)
if err != nil {
logger.Errorf("SC: Error in verify: %v", err)
}
job.block = block
cOut <- job
}
}
func (s *GossipStateProviderImpl) deliverPayloads() {
defer s.done.Done()
reqSeq := 0
MaxNumOutstanding := 2
pipeline := true
strPipeline := os.Getenv("STREAMCHAIN_PIPELINE")
if strPipeline == "" || strPipeline == "0" {
pipeline = false
} else {
MaxNumOutstanding, _ = strconv.Atoi(strPipeline)
}
logger.Errorf("STREAM CHAIN Pipelining ", pipeline, MaxNumOutstanding)
var inChans []chan *PipelineData
var outChans []chan *PipelineData
inChans = make([]chan *PipelineData, MaxNumOutstanding)
outChans = make([]chan *PipelineData, MaxNumOutstanding)
if pipeline {
for seq := 0; seq < MaxNumOutstanding; seq++ {
inChans[seq] = make(chan *PipelineData)
outChans[seq] = make(chan *PipelineData)
go s.verifyPayloads(inChans[seq], outChans[seq])
}
go s.commitPayloads(outChans, MaxNumOutstanding)
}
for {
select {
// Wait for notification that next seq has arrived
......@@ -641,6 +631,20 @@ func (s *GossipStateProviderImpl) deliverPayloads() {
}
logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err))
}
job := &PipelineData{rawBlock, p, payload.SeqNum}
if pipeline {
inChans[reqSeq] <- job
reqSeq++
if reqSeq == MaxNumOutstanding {
reqSeq = 0
}
} else {
newBlock, _ := s.ledger.VerifyBlock(job.block, job.privateDataSets)
job.block = newBlock
s.ledger.StoreBlock(job.block, job.privateDataSets)
}
}
case <-s.stopCh:
s.stopCh <- struct{}{}
......@@ -659,7 +663,7 @@ func (s *GossipStateProviderImpl) antiEntropy() {
case <-s.stopCh:
s.stopCh <- struct{}{}
return
case <-time.After(s.config.AntiEntropyInterval):
case <-time.After(defAntiEntropyInterval):
ourHeight, err := s.ledger.LedgerHeight()
if err != nil {
// Unable to read from ledger continue to the next round
......@@ -704,7 +708,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
defer atomic.StoreInt32(&s.stateTransferActive, 0)
for prev := start; prev <= end; {
next := min(end, prev+s.config.AntiEntropyBatchSize)
next := min(end, prev+defAntiEntropyBatchSize)
gossipMsg := s.stateRequestMessage(prev, next)
......@@ -712,7 +716,7 @@ func (s *GossipStateProviderImpl) requestBlocksInRange(start uint64, end uint64)
tryCounts := 0