Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
zistvan-public
StreamChain Prototype
Commits
cddd278f
Commit
cddd278f
authored
May 14, 2019
by
Lucas Kuhring
Browse files
[FastFabric] Integrate block caching
parent
fa3b2d0f
Changes
37
Hide whitespace changes
Inline
Side-by-side
common/ledger/blkstorage/blockstorage.go
View file @
cddd278f
...
...
@@ -9,6 +9,7 @@ package blkstorage
import
(
"github.com/hyperledger/fabric/common/ledger"
l
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/pkg/errors"
...
...
@@ -53,7 +54,7 @@ type BlockStoreProvider interface {
// An implementation of this interface is expected to take an argument
// of type `IndexConfig` which configures the block store on what items should be indexed
type
BlockStore
interface
{
AddBlock
(
block
*
c
ommon
.
Block
)
error
AddBlock
(
block
*
c
ached
.
Block
)
error
GetBlockchainInfo
()
(
*
common
.
BlockchainInfo
,
error
)
RetrieveBlocks
(
startNum
uint64
)
(
ledger
.
ResultsIterator
,
error
)
RetrieveBlockByHash
(
blockHash
[]
byte
)
(
*
common
.
Block
,
error
)
...
...
common/ledger/blkstorage/fsblkstorage/blockfile_mgr.go
View file @
cddd278f
...
...
@@ -21,6 +21,7 @@ import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
putil
"github.com/hyperledger/fabric/protos/utils"
...
...
@@ -250,7 +251,7 @@ func (mgr *blockfileMgr) moveToNextFile() {
mgr
.
updateCheckpoint
(
cpInfo
)
}
func
(
mgr
*
blockfileMgr
)
addBlock
(
block
*
c
ommon
.
Block
)
error
{
func
(
mgr
*
blockfileMgr
)
addBlock
(
block
*
c
ached
.
Block
)
error
{
bcInfo
:=
mgr
.
getBlockchainInfo
()
if
block
.
Header
.
Number
!=
bcInfo
.
Height
{
return
errors
.
Errorf
(
...
...
@@ -269,7 +270,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
bcInfo
.
CurrentBlockHash
,
block
.
Header
.
PreviousHash
,
)
}
blockBytes
,
info
,
err
:=
serializeBlock
(
block
)
blockBytes
,
info
,
err
:=
serializeBlock
(
block
.
Block
)
if
err
!=
nil
{
return
errors
.
WithMessage
(
err
,
"error serializing block"
)
}
...
...
@@ -457,7 +458,7 @@ func (mgr *blockfileMgr) updateCheckpoint(cpInfo *checkpointInfo) {
mgr
.
cpInfoCond
.
Broadcast
()
}
func
(
mgr
*
blockfileMgr
)
updateBlockchainInfo
(
latestBlockHash
[]
byte
,
latestBlock
*
c
ommon
.
Block
)
{
func
(
mgr
*
blockfileMgr
)
updateBlockchainInfo
(
latestBlockHash
[]
byte
,
latestBlock
*
c
ached
.
Block
)
{
currentBCInfo
:=
mgr
.
getBlockchainInfo
()
newBCInfo
:=
&
common
.
BlockchainInfo
{
Height
:
currentBCInfo
.
Height
+
1
,
...
...
common/ledger/blkstorage/fsblkstorage/fs_blockstore.go
View file @
cddd278f
...
...
@@ -20,6 +20,7 @@ import (
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)
...
...
@@ -38,7 +39,7 @@ func newFsBlockStore(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
}
// AddBlock adds a new block
func
(
store
*
fsBlockStore
)
AddBlock
(
block
*
c
ommon
.
Block
)
error
{
func
(
store
*
fsBlockStore
)
AddBlock
(
block
*
c
ached
.
Block
)
error
{
return
store
.
fileMgr
.
addBlock
(
block
)
}
...
...
common/ledger/blockledger/file/impl.go
View file @
cddd278f
...
...
@@ -10,6 +10,7 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
cb
"github.com/hyperledger/fabric/protos/common"
ab
"github.com/hyperledger/fabric/protos/orderer"
)
...
...
@@ -25,7 +26,7 @@ type FileLedger struct {
// FileLedgerBlockStore defines the interface to interact with deliver when using a
// file ledger
type
FileLedgerBlockStore
interface
{
AddBlock
(
block
*
c
b
.
Block
)
error
AddBlock
(
block
*
c
ached
.
Block
)
error
GetBlockchainInfo
()
(
*
cb
.
BlockchainInfo
,
error
)
RetrieveBlocks
(
startBlockNumber
uint64
)
(
ledger
.
ResultsIterator
,
error
)
}
...
...
@@ -104,7 +105,7 @@ func (fl *FileLedger) Height() uint64 {
// Append a new block to the ledger
func
(
fl
*
FileLedger
)
Append
(
block
*
cb
.
Block
)
error
{
err
:=
fl
.
blockStore
.
AddBlock
(
block
)
err
:=
fl
.
blockStore
.
AddBlock
(
cached
.
GetBlock
(
block
)
)
if
err
==
nil
{
close
(
fl
.
signal
)
fl
.
signal
=
make
(
chan
struct
{})
...
...
common/ledger/testutil/test_helper.go
View file @
cddd278f
...
...
@@ -13,6 +13,7 @@ import (
"github.com/hyperledger/fabric/common/configtx/test"
"github.com/hyperledger/fabric/common/util"
lutils
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/protos/common"
pb
"github.com/hyperledger/fabric/protos/peer"
ptestutils
"github.com/hyperledger/fabric/protos/testutils"
...
...
@@ -69,7 +70,7 @@ func (bg *BlockGenerator) NextBlockWithTxid(simulationResults [][]byte, txids []
}
// NextTestBlock constructs next block in sequence block with 'numTx' number of transactions for testing
func
(
bg
*
BlockGenerator
)
NextTestBlock
(
numTx
int
,
txSize
int
)
*
c
ommon
.
Block
{
func
(
bg
*
BlockGenerator
)
NextTestBlock
(
numTx
int
,
txSize
int
)
*
c
ached
.
Block
{
simulationResults
:=
[][]
byte
{}
for
i
:=
0
;
i
<
numTx
;
i
++
{
simulationResults
=
append
(
simulationResults
,
ConstructRandomBytes
(
bg
.
t
,
txSize
))
...
...
@@ -78,8 +79,8 @@ func (bg *BlockGenerator) NextTestBlock(numTx int, txSize int) *common.Block {
}
// NextTestBlocks constructs 'numBlocks' number of blocks for testing
func
(
bg
*
BlockGenerator
)
NextTestBlocks
(
numBlocks
int
)
[]
*
c
ommon
.
Block
{
blocks
:=
[]
*
c
ommon
.
Block
{}
func
(
bg
*
BlockGenerator
)
NextTestBlocks
(
numBlocks
int
)
[]
*
c
ached
.
Block
{
blocks
:=
[]
*
c
ached
.
Block
{}
numTx
:=
10
for
i
:=
0
;
i
<
numBlocks
;
i
++
{
block
:=
bg
.
NextTestBlock
(
numTx
,
100
)
...
...
@@ -118,7 +119,7 @@ func ConstructTransactionFromTxDetails(txDetails *TxDetails, sign bool) (*common
return
txEnv
,
txID
,
err
}
func
ConstructBlockFromBlockDetails
(
t
*
testing
.
T
,
blockDetails
*
BlockDetails
,
sign
bool
)
*
c
ommon
.
Block
{
func
ConstructBlockFromBlockDetails
(
t
*
testing
.
T
,
blockDetails
*
BlockDetails
,
sign
bool
)
*
c
ached
.
Block
{
var
envs
[]
*
common
.
Envelope
for
_
,
txDetails
:=
range
blockDetails
.
Txs
{
env
,
_
,
err
:=
ConstructTransactionFromTxDetails
(
txDetails
,
sign
)
...
...
@@ -156,7 +157,7 @@ func ConstructBlock(t *testing.T, blockNum uint64, previousHash []byte, simulati
}
//ConstructTestBlock constructs a single block with random contents
func
ConstructTestBlock
(
t
*
testing
.
T
,
blockNum
uint64
,
numTx
int
,
txSize
int
)
*
c
ommon
.
Block
{
func
ConstructTestBlock
(
t
*
testing
.
T
,
blockNum
uint64
,
numTx
int
,
txSize
int
)
*
c
ached
.
Block
{
simulationResults
:=
[][]
byte
{}
for
i
:=
0
;
i
<
numTx
;
i
++
{
simulationResults
=
append
(
simulationResults
,
ConstructRandomBytes
(
t
,
txSize
))
...
...
@@ -166,11 +167,11 @@ func ConstructTestBlock(t *testing.T, blockNum uint64, numTx int, txSize int) *c
// ConstructTestBlocks returns a series of blocks starting with blockNum=0.
// The first block in the returned array is a config tx block that represents a genesis block
func
ConstructTestBlocks
(
t
*
testing
.
T
,
numBlocks
int
)
[]
*
c
ommon
.
Block
{
func
ConstructTestBlocks
(
t
*
testing
.
T
,
numBlocks
int
)
[]
*
c
ached
.
Block
{
bg
,
gb
:=
NewBlockGenerator
(
t
,
util
.
GetTestChainID
(),
false
)
blocks
:=
[]
*
c
ommon
.
Block
{}
blocks
:=
[]
*
c
ached
.
Block
{}
if
numBlocks
!=
0
{
blocks
=
append
(
blocks
,
gb
)
blocks
=
append
(
blocks
,
cached
.
GetBlock
(
gb
)
)
}
return
append
(
blocks
,
bg
.
NextTestBlocks
(
numBlocks
-
1
)
...
)
}
...
...
@@ -184,7 +185,7 @@ func ConstructBytesProposalResponsePayload(version string, simulationResults []b
return
ptestutils
.
ConstructBytesProposalResponsePayload
(
util
.
GetTestChainID
(),
ccid
,
nil
,
simulationResults
)
}
func
NewBlock
(
env
[]
*
common
.
Envelope
,
blockNum
uint64
,
previousHash
[]
byte
)
*
c
ommon
.
Block
{
func
NewBlock
(
env
[]
*
common
.
Envelope
,
blockNum
uint64
,
previousHash
[]
byte
)
*
c
ached
.
Block
{
block
:=
common
.
NewBlock
(
blockNum
,
previousHash
)
for
i
:=
0
;
i
<
len
(
env
);
i
++
{
txEnvBytes
,
_
:=
proto
.
Marshal
(
env
[
i
])
...
...
@@ -195,5 +196,5 @@ func NewBlock(env []*common.Envelope, blockNum uint64, previousHash []byte) *com
block
.
Metadata
.
Metadata
[
common
.
BlockMetadataIndex_TRANSACTIONS_FILTER
]
=
lutils
.
NewTxValidationFlagsSetValue
(
len
(
env
),
pb
.
TxValidationCode_VALID
)
return
block
return
cached
.
GetBlock
(
block
)
}
core/chaincode/mock/peer_ledger.go
View file @
cddd278f
...
...
@@ -2,12 +2,14 @@
package
mock
import
(
sync
"sync"
"sync"
ledger
"github.com/hyperledger/fabric/common/ledger"
fastfabric_extensions
"github.com/hyperledger/fabric/fastfabric-extensions"
"github.com/hyperledger/fabric/common/ledger"
ledgera
"github.com/hyperledger/fabric/core/ledger"
common
"github.com/hyperledger/fabric/protos/common"
peer
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)
type
PeerLedger
struct
{
...
...
@@ -256,6 +258,10 @@ type PeerLedger struct {
invocationsMutex
sync
.
RWMutex
}
func
(
fake
*
PeerLedger
)
GetBlockstore
()
(
store
fastfabric_extensions
.
BlockStore
)
{
panic
(
"implement me"
)
}
func
(
fake
*
PeerLedger
)
Close
()
{
fake
.
closeMutex
.
Lock
()
fake
.
closeArgsForCall
=
append
(
fake
.
closeArgsForCall
,
struct
{
...
...
core/committer/committer_impl.go
View file @
cddd278f
...
...
@@ -13,8 +13,8 @@ import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/config"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
)
...
...
@@ -57,12 +57,12 @@ type LedgerCommitter struct {
// ConfigBlockEventer callback function proto type to define action
// upon arrival on new configuaration update block
type
ConfigBlockEventer
func
(
block
*
c
ommon
.
Block
)
error
type
ConfigBlockEventer
func
(
block
*
c
ached
.
Block
)
error
// NewLedgerCommitter is a factory function to create an instance of the committer
// which passes incoming blocks via validation and commits them into the ledger.
func
NewLedgerCommitter
(
ledger
PeerLedgerSupport
)
*
LedgerCommitter
{
return
NewLedgerCommitterReactive
(
ledger
,
func
(
_
*
c
ommon
.
Block
)
error
{
return
nil
})
return
NewLedgerCommitterReactive
(
ledger
,
func
(
_
*
c
ached
.
Block
)
error
{
return
nil
})
}
// NewLedgerCommitterReactive is a factory function to create an instance of the committer
...
...
@@ -74,9 +74,9 @@ func NewLedgerCommitterReactive(ledger PeerLedgerSupport, eventer ConfigBlockEve
// preCommit takes care to validate the block and update based on its
// content
func
(
lc
*
LedgerCommitter
)
preCommit
(
block
*
c
ommon
.
Block
)
error
{
func
(
lc
*
LedgerCommitter
)
preCommit
(
block
*
c
ached
.
Block
)
error
{
// Updating CSCC with new configuration block
if
utils
.
IsConfigBlock
(
block
)
{
if
block
.
IsConfigBlock
()
{
logger
.
Debug
(
"Received configuration update, calling CSCC ConfigUpdate"
)
if
err
:=
lc
.
eventer
(
block
);
err
!=
nil
{
return
errors
.
WithMessage
(
err
,
"could not update CSCC with new configuration update"
)
...
...
core/committer/txvalidator/validator.go
View file @
cddd278f
...
...
@@ -23,6 +23,7 @@ import (
"github.com/hyperledger/fabric/core/common/validation"
"github.com/hyperledger/fabric/core/ledger"
ledgerUtil
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/protos/common"
mspprotos
"github.com/hyperledger/fabric/protos/msp"
...
...
@@ -60,14 +61,14 @@ type Support interface {
// and return the bit array mask indicating invalid transactions which
// didn't pass validation.
type
Validator
interface
{
Validate
(
block
*
c
ommon
.
Block
)
error
Validate
(
block
*
c
ached
.
Block
)
error
}
// private interface to decouple tx validator
// and vscc execution, in order to increase
// testability of TxValidator
type
vsccValidator
interface
{
VSCCValidateTx
(
seq
int
,
payload
*
c
ommon
.
Payload
,
envBytes
[]
byte
,
block
*
c
ommon
.
Block
)
(
error
,
peer
.
TxValidationCode
)
VSCCValidateTx
(
seq
int
,
payload
*
c
ached
.
Payload
,
envBytes
[]
byte
,
block
*
c
ached
.
Block
)
(
error
,
peer
.
TxValidationCode
)
}
// implementation of Validator interface, keeps
...
...
@@ -82,7 +83,7 @@ type TxValidator struct {
var
logger
=
flogging
.
MustGetLogger
(
"committer.txvalidator"
)
type
blockValidationRequest
struct
{
block
*
c
ommon
.
Block
block
*
c
ached
.
Block
d
[]
byte
tIdx
int
}
...
...
@@ -131,7 +132,7 @@ func (v *TxValidator) chainExists(chain string) bool {
// state is when a config transaction is received, but they are
// guaranteed to be alone in the block. If/when this assumption
// is violated, this code must be changed.
func
(
v
*
TxValidator
)
Validate
(
block
*
c
ommon
.
Block
)
error
{
func
(
v
*
TxValidator
)
Validate
(
block
*
c
ached
.
Block
)
error
{
var
err
error
var
errPos
int
...
...
@@ -225,7 +226,7 @@ func (v *TxValidator) Validate(block *common.Block) error {
}
// Initialize metadata structure
utils
.
InitBlockMetadata
(
block
)
utils
.
InitBlockMetadata
(
block
.
Block
)
block
.
Metadata
.
Metadata
[
common
.
BlockMetadataIndex_TRANSACTIONS_FILTER
]
=
txsfltr
...
...
@@ -237,7 +238,7 @@ func (v *TxValidator) Validate(block *common.Block) error {
// allValidated returns error if some of the validation flags have not been set
// during validation
func
(
v
*
TxValidator
)
allValidated
(
txsfltr
ledgerUtil
.
TxValidationFlags
,
block
*
c
ommon
.
Block
)
error
{
func
(
v
*
TxValidator
)
allValidated
(
txsfltr
ledgerUtil
.
TxValidationFlags
,
block
*
c
ached
.
Block
)
error
{
for
id
,
f
:=
range
txsfltr
{
if
peer
.
TxValidationCode
(
f
)
==
peer
.
TxValidationCode_NOT_VALIDATED
{
return
errors
.
Errorf
(
"transaction %d in block %d has skipped validation"
,
id
,
block
.
Header
.
Number
)
...
...
@@ -278,7 +279,7 @@ func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *bl
return
}
if
env
,
err
:=
utils
.
GetEnvelopeFromBlock
(
d
);
err
!=
nil
{
if
env
,
err
:=
block
.
UnmarshalSpecificEnvelope
(
tIdx
);
err
!=
nil
{
logger
.
Warningf
(
"Error getting tx from block: %+v"
,
err
)
results
<-
&
blockValidationResult
{
tIdx
:
tIdx
,
...
...
@@ -293,7 +294,7 @@ func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *bl
// job for VSCC below
logger
.
Debugf
(
"[%s] validateTx starts for block %p env %p txn %d"
,
v
.
ChainID
,
block
,
env
,
tIdx
)
defer
logger
.
Debugf
(
"[%s] validateTx completes for block %p env %p txn %d"
,
v
.
ChainID
,
block
,
env
,
tIdx
)
var
payload
*
c
ommon
.
Payload
var
payload
*
c
ached
.
Payload
var
err
error
var
txResult
peer
.
TxValidationCode
var
txsChaincodeName
*
sysccprovider
.
ChaincodeInstance
...
...
@@ -308,7 +309,7 @@ func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *bl
return
}
chdr
,
err
:=
utils
.
UnmarshalChannelHeader
(
payload
.
Header
.
ChannelHeader
)
chdr
,
err
:=
payload
.
Header
.
Unmarshal
ChannelHeader
(
)
if
err
!=
nil
{
logger
.
Warningf
(
"Could not unmarshal channel header, err %s, skipping"
,
err
)
results
<-
&
blockValidationResult
{
...
...
@@ -479,7 +480,7 @@ func (v *TxValidator) validateTx(req *blockValidationRequest, results chan<- *bl
// in the ledger or no decision can be made for whether such transaction exists;
// the function returns nil if it has ensured that there is no such duplicate, such
// that its consumer can proceed with the transaction processing
func
(
v
*
TxValidator
)
checkTxIdDupsLedger
(
tIdx
int
,
chdr
*
c
ommon
.
ChannelHeader
,
ldgr
ledger
.
PeerLedger
)
(
errorTuple
*
blockValidationResult
)
{
func
(
v
*
TxValidator
)
checkTxIdDupsLedger
(
tIdx
int
,
chdr
*
c
ached
.
ChannelHeader
,
ldgr
ledger
.
PeerLedger
)
(
errorTuple
*
blockValidationResult
)
{
// Retrieve the transaction identifier of the input header
txID
:=
chdr
.
TxId
...
...
@@ -564,9 +565,9 @@ func (v *TxValidator) invalidTXsForUpgradeCC(txsChaincodeNames map[int]*sysccpro
}
}
func
(
v
*
TxValidator
)
getTxCCInstance
(
payload
*
c
ommon
.
Payload
)
(
invokeCCIns
,
upgradeCCIns
*
sysccprovider
.
ChaincodeInstance
,
err
error
)
{
func
(
v
*
TxValidator
)
getTxCCInstance
(
payload
*
c
ached
.
Payload
)
(
invokeCCIns
,
upgradeCCIns
*
sysccprovider
.
ChaincodeInstance
,
err
error
)
{
// This is duplicated unpacking work, but make test easier.
chdr
,
err
:=
utils
.
UnmarshalChannelHeader
(
payload
.
Header
.
ChannelHeader
)
chdr
,
err
:=
payload
.
Header
.
Unmarshal
ChannelHeader
(
)
if
err
!=
nil
{
return
nil
,
nil
,
err
}
...
...
@@ -575,7 +576,7 @@ func (v *TxValidator) getTxCCInstance(payload *common.Payload) (invokeCCIns, upg
chainID
:=
chdr
.
ChannelId
// it is guaranteed to be an existing channel by now
// ChaincodeID
hdrExt
,
err
:=
utils
.
GetChaincodeHeaderExtension
(
payload
.
Header
)
hdrExt
,
err
:=
chdr
.
UnmarshalExtension
(
)
if
err
!=
nil
{
return
nil
,
nil
,
err
}
...
...
@@ -583,29 +584,28 @@ func (v *TxValidator) getTxCCInstance(payload *common.Payload) (invokeCCIns, upg
invokeIns
:=
&
sysccprovider
.
ChaincodeInstance
{
ChainID
:
chainID
,
ChaincodeName
:
invokeCC
.
Name
,
ChaincodeVersion
:
invokeCC
.
Version
}
// Transaction
tx
,
err
:=
utils
.
GetTransaction
(
payload
.
Data
)
tx
,
err
:=
payload
.
UnmarshalTransaction
(
)
if
err
!=
nil
{
logger
.
Errorf
(
"GetTransaction failed: %+v"
,
err
)
return
invokeIns
,
nil
,
nil
}
// ChaincodeActionPayload
cap
,
err
:=
utils
.
Get
ChaincodeActionPayload
(
tx
.
Actions
[
0
]
.
Payload
)
cap
,
err
:=
tx
.
Actions
[
0
]
.
Unmarshal
ChaincodeActionPayload
()
if
err
!=
nil
{
logger
.
Errorf
(
"GetChaincodeActionPayload failed: %+v"
,
err
)
return
invokeIns
,
nil
,
nil
}
// ChaincodeProposalPayload
cpp
,
err
:=
utils
.
GetChaincodeProposalPayload
(
cap
.
Chaincode
ProposalPayload
)
cpp
,
err
:=
cap
.
Unmarshal
ProposalPayload
(
)
if
err
!=
nil
{
logger
.
Errorf
(
"GetChaincodeProposalPayload failed: %+v"
,
err
)
return
invokeIns
,
nil
,
nil
}
// ChaincodeInvocationSpec
cis
:=
&
peer
.
ChaincodeInvocationSpec
{}
err
=
proto
.
Unmarshal
(
cpp
.
Input
,
cis
)
cis
,
err
:=
cpp
.
UnmarshalInput
()
if
err
!=
nil
{
logger
.
Errorf
(
"GetChaincodeInvokeSpec failed: %+v"
,
err
)
return
invokeIns
,
nil
,
nil
...
...
core/committer/txvalidator/vscc_validator.go
View file @
cddd278f
...
...
@@ -12,6 +12,8 @@ package txvalidator
import
(
"fmt"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/cauthdsl"
commonerrors
"github.com/hyperledger/fabric/common/errors"
...
...
@@ -19,7 +21,6 @@ import (
"github.com/hyperledger/fabric/core/common/ccprovider"
"github.com/hyperledger/fabric/core/common/sysccprovider"
validation
"github.com/hyperledger/fabric/core/handlers/validation/api"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/protos/utils"
...
...
@@ -46,20 +47,20 @@ func newVSCCValidator(chainID string, support Support, sccp sysccprovider.System
}
// VSCCValidateTx executes vscc validation for transaction
func
(
v
*
VsccValidatorImpl
)
VSCCValidateTx
(
seq
int
,
payload
*
c
ommon
.
Payload
,
envBytes
[]
byte
,
block
*
c
ommon
.
Block
)
(
error
,
peer
.
TxValidationCode
)
{
func
(
v
*
VsccValidatorImpl
)
VSCCValidateTx
(
seq
int
,
payload
*
c
ached
.
Payload
,
envBytes
[]
byte
,
block
*
c
ached
.
Block
)
(
error
,
peer
.
TxValidationCode
)
{
chainID
:=
v
.
chainID
logger
.
Debugf
(
"[%s] VSCCValidateTx starts for bytes %p"
,
chainID
,
envBytes
)
// get
header extensions so we have the chaincode ID
hdr
Ext
,
err
:=
utils
.
GetChaincodeHeaderExtension
(
payload
.
Header
)
// get
channel header
c
hdr
,
err
:=
payload
.
Header
.
UnmarshalChannel
Header
(
)
if
err
!=
nil
{
return
err
,
peer
.
TxValidationCode_BAD_
HEADER_EXTENSION
return
err
,
peer
.
TxValidationCode_BAD_
CHANNEL_HEADER
}
// get
channel header
c
hdr
,
err
:=
utils
.
Unmarshal
ChannelHeader
(
payload
.
Header
.
ChannelHeader
)
// get
header extensions so we have the chaincode ID
hdr
Ext
,
err
:=
chdr
.
Unmarshal
Extension
(
)
if
err
!=
nil
{
return
err
,
peer
.
TxValidationCode_BAD_
CHANNEL_HEADER
return
err
,
peer
.
TxValidationCode_BAD_
HEADER_EXTENSION
}
/* obtain the list of namespaces we're writing stuff to;
...
...
@@ -69,12 +70,31 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
3) does it write to any cc that cannot be invoked? */
writesToLSCC
:=
false
writesToNonInvokableSCC
:=
false
respPayload
,
err
:=
utils
.
GetActionFromEnvelope
(
envBytes
)
var
ca
*
cached
.
ChaincodeAction
tx
,
err
:=
payload
.
UnmarshalTransaction
()
if
err
==
nil
{
if
len
(
tx
.
Actions
)
==
0
{
err
=
errors
.
New
(
"at least one TransactionAction required"
)
}
if
err
==
nil
{
cap
,
err
:=
tx
.
Actions
[
0
]
.
UnmarshalChaincodeActionPayload
()
if
err
==
nil
{
respPayload
,
err
:=
cap
.
Action
.
UnmarshalProposalResponsePayload
()
if
err
==
nil
{
ca
,
err
=
respPayload
.
UnmarshalChaincodeAction
()
}
}
}
}
if
err
!=
nil
{
return
errors
.
WithMessage
(
err
,
"GetActionFromEnvelope failed"
),
peer
.
TxValidationCode_BAD_RESPONSE_PAYLOAD
}
txRWSet
:=
&
rwsetutil
.
TxRwSet
{}
if
err
=
txRWSet
.
FromProtoBytes
(
respPayload
.
Results
);
err
!=
nil
{
var
txRWSet
*
cached
.
TxRwSet
if
err
==
nil
{
txRWSet
,
err
=
ca
.
UnmarshalRwSet
()
}
if
err
!=
nil
{
return
errors
.
WithMessage
(
err
,
"txRWSet.FromProtoBytes failed"
),
peer
.
TxValidationCode_BAD_RWSET
}
...
...
@@ -83,13 +103,13 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
return
errors
.
New
(
"nil ChaincodeId in header extension"
),
peer
.
TxValidationCode_INVALID_OTHER_REASON
}
if
respPayload
.
ChaincodeId
==
nil
{
if
ca
.
ChaincodeId
==
nil
{
return
errors
.
New
(
"nil ChaincodeId in ChaincodeAction"
),
peer
.
TxValidationCode_INVALID_OTHER_REASON
}
// get name and version of the cc we invoked
ccID
:=
hdrExt
.
ChaincodeId
.
Name
ccVer
:=
respPayload
.
ChaincodeId
.
Version
ccVer
:=
ca
.
ChaincodeId
.
Version
// sanity check on ccID
if
ccID
==
""
{
...
...
@@ -97,8 +117,8 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
logger
.
Errorf
(
"%+v"
,
err
)
return
err
,
peer
.
TxValidationCode_INVALID_OTHER_REASON
}
if
ccID
!=
respPayload
.
ChaincodeId
.
Name
{
err
=
errors
.
Errorf
(
"inconsistent ccid info (%s/%s)"
,
ccID
,
respPayload
.
ChaincodeId
.
Name
)
if
ccID
!=
ca
.
ChaincodeId
.
Name
{
err
=
errors
.
Errorf
(
"inconsistent ccid info (%s/%s)"
,
ccID
,
ca
.
ChaincodeId
.
Name
)
logger
.
Errorf
(
"%+v"
,
err
)
return
err
,
peer
.
TxValidationCode_INVALID_OTHER_REASON
}
...
...
@@ -113,9 +133,9 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
alwaysEnforceOriginalNamespace
:=
v
.
support
.
Capabilities
()
.
V1_2Validation
()
if
alwaysEnforceOriginalNamespace
{
wrNamespace
=
append
(
wrNamespace
,
ccID
)
if
respPayload
.
Events
!=
nil
{
ccEvent
:=
&
peer
.
Chaincode
Event
{}
if
err
=
proto
.
Unmarshal
(
respPayload
.
Events
,
ccEvent
);
err
!=
nil
{
if
ca
.
Events
!=
nil
{
ccEvent
,
err
:=
ca
.
Unmarshal
Event
s
()
if
err
!=
nil
{
return
errors
.
Wrapf
(
err
,
"invalid chaincode event"
),
peer
.
TxValidationCode_INVALID_OTHER_REASON
}
if
ccEvent
.
ChaincodeId
!=
ccID
{
...
...
@@ -179,7 +199,7 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
// validate *EACH* read write set according to its chaincode's endorsement policy
for
_
,
ns
:=
range
wrNamespace
{
// Get latest chaincode version, vscc and validate policy
txcc
,
vscc
,
policy
,
err
:=
v
.
GetInfoForValidate
(
chdr
,
ns
)
txcc
,
vscc
,
policy
,
err
:=
v
.
GetInfoForValidate
(
chdr
.
ChannelHeader
,
ns
)
if
err
!=
nil
{
logger
.
Errorf
(
"GetInfoForValidate for txId = %s returned error: %+v"
,
chdr
.
TxId
,
err
)
return
err
,
peer
.
TxValidationCode_INVALID_OTHER_REASON
...
...
@@ -198,7 +218,7 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
ctx
:=
&
Context
{
Seq
:
seq
,
Envelope
:
envBytes
,
Block
:
block
,
Block
:
block
.
Block
,
TxID
:
chdr
.
TxId
,
Channel
:
chdr
.
ChannelId
,
Namespace
:
ns
,
...
...
@@ -225,7 +245,7 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
}
// Get latest chaincode version, vscc and validate policy
_
,
vscc
,
policy
,
err
:=
v
.
GetInfoForValidate
(
chdr
,
ccID
)
_
,
vscc
,
policy
,
err
:=
v
.
GetInfoForValidate
(
chdr
.
ChannelHeader
,
ccID
)
if
err
!=
nil
{
logger
.
Errorf
(
"GetInfoForValidate for txId = %s returned error: %+v"
,
chdr
.
TxId
,
err
)
return
err
,
peer
.
TxValidationCode_INVALID_OTHER_REASON
...
...
@@ -239,7 +259,7 @@ func (v *VsccValidatorImpl) VSCCValidateTx(seq int, payload *common.Payload, env
ctx
:=
&
Context
{
Seq
:
seq
,
Envelope
:
envBytes
,
Block
:
block
,
Block
:
block
.
Block
,
TxID
:
chdr
.
TxId
,
Channel
:
chdr
.
ChannelId
,
Namespace
:
ccID
,
...
...
@@ -269,7 +289,6 @@ func (v *VsccValidatorImpl) VSCCValidateTxForCC(ctx *Context) error {
if
e
,
isExecutionError
:=
err
.
(
*
validation
.
ExecutionFailureError
);
isExecutionError
{
return
&
commonerrors
.
VSCCExecutionFailureError
{
Err
:
e
}
}
// Else, treat it as an endorsement error.
return
&
commonerrors
.
VSCCEndorsementPolicyError
{
Err
:
err
}
}
...
...
@@ -359,7 +378,7 @@ func (v *VsccValidatorImpl) GetInfoForValidate(chdr *common.ChannelHeader, ccID
// txWritesToNamespace returns true if the supplied NsRwSet
// performs a ledger write
func
(
v
*
VsccValidatorImpl
)
txWritesToNamespace
(
ns
*
rwsetutil
.
NsRwSet
)
bool
{
func
(
v
*
VsccValidatorImpl
)
txWritesToNamespace
(
ns
*
cached
.
NsRwSet
)
bool
{
// check for public writes first
if
ns
.
KvRwSet
!=
nil
&&
len
(
ns
.
KvRwSet
.
Writes
)
>
0
{
return
true
...
...
core/common/validation/msgvalidation.go
View file @
cddd278f
...
...
@@ -22,6 +22,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/fastfabric-extensions/cached"
mspmgmt
"github.com/hyperledger/fabric/msp/mgmt"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/msp"
...
...
@@ -91,9 +92,9 @@ func ValidateProposalMessage(signedProp *pb.SignedProposal) (*pb.Proposal, *comm
if
err
!=
nil
{
return
nil
,
nil
,
nil
,
err
}
uhdr
:=
&
cached
.
Header
{
Header
:
hdr
}
// validate the header
chdr
,
shdr
,
err
:=
validateCommonHeader
(
hdr
)
chdr
,
shdr
,
err
:=
validateCommonHeader
(
u
hdr
)
if
err
!=
nil
{
return
nil
,
nil
,
nil
,
err
}
...
...
@@ -211,9 +212,9 @@ func validateSignatureHeader(sHdr *common.SignatureHeader) error {
}
// checks for a valid ChannelHeader
func
validateChannelHeader
(
cHdr
*
c
ommon
.
ChannelHeader
)
error
{
func
validateChannelHeader
(
cHdr
*
c
ached
.
ChannelHeader
)
error
{
// check for nil argument
if
cHdr
==
nil
{
if
cHdr
.
ChannelHeader
==
nil
{
return
errors
.
New
(
"nil ChannelHeader provided"
)
}
...
...
@@ -243,17 +244,17 @@ func validateChannelHeader(cHdr *common.ChannelHeader) error {
}
// checks for a valid Header
func
validateCommonHeader
(
hdr
*
c
ommon
.
Header
)
(
*
c
ommon
.
ChannelHeader
,
*
common
.
SignatureHeader
,
error
)
{
if
hdr
==
nil
{
func
validateCommonHeader
(
hdr
*
c
ached
.
Header
)
(
*
c
ached
.
ChannelHeader
,
*
common
.
SignatureHeader
,
error
)
{
if
hdr
.
Header
==
nil
{
return
nil
,
nil
,
errors
.
New
(
"nil header"
)
}
chdr
,
err
:=
utils
.
UnmarshalChannelHeader
(
hdr
.
ChannelHeader
)
chdr
,
err
:=
hdr
.
UnmarshalChannelHeader
()
if
err
!=
nil
{
return
nil
,
nil
,
err
}