Commit 458c5212 authored by Mari Wade's avatar Mari Wade
Browse files

FAB-1336 Add new ledger blockstorage index.



Add a new ledger blockstorage index for History
that will map (blocknum,trannum) to the file storage
location for this block transaction

This index will be used for the API  GetTransactionsForKey()
for (chaincode1,key1).  It will do a key range query on chaincode1~key1
to pick up all chaincode1~key1 records.  Results will indicate
the set of (blocknum,trannum) transactions that updated this key.

Change-Id: I81da09e5526d7e2966634c78a03d34011d514442
Signed-off-by: default avatarMari Wade <mariwade@us.ibm.com>
parent d18aa985
......@@ -30,9 +30,10 @@ type IndexableAttr string
// constants for indexable attributes
const (
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNum = IndexableAttr("BlockNum")
IndexableAttrBlockHash = IndexableAttr("BlockHash")
IndexableAttrTxID = IndexableAttr("TxID")
IndexableAttrBlockNumTranNum = IndexableAttr("BlockNumTranNum")
)
// IndexConfig - a configuration that includes a list of attributes that should be indexed
......
......@@ -27,7 +27,13 @@ import (
type serializedBlockInfo struct {
blockHeader *common.BlockHeader
txOffsets map[string]*locPointer
txOffsets []*txindexInfo
}
//The order of the transactions must be maintained for history
type txindexInfo struct {
txID string
loc *locPointer
}
func serializeBlock(block *common.Block) ([]byte, *serializedBlockInfo, error) {
......@@ -94,8 +100,9 @@ func addHeaderBytes(blockHeader *common.BlockHeader, buf *proto.Buffer) error {
return nil
}
func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*locPointer, error) {
txOffsets := make(map[string]*locPointer)
func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) ([]*txindexInfo, error) {
var txOffsets []*txindexInfo
if err := buf.EncodeVarint(uint64(len(blockData.Data))); err != nil {
return nil, err
}
......@@ -108,7 +115,8 @@ func addDataBytes(blockData *common.BlockData, buf *proto.Buffer) (map[string]*l
if err := buf.EncodeRawBytes(txEnvelopeBytes); err != nil {
return nil, err
}
txOffsets[txid] = &locPointer{offset, len(buf.Bytes()) - offset}
idxInfo := &txindexInfo{txid, &locPointer{offset, len(buf.Bytes()) - offset}}
txOffsets = append(txOffsets, idxInfo)
}
return txOffsets, nil
}
......@@ -147,9 +155,9 @@ func extractHeader(buf *ledgerutil.Buffer) (*common.BlockHeader, error) {
return header, nil
}
func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPointer, error) {
func extractData(buf *ledgerutil.Buffer) (*common.BlockData, []*txindexInfo, error) {
data := &common.BlockData{}
txOffsets := make(map[string]*locPointer)
var txOffsets []*txindexInfo
var numItems uint64
var err error
......@@ -167,7 +175,8 @@ func extractData(buf *ledgerutil.Buffer) (*common.BlockData, map[string]*locPoin
return nil, nil, err
}
data.Data = append(data.Data, txEnvBytes)
txOffsets[txid] = &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}
idxInfo := &txindexInfo{txid, &locPointer{txOffset, buf.GetBytesConsumed() - txOffset}}
txOffsets = append(txOffsets, idxInfo)
}
return data, txOffsets, nil
}
......
......@@ -50,12 +50,16 @@ func TestSerializedBlockInfo(t *testing.T) {
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, infoFromBB, info)
testutil.AssertEquals(t, len(info.txOffsets), len(block.Data.Data))
for _, txEnvBytes := range block.Data.Data {
for txIndex, txEnvBytes := range block.Data.Data {
txid, err := extractTxID(txEnvBytes)
testutil.AssertNoError(t, err, "")
offset, ok := info.txOffsets[txid]
testutil.AssertEquals(t, ok, true)
b := bb[offset.offset:]
indexInfo := info.txOffsets[txIndex]
indexTxID := indexInfo.txID
indexOffset := indexInfo.loc
testutil.AssertEquals(t, txid, indexTxID)
b := bb[indexOffset.offset:]
len, num := proto.DecodeVarint(b)
txEnvBytesFromBB := b[num : num+int(len)]
testutil.AssertEquals(t, txEnvBytesFromBB, txEnvBytes)
......
......@@ -304,7 +304,7 @@ func (mgr *blockfileMgr) addBlock(block *common.Block) error {
blockFLP.offset = currentOffset
// shift the txoffset because we prepend length of bytes before block bytes
for _, txOffset := range txOffsets {
txOffset.offset += len(blockBytesEncodedLen)
txOffset.loc.offset += len(blockBytesEncodedLen)
}
//save the index in the database
mgr.index.indexBlock(&blockIdxInfo{
......@@ -363,7 +363,7 @@ func (mgr *blockfileMgr) syncIndex() error {
return err
}
for _, offset := range info.txOffsets {
offset.offset += int(blockPlacementInfo.blockBytesOffset)
offset.loc.offset += int(blockPlacementInfo.blockBytesOffset)
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
......@@ -456,6 +456,15 @@ func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*pb.Transaction,
return mgr.fetchTransaction(loc)
}
func (mgr *blockfileMgr) retrieveTransactionForBlockNumTranNum(blockNum uint64, tranNum uint64) (*pb.Transaction, error) {
logger.Debugf("retrieveTransactionForBlockNumTranNum() - blockNum = [%d], tranNum = [%d]", blockNum, tranNum)
loc, err := mgr.index.getTXLocForBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
return mgr.fetchTransaction(loc)
}
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
blockBytes, err := mgr.fetchBlockBytes(lp)
if err != nil {
......
......@@ -27,10 +27,11 @@ import (
)
const (
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
indexCheckpointKeyStr = "indexCheckpointKey"
blockNumIdxKeyPrefix = 'n'
blockHashIdxKeyPrefix = 'h'
txIDIdxKeyPrefix = 't'
blockNumTranNumIdxKeyPrefix = 'a'
indexCheckpointKeyStr = "indexCheckpointKey"
)
var indexCheckpointKey = []byte(indexCheckpointKeyStr)
......@@ -41,13 +42,14 @@ type index interface {
getBlockLocByHash(blockHash []byte) (*fileLocPointer, error)
getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, error)
getTxLoc(txID string) (*fileLocPointer, error)
getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error)
}
type blockIdxInfo struct {
blockNum uint64
blockHash []byte
flp *fileLocPointer
txOffsets map[string]*locPointer
txOffsets []*txindexInfo
}
type blockIndex struct {
......@@ -89,25 +91,42 @@ func (index *blockIndex) indexBlock(blockIdxInfo *blockIdxInfo) error {
return err
}
//Index1
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockHash]; ok {
batch.Put(constructBlockHashKey(blockIdxInfo.blockHash), flpBytes)
}
//Index2
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNum]; ok {
batch.Put(constructBlockNumKey(blockIdxInfo.blockNum), flpBytes)
}
//Index3 Used to find a transactin by it's transaction id
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrTxID]; ok {
for txid, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset)
logger.Debugf("Adding txLoc [%s] for tx [%s] to index", txFlp, txid)
for _, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx ID: [%s] to index", txFlp, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructTxIDKey(txid), txFlpBytes)
batch.Put(constructTxIDKey(txoffset.txID), txFlpBytes)
}
}
//Index4 - Store BlockNumTranNum will be used to query history data
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; ok {
for txIterator, txoffset := range txOffsets {
txFlp := newFileLocationPointer(flp.fileSuffixNum, flp.offset, txoffset.loc)
logger.Debugf("Adding txLoc [%s] for tx number:[%d] ID: [%s] to blockNumTranNum index", txFlp, txIterator+1, txoffset.txID)
txFlpBytes, marshalErr := txFlp.marshal()
if marshalErr != nil {
return marshalErr
}
batch.Put(constructBlockNumTranNumKey(blockIdxInfo.blockNum, uint64(txIterator+1)), txFlpBytes)
}
}
batch.Put(indexCheckpointKey, encodeBlockNum(blockIdxInfo.blockNum))
if err := index.db.WriteBatch(batch, false); err != nil {
return err
......@@ -163,6 +182,22 @@ func (index *blockIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return txFLP, nil
}
func (index *blockIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
if _, ok := index.indexItemsMap[blkstorage.IndexableAttrBlockNumTranNum]; !ok {
return nil, blkstorage.ErrAttrNotIndexed
}
b, err := index.db.Get(constructBlockNumTranNumKey(blockNum, tranNum))
if err != nil {
return nil, err
}
if b == nil {
return nil, blkstorage.ErrNotFoundInIndex
}
txFLP := &fileLocPointer{}
txFLP.unmarshal(b)
return txFLP, nil
}
func constructBlockNumKey(blockNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
return append([]byte{blockNumIdxKeyPrefix}, blkNumBytes...)
......@@ -176,6 +211,13 @@ func constructTxIDKey(txID string) []byte {
return append([]byte{txIDIdxKeyPrefix}, []byte(txID)...)
}
func constructBlockNumTranNumKey(blockNum uint64, txNum uint64) []byte {
blkNumBytes := util.EncodeOrderPreservingVarUint64(blockNum)
tranNumBytes := util.EncodeOrderPreservingVarUint64(txNum)
key := append(blkNumBytes, tranNumBytes...)
return append([]byte{blockNumTranNumIdxKeyPrefix}, key...)
}
func constructTxID(blockNum uint64, txNum int) string {
return fmt.Sprintf("%d:%d", blockNum, txNum)
}
......
......@@ -42,6 +42,9 @@ func (i *noopIndex) getBlockLocByBlockNum(blockNum uint64) (*fileLocPointer, err
func (i *noopIndex) getTxLoc(txID string) (*fileLocPointer, error) {
return nil, nil
}
func (i *noopIndex) getTXLocForBlockNumTranNum(blockNum uint64, tranNum uint64) (*fileLocPointer, error) {
return nil, nil
}
func TestBlockIndexSync(t *testing.T) {
testBlockIndexSync(t, 10, 5, false)
......@@ -103,7 +106,9 @@ func TestBlockIndexSelectiveIndexing(t *testing.T) {
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockNumTranNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrBlockHash, blkstorage.IndexableAttrBlockNum})
testBlockIndexSelectiveIndexing(t, []blkstorage.IndexableAttr{blkstorage.IndexableAttrTxID, blkstorage.IndexableAttrBlockNumTranNum})
}
func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.IndexableAttr) {
......@@ -149,4 +154,15 @@ func testBlockIndexSelectiveIndexing(t *testing.T, indexItems []blkstorage.Index
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
//test 'retrieveTrasnactionsByBlockNumTranNum
tx2, err := blockfileMgr.retrieveTransactionForBlockNumTranNum(1, 1)
if testutil.Contains(indexItems, blkstorage.IndexableAttrBlockNumTranNum) {
testutil.AssertNoError(t, err, "Error while retrieving tx by blockNum and tranNum")
txOrig2, err2 := extractTransaction(blocks[0].Data.Data[0])
testutil.AssertNoError(t, err2, "")
testutil.AssertEquals(t, tx2, txOrig2)
} else {
testutil.AssertSame(t, err, blkstorage.ErrAttrNotIndexed)
}
}
......@@ -39,6 +39,7 @@ func newTestEnv(t testing.TB) *testEnv {
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
os.RemoveAll(conf.dbPath)
os.RemoveAll(conf.blockfilesDir)
......
......@@ -74,6 +74,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)
......
......@@ -179,7 +179,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulator, _ := ledger.NewTxSimulator()
simulator.SetState("ns1", "key4", []byte("value1"))
simulator.SetState("ns1", "key5", []byte("value2"))
simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.SetState("ns1", "key6", []byte("{\"shipmentID\":\"161003PKC7300\",\"customsInvoice\":{\"methodOfTransport\":\"GROUND\",\"invoiceNumber\":\"00091622\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.SetState("ns1", "key7", []byte("{\"shipmentID\":\"161003PKC7600\",\"customsInvoice\":{\"methodOfTransport\":\"AIR MAYBE\",\"invoiceNumber\":\"00091624\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
......@@ -195,6 +195,7 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
Height: 1, CurrentBlockHash: block1Hash, PreviousBlockHash: []byte{}})
//Note key 4 and 6 are updates but key 7 is new. I.E. should see history for key 4 and 6 if history is enabled
simulationResults := [][]byte{}
simulator, _ = ledger.NewTxSimulator()
simulator.SetState("ns1", "key4", []byte("value3"))
simulator.SetState("ns1", "key5", []byte("{\"shipmentID\":\"161003PKC7500\",\"customsInvoice\":{\"methodOfTransport\":\"AIR FREIGHT\",\"invoiceNumber\":\"00091623\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
......@@ -203,7 +204,16 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
simulator.SetState("ns1", "key8", []byte("{\"shipmentID\":\"161003PKC7700\",\"customsInvoice\":{\"methodOfTransport\":\"SHIP\",\"invoiceNumber\":\"00091625\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
block2 := bg.NextBlock([][]byte{simRes}, false)
simulationResults = append(simulationResults, simRes)
//add a 2nd transaction
simulator2, _ := ledger.NewTxSimulator()
simulator2.SetState("ns1", "key9", []byte("value5"))
simulator2.SetState("ns1", "key10", []byte("{\"shipmentID\":\"261003PKC8000\",\"customsInvoice\":{\"methodOfTransport\":\"DONKEY\",\"invoiceNumber\":\"00091626\"},\"weightUnitOfMeasure\":\"KGM\",\"volumeUnitOfMeasure\": \"CO\",\"dimensionUnitOfMeasure\":\"CM\",\"currency\":\"USD\"}"))
simulator2.Done()
simRes2, _ := simulator2.GetTxSimulationResults()
simulationResults = append(simulationResults, simRes2)
block2 := bg.NextBlock(simulationResults, false)
ledger.RemoveInvalidTransactionsAndPrepare(block2)
ledger.Commit()
......@@ -225,6 +235,6 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
testutil.AssertEquals(t, b2, block2)
if ledgerconfig.IsHistoryDBEnabled() == true {
//TODO history specific test
//TODO history specific test once the query api's are in and we can validate content
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment