Commit f0c43f79 authored by manish's avatar manish
Browse files

Use a single leveldb for state maintainance

https://jira.hyperledger.org/browse/FAB-1543



Change-Id: Ib11d1a2fc14cf81dd5380eadf93e73e1c56113e3
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 3e534dee
......@@ -28,6 +28,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/chaincode/shim"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/gossip/service"
"github.com/hyperledger/fabric/protos/common"
......@@ -98,6 +99,8 @@ func TestConfigerInvokeJoinChainWrongParams(t *testing.T) {
func TestConfigerInvokeJoinChainCorrectParams(t *testing.T) {
//t.Skip("Test CI build")
viper.Set("peer.fileSystemPath", "/var/hyperledger/test/")
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()
defer os.RemoveAll("/var/hyperledger/test/")
e := new(PeerConfiger)
......
......@@ -38,6 +38,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/crypto/primitives"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/peer/msp"
"github.com/hyperledger/fabric/msp"
"github.com/hyperledger/fabric/protos/common"
......@@ -114,7 +115,7 @@ func finitPeer(lis net.Listener, chainIDs ...string) {
}
closeListenerAndSleep(lis)
}
ledgermgmt.CleanupTestEnv()
ledgerPath := viper.GetString("peer.fileSystemPath")
os.RemoveAll(ledgerPath)
os.RemoveAll(filepath.Join(os.TempDir(), "hyperledger"))
......
......@@ -17,25 +17,25 @@ limitations under the License.
package committer
import (
"os"
"testing"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
pb "github.com/hyperledger/fabric/protos/peer"
)
func TestKVLedgerBlockStorage(t *testing.T) {
conf := kvledger.NewConf("/tmp/tests/ledger/", 0)
defer os.RemoveAll("/tmp/tests/ledger/")
ledger, _ := kvledger.NewKVLedger(conf)
viper.Set("peer.fileSystemPath", "/tmp/fabric/committertest")
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()
ledger, err := ledgermgmt.CreateLedger("TestLedger")
assert.NoError(t, err, "Error while creating ledger: %s", err)
defer ledger.Close()
committer := NewLedgerCommitter(ledger)
height, err := committer.LedgerHeight()
assert.Equal(t, uint64(0), height)
assert.NoError(t, err)
......
......@@ -17,14 +17,14 @@ limitations under the License.
package txvalidator
import (
"os"
"testing"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
pb "github.com/hyperledger/fabric/protos/peer"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
)
......@@ -36,10 +36,10 @@ func (v *mockVsccValidator) VSCCValidateTx(payload *common.Payload, envBytes []b
}
func TestKVLedgerBlockStorage(t *testing.T) {
conf := kvledger.NewConf("/tmp/tests/ledger/", 0)
defer os.RemoveAll("/tmp/tests/ledger/")
ledger, _ := kvledger.NewKVLedger(conf)
viper.Set("peer.fileSystemPath", "/tmp/fabric/txvalidatortest")
ledgermgmt.InitializeTestEnv()
defer ledgermgmt.CleanupTestEnv()
ledger, _ := ledgermgmt.CreateLedger("TestLedger")
defer ledger.Close()
validator := &txValidator{ledger, &mockVsccValidator{}}
......
......@@ -21,15 +21,16 @@ import (
"os"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/example"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
)
const (
ledgerPath = "/tmp/test/ledger/kvledger/example"
ledgerID = "Default"
)
var finalLedger ledger.ValidatedLedger
......@@ -47,10 +48,10 @@ func init() {
// Initialization will get a handle to the ledger at the specified path
// Note, if subledgers are supported in the future,
// the various ledgers could be created/managed at this level
os.RemoveAll(ledgerPath)
ledgerConf := kvledger.NewConf(ledgerPath, 0)
cleanup()
ledgermgmt.Initialize()
var err error
finalLedger, err = kvledger.NewKVLedger(ledgerConf)
finalLedger, err = ledgermgmt.CreateLedger(ledgerID)
if err != nil {
panic(fmt.Errorf("Error in NewKVLedger(): %s", err))
}
......@@ -60,7 +61,7 @@ func init() {
}
func main() {
defer finalLedger.Close()
defer ledgermgmt.Close()
// Each of the functions here will emulate endorser, orderer,
// and committer by calling ledger APIs to similate the proposal,
......@@ -164,3 +165,8 @@ func handleError(err error, quit bool) {
}
}
}
func cleanup() {
ledgerRootPath := ledgerconfig.GetRootPath()
os.RemoveAll(ledgerRootPath)
}
......@@ -19,14 +19,13 @@ package kvledger
import (
"errors"
"fmt"
"strings"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/blkstorage"
"github.com/hyperledger/fabric/core/ledger/blkstorage/fsblkstorage"
"github.com/hyperledger/fabric/core/ledger/history"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/couchdbtxmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
......@@ -39,37 +38,18 @@ import (
var logger = logging.MustGetLogger("kvledger")
// Conf captures `KVLedger` configurations
type Conf struct {
blockStorageDir string
maxBlockfileSize int
txMgrDBPath string
}
// NewConf constructs new `Conf`.
// filesystemPath is the top level directory under which `KVLedger` manages its data
func NewConf(filesystemPath string, maxBlockfileSize int) *Conf {
if !strings.HasSuffix(filesystemPath, "/") {
filesystemPath = filesystemPath + "/"
}
blocksStorageDir := filesystemPath + "blocks"
txMgrDBPath := filesystemPath + "txMgmgt/db"
return &Conf{blocksStorageDir, maxBlockfileSize, txMgrDBPath}
}
// KVLedger provides an implementation of `ledger.ValidatedLedger`.
// This implementation provides a key-value based data model
type KVLedger struct {
ledgerID string
blockStore blkstorage.BlockStore
txtmgmt txmgr.TxMgr
historymgmt history.HistMgr
}
// NewKVLedger constructs new `KVLedger`
func NewKVLedger(conf *Conf) (*KVLedger, error) {
logger.Debugf("Creating KVLedger using config: ", conf)
func NewKVLedger(versionedDBProvider statedb.VersionedDBProvider, ledgerID string) (*KVLedger, error) {
logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)
attrsToIndex := []blkstorage.IndexableAttr{
blkstorage.IndexableAttrBlockHash,
blkstorage.IndexableAttrBlockNum,
......@@ -77,7 +57,9 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
blkstorage.IndexableAttrBlockNumTranNum,
}
indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
blockStorageConf := fsblkstorage.NewConf(conf.blockStorageDir, conf.maxBlockfileSize)
blockStorageDir := ledgerconfig.GetBlockStoragePath(ledgerID)
blockStorageConf := fsblkstorage.NewConf(blockStorageDir, ledgerconfig.GetMaxBlockfileSize())
blockStore := fsblkstorage.NewFsBlockStore(blockStorageConf, indexConfig)
//State and History database managers
......@@ -91,14 +73,14 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
couchDBDef := ledgerconfig.GetCouchDBDefinition()
//create new transaction manager based on couchDB
txmgmt = couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: conf.txMgrDBPath},
txmgmt = couchdbtxmgmt.NewCouchDBTxMgr(&couchdbtxmgmt.Conf{DBPath: ""},
couchDBDef.URL, //couchDB connection URL
"system", //couchDB db name matches ledger name, TODO for now use system ledger, eventually allow passing in subledger name
couchDBDef.Username, //enter couchDB id here
couchDBDef.Password) //enter couchDB pw here
} else {
// Fall back to using goleveldb lockbased transaction manager
db := stateleveldb.NewVersionedDBProvider(&stateleveldb.Conf{DBPath: conf.txMgrDBPath}).GetDBHandle("Default")
db := versionedDBProvider.GetDBHandle(ledgerID)
txmgmt = lockbasedtxmgr.NewLockBasedTxMgr(db)
}
......@@ -114,7 +96,7 @@ func NewKVLedger(conf *Conf) (*KVLedger, error) {
couchDBDef.Password) //enter couchDB pw here
}
l := &KVLedger{blockStore, txmgmt, historymgmt}
l := &KVLedger{ledgerID, blockStore, txmgmt, historymgmt}
if err := recoverStateDB(l); err != nil {
panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kvledger
import (
"errors"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/util/db"
)
var (
// ErrLedgerIDExists is thrown by a CreateLedger call if a ledger with the given id already exists
ErrLedgerIDExists = errors.New("LedgerID already exists")
// ErrNonExistingLedgerID is thrown by a OpenLedger call if a ledger with the given id does not exist
ErrNonExistingLedgerID = errors.New("LedgerID does not exist")
// ErrLedgerNotOpened is thrown by a CloseLedger call if a ledger with the given id has not been opened
ErrLedgerNotOpened = errors.New("Ledger is not opened yet")
)
// Provider implements interface ledger.ValidatedLedgerProvider
type Provider struct {
idStore *idStore
vdbProvider statedb.VersionedDBProvider
}
// NewProvider instantiates a new Provider.
// This is not thread-safe and assumed to be synchronized be the caller
func NewProvider() (ledger.ValidatedLedgerProvider, error) {
logger.Info("Initializing ledger provider")
var vdbProvider statedb.VersionedDBProvider
if !ledgerconfig.IsCouchDBEnabled() {
logger.Debugf("Constructing leveldb VersionedDBProvider")
vdbProvider = stateleveldb.NewVersionedDBProvider()
} else {
//TODO same for couchDB after refactoring of couchdb code
}
ledgerMgmtPath := ledgerconfig.GetLedgerProviderPath()
idStore := openIDStore(ledgerMgmtPath)
logger.Info("ledger provider Initialized")
return &Provider{idStore, vdbProvider}, nil
}
// Create implements the corresponding method from interface ledger.ValidatedLedgerProvider
func (provider *Provider) Create(ledgerID string) (ledger.ValidatedLedger, error) {
exists, err := provider.idStore.ledgerIDExists(ledgerID)
if err != nil {
return nil, err
}
if exists {
return nil, ErrLedgerIDExists
}
provider.idStore.createLedgerID(ledgerID)
l, err := NewKVLedger(provider.vdbProvider, ledgerID)
if err != nil {
return nil, err
}
return l, nil
}
// Open implements the corresponding method from interface ledger.ValidatedLedgerProvider
func (provider *Provider) Open(ledgerID string) (ledger.ValidatedLedger, error) {
exists, err := provider.idStore.ledgerIDExists(ledgerID)
if err != nil {
return nil, err
}
if !exists {
return nil, ErrNonExistingLedgerID
}
l, err := NewKVLedger(provider.vdbProvider, ledgerID)
if err != nil {
return nil, err
}
return l, nil
}
// Exists implements the corresponding method from interface ledger.ValidatedLedgerProvider
func (provider *Provider) Exists(ledgerID string) (bool, error) {
return provider.idStore.ledgerIDExists(ledgerID)
}
// List implements the corresponding method from interface ledger.ValidatedLedgerProvider
func (provider *Provider) List() ([]string, error) {
return provider.idStore.getAllLedgerIds()
}
// Close implements the corresponding method from interface ledger.ValidatedLedgerProvider
func (provider *Provider) Close() {
provider.vdbProvider.Close()
provider.idStore.close()
}
type idStore struct {
db *db.DB
}
func openIDStore(path string) *idStore {
db := db.CreateDB(&db.Conf{DBPath: path})
db.Open()
return &idStore{db}
}
func (s *idStore) createLedgerID(ledgerID string) error {
key := []byte(ledgerID)
val := []byte{}
err := error(nil)
if val, err = s.db.Get(key); err != nil {
return err
}
if val != nil {
return ErrLedgerIDExists
}
return s.db.Put(key, val, true)
}
func (s *idStore) ledgerIDExists(ledgerID string) (bool, error) {
key := []byte(ledgerID)
val := []byte{}
err := error(nil)
if val, err = s.db.Get(key); err != nil {
return false, err
}
return val != nil, nil
}
func (s *idStore) getAllLedgerIds() ([]string, error) {
var ids []string
itr := s.db.GetIterator(nil, nil)
itr.First()
for itr.Valid() {
key := string(itr.Key())
ids = append(ids, key)
itr.Next()
}
return ids, nil
}
func (s *idStore) close() {
s.db.Close()
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kvledger
import (
"fmt"
"testing"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/testutil"
)
func TestLedgerProvider(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
numLedgers := 10
provider, _ := NewProvider()
existingLedgerIDs, err := provider.List()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, len(existingLedgerIDs), 0)
for i := 0; i < numLedgers; i++ {
provider.Create(constructTestLedgerID(i))
}
existingLedgerIDs, err = provider.List()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, len(existingLedgerIDs), numLedgers)
provider.Close()
provider, _ = NewProvider()
defer provider.Close()
ledgerIds, _ := provider.List()
testutil.AssertEquals(t, len(ledgerIds), numLedgers)
t.Logf("ledgerIDs=%#v", ledgerIds)
for i := 0; i < numLedgers; i++ {
testutil.AssertEquals(t, ledgerIds[i], constructTestLedgerID(i))
}
_, err = provider.Create(constructTestLedgerID(2))
testutil.AssertEquals(t, err, ErrLedgerIDExists)
_, err = provider.Open(constructTestLedgerID(numLedgers))
testutil.AssertEquals(t, err, ErrNonExistingLedgerID)
}
func TestMultipleLedgerBasicRW(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
numLedgers := 10
provider, _ := NewProvider()
ledgers := make([]ledger.ValidatedLedger, numLedgers)
for i := 0; i < numLedgers; i++ {
l, err := provider.Create(constructTestLedgerID(i))
testutil.AssertNoError(t, err, "")
ledgers[i] = l
}
for i, l := range ledgers {
s, _ := l.NewTxSimulator()
err := s.SetState("ns", "testKey", []byte(fmt.Sprintf("testValue_%d", i)))
s.Done()
testutil.AssertNoError(t, err, "")
res, err := s.GetTxSimulationResults()
testutil.AssertNoError(t, err, "")
b := testutil.ConstructBlock(t, [][]byte{res}, false)
err = l.Commit(b)
l.Close()
testutil.AssertNoError(t, err, "")
}
provider.Close()
provider, _ = NewProvider()
defer provider.Close()
ledgers = make([]ledger.ValidatedLedger, numLedgers)
for i := 0; i < numLedgers; i++ {
l, err := provider.Open(constructTestLedgerID(i))
testutil.AssertNoError(t, err, "")
ledgers[i] = l
}
for i, l := range ledgers {
q, _ := l.NewQueryExecutor()
val, err := q.GetState("ns", "testKey")
q.Done()
testutil.AssertNoError(t, err, "")
testutil.AssertEquals(t, val, []byte(fmt.Sprintf("testValue_%d", i)))
l.Close()
}
}
func constructTestLedgerID(i int) string {
return fmt.Sprintf("ledger_%06d", i)
}
......@@ -29,8 +29,11 @@ import (
func TestKVLedgerBlockStorage(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
ledger, _ := NewKVLedger(env.conf)
provider, _ := NewProvider()
defer provider.Close()
ledger, _ := provider.Create("testLedger")
defer ledger.Close()
bcInfo, _ := ledger.GetBlockchainInfo()
testutil.AssertEquals(t, bcInfo, &pb.BlockchainInfo{
Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})
......@@ -80,7 +83,9 @@ func TestKVLedgerBlockStorage(t *testing.T) {
func TestKVLedgerStateDBRecovery(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
ledger, _ := NewKVLedger(env.conf)
provider, _ := NewProvider()
defer provider.Close()
ledger, _ := provider.Create("testLedger")
defer ledger.Close()
bcInfo, _ := ledger.GetBlockchainInfo()
......@@ -117,9 +122,9 @@ func TestKVLedgerStateDBRecovery(t *testing.T) {
//generating a block based on the simulation result
block2 := bg.NextBlock([][]byte{simRes}, false)
//performing validation of read and write set to find valid transactions
ledger.txtmgmt.ValidateAndPrepare(block2, true)
ledger.(*KVLedger).txtmgmt.ValidateAndPrepare(block2, true)
//writing the validated block to block storage but not committing the transaction to state DB
err := ledger.blockStore.AddBlock(block2)
err := ledger.(*KVLedger).blockStore.AddBlock(block2)
//assume that peer fails here before committing the transaction
assert.NoError(t, err)
......@@ -140,10 +145,12 @@ func TestKVLedgerStateDBRecovery(t *testing.T) {
testutil.AssertEquals(t, value, []byte("value3"))
simulator.Done()
ledger.Close()
provider.Close()
//we assume here that the peer comes online and calls NewKVLedger to get a handler for the ledger
//State DB should be recovered before returning from NewKVLedger call
ledger, _ = NewKVLedger(env.conf)
provider, _ = NewProvider()
ledger, _ = provider.Open("testLedger")
simulator, _ = ledger.NewTxSimulator()
value, _ = simulator.GetState("ns1", "key1")
//value for 'key1' should be 'value4' after recovery
......@@ -155,7 +162,6 @@ func TestKVLedgerStateDBRecovery(t *testing.T) {
//value for 'key3' should be 'value6' after recovery
testutil.AssertEquals(t, value, []byte("value6"))
simulator.Done()
ledger.Close()
}
func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
......@@ -168,7 +174,9 @@ func TestLedgerWithCouchDbEnabledWithBinaryAndJSONData(t *testing.T) {
env := newTestEnv(t)
defer env.cleanup()
ledger, _ := NewKVLedger(env.conf)
provider, _ := NewProvider()
defer provider.Close()
ledger, _ := provider.Create("testLedger")
defer ledger.Close()
//testNs := "ns1"
......
......@@ -18,23 +18,25 @@ package main
import (
"fmt"
"os"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/kvledger/example"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/hyperledger/fabric/core/ledger/testutil"
"github.com/hyperledger/fabric/core/ledger/util"
"github.com/hyperledger/fabric/protos/common"
logging "github.com/op/go-logging"
)
var logger = logging.MustGetLogger("main")
const (
ledgerPath = "/tmp/test/ledgernext/kvledger/example"
ledgerID = "Default"
)