db.go 5.09 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package db

import (
	"fmt"
	"sync"

	"github.com/hyperledger/fabric/core/ledgernext/util"
	"github.com/op/go-logging"
	"github.com/tecbot/gorocksdb"
)

var logger = logging.MustGetLogger("kvledger.db")

type dbState int32

const (
	defaultCFName         = "default"
	closed        dbState = iota
	opened
)

// Conf configuration for `DB`
type Conf struct {
manish's avatar
manish committed
40
41
42
	DBPath     string
	CFNames    []string
	DisableWAL bool
manish's avatar
manish committed
43
44
45
46
47
48
49
50
51
}

// DB - a rocksDB instance
type DB struct {
	conf         *Conf
	rocksDB      *gorocksdb.DB
	cfHandlesMap map[string]*gorocksdb.ColumnFamilyHandle
	dbState      dbState
	mux          sync.Mutex
manish's avatar
manish committed
52
53
54

	readOpts  *gorocksdb.ReadOptions
	writeOpts *gorocksdb.WriteOptions
manish's avatar
manish committed
55
56
57
58
59
}

// CreateDB constructs a `DB`
func CreateDB(conf *Conf) *DB {
	conf.CFNames = append(conf.CFNames, defaultCFName)
manish's avatar
manish committed
60
61
62
63
64
65
66
67
68
	readOpts := gorocksdb.NewDefaultReadOptions()
	writeOpts := gorocksdb.NewDefaultWriteOptions()
	writeOpts.DisableWAL(conf.DisableWAL)
	return &DB{
		conf:         conf,
		cfHandlesMap: make(map[string]*gorocksdb.ColumnFamilyHandle),
		dbState:      closed,
		readOpts:     readOpts,
		writeOpts:    writeOpts}
manish's avatar
manish committed
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
}

// Open open underlying rocksdb
func (dbInst *DB) Open() {
	dbInst.mux.Lock()
	if dbInst.dbState == opened {
		dbInst.mux.Unlock()
		return
	}

	defer dbInst.mux.Unlock()

	dbPath := dbInst.conf.DBPath
	dirEmpty, err := util.CreateDirIfMissing(dbPath)
	if err != nil {
		panic(fmt.Sprintf("Error while trying to open DB: %s", err))
	}
	opts := gorocksdb.NewDefaultOptions()
	defer opts.Destroy()
	opts.SetCreateIfMissing(dirEmpty)
	opts.SetCreateIfMissingColumnFamilies(true)

	var cfOpts []*gorocksdb.Options
	for range dbInst.conf.CFNames {
		cfOpts = append(cfOpts, opts)
	}
	db, cfHandlers, err := gorocksdb.OpenDbColumnFamilies(opts, dbPath, dbInst.conf.CFNames, cfOpts)
	if err != nil {
		panic(fmt.Sprintf("Error opening DB: %s", err))
	}
	dbInst.rocksDB = db
	for i := 0; i < len(dbInst.conf.CFNames); i++ {
		dbInst.cfHandlesMap[dbInst.conf.CFNames[i]] = cfHandlers[i]
	}
	dbInst.dbState = opened
}

// Close releases all column family handles and closes rocksdb
func (dbInst *DB) Close() {
	dbInst.mux.Lock()
	if dbInst.dbState == closed {
		dbInst.mux.Unlock()
		return
	}

	defer dbInst.mux.Unlock()
	for _, cfHandler := range dbInst.cfHandlesMap {
		cfHandler.Destroy()
	}
	dbInst.rocksDB.Close()
	dbInst.dbState = closed
}

func (dbInst *DB) isOpen() bool {
	dbInst.mux.Lock()
	defer dbInst.mux.Unlock()
	return dbInst.dbState == opened
}

// Get returns the value for the given column family and key
func (dbInst *DB) Get(cfHandle *gorocksdb.ColumnFamilyHandle, key []byte) ([]byte, error) {
manish's avatar
manish committed
130
	slice, err := dbInst.rocksDB.GetCF(dbInst.readOpts, cfHandle, key)
manish's avatar
manish committed
131
132
133
134
135
136
137
138
139
140
141
142
143
144
	if err != nil {
		fmt.Println("Error while trying to retrieve key:", key)
		return nil, err
	}
	defer slice.Free()
	if slice.Data() == nil {
		return nil, nil
	}
	data := makeCopy(slice.Data())
	return data, nil
}

// Put saves the key/value in the given column family
func (dbInst *DB) Put(cfHandle *gorocksdb.ColumnFamilyHandle, key []byte, value []byte) error {
manish's avatar
manish committed
145
	err := dbInst.rocksDB.PutCF(dbInst.writeOpts, cfHandle, key, value)
manish's avatar
manish committed
146
147
148
149
150
151
152
153
154
	if err != nil {
		fmt.Println("Error while trying to write key:", key)
		return err
	}
	return nil
}

// Delete delets the given key in the specified column family
func (dbInst *DB) Delete(cfHandle *gorocksdb.ColumnFamilyHandle, key []byte) error {
manish's avatar
manish committed
155
	err := dbInst.rocksDB.DeleteCF(dbInst.writeOpts, cfHandle, key)
manish's avatar
manish committed
156
157
158
159
160
161
162
163
164
	if err != nil {
		fmt.Println("Error while trying to delete key:", key)
		return err
	}
	return nil
}

// WriteBatch writes a batch
func (dbInst *DB) WriteBatch(batch *gorocksdb.WriteBatch) error {
manish's avatar
manish committed
165
	if err := dbInst.rocksDB.Write(dbInst.writeOpts, batch); err != nil {
manish's avatar
manish committed
166
167
168
169
170
171
172
		return err
	}
	return nil
}

// GetIterator returns an iterator for the given column family
func (dbInst *DB) GetIterator(cfName string) *gorocksdb.Iterator {
manish's avatar
manish committed
173
	return dbInst.rocksDB.NewIteratorCF(dbInst.readOpts, dbInst.GetCFHandle(cfName))
manish's avatar
manish committed
174
175
176
177
178
179
180
181
182
183
184
185
}

// GetCFHandle returns handle to a named column family
func (dbInst *DB) GetCFHandle(cfName string) *gorocksdb.ColumnFamilyHandle {
	return dbInst.cfHandlesMap[cfName]
}

// GetDefaultCFHandle returns handle to default column family
func (dbInst *DB) GetDefaultCFHandle() *gorocksdb.ColumnFamilyHandle {
	return dbInst.GetCFHandle(defaultCFName)
}

manish's avatar
manish committed
186
187
188
189
190
191
192
193
// Flush flushes rocksDB memory to sst files
func (dbInst *DB) Flush(wait bool) error {
	flushOpts := gorocksdb.NewDefaultFlushOptions()
	defer flushOpts.Destroy()
	flushOpts.SetWait(wait)
	return dbInst.rocksDB.Flush(flushOpts)
}

manish's avatar
manish committed
194
195
196
197
198
func makeCopy(src []byte) []byte {
	dest := make([]byte, len(src))
	copy(dest, src)
	return dest
}