Commit f6c97e0f authored by manish's avatar manish
Browse files

[FAB-8557] Fix overwriting txid in block index



This CR fixes the current behavior of block store index.
In the current implemetation, the indexes that maintain
different pieces of information by txid are overwritten
by a duplicate txid (if any). This CR reverses this behavior
and keeps the first appearance of txid in the index. The future
duplicate txids will not overwrite it.

Though, either of these behaviors (keeping the first tx or
keeping the last tx) are not fully justified, primarily because
the problem itself is a paradox - in the sense that the ids
(that are supposed to be unique by definition) are duplicated.
However, the justification of moving from the current behavior
to the proposed behavior is that, its easy for someone to replay
the transaction or simply use an existing txid in a bogus transaction
just to exploit the current behavior of overwriting the block storage
index and preventing the legitimate user to query about the status
of their transactions.

Change-Id: I3b81ae61c756ef78253b58a94644778716fb0e16
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 8a780138
......@@ -31,8 +31,9 @@ type serializedBlockInfo struct {
//The order of the transactions must be maintained for history
type txindexInfo struct {
txID string
loc *locPointer
txID string
loc *locPointer
isDuplicate bool
}
func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error) {
......@@ -118,7 +119,7 @@ func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInf
if err := buf.EncodeRawBytes(txEnvelopeBytes); err != nil {
return nil, err
}
idxInfo := &txindexInfo{txid, &locPointer{offset, len(buf.Bytes()) - offset}}
idxInfo := &txindexInfo{txID: txid, loc: &locPointer{offset, len(buf.Bytes()) - offset}}
txOffsets = append(txOffsets, idxInfo)
}
return txOffsets, nil
......@@ -178,7 +179,7 @@ func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, err
return nil, nil, err
}
data.Data = append(data.Data, txEnvBytes)
idxInfo := &txindexInfo{txid, &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}}
idxInfo := &txindexInfo{txID: txid, loc: &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}}
txOffsets = append(txOffsets, idxInfo)
}
return data, txOffsets, nil
......
......@@ -142,7 +142,9 @@ func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig,
}
// Create a new KeyValue store database handler for the blocks index in the keyvalue database
mgr.index = newBlockIndex(indexConfig, indexStore)
if mgr.index, err = newBlockIndex(indexConfig, indexStore); err != nil {
panic(fmt.Sprintf("error in block index: %s", err))
}
// Update the manager with the checkpoint info and the file writer
mgr.cpInfo = cpInfo
......@@ -307,9 +309,11 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
if err = mgr.index.indexBlock(&blockIdxInfo{
blockNum: block.Header.Number, blockHash: blockHash,
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata})
flp: blockFLP, txOffsets: txOffsets, metadata: block.Metadata}); err != nil {
return err
}
//update the checkpoint info (for storage) and the blockchain info (for APIs) in the manager
mgr.updateCheckpoint(newCPInfo)
......
......@@ -22,8 +22,10 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/ledger/testutil"
ledgerutil "github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/peer"
putil "github.com/hyperledger/fabric/protos/utils"
)
......@@ -169,6 +171,74 @@ func TestBlockfileMgrGetTxById(t *testing.T) {
}
}
// TestBlockfileMgrGetTxByIdDuplicateTxid tests that a transaction with an existing txid
// (within same block or a different block) should not over-write the index by-txid (FAB-8557)
func TestBlockfileMgrGetTxByIdDuplicateTxid(t *testing.T) {
env := newTestEnv(t, NewConf(testPath(), 0))
defer env.Cleanup()
blkStore, err := env.provider.OpenBlockStore("testLedger")
testutil.AssertNoError(env.t, err, "")
blkFileMgr := blkStore.(*fsBlockStore).fileMgr
bg, gb := testutil.NewBlockGenerator(t, "testLedger", false)
testutil.AssertNoError(t, blkFileMgr.addBlock(gb), "")
block1 := bg.NextBlockWithTxid(
[][]byte{
[]byte("tx with id=txid-1"),
[]byte("tx with id=txid-2"),
[]byte("another tx with existing id=txid-1"),
},
[]string{"txid-1", "txid-2", "txid-1"},
)
txValidationFlags := ledgerutil.NewTxValidationFlags(3)
txValidationFlags.SetFlag(0, peer.TxValidationCode_VALID)
txValidationFlags.SetFlag(1, peer.TxValidationCode_INVALID_OTHER_REASON)
txValidationFlags.SetFlag(2, peer.TxValidationCode_DUPLICATE_TXID)
block1.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txValidationFlags
testutil.AssertNoError(t, blkFileMgr.addBlock(block1), "")
block2 := bg.NextBlockWithTxid(
[][]byte{
[]byte("tx with id=txid-3"),
[]byte("yet another tx with existing id=txid-1"),
},
[]string{"txid-3", "txid-1"},
)
txValidationFlags = ledgerutil.NewTxValidationFlags(2)
txValidationFlags.SetFlag(0, peer.TxValidationCode_VALID)
txValidationFlags.SetFlag(1, peer.TxValidationCode_DUPLICATE_TXID)
block2.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER] = txValidationFlags
testutil.AssertNoError(t, blkFileMgr.addBlock(block2), "")
txenvp1, err := putil.GetEnvelopeFromBlock(block1.Data.Data[0])
testutil.AssertNoError(t, err, "")
txenvp2, err := putil.GetEnvelopeFromBlock(block1.Data.Data[1])
testutil.AssertNoError(t, err, "")
txenvp3, err := putil.GetEnvelopeFromBlock(block2.Data.Data[0])
testutil.AssertNoError(t, err, "")
indexedTxenvp, _ := blkFileMgr.retrieveTransactionByID("txid-1")
testutil.AssertEquals(t, indexedTxenvp, txenvp1)
indexedTxenvp, _ = blkFileMgr.retrieveTransactionByID("txid-2")
testutil.AssertEquals(t, indexedTxenvp, txenvp2)
indexedTxenvp, _ = blkFileMgr.retrieveTransactionByID("txid-3")
testutil.AssertEquals(t, indexedTxenvp, txenvp3)
blk, _ := blkFileMgr.retrieveBlockByTxID("txid-1")
testutil.AssertEquals(t, blk, block1)
blk, _ = blkFileMgr.retrieveBlockByTxID("txid-2")
testutil.AssertEquals(t, blk, block1)
blk, _ = blkFileMgr.retrieveBlockByTxID("txid-3")
testutil.AssertEquals(t, blk, block2)
validationCode, _ := blkFileMgr.retrieveTxValidationCodeByTxID("txid-1")
testutil.AssertEquals(t, validationCode, peer.TxValidationCode_VALID)
validationCode, _ = blkFileMgr.retrieveTxValidationCodeByTxID("txid-2")
testutil.AssertEquals(t, validationCode, peer.TxValidationCode_INVALID_OTHER_REASON)
validationCode, _ = blkFileMgr.retrieveTxValidationCodeByTxID("txid-3")
testutil.AssertEquals(t, validationCode, peer.TxValidationCode_VALID)
}
func TestBlockfileMgrGetTxByBlockNumTranNum(t *testing.T) {
env := newTestEnv(t, NewConf(testPath(), 0))
defer env.Cleanup()
......
......@@ -67,14 +67,22 @@ type blockIndex struct {
db *leveldbhelper.DBHandle
}
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) *blockIndex {
func newBlockIndex(indexConfig *blkstorage.IndexConfig, db *leveldbhelper.DBHandle) (*blockIndex, error) {
indexItems := indexConfig.AttrsToIndex
logger.Debugf("newBlockIndex() - indexItems:[%s]", indexItems)
indexItemsMap := make(map[blkstorage.IndexableAttr]bool)
for _, indexItem := range indexItems {
indexItemsMap[indexItem] = true
}
return &blockIndex{indexItemsMap, db}
// This dependency is needed because the index 'IndexableAttrTxID' is used for detecting the duplicate txid
// and the results are reused in the other two indexes. Ideally, all three index should be merged into one
// for efficiency purpose - [FAB-10587]
if (indexItemsMap[blkstorage.IndexableAttrTxValidationCode] || indexItemsMap[blkstorage.IndexableAttrBlockTxID]) &&
!indexItemsMap[blkstorage.IndexableAttrTxID] {
return nil, fmt.Errorf("dependent index [%s] is not enabled for [%s] or [%s]",
blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockTxID)
}
return &blockIndex{indexItemsMap, db}, nil
}
func (index *blockIndex) getLastBlockIndexed() (uint64, error) {
......@@ -117,9 +125,17 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
//Index3 Used to find a transaction by it's transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
if err = index.markDuplicateTxids(blockIdxInfo); err != nil {
logger.Errorf("error while detecting duplicate txids:%s", err)
return err
}
for _, txoffset := range txOffsets {
if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
logger.Debugf("txid [%s] is a duplicate of a previous tx. Not indexing in txid-index", txoffset.txID)
continue
}
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to txid-index", txFlp, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
......@@ -144,6 +160,9 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
// Index5 - Store BlockNumber will be used to find block by transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockTxID]; ok {
for _, txoffset := range txOffsets {
if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
continue
}
batch.Put(constructBlockTxIDKey(txoffset.txID), flpBytes)
}
}
......@@ -151,6 +170,9 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
// Index6 - Store transaction validation result by transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxValidationCode]; ok {
for idx, txoffset := range txOffsets {
if txoffset.isDuplicate { // do not overwrite txid entry in the index - FAB-8557
continue
}
batch.Put(constructTxValidationCodeIDKey(txoffset.txID), []byte{byte(txsfltr.Flag(idx))})
}
}
......@@ -163,6 +185,28 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
return nil
}
func (index *blockIndex) markDuplicateTxids(blockIdxInfo *blockIdxInfo) error {
uniqueTxids := make(map[string]bool)
for _, txIdxInfo := range blockIdxInfo.txOffsets {
txid := txIdxInfo.txID
if uniqueTxids[txid] { // txid is duplicate of a previous tx in the block
txIdxInfo.isDuplicate = true
continue
}
loc, err := index.getTxLoc(txid)
if loc != nil { // txid is duplicate of a previous tx in the index
txIdxInfo.isDuplicate = true
continue
}
if err != blkstorage.ErrNotFoundInIndex {
return err
}
uniqueTxids[txid] = true
}
return nil
}
func (index *blockIndex) getBlockLocByHash(blockHash []byte) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
......
......@@ -118,6 +118,27 @@ func testBlockIndexSync(t *testing.T, numBlocks int, numBlocksToIndex int, syncB
})
}
func TestBlockIndexSelectiveIndexingWrongConfig(t *testing.T) {
testBlockIndexSelectiveIndexingWrongConfig(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockTxID})
testBlockIndexSelectiveIndexingWrongConfig(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxValidationCode})
testBlockIndexSelectiveIndexingWrongConfig(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockTxID, blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexingWrongConfig(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxValidationCode, blkstorage.IndexableAttrBlockNumTranNum})
}
func testBlockIndexSelectiveIndexingWrongConfig(t *testing.T, indexItems []blkstorage.IndexableAttr) {
var testName string
for _, s := range indexItems {
testName = testName + string(s)
}
t.Run(testName, func(t *testing.T) {
env := newTestEnvSelectiveIndexing(t, NewConf(testPath(), 0), indexItems)
defer env.Cleanup()
defer testutil.AssertPanic(t, "A panic is expected when index is opened with wrong configs")
env.provider.OpenBlockStore("test-ledger")
})
}
func TestBlockIndexSelectiveIndexing(t *testing.T) {
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash})
......@@ -126,8 +147,8 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) {
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNumTranNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockNumTranNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockTxID})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxValidationCode})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockTxID})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrTxValidationCode})
}
func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) {
......
......@@ -26,5 +26,24 @@ before referenced.
Please refer to https://jira.hyperledger.org/browse/FAB-10151 for more
details.
There is a change how some of the indexes maintained by ledger are updated.
Specifically, this include indexes that maintain information by txid. In a rare
scenario where there are more than one transactions with same transaction ids,
in the Previous releases, the last transaction would be indexed. However, in this
release, the behaviour is changed and the first transaction will not be overwritten
in the index. See https://jira.hyperledger.org/browse/FAB-8557 for more details.
This change may cause peers in a network giving different answers to the queries
that are answered using these indexes. This can happen when during upgrade when some of
the peers are still on the previous release and hence indexing transaction based on the
previous behavior. The queries that may show this effect include 'GetTransactionByID',
'GetBlockByTxID', via qscc. TODO - check whether 'GetTxValidationCodeByTxID' ledger
api is also exposed by some other mean.
As noted above that in a permissioned network, duplicate transaction ids would be a rare
situation, however, if the above mentioned different query response is observed, it can be
resolved by rebuilding the block storage indexes.
Change Log
----------
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment