Commit 13952e9e authored by manish's avatar manish Committed by Matthew Sykes
Browse files

FAB-12803 Add transaction level metrics



This CR introduces a counter metric that is incremented
with each transaction processed. The labels attaches to
the this counter include channel_name, transaction_type,
chaincode_name, and validation_code.

Change-Id: Id30bf318be29ec3c4e99c64d14383fa62f136aef
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 0f1207a4
......@@ -28,6 +28,18 @@ type BlockGenerator struct {
t *testing.T
}
type TxDetails struct {
TxID string
ChaincodeName, ChaincodeVersion string
SimulationResults []byte
}
type BlockDetails struct {
BlockNum uint64
PreviousHash []byte
Txs []*TxDetails
}
// NewBlockGenerator instantiates new BlockGenerator for testing
func NewBlockGenerator(t *testing.T, ledgerID string, signTxs bool) (*BlockGenerator, *common.Block) {
gb, err := test.MakeGenesisBlock(ledgerID)
......@@ -79,22 +91,45 @@ func (bg *BlockGenerator) NextTestBlocks(numBlocks int) []*common.Block {
// ConstructTransaction constructs a transaction for testing
func ConstructTransaction(_ *testing.T, simulationResults []byte, txid string, sign bool) (*common.Envelope, string, error) {
return ConstructTransactionFromTxDetails(
&TxDetails{
ChaincodeName: "foo",
ChaincodeVersion: "v1",
TxID: txid,
SimulationResults: simulationResults,
},
sign,
)
}
func ConstructTransactionFromTxDetails(txDetails *TxDetails, sign bool) (*common.Envelope, string, error) {
ccid := &pb.ChaincodeID{
Name: "foo",
Version: "v1",
Name: txDetails.ChaincodeName,
Version: txDetails.ChaincodeVersion,
}
//response := &pb.Response{Status: 200}
var txID string
var txEnv *common.Envelope
var err error
var txID string
if sign {
txEnv, txID, err = ptestutils.ConstructSignedTxEnvWithDefaultSigner(util.GetTestChainID(), ccid, nil, simulationResults, txid, nil, nil)
txEnv, txID, err = ptestutils.ConstructSignedTxEnvWithDefaultSigner(util.GetTestChainID(), ccid, nil, txDetails.SimulationResults, txDetails.TxID, nil, nil)
} else {
txEnv, txID, err = ptestutils.ConstructUnsignedTxEnv(util.GetTestChainID(), ccid, nil, simulationResults, txid, nil, nil)
txEnv, txID, err = ptestutils.ConstructUnsignedTxEnv(util.GetTestChainID(), ccid, nil, txDetails.SimulationResults, txDetails.TxID, nil, nil)
}
return txEnv, txID, err
}
func ConstructBlockFromBlockDetails(t *testing.T, blockDetails *BlockDetails, sign bool) *common.Block {
var envs []*common.Envelope
for _, txDetails := range blockDetails.Txs {
env, _, err := ConstructTransactionFromTxDetails(txDetails, sign)
if err != nil {
t.Fatalf("ConstructTestTransaction failed, err %s", err)
}
envs = append(envs, env)
}
return NewBlock(envs, blockDetails.BlockNum, blockDetails.PreviousHash)
}
func ConstructBlockWithTxid(t *testing.T, blockNum uint64, previousHash []byte, simulationResults [][]byte, txids []string, sign bool) *common.Block {
envs := []*common.Envelope{}
for i := 0; i < len(simulationResults); i++ {
......
......@@ -266,7 +266,7 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
startBlockProcessing := time.Now()
logger.Debugf("[%s] Validating state for block [%d]", l.ledgerID, blockNo)
err = l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)
txstatsInfo, err := l.txtmgmt.ValidateAndPrepare(pvtdataAndBlock, true)
if err != nil {
return err
}
......@@ -310,6 +310,7 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er
elapsedBlockProcessing,
elapsedCommitBlockStorage,
elapsedCommitState,
txstatsInfo,
)
return nil
}
......@@ -319,11 +320,13 @@ func (l *kvLedger) updateBlockStats(
blockProcessingTime time.Duration,
blockstorageCommitTime time.Duration,
statedbCommitTime time.Duration,
txstatsInfo []*txmgr.TxStatInfo,
) {
l.stats.updateBlockchainHeight(blockNum + 1)
l.stats.updateBlockProcessingTime(blockProcessingTime)
l.stats.updateBlockstorageCommitTime(blockstorageCommitTime)
l.stats.updateStatedbCommitTime(statedbCommitTime)
l.stats.updateTransactionsStats(txstatsInfo)
}
// GetMissingPvtDataInfoForMostRecentBlocks returns the missing private data information for the
......
......@@ -223,7 +223,8 @@ func TestKVLedgerDBRecovery(t *testing.T) {
map[string]string{"key1": "value1.2", "key2": "value2.2", "key3": "value3.2"},
map[string]string{"key1": "pvtValue1.2", "key2": "pvtValue2.2", "key3": "pvtValue3.2"})
assert.NoError(t, ledger.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata2, true))
_, err := ledger.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata2, true)
assert.NoError(t, err)
assert.NoError(t, ledger.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtdata2))
// block storage should be as of block-2 but the state and history db should be as of block-1
......@@ -272,7 +273,8 @@ func TestKVLedgerDBRecovery(t *testing.T) {
map[string]string{"key1": "value1.3", "key2": "value2.3", "key3": "value3.3"},
map[string]string{"key1": "pvtValue1.3", "key2": "pvtValue2.3", "key3": "pvtValue3.3"},
)
assert.NoError(t, ledger.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata3, true))
_, err = ledger.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata3, true)
assert.NoError(t, err)
assert.NoError(t, ledger.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtdata3))
// committing the transaction to state DB
assert.NoError(t, ledger.(*kvLedger).txtmgmt.Commit())
......@@ -324,8 +326,8 @@ func TestKVLedgerDBRecovery(t *testing.T) {
map[string]string{"key1": "value1.4", "key2": "value2.4", "key3": "value3.4"},
map[string]string{"key1": "pvtValue1.4", "key2": "pvtValue2.4", "key3": "pvtValue3.4"},
)
assert.NoError(t, ledger.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata4, true))
_, err = ledger.(*kvLedger).txtmgmt.ValidateAndPrepare(blockAndPvtdata4, true)
assert.NoError(t, err)
assert.NoError(t, ledger.(*kvLedger).blockStore.CommitWithPvtData(blockAndPvtdata4))
assert.NoError(t, ledger.(*kvLedger).historyDB.Commit(blockAndPvtdata4.Block))
......
......@@ -9,8 +9,7 @@ import (
"time"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
)
type stats struct {
......@@ -60,18 +59,29 @@ func (s *ledgerStats) updateStatedbCommitTime(timeTaken time.Duration) {
s.stats.statedbCommitTime.With("channel_name", s.ledgerid).Observe(timeTaken.Seconds())
}
func (s *ledgerStats) updateTransactionCounts(
transactionType common.HeaderType,
chaincodeName string,
validatioCode peer.TxValidationCode,
func (s *ledgerStats) updateTransactionsStats(
txstatsInfo []*txmgr.TxStatInfo,
) {
s.stats.transactionsCount.
With(s.ledgerid,
transactionType.String(),
chaincodeName,
validatioCode.String(),
).
Add(1)
for _, txstat := range txstatsInfo {
transactionTypeStr := "CouldNotDetermine"
if txstat.TxType != -1 {
transactionTypeStr = txstat.TxType.String()
}
chaincodeName := ""
if txstat.ChaincodeID != nil {
chaincodeName = txstat.ChaincodeID.Name
}
s.stats.transactionsCount.
With(
"channel_name", s.ledgerid,
"transaction_type", transactionTypeStr,
"chaincode_name", chaincodeName,
"validation_code", txstat.ValidationCode.String(),
).
Add(1)
}
}
var (
......@@ -119,7 +129,7 @@ var (
Subsystem: "",
Name: "transaction_counts",
Help: "Number of transactions processed.",
LabelNames: []string{"channel_name", "transaction_type", "chaincode", "validation_code"},
StatsdFormat: "%{#fqname}.%{channel_name}.%{transaction_type}.%{chaincode}.%{validation_code}",
LabelNames: []string{"channel_name", "transaction_type", "chaincode_name", "validation_code"},
StatsdFormat: "%{#fqname}.%{channel_name}.%{transaction_type}.%{chaincode_name}.%{validation_code}",
}
)
......@@ -13,7 +13,10 @@ import (
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
lgr "github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/mock"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
"github.com/stretchr/testify/assert"
)
......@@ -54,13 +57,13 @@ func TestStatsBlockchainHeight(t *testing.T) {
// invoke updateBlockStats api explicitly and verify the calls with fake metrics
ledger.updateBlockStats(
10, 1*time.Second, 2*time.Second, 3*time.Second,
10, 1*time.Second, 2*time.Second, 3*time.Second, nil,
)
assert.Equal(t, []string{"channel_name", ledgerid}, fakeBlockchainHeightGauge.WithArgsForCall(3))
assert.Equal(t, float64(11), fakeBlockchainHeightGauge.SetArgsForCall(3))
}
func TestStatsBlockCommitTimings(t *testing.T) {
func TestStatsBlockCommit(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
testMetricProvider := testutilConstructMetricProvider()
......@@ -81,31 +84,95 @@ func TestStatsBlockCommitTimings(t *testing.T) {
defer ledger.Close()
// calls during committing genesis block
assert.Equal(t, []string{"channel_name", ledgerid},
testMetricProvider.fakeBlockProcessingTimeHist.WithArgsForCall(0))
assert.Equal(t, []string{"channel_name", ledgerid},
testMetricProvider.fakeBlockstorageCommitTimeHist.WithArgsForCall(0))
assert.Equal(t, []string{"channel_name", ledgerid},
testMetricProvider.fakeStatedbCommitTimeHist.WithArgsForCall(0))
assert.Equal(t,
[]string{"channel_name", ledgerid},
testMetricProvider.fakeBlockProcessingTimeHist.WithArgsForCall(0),
)
assert.Equal(t,
[]string{"channel_name", ledgerid},
testMetricProvider.fakeBlockstorageCommitTimeHist.WithArgsForCall(0),
)
assert.Equal(t,
[]string{"channel_name", ledgerid},
testMetricProvider.fakeStatedbCommitTimeHist.WithArgsForCall(0),
)
assert.Equal(t,
[]string{
"channel_name", ledgerid,
"transaction_type", common.HeaderType_CONFIG.String(),
"chaincode_name", "",
"validation_code", peer.TxValidationCode_VALID.String(),
},
testMetricProvider.fakeTransactionsCount.WithArgsForCall(0),
)
// invoke updateBlockStats api explicitly and verify the calls with fake metrics
ledger.updateBlockStats(
10, 1*time.Second, 2*time.Second, 3*time.Second,
[]*txmgr.TxStatInfo{
{
ValidationCode: peer.TxValidationCode_VALID,
TxType: common.HeaderType_ENDORSER_TRANSACTION,
ChaincodeID: &peer.ChaincodeID{Name: "mycc"},
NumCollections: 2,
},
{
ValidationCode: peer.TxValidationCode_INVALID_OTHER_REASON,
TxType: -1,
},
},
)
assert.Equal(t,
[]string{"channel_name", ledgerid},
testMetricProvider.fakeBlockProcessingTimeHist.WithArgsForCall(1),
)
assert.Equal(t,
float64(1),
testMetricProvider.fakeBlockProcessingTimeHist.ObserveArgsForCall(1),
)
assert.Equal(t,
[]string{"channel_name", ledgerid},
testMetricProvider.fakeBlockstorageCommitTimeHist.WithArgsForCall(1),
)
assert.Equal(t,
float64(2),
testMetricProvider.fakeBlockstorageCommitTimeHist.ObserveArgsForCall(1),
)
assert.Equal(t,
[]string{"channel_name", ledgerid},
testMetricProvider.fakeStatedbCommitTimeHist.WithArgsForCall(1),
)
assert.Equal(t,
float64(3),
testMetricProvider.fakeStatedbCommitTimeHist.ObserveArgsForCall(1),
)
assert.Equal(t,
[]string{
"channel_name", ledgerid,
"transaction_type", common.HeaderType_ENDORSER_TRANSACTION.String(),
"chaincode_name", "mycc",
"validation_code", peer.TxValidationCode_VALID.String(),
},
testMetricProvider.fakeTransactionsCount.WithArgsForCall(1),
)
assert.Equal(t,
float64(1),
testMetricProvider.fakeTransactionsCount.AddArgsForCall(1),
)
assert.Equal(t, []string{"channel_name", ledgerid},
testMetricProvider.fakeBlockProcessingTimeHist.WithArgsForCall(1))
assert.Equal(t, float64(1),
testMetricProvider.fakeBlockProcessingTimeHist.ObserveArgsForCall(1))
assert.Equal(t, []string{"channel_name", ledgerid},
testMetricProvider.fakeBlockstorageCommitTimeHist.WithArgsForCall(1))
assert.Equal(t, float64(2),
testMetricProvider.fakeBlockstorageCommitTimeHist.ObserveArgsForCall(1))
assert.Equal(t, []string{"channel_name", ledgerid},
testMetricProvider.fakeStatedbCommitTimeHist.WithArgsForCall(1))
assert.Equal(t, float64(3),
testMetricProvider.fakeStatedbCommitTimeHist.ObserveArgsForCall(1))
assert.Equal(t,
[]string{
"channel_name", ledgerid,
"transaction_type", "CouldNotDetermine",
"chaincode_name", "",
"validation_code", peer.TxValidationCode_INVALID_OTHER_REASON.String(),
},
testMetricProvider.fakeTransactionsCount.WithArgsForCall(2),
)
assert.Equal(t,
float64(1),
testMetricProvider.fakeTransactionsCount.AddArgsForCall(2),
)
}
type testMetricProvider struct {
......
......@@ -232,6 +232,19 @@ func collHashedRwSetFromProtoMsg(protoMsg *rwset.CollectionHashedReadWriteSet) (
return colHashedRwSet, nil
}
func (txRwSet *TxRwSet) NumCollections() int {
if txRwSet == nil {
return 0
}
numColls := 0
for _, nsRwset := range txRwSet.NsRwSets {
for range nsRwset.CollHashedRwSets {
numColls++
}
}
return numColls
}
///////////////////////////////////////////////////////////////////////////////
// functions for private read-write set
///////////////////////////////////////////////////////////////////////////////
......
......@@ -125,6 +125,13 @@ func TestCollHashedRwSetConversion(t *testing.T) {
assert.Equal(t, collHashedRwSet.PvtRwSetHash, collHashedRwSet1.PvtRwSetHash)
}
func TestNumCollections(t *testing.T) {
var txRwSet *TxRwSet
assert.Equal(t, 0, txRwSet.NumCollections()) // nil TxRwSet
assert.Equal(t, 0, (&TxRwSet{}).NumCollections()) // empty TxRwSet
assert.Equal(t, 4, sampleTxRwSet().NumCollections()) // sample TxRwSet
}
func sampleTxRwSet() *TxRwSet {
txRwSet := &TxRwSet{}
txRwSet.NsRwSets = append(txRwSet.NsRwSets, sampleNsRwSet("ns-1"))
......
......@@ -99,7 +99,8 @@ func testPrivateDataMetadataRetrievalByHash(t *testing.T, env testEnv) {
s1.SetPrivateDataMetadata("ns", "coll", key1, metadata1)
s1.Done()
blkAndPvtdata1 := prepareNextBlockForTestFromSimulator(t, bg, s1)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata1, true))
_, err := txMgr.ValidateAndPrepare(blkAndPvtdata1, true)
assert.NoError(t, err)
assert.NoError(t, txMgr.Commit())
t.Run("query-helper-for-queryexecutor", func(t *testing.T) {
......
......@@ -16,6 +16,7 @@ import (
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/privacyenabledstate"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/pvtstatepurgemgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/queryutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/validator/valimpl"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
......@@ -95,22 +96,24 @@ func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, er
}
// ValidateAndPrepare implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) error {
func (txmgr *LockBasedTxMgr) ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) (
[]*txmgr.TxStatInfo, error,
) {
block := blockAndPvtdata.Block
logger.Debugf("Waiting for purge mgr to finish the background job of computing expirying keys for the block")
txmgr.pvtdataPurgeMgr.WaitForPrepareToFinish()
logger.Debugf("Validating new block with num trans = [%d]", len(block.Data.Data))
batch, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
batch, txstatsInfo, err := txmgr.validator.ValidateAndPrepareBatch(blockAndPvtdata, doMVCCValidation)
if err != nil {
txmgr.reset()
return err
return nil, err
}
txmgr.current = &current{block: block, batch: batch}
if err := txmgr.invokeNamespaceListeners(); err != nil {
txmgr.reset()
return err
return nil, err
}
return nil
return txstatsInfo, nil
}
// RemoveStaleAndCommitPvtDataOfOldBlocks implements method in interface `txmgmt.TxMgr`
......@@ -503,7 +506,7 @@ func (txmgr *LockBasedTxMgr) ShouldRecover(lastAvailableBlock uint64) (bool, uin
func (txmgr *LockBasedTxMgr) CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvtData) error {
block := blockAndPvtdata.Block
logger.Debugf("Constructing updateSet for the block %d", block.Header.Number)
if err := txmgr.ValidateAndPrepare(blockAndPvtdata, false); err != nil {
if _, err := txmgr.ValidateAndPrepare(blockAndPvtdata, false); err != nil {
return err
}
......
......@@ -125,7 +125,7 @@ func newTxMgrTestHelper(t *testing.T, txMgr txmgr.TxMgr) *txMgrTestHelper {
func (h *txMgrTestHelper) validateAndCommitRWSet(txRWSet *rwset.TxReadWriteSet) {
rwSetBytes, _ := proto.Marshal(txRWSet)
block := h.bg.NextBlock([][]byte{rwSetBytes})
err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, PvtData: nil}, true)
_, err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, PvtData: nil}, true)
assert.NoError(h.t, err)
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
invalidTxNum := 0
......@@ -142,7 +142,7 @@ func (h *txMgrTestHelper) validateAndCommitRWSet(txRWSet *rwset.TxReadWriteSet)
func (h *txMgrTestHelper) checkRWsetInvalid(txRWSet *rwset.TxReadWriteSet) {
rwSetBytes, _ := proto.Marshal(txRWSet)
block := h.bg.NextBlock([][]byte{rwSetBytes})
err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, PvtData: nil}, true)
_, err := h.txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block, PvtData: nil}, true)
assert.NoError(h.t, err)
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
invalidTxNum := 0
......
......@@ -127,7 +127,7 @@ func TestStateListenerQueryExecutor(t *testing.T) {
block := testutil.ConstructBlock(t, 1, nil, [][]byte{simResBytes}, false)
// invoke ValidateAndPrepare function
err = txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block}, false)
_, err = txMgr.ValidateAndPrepare(&ledger.BlockAndPvtData{Block: block}, false)
assert.NoError(t, err)
// validate that the query executors passed to the state listener
......
......@@ -1265,7 +1265,8 @@ func TestRemoveStaleAndCommitPvtDataOfOldBlocksWithExpiry(t *testing.T) {
// stored pvt key would get expired and purged while committing block 3
blkAndPvtdata := prepareNextBlockForTest(t, txMgr, bg, "txid-1",
map[string]string{"pubkey1": "pub-value1"}, map[string]string{"pvtkey1": "pvt-value1"}, true)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err := txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
// committing block 1
assert.NoError(t, txMgr.Commit())
......@@ -1299,7 +1300,8 @@ func TestRemoveStaleAndCommitPvtDataOfOldBlocksWithExpiry(t *testing.T) {
// stored pvt key would get expired and purged while committing block 4
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-2",
map[string]string{"pubkey2": "pub-value2"}, map[string]string{"pvtkey2": "pvt-value2"}, true)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err = txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
// committing block 2
assert.NoError(t, txMgr.Commit())
......@@ -1313,7 +1315,8 @@ func TestRemoveStaleAndCommitPvtDataOfOldBlocksWithExpiry(t *testing.T) {
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-3",
map[string]string{"pubkey3": "pub-value3"}, nil, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err = txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
// committing block 3
assert.NoError(t, txMgr.Commit())
......@@ -1341,7 +1344,8 @@ func TestRemoveStaleAndCommitPvtDataOfOldBlocksWithExpiry(t *testing.T) {
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-4",
map[string]string{"pubkey4": "pub-value4"}, nil, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err = txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
// committing block 4 and should purge pvtkey2
assert.NoError(t, txMgr.Commit())
......@@ -1419,7 +1423,8 @@ func TestTxSimulatorMissingPvtdataExpiry(t *testing.T) {
blkAndPvtdata := prepareNextBlockForTest(t, txMgr, bg, "txid-1",
map[string]string{"pubkey1": "pub-value1"}, map[string]string{"pvtkey1": "pvt-value1"}, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err := txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
assert.NoError(t, txMgr.Commit())
simulator, _ := txMgr.NewTxSimulator("tx-tmp")
......@@ -1429,8 +1434,10 @@ func TestTxSimulatorMissingPvtdataExpiry(t *testing.T) {
simulator.Done()
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-2",
map[string]string{"pubkey1": "pub-value2"}, map[string]string{"pvtkey2": "pvt-value2"}, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err = txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
assert.NoError(t, txMgr.Commit())
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
......@@ -1440,7 +1447,8 @@ func TestTxSimulatorMissingPvtdataExpiry(t *testing.T) {
blkAndPvtdata = prepareNextBlockForTest(t, txMgr, bg, "txid-2",
map[string]string{"pubkey1": "pub-value3"}, map[string]string{"pvtkey3": "pvt-value3"}, false)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata, true))
_, err = txMgr.ValidateAndPrepare(blkAndPvtdata, true)
assert.NoError(t, err)
assert.NoError(t, txMgr.Commit())
simulator, _ = txMgr.NewTxSimulator("tx-tmp")
......@@ -1537,7 +1545,8 @@ func testTxWithPvtdataMetadata(t *testing.T, env testEnv, ns, coll string) {
s1.Done()
blkAndPvtdata1 := prepareNextBlockForTestFromSimulator(t, bg, s1)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata1, true))
_, err := txMgr.ValidateAndPrepare(blkAndPvtdata1, true)
assert.NoError(t, err)
assert.NoError(t, txMgr.Commit())
// Run query - key1 and key2 should return both value and metadata. Key3 should still be non-exsting in db
......@@ -1555,7 +1564,8 @@ func testTxWithPvtdataMetadata(t *testing.T, env testEnv, ns, coll string) {
s2.Done()
blkAndPvtdata2 := prepareNextBlockForTestFromSimulator(t, bg, s2)
assert.NoError(t, txMgr.ValidateAndPrepare(blkAndPvtdata2, true))
_, err = txMgr.ValidateAndPrepare(blkAndPvtdata2, true)
assert.NoError(t, err)
assert.NoError(t, txMgr.Commit())
// Run query - key1 should return updated metadata. Key2 should return 'nil' metadata
......
......@@ -19,13 +19,15 @@ package txmgr
import (
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
)
// TxMgr - an interface that a transaction manager should implement
type TxMgr interface {
NewQueryExecutor(txid string) (ledger.QueryExecutor, error)
NewTxSimulator(txid string) (ledger.TxSimulator, error)
ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) error
ValidateAndPrepare(blockAndPvtdata *ledger.BlockAndPvtData, doMVCCValidation bool) ([]*TxStatInfo, error)
RemoveStaleAndCommitPvtDataOfOldBlocks(blocksPvtData map[uint64][]*ledger.TxPvtData) error
GetLastSavepoint() (*version.Height, error)
ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error)
......@@ -35,6 +37,14 @@ type TxMgr interface {
Shutdown()
}
// TxStatInfo encapsulates information about a transaction
type TxStatInfo struct {
ValidationCode peer.TxValidationCode
TxType common.HeaderType
ChaincodeID *peer.ChaincodeID
NumCollections int
}
// ErrUnsupportedTransaction is expected to be thrown if a unsupported query is performed in an update transaction
type ErrUnsupportedTransaction struct {
Msg string
......