Commit 9544025c authored by grapebaba's avatar grapebaba
Browse files

Refactor db package



Currently db will open when GetDBHandle method is invoked first time,
so GetDBHandle method needs to use mutex.
This patch makes db open and close in its lifecycle methods Start and
Stop, invokes Start only when peer starts and invokes Stop when peer
stops.
After that, methods in db do not need to support concurrency.

Change-Id: Id8612f2a846c5d626bd42c5cd4ae482076f6a975
Signed-off-by: default avatargrapebaba <281165273@qq.com>
parent fb7da0de
......@@ -32,6 +32,7 @@ import (
"github.com/hyperledger/fabric/core/container"
"github.com/hyperledger/fabric/core/container/ccintf"
"github.com/hyperledger/fabric/core/crypto"
"github.com/hyperledger/fabric/core/db"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/util"
"github.com/hyperledger/fabric/membersrvc/ca"
......@@ -46,6 +47,8 @@ import (
// attributes to request in the batch of tcerts while deploying, invoking or querying
var attributes = []string{"company", "position"}
var testDBWrapper = db.NewTestDBWrapper()
func getNowMillis() int64 {
nanos := time.Now().UnixNano()
return nanos / 1000000
......@@ -355,6 +358,7 @@ func executeDeployTransaction(t *testing.T, url string) {
// Test deploy of a transaction
func TestExecuteDeployTransaction(t *testing.T) {
testDBWrapper.CleanDB(t)
executeDeployTransaction(t, "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example01")
}
......@@ -364,6 +368,7 @@ func TestGopathExecuteDeployTransaction(t *testing.T) {
// and a couple of elements - it doesn't matter what they are
os.Setenv("GOPATH", os.Getenv("GOPATH")+string(os.PathSeparator)+string(os.PathListSeparator)+"/tmp/foo"+string(os.PathListSeparator)+"/tmp/bar")
fmt.Printf("set GOPATH to: \"%s\"\n", os.Getenv("GOPATH"))
testDBWrapper.CleanDB(t)
executeDeployTransaction(t, "github.com/hyperledger/fabric/examples/chaincode/go/chaincode_example01")
}
......@@ -372,6 +377,7 @@ func TestHTTPExecuteDeployTransaction(t *testing.T) {
// The chaincode used here cannot be from the fabric repo
// itself or it won't be downloaded because it will be found
// in GOPATH, which would defeat the test
testDBWrapper.CleanDB(t)
executeDeployTransaction(t, "http://github.com/hyperledger/fabric-test-resources/examples/chaincode/go/chaincode_example01")
}
......@@ -465,6 +471,7 @@ func invokeExample02Transaction(ctxt context.Context, cID *pb.ChaincodeID, args
}
func TestExecuteInvokeTransaction(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
//TLS is on by default. This is the ONLY test that does NOT use TLS
......@@ -570,6 +577,7 @@ func exec(ctxt context.Context, chaincodeID string, numTrans int, numQueries int
// Test the execution of a query.
func TestExecuteQuery(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -653,6 +661,7 @@ func TestExecuteQuery(t *testing.T) {
// Test the execution of an invalid transaction.
func TestExecuteInvokeInvalidTransaction(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -714,6 +723,7 @@ func TestExecuteInvokeInvalidTransaction(t *testing.T) {
// Test the execution of an invalid query.
func TestExecuteInvalidQuery(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -785,6 +795,7 @@ func TestExecuteInvalidQuery(t *testing.T) {
// Test the execution of a chaincode that invokes another chaincode.
func TestChaincodeInvokeChaincode(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -898,6 +909,7 @@ func TestChaincodeInvokeChaincode(t *testing.T) {
// Test the execution of a chaincode that invokes another chaincode with wrong parameters. Should receive error from
// from the called chaincode
func TestChaincodeInvokeChaincodeErrorCase(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -1098,6 +1110,7 @@ func chaincodeQueryChaincode(user string) error {
// Test the execution of a chaincode query that queries another chaincode without security enabled
func TestChaincodeQueryChaincode(t *testing.T) {
testDBWrapper.CleanDB(t)
var peerLis net.Listener
var err error
if peerLis, err = initPeer(); err != nil {
......@@ -1119,6 +1132,7 @@ func TestChaincodeQueryChaincode(t *testing.T) {
// Test the execution of a chaincode that queries another chaincode with invalid parameter. Should receive error from
// from the called chaincode
func TestChaincodeQueryChaincodeErrorCase(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -1229,6 +1243,7 @@ func TestChaincodeQueryChaincodeErrorCase(t *testing.T) {
// Test the execution of a chaincode query that queries another chaincode with security enabled
// NOTE: this really needs to be a behave test. Remove when we have support in behave for multiple chaincodes
func TestChaincodeQueryChaincodeWithSec(t *testing.T) {
testDBWrapper.CleanDB(t)
viper.Set("security.enabled", "true")
//Initialize crypto
......@@ -1282,6 +1297,7 @@ func TestChaincodeQueryChaincodeWithSec(t *testing.T) {
// Test the invocation of a transaction.
func TestRangeQuery(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -1352,6 +1368,7 @@ func TestRangeQuery(t *testing.T) {
}
func TestGetEvent(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
if viper.GetBool("peer.tls.enabled") {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......
......@@ -22,7 +22,6 @@ import (
"os"
"path"
"strings"
"sync"
"github.com/op/go-logging"
"github.com/spf13/viper"
......@@ -45,13 +44,6 @@ var columnfamilies = []string{
persistCF, // persistent per-peer state (consensus)
}
type dbState int32
const (
closed dbState = iota
opened
)
// OpenchainDB encapsulates rocksdb's structures
type OpenchainDB struct {
DB *gorocksdb.DB
......@@ -60,23 +52,30 @@ type OpenchainDB struct {
StateDeltaCF *gorocksdb.ColumnFamilyHandle
IndexesCF *gorocksdb.ColumnFamilyHandle
PersistCF *gorocksdb.ColumnFamilyHandle
dbState dbState
mux sync.Mutex
}
var openchainDB = Create()
var openchainDB = create()
// Create create an openchainDB instance
func Create() *OpenchainDB {
return &OpenchainDB{dbState: closed}
func create() *OpenchainDB {
return &OpenchainDB{}
}
// GetDBHandle get an opened openchainDB singleton
// GetDBHandle gets an opened openchainDB singleton. Note that method Start must always be invoked before this method.
func GetDBHandle() *OpenchainDB {
openchainDB.Open()
return openchainDB
}
// Start the db, init the openchainDB instance and open the db. Note this method has no guarantee correct behavior concurrent invocation.
func Start() {
openchainDB.open()
}
// Stop the db. Note this method has no guarantee correct behavior concurrent invocation.
func Stop() {
openchainDB.close()
}
// GetFromBlockchainCF get value for given key from column family - blockchainCF
func (openchainDB *OpenchainDB) GetFromBlockchainCF(key []byte) ([]byte, error) {
return openchainDB.Get(openchainDB.BlockchainCF, key)
......@@ -142,15 +141,7 @@ func getDBPath() string {
}
// Open open underlying rocksdb
func (openchainDB *OpenchainDB) Open() {
openchainDB.mux.Lock()
if openchainDB.dbState == opened {
openchainDB.mux.Unlock()
return
}
defer openchainDB.mux.Unlock()
func (openchainDB *OpenchainDB) open() {
dbPath := getDBPath()
missing, err := dirMissingOrEmpty(dbPath)
if err != nil {
......@@ -190,25 +181,16 @@ func (openchainDB *OpenchainDB) Open() {
openchainDB.StateDeltaCF = cfHandlers[3]
openchainDB.IndexesCF = cfHandlers[4]
openchainDB.PersistCF = cfHandlers[5]
openchainDB.dbState = opened
}
// Close releases all column family handles and closes rocksdb
func (openchainDB *OpenchainDB) Close() {
openchainDB.mux.Lock()
if openchainDB.dbState == closed {
openchainDB.mux.Unlock()
return
}
defer openchainDB.mux.Unlock()
func (openchainDB *OpenchainDB) close() {
openchainDB.BlockchainCF.Destroy()
openchainDB.StateCF.Destroy()
openchainDB.StateDeltaCF.Destroy()
openchainDB.IndexesCF.Destroy()
openchainDB.PersistCF.Destroy()
openchainDB.DB.Close()
openchainDB.dbState = closed
}
// DeleteState delets ALL state keys/values from the DB. This is generally
......
......@@ -41,72 +41,64 @@ func TestGetDBPathEmptyPath(t *testing.T) {
}
}()
defer viper.Set("peer.fileSystemPath", originalSetting)
Start()
GetDBHandle()
}
func TestCreateDB(t *testing.T) {
openchainDB := Create()
openchainDB.Open()
defer deleteTestDBPath()
defer openchainDB.Close()
}
func TestOpenDB_DirDoesNotExist(t *testing.T) {
openchainDB := Create()
func TestStartDB_DirDoesNotExist(t *testing.T) {
deleteTestDBPath()
defer deleteTestDBPath()
defer openchainDB.Close()
defer Stop()
defer func() {
if r := recover(); r != nil {
t.Fatalf("Failed to open DB: %s", r)
}
}()
openchainDB.Open()
Start()
}
func TestOpenDB_NonEmptyDirExists(t *testing.T) {
openchainDB := Create()
func TestStartDB_NonEmptyDirExists(t *testing.T) {
deleteTestDBPath()
createNonEmptyTestDBPath()
defer deleteTestDBPath()
defer openchainDB.Close()
defer func() {
if r := recover(); r == nil {
t.Fatalf("dbPath is already exists. DB open should throw error")
}
}()
openchainDB.Open()
Start()
}
func TestWriteAndRead(t *testing.T) {
openchainDB := GetDBHandle()
deleteTestDBPath()
Start()
defer deleteTestDBPath()
defer openchainDB.Close()
defer Stop()
performBasicReadWrite(openchainDB, t)
}
// This test verifies that when a new column family is added to the DB
// users at an older level of the DB will still be able to open it with new code
func TestDBColumnUpgrade(t *testing.T) {
openchainDB := GetDBHandle()
openchainDB.Close()
deleteTestDBPath()
Start()
Stop()
oldcfs := columnfamilies
columnfamilies = append([]string{"Testing"}, columnfamilies...)
defer func() {
columnfamilies = oldcfs
}()
openchainDB = GetDBHandle()
defer deleteTestDBPath()
defer openchainDB.Close()
defer Stop()
defer func() {
if r := recover(); r != nil {
t.Fatalf("Error re-opening DB with upgraded columnFamilies")
}
}()
Start()
}
func TestDeleteState(t *testing.T) {
......
......@@ -46,6 +46,7 @@ func (testDB *TestDBWrapper) CleanDB(t testing.TB) {
testDB.removeDBPath()
t.Logf("Creating testDB")
Start()
testDB.performCleanup = true
}
......@@ -55,12 +56,13 @@ func (testDB *TestDBWrapper) CreateFreshDBGinkgo() {
// at the end of the test
testDB.cleanup()
testDB.removeDBPath()
Start()
testDB.performCleanup = true
}
func (testDB *TestDBWrapper) cleanup() {
if testDB.performCleanup {
GetDBHandle().Close()
Stop()
testDB.performCleanup = false
}
}
......@@ -116,8 +118,12 @@ func (testDB *TestDBWrapper) GetFromStateDeltaCF(t testing.TB, key []byte) []byt
// CloseDB closes the db
func (testDB *TestDBWrapper) CloseDB(t testing.TB) {
openchainDB := GetDBHandle()
openchainDB.Close()
Stop()
}
// OpenDB opens the db
func (testDB *TestDBWrapper) OpenDB(t testing.TB) {
Start()
}
// GetEstimatedNumKeys returns estimated number of key-values in db. This is not accurate in all the cases
......
......@@ -187,8 +187,11 @@ func TestIndexesAsync_IndexPendingBlocks(t *testing.T) {
t.Fatalf("Error populating block chain with sample data: %s", err)
}
// close the db and create new instance of blockchain (and the associated async indexer) - the indexer should index the pending blocks
// close the db
testDBWrapper.CloseDB(t)
// open the db again and create new instance of blockchain (and the associated async indexer)
// the indexer should index the pending blocks
testDBWrapper.OpenDB(t)
testBlockchainWrapper = newTestBlockchainWrapper(t)
defer chain.indexer.stop()
......
......@@ -24,6 +24,7 @@ import (
"time"
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/db"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/system_chaincode/api"
"github.com/hyperledger/fabric/core/system_chaincode/samplesyscc"
......@@ -34,6 +35,8 @@ import (
"google.golang.org/grpc"
)
var testDBWrapper = db.NewTestDBWrapper()
// Invoke or query a chaincode.
func invoke(ctx context.Context, spec *pb.ChaincodeSpec, typ pb.Transaction_Type) (*pb.ChaincodeEvent, string, []byte, error) {
chaincodeInvocationSpec := &pb.ChaincodeInvocationSpec{ChaincodeSpec: spec}
......@@ -75,6 +78,7 @@ func closeListenerAndSleep(l net.Listener) {
// Test deploy of a transaction.
func TestExecuteDeploySysChaincode(t *testing.T) {
testDBWrapper.CleanDB(t)
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
viper.Set("peer.fileSystemPath", "/var/hyperledger/test/tmpdb")
......
......@@ -52,6 +52,7 @@ import (
"github.com/hyperledger/fabric/core/chaincode"
"github.com/hyperledger/fabric/core/comm"
"github.com/hyperledger/fabric/core/crypto"
"github.com/hyperledger/fabric/core/db"
"github.com/hyperledger/fabric/core/ledger/genesis"
"github.com/hyperledger/fabric/core/peer"
"github.com/hyperledger/fabric/core/rest"
......@@ -470,6 +471,8 @@ func serve(args []string) error {
logger.Infof("Privacy enabled status: false")
}
db.Start()
var opts []grpc.ServerOption
if comm.TLSEnabled() {
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"), viper.GetString("peer.tls.key.file"))
......@@ -632,6 +635,7 @@ func stop() (err error) {
serverClient := pb.NewAdminClient(clientConn)
status, err := serverClient.StopServer(context.Background(), &google_protobuf.Empty{})
db.Stop()
if err != nil {
fmt.Println(&pb.ServerStatus{Status: pb.ServerStatus_STOPPED})
return nil
......
......@@ -63,8 +63,9 @@ func main() {
os.Exit(5)
}
db.Start()
openchainDB := db.GetDBHandle()
defer openchainDB.Close()
defer db.Stop()
fmt.Println()
scan(openchainDB, "blockchainCF", openchainDB.BlockchainCF, blockDetailPrinter)
fmt.Println()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment