Commit 9d90fdf6 authored by denyeart's avatar denyeart
Browse files

[FAB-2052] Finish ledger API GetHistoryForKey()



Finish the implementation for ledger API GetHistoryForKey().

Lookup the history of key updates by blockNum:tranNum.

For each history record, retrieve the txID and key value
from block storage. This requires passing a block storage handle
into the history query executor, and exposing the block storage
test environment to other packages. Historic txID and key value
is returned to client to enable simple provenance scenarios.

Added tests for GetHistoryForKey() including recovery scenarios.
One test required a fix to block storage syncIndex().
Also tested end-to-end with peer.

Change-Id: I988130e9682f5d8d707c4ec37753bc0e7e297269
Signed-off-by: default avatardenyeart <enyeart@us.ibm.com>
parent 4e051edc
......@@ -99,6 +99,7 @@ At start up a new manager:
*) Updates blockchain info used by the APIs
*/
func newBlockfileMgr(id string, conf *Conf, indexConfig *blkstorage.IndexConfig, indexStore *leveldbhelper.DBHandle) *blockfileMgr {
logger.Debugf("newBlockfileMgr() initializing file-based block storage for ledger: %s ", id)
//Determine the root directory for the blockfile storage, if it does not exist create it
rootDir := conf.getLedgerBlockDir(id)
_, err := util.CreateDirIfMissing(rootDir)
......@@ -343,6 +344,7 @@ func (mgr *blockfileMgr) syncIndex() error {
//Should be at the last block, but go ahead and loop looking for next blockBytes
//If there is another block, add it to the index
//TODO Currently this re-indexes the lastBlockIndexed every time. May be better to skip it.
for {
if blockBytes, blockPlacementInfo, err = stream.nextBlockBytesAndPlacementInfo(); err != nil {
return err
......@@ -354,9 +356,14 @@ func (mgr *blockfileMgr) syncIndex() error {
if err != nil {
return err
}
//The blockStartOffset will get applied to the txOffsets prior to indexing within indexBlock(),
//therefore just shift by the difference between blockBytesOffset and blockStartOffset
numBytesToShift := int(blockPlacementInfo.blockBytesOffset - blockPlacementInfo.blockStartOffset)
for _, offset := range info.txOffsets {
offset.loc.offset += int(blockPlacementInfo.blockBytesOffset)
offset.loc.offset += numBytesToShift
}
//Update the blockIndexInfo with what was actually stored in file system
blockIdxInfo := &blockIdxInfo{}
blockIdxInfo.blockHash = info.blockHeader.Hash()
......@@ -364,6 +371,7 @@ func (mgr *blockfileMgr) syncIndex() error {
blockIdxInfo.flp = &fileLocPointer{fileSuffixNum: blockPlacementInfo.fileNum,
locPointer: locPointer{offset: int(blockPlacementInfo.blockStartOffset)}}
blockIdxInfo.txOffsets = info.txOffsets
logger.Debugf("syncIndex() indexing block [%d]", blockIdxInfo.blockNum)
if err = mgr.index.indexBlock(blockIdxInfo); err != nil {
return err
}
......@@ -470,6 +478,7 @@ func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*common.Block, error) {
}
func (mgr *blockfileMgr) fetchTransactionEnvelope(lp *fileLocPointer) (*common.Envelope, error) {
logger.Debugf("Entering fetchTransactionEnvelope() %v\n", lp)
var err error
var txEnvelopeBytes []byte
if txEnvelopeBytes, err = mgr.fetchRawBytes(lp); err != nil {
......
......@@ -17,6 +17,7 @@ limitations under the License.
package fsblkstorage
import (
"bytes"
"fmt"
"github.com/golang/protobuf/proto"
......@@ -292,5 +293,16 @@ func (flp *fileLocPointer) String() string {
}
func (blockIdxInfo *blockIdxInfo) String() string {
return fmt.Sprintf("blockNum=%d, blockHash=%#v", blockIdxInfo.blockNum, blockIdxInfo.blockHash)
var buffer bytes.Buffer
for _, txOffset := range blockIdxInfo.txOffsets {
buffer.WriteString("txId=")
buffer.WriteString(txOffset.txID)
buffer.WriteString(" locPointer=")
buffer.WriteString(txOffset.loc.String())
buffer.WriteString("\n")
}
txOffsetsString := buffer.String()
return fmt.Sprintf("blockNum=%d, blockHash=%#v txOffsets=\n%s", blockIdxInfo.blockNum, blockIdxInfo.blockHash, txOffsetsString)
}
......@@ -16,8 +16,11 @@ limitations under the License.
package historydb
import "github.com/hyperledger/fabric/protos/common"
import "github.com/hyperledger/fabric/core/ledger"
import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/protos/common"
)
// HistoryDBProvider provides an instance of a history DB
type HistoryDBProvider interface {
......@@ -29,7 +32,7 @@ type HistoryDBProvider interface {
// HistoryDB - an interface that a history database should implement
type HistoryDB interface {
NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error)
NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error)
Commit(block *common.Block) error
GetBlockNumFromSavepoint() (uint64, error)
}
......@@ -17,6 +17,7 @@ limitations under the License.
package historyleveldb
import (
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
......@@ -82,16 +83,13 @@ func (historyDB *historyDB) Close() {
// Commit implements method in HistoryDB interface
func (historyDB *historyDB) Commit(block *common.Block) error {
logger.Debugf("Entering HistoryLevelDB.Commit()")
//Get the blocknumber off of the header
blockNo := block.Header.Number
//Set the starting tranNo to 0
var tranNo uint64
dbBatch := leveldbhelper.NewUpdateBatch()
logger.Debugf("Updating history for blockNo: %v with [%d] transactions",
logger.Debugf("Updating history database for blockNo [%v] with [%d] transactions",
blockNo, len(block.Data.Data))
//TODO add check for invalid trans in bit array
......@@ -110,7 +108,6 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
if common.HeaderType(payload.Header.ChainHeader.Type) == common.HeaderType_ENDORSER_TRANSACTION {
logger.Debugf("Updating history for tranNo: %d", tranNo)
// extract actions from the envelope message
respPayload, err := putils.GetActionFromEnvelope(envBytes)
if err != nil {
......@@ -133,9 +130,6 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
for _, kvWrite := range nsRWSet.Writes {
writeKey := kvWrite.Key
logger.Debugf("Writing history record for: ns=%s, key=%s, blockNo=%d tranNo=%d",
ns, writeKey, blockNo, tranNo)
//composite key for history records is in the form ns~key~blockNo~tranNo
compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)
......@@ -145,9 +139,8 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
}
} else {
logger.Debugf("Skipping transaction %d since it is not an endorsement transaction\n", tranNo)
logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
}
}
// add savepoint for recovery purpose
......@@ -159,12 +152,13 @@ func (historyDB *historyDB) Commit(block *common.Block) error {
return err
}
logger.Debugf("Updates committed to history database for blockNo [%v]", blockNo)
return nil
}
// NewHistoryQueryExecutor implements method in HistoryDB interface
func (historyDB *historyDB) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
return &LevelHistoryDBQueryExecutor{historyDB}, nil
func (historyDB *historyDB) NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error) {
return &LevelHistoryDBQueryExecutor{historyDB, blockStore}, nil
}
// GetBlockNumFromSavepoint implements method in HistoryDB interface
......
......@@ -18,19 +18,23 @@ package historyleveldb
import (
"errors"
"fmt"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwset"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
// LevelHistoryDBQueryExecutor is a query executor against the LevelDB history DB
type LevelHistoryDBQueryExecutor struct {
historyDB *historyDB
historyDB *historyDB
blockStore blkstorage.BlockStore
}
// GetHistoryForKey implements method in interface `ledger.HistoryQueryExecutor`
......@@ -47,17 +51,21 @@ func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key str
// range scan to find any history records starting with namespace~key
dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
return newHistoryScanner(compositeStartKey, dbItr), nil
return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}
//historyScanner implements ResultsIterator for iterating through history results
type historyScanner struct {
compositePartialKey []byte //compositePartialKey includes namespace~key
namespace string
key string
dbItr iterator.Iterator
blockStore blkstorage.BlockStore
}
func newHistoryScanner(compositePartialKey []byte, dbItr iterator.Iterator) *historyScanner {
return &historyScanner{compositePartialKey, dbItr}
func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}
func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
......@@ -70,15 +78,72 @@ func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)
blockNumTranNum := fmt.Sprintf("%v:%v", blockNum, tranNum)
logger.Debugf("Got history record for key %s: %s\n", scanner.compositePartialKey, blockNumTranNum)
// Get the transaction from block storage that is associated with this history record
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
// For initial test return the blockNumTranNum as TxID.
// TODO query block storage to get and return the TxID and value
return &ledger.KeyModification{TxID: blockNumTranNum}, nil
// Get the txid and key write value associated with this transaction
txID, keyValue, err := getTxIDandKeyWriteValueFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, txID)
return &ledger.KeyModification{TxID: txID, Value: keyValue}, nil
}
func (scanner *historyScanner) Close() {
scanner.dbItr.Release()
}
// getTxIDandKeyWriteValueFromTran inspects a transaction for writes to a given key
func getTxIDandKeyWriteValueFromTran(
tranEnvelope *common.Envelope, namespace string, key string) (string, []byte, error) {
logger.Debugf("Entering getTxIDandKeyWriteValueFromTran()\n", namespace, key)
// extract action from the envelope
payload, err := putils.GetPayload(tranEnvelope)
if err != nil {
return "", nil, err
}
tx, err := putils.GetTransaction(payload.Data)
if err != nil {
return "", nil, err
}
_, respPayload, err := putils.GetPayloads(tx.Actions[0])
if err != nil {
return "", nil, err
}
txID := payload.Header.ChainHeader.TxID
txRWSet := &rwset.TxReadWriteSet{}
// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.Unmarshal(respPayload.Results); err != nil {
return txID, nil, err
}
// look for the namespace and key by looping through the transaction's ReadWriteSets
for _, nsRWSet := range txRWSet.NsRWs {
if nsRWSet.NameSpace == namespace {
// got the correct namespace, now find the key write
for _, kvWrite := range nsRWSet.Writes {
if kvWrite.Key == key {
return txID, kvWrite.Value, nil
}
} // end keys loop
return txID, nil, errors.New("Key not found in namespace's writeset")
} // end if
} //end namespaces loop
return txID, nil, errors.New("Namespace not found in transaction's ReadWriteSets")
}
......@@ -18,6 +18,7 @@ package historyleveldb
import (
"os"
"strconv"
"testing"
configtxtest "github.com/hyperledger/fabric/common/configtx/test"
......@@ -38,22 +39,22 @@ func TestSavepoint(t *testing.T) {
defer env.cleanup()
// read the savepoint, it should not exist and should return 0
blockNum, err := env.TestHistoryDB.GetBlockNumFromSavepoint()
blockNum, err := env.testHistoryDB.GetBlockNumFromSavepoint()
testutil.AssertNoError(t, err, "Error upon historyDatabase.GetBlockNumFromSavepoint()")
testutil.AssertEquals(t, blockNum, uint64(0))
// create a block
simulator, _ := env.Txmgr.NewTxSimulator()
simulator, _ := env.txmgr.NewTxSimulator()
simulator.SetState("ns1", "key1", []byte("value1"))
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
bg := testutil.NewBlockGenerator(t)
block1 := bg.NextBlock([][]byte{simRes}, false)
err = env.TestHistoryDB.Commit(block1)
err = env.testHistoryDB.Commit(block1)
testutil.AssertNoError(t, err, "")
// read the savepoint, it should now exist and return 1
blockNum, err = env.TestHistoryDB.GetBlockNumFromSavepoint()
blockNum, err = env.testHistoryDB.GetBlockNumFromSavepoint()
testutil.AssertNoError(t, err, "Error upon historyDatabase.GetBlockNumFromSavepoint()")
testutil.AssertEquals(t, blockNum, uint64(1))
}
......@@ -62,35 +63,46 @@ func TestHistory(t *testing.T) {
env := NewTestHistoryEnv(t)
defer env.cleanup()
provider := env.testBlockStorageEnv.provider
store1, err := provider.OpenBlockStore("ledger1")
testutil.AssertNoError(t, err, "Error upon provider.OpenBlockStore()")
defer store1.Shutdown()
//block1
simulator, _ := env.Txmgr.NewTxSimulator()
simulator.SetState("ns1", "key7", []byte("value1"))
simulator, _ := env.txmgr.NewTxSimulator()
value1 := []byte("value1")
simulator.SetState("ns1", "key7", value1)
simulator.Done()
simRes, _ := simulator.GetTxSimulationResults()
bg := testutil.NewBlockGenerator(t)
block1 := bg.NextBlock([][]byte{simRes}, false)
err := env.TestHistoryDB.Commit(block1)
err = store1.AddBlock(block1)
testutil.AssertNoError(t, err, "")
err = env.testHistoryDB.Commit(block1)
testutil.AssertNoError(t, err, "")
//block2 tran1
simulationResults := [][]byte{}
simulator, _ = env.Txmgr.NewTxSimulator()
simulator.SetState("ns1", "key7", []byte("value2"))
simulator, _ = env.txmgr.NewTxSimulator()
value2 := []byte("value2")
simulator.SetState("ns1", "key7", value2)
simulator.Done()
simRes, _ = simulator.GetTxSimulationResults()
simulationResults = append(simulationResults, simRes)
//block2 tran2
simulator2, _ := env.Txmgr.NewTxSimulator()
simulator2.SetState("ns1", "key7", []byte("value3"))
simulator2, _ := env.txmgr.NewTxSimulator()
value3 := []byte("value3")
simulator2.SetState("ns1", "key7", value3)
simulator2.Done()
simRes2, _ := simulator2.GetTxSimulationResults()
simulationResults = append(simulationResults, simRes2)
block2 := bg.NextBlock(simulationResults, false)
err = env.TestHistoryDB.Commit(block2)
err = store1.AddBlock(block2)
testutil.AssertNoError(t, err, "")
err = env.testHistoryDB.Commit(block2)
testutil.AssertNoError(t, err, "")
qhistory, err := env.TestHistoryDB.NewHistoryQueryExecutor()
qhistory, err := env.testHistoryDB.NewHistoryQueryExecutor(store1)
testutil.AssertNoError(t, err, "Error upon NewHistoryQueryExecutor")
itr, err2 := qhistory.GetHistoryForKey("ns1", "key7")
......@@ -103,13 +115,13 @@ func TestHistory(t *testing.T) {
break
}
txid := kmod.(*ledger.KeyModification).TxID
//v := kmod.(*ledger.KeyModification).Value TODO value not populated yet
t.Logf("Retrieved history record for key=key7 at TxId=%s", txid)
retrievedValue := kmod.(*ledger.KeyModification).Value
t.Logf("Retrieved history record for key=key7 at TxId=%s with value %v", txid, retrievedValue)
count++
expectedValue := []byte("value" + strconv.Itoa(count))
testutil.AssertEquals(t, retrievedValue, expectedValue)
}
testutil.AssertEquals(t, count, 3)
// TODO add assertions for exact history values once it is populated
}
//TestSavepoint tests that save points get written after each block and get returned via GetBlockNumfromSavepoint
......@@ -120,7 +132,8 @@ func TestHistoryDisabled(t *testing.T) {
viper.Set("ledger.state.historyDatabase", "false")
qhistory, err := env.TestHistoryDB.NewHistoryQueryExecutor()
//no need to pass blockstore into history executore, it won't be used in this test
qhistory, err := env.testHistoryDB.NewHistoryQueryExecutor(nil)
testutil.AssertNoError(t, err, "Error upon NewHistoryQueryExecutor")
_, err2 := qhistory.GetHistoryForKey("ns1", "key7")
......@@ -137,6 +150,6 @@ func TestGenesisBlockNoError(t *testing.T) {
block, err := configtxtest.MakeGenesisBlock("test_chainid")
testutil.AssertNoError(t, err, "")
err = env.TestHistoryDB.Commit(block)
err = env.testHistoryDB.Commit(block)
testutil.AssertNoError(t, err, "")
}
......@@ -20,9 +20,10 @@ import (
"os"
"testing"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/common/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
......@@ -30,21 +31,23 @@ import (
"github.com/spf13/viper"
)
/////// levelDBLockBasedHistoryEnv //////
type levelDBLockBasedHistoryEnv struct {
t testing.TB
TestDBEnv *stateleveldb.TestVDBEnv
TestDB statedb.VersionedDB
Txmgr txmgr.TxMgr
TestHistoryDBProvider historydb.HistoryDBProvider
TestHistoryDB historydb.HistoryDB
testBlockStorageEnv *testBlockStoreEnv
testDBEnv *stateleveldb.TestVDBEnv
txmgr txmgr.TxMgr
testHistoryDBProvider historydb.HistoryDBProvider
testHistoryDB historydb.HistoryDB
}
func NewTestHistoryEnv(t *testing.T) *levelDBLockBasedHistoryEnv {
//testutil.SetupCoreYAMLConfig("./../../../../../../peer")
viper.Set("ledger.state.historyDatabase", "true")
viper.Set("peer.fileSystemPath", "/tmp/fabric/ledgerhistorytests")
blockStorageTestEnv := newBlockStorageTestEnv(t)
testDBEnv := stateleveldb.NewTestVDBEnv(t)
testDB, err := testDBEnv.DBProvider.GetDBHandle("TestDB")
testutil.AssertNoError(t, err, "")
......@@ -55,16 +58,16 @@ func NewTestHistoryEnv(t *testing.T) *levelDBLockBasedHistoryEnv {
testHistoryDB, err := testHistoryDBProvider.GetDBHandle("TestHistoryDB")
testutil.AssertNoError(t, err, "")
return &levelDBLockBasedHistoryEnv{t, testDBEnv, testDB, txMgr, testHistoryDBProvider, testHistoryDB}
return &levelDBLockBasedHistoryEnv{t, blockStorageTestEnv, testDBEnv, txMgr, testHistoryDBProvider, testHistoryDB}
}
func (env *levelDBLockBasedHistoryEnv) cleanup() {
defer env.Txmgr.Shutdown()
defer env.TestDBEnv.Cleanup()
defer env.txmgr.Shutdown()
defer env.testDBEnv.Cleanup()
defer env.testBlockStorageEnv.cleanup()
// clean up history
env.TestHistoryDBProvider.Close()
env.testHistoryDBProvider.Close()
removeDBPath(env.t, "Cleanup")
}
......@@ -76,3 +79,39 @@ func removeDBPath(t testing.TB, caller string) {
}
logger.Debugf("Removed folder [%s] for history test environment for %s", dbPath, caller)
}
/////// testBlockStoreEnv//////
type testBlockStoreEnv struct {
t testing.TB
provider *fsblkstorage.FsBlockstoreProvider
blockStorageDir string
}
func newBlockStorageTestEnv(t testing.TB) *testBlockStoreEnv {
var testPath = "/tmp/fabric/ledgertests/kvledger/history/historydb/historyleveldb"
conf := fsblkstorage.NewConf(testPath, 0)
attrsToIndex := []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
blkstorage.IndexableAttrTxID,
blkstorage.IndexableAttrBlockNumTranNum,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStorageProvider := fsblkstorage.NewProvider(conf, indexConfig).(*fsblkstorage.FsBlockstoreProvider)
return &testBlockStoreEnv{t, blockStorageProvider, testPath}
}
func (env *testBlockStoreEnv) cleanup() {
env.provider.Close()
env.removeFSPath()
}
func (env *testBlockStoreEnv) removeFSPath() {
fsPath := env.blockStorageDir
os.RemoveAll(fsPath)
}
......@@ -68,6 +68,7 @@ func newKVLedger(ledgerID string, blockStore blkstorage.BlockStore,
//Recover the state database and history database (if exist)
//by recommitting last valid blocks
func recoverDB(l *kvLedger) error {
logger.Debugf("Entering recoverDB()")
//If there is no block in blockstorage, nothing to recover.
info, _ := l.blockStore.GetBlockchainInfo()
if info.Height == 0 {
......@@ -247,33 +248,35 @@ func (l *kvLedger) NewQueryExecutor() (ledger.QueryExecutor, error) {
// NewHistoryQueryExecutor gives handle to a history query executor.
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
// Pass the ledger blockstore so that historical values can be looked up from the chain
func (l *kvLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
return l.historyDB.NewHistoryQueryExecutor()
return l.historyDB.NewHistoryQueryExecutor(l.blockStore)
}
// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
func (l *kvLedger) Commit(block *common.Block) error {
var err error
blockNo := block.Header.Number
logger.Debug("Validating block")
logger.Debugf("Validating block [%d]", blockNo)
err = l.txtmgmt.ValidateAndPrepare(block, true)
if err != nil {
return err
}
logger.Debug("Committing block to storage")
logger.Debugf("Committing block [%d] to storage", blockNo)
if err = l.blockStore.AddBlock(block); err != nil {
return err
}
logger.Debug("Committing block transactions to state database")
logger.Debugf("Committing block [%d] transactions to state database", blockNo)
if err = l.txtmgmt.Commit(); err != nil {
panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
}
// History database could be written in parallel with state and/or async as a future optimization
if ledgerconfig.IsHistoryDBEnabled() {
logger.Debugf("Committing block transactions to history database")
logger.Debugf("Committing block [%d] transactions to history database", blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(fmt.Errorf(`Error during commit to history db:%s`, err))
}
......
......@@ -107,6 +107,8 @@ func (provider *Provider) Create(ledgerID string) (ledger.PeerLedger, error) {
// Open implements the corresponding method from interface ledger.PeerLedgerProvider
func (provider *Provider) Open(ledgerID string) (ledger.PeerLedger, error) {
logger.Debugf("Open() opening kvledger: %s", ledgerID)
// Check the ID store to ensure that the chainId/ledgerId exists
exists, err := provider.idStore.ledgerIDExists(ledgerID)