kv_ledger_provider.go 6.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kvledger

import (
	"errors"

22
23
24
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
	"github.com/hyperledger/fabric/common/ledger/blkstorage/fsblkstorage"
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
25
	"github.com/hyperledger/fabric/core/ledger"
26
27
	"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
	"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb/historyleveldb"
28
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
29
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
30
31
32
33
34
35
36
37
38
39
40
41
42
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
)

var (
	// ErrLedgerIDExists is thrown by a CreateLedger call if a ledger with the given id already exists
	ErrLedgerIDExists = errors.New("LedgerID already exists")
	// ErrNonExistingLedgerID is thrown by a OpenLedger call if a ledger with the given id does not exist
	ErrNonExistingLedgerID = errors.New("LedgerID does not exist")
	// ErrLedgerNotOpened is thrown by a CloseLedger call if a ledger with the given id has not been opened
	ErrLedgerNotOpened = errors.New("Ledger is not opened yet")
)

43
// Provider implements interface ledger.PeerLedgerProvider
44
type Provider struct {
manish's avatar
manish committed
45
46
	idStore            *idStore
	blockStoreProvider blkstorage.BlockStoreProvider
47
48
	vdbProvider        statedb.VersionedDBProvider
	historydbProvider  historydb.HistoryDBProvider
49
50
51
52
}

// NewProvider instantiates a new Provider.
// This is not thread-safe and assumed to be synchronized be the caller
53
func NewProvider() (ledger.PeerLedgerProvider, error) {
54

55
	logger.Info("Initializing ledger provider")
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

	// Initialize the ID store (inventory of chainIds/ledgerIds)
	idStore := openIDStore(ledgerconfig.GetLedgerProviderPath())

	// Initialize the block storage
	attrsToIndex := []blkstorage.IndexableAttr{
		blkstorage.IndexableAttrBlockHash,
		blkstorage.IndexableAttrBlockNum,
		blkstorage.IndexableAttrTxID,
		blkstorage.IndexableAttrBlockNumTranNum,
	}
	indexConfig := &blkstorage.IndexConfig{AttrsToIndex: attrsToIndex}
	blockStoreProvider := fsblkstorage.NewProvider(
		fsblkstorage.NewConf(ledgerconfig.GetBlockStorePath(), ledgerconfig.GetMaxBlockfileSize()),
		indexConfig)

	// Initialize the versioned database (state database)
73
74
75
76
77
	var vdbProvider statedb.VersionedDBProvider
	if !ledgerconfig.IsCouchDBEnabled() {
		logger.Debugf("Constructing leveldb VersionedDBProvider")
		vdbProvider = stateleveldb.NewVersionedDBProvider()
	} else {
78
79
80
81
82
83
		logger.Debugf("Constructing CouchDB VersionedDBProvider")
		var err error
		vdbProvider, err = statecouchdb.NewVersionedDBProvider()
		if err != nil {
			return nil, err
		}
84
	}
manish's avatar
manish committed
85

86
87
88
89
	// Initialize the history database (index for history of values by key)
	var historydbProvider historydb.HistoryDBProvider
	historydbProvider = historyleveldb.NewHistoryDBProvider()

90
	logger.Info("ledger provider Initialized")
91
	return &Provider{idStore, blockStoreProvider, vdbProvider, historydbProvider}, nil
92
93
}

94
95
// Create implements the corresponding method from interface ledger.PeerLedgerProvider
func (provider *Provider) Create(ledgerID string) (ledger.PeerLedger, error) {
96
97
98
99
100
101
102
103
	exists, err := provider.idStore.ledgerIDExists(ledgerID)
	if err != nil {
		return nil, err
	}
	if exists {
		return nil, ErrLedgerIDExists
	}
	provider.idStore.createLedgerID(ledgerID)
manish's avatar
manish committed
104
	return provider.Open(ledgerID)
105
106
}

107
108
// Open implements the corresponding method from interface ledger.PeerLedgerProvider
func (provider *Provider) Open(ledgerID string) (ledger.PeerLedger, error) {
109
110

	// Check the ID store to ensure that the chainId/ledgerId exists
111
112
113
114
115
116
117
	exists, err := provider.idStore.ledgerIDExists(ledgerID)
	if err != nil {
		return nil, err
	}
	if !exists {
		return nil, ErrNonExistingLedgerID
	}
118
119
120
121
122
123
124
125

	// Get the block store for a chain/ledger
	blockStore, err := provider.blockStoreProvider.OpenBlockStore(ledgerID)
	if err != nil {
		return nil, err
	}

	// Get the versioned database (state database) for a chain/ledger
manish's avatar
manish committed
126
127
128
129
	vDB, err := provider.vdbProvider.GetDBHandle(ledgerID)
	if err != nil {
		return nil, err
	}
130
131
132

	// Get the history database (index for history of values by key) for a chain/ledger
	historyDB, err := provider.historydbProvider.GetDBHandle(ledgerID)
manish's avatar
manish committed
133
134
135
	if err != nil {
		return nil, err
	}
136
137
138
139

	// Create a kvLedger for this chain/ledger, which encasulates the underlying data stores
	// (id store, blockstore, state database, history database)
	l, err := newKVLedger(ledgerID, blockStore, vDB, historyDB)
140
141
142
143
144
145
	if err != nil {
		return nil, err
	}
	return l, nil
}

146
// Exists implements the corresponding method from interface ledger.PeerLedgerProvider
147
148
149
150
func (provider *Provider) Exists(ledgerID string) (bool, error) {
	return provider.idStore.ledgerIDExists(ledgerID)
}

151
// List implements the corresponding method from interface ledger.PeerLedgerProvider
152
153
154
155
func (provider *Provider) List() ([]string, error) {
	return provider.idStore.getAllLedgerIds()
}

156
// Close implements the corresponding method from interface ledger.PeerLedgerProvider
157
158
func (provider *Provider) Close() {
	provider.idStore.close()
manish's avatar
manish committed
159
	provider.blockStoreProvider.Close()
160
161
	provider.vdbProvider.Close()
	provider.historydbProvider.Close()
162
163
164
}

type idStore struct {
manish's avatar
manish committed
165
	db *leveldbhelper.DB
166
167
168
}

func openIDStore(path string) *idStore {
manish's avatar
manish committed
169
	db := leveldbhelper.CreateDB(&leveldbhelper.Conf{DBPath: path})
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
	db.Open()
	return &idStore{db}
}

func (s *idStore) createLedgerID(ledgerID string) error {
	key := []byte(ledgerID)
	val := []byte{}
	err := error(nil)
	if val, err = s.db.Get(key); err != nil {
		return err
	}
	if val != nil {
		return ErrLedgerIDExists
	}
	return s.db.Put(key, val, true)
}

func (s *idStore) ledgerIDExists(ledgerID string) (bool, error) {
	key := []byte(ledgerID)
	val := []byte{}
	err := error(nil)
	if val, err = s.db.Get(key); err != nil {
		return false, err
	}
	return val != nil, nil
}

func (s *idStore) getAllLedgerIds() ([]string, error) {
	var ids []string
	itr := s.db.GetIterator(nil, nil)
	itr.First()
	for itr.Valid() {
		key := string(itr.Key())
		ids = append(ids, key)
		itr.Next()
	}
	return ids, nil
}

func (s *idStore) close() {
	s.db.Close()
}