kv_ledger.go 10.8 KB
Newer Older
manish's avatar
manish committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*
Copyright IBM Corp. 2016 All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

		 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kvledger

import (
	"errors"
	"fmt"

23
24
	commonledger "github.com/hyperledger/fabric/common/ledger"
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
25
	"github.com/hyperledger/fabric/core/ledger"
26
	"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
27
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
28
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr"
29
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr"
30
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
31

32
	logging "github.com/op/go-logging"
33

34
	"github.com/hyperledger/fabric/protos/common"
manish's avatar
manish committed
35
36
)

37
38
var logger = logging.MustGetLogger("kvledger")

39
// KVLedger provides an implementation of `ledger.PeerLedger`.
manish's avatar
manish committed
40
// This implementation provides a key-value based data model
manish's avatar
manish committed
41
type kvLedger struct {
42
43
44
45
	ledgerID   string
	blockStore blkstorage.BlockStore
	txtmgmt    txmgr.TxMgr
	historyDB  historydb.HistoryDB
manish's avatar
manish committed
46
47
48
}

// NewKVLedger constructs new `KVLedger`
49
50
51
func newKVLedger(ledgerID string, blockStore blkstorage.BlockStore,
	versionedDB statedb.VersionedDB, historyDB historydb.HistoryDB) (*kvLedger, error) {

52
	logger.Debugf("Creating KVLedger ledgerID=%s: ", ledgerID)
53

54
55
	//Initialize transaction manager using state database
	var txmgmt txmgr.TxMgr
manish's avatar
manish committed
56
	txmgmt = lockbasedtxmgr.NewLockBasedTxMgr(versionedDB)
57

58
59
60
	// Create a kvLedger for this chain/ledger, which encasulates the underlying
	// id store, blockstore, txmgr (state database), history database
	l := &kvLedger{ledgerID, blockStore, txmgmt, historyDB}
61

62
63
	//Recover both state DB and history DB if they are out of sync with block storage
	if err := recoverDB(l); err != nil {
senthil's avatar
senthil committed
64
65
		panic(fmt.Errorf(`Error during state DB recovery:%s`, err))
	}
66

senthil's avatar
senthil committed
67
68
69
	return l, nil
}

70
71
//Recover the state database and history database (if exist)
//by recommitting last valid blocks
manish's avatar
manish committed
72
func recoverDB(l *kvLedger) error {
senthil's avatar
senthil committed
73
74
75
	//If there is no block in blockstorage, nothing to recover.
	info, _ := l.blockStore.GetBlockchainInfo()
	if info.Height == 0 {
76
		logger.Debugf("Block storage is empty.")
senthil's avatar
senthil committed
77
78
79
80
		return nil
	}

	var err error
81
82
83
84
85
86
	var stateDBSavepoint, historyDBSavepoint uint64
	//Default value for bool is false
	var recoverStateDB, recoverHistoryDB bool

	//Getting savepointValue stored in the state DB
	if stateDBSavepoint, err = l.txtmgmt.GetBlockNumFromSavepoint(); err != nil {
senthil's avatar
senthil committed
87
88
		return err
	}
89

90
91
92
93
94
95
96
	//Check whether the state DB is in sync with block storage
	if recoverStateDB, err = isRecoveryNeeded(stateDBSavepoint, info.Height); err != nil {
		return err
	}

	if ledgerconfig.IsHistoryDBEnabled() == true {
		//Getting savepointValue stored in the history DB
97
		if historyDBSavepoint, err = l.historyDB.GetBlockNumFromSavepoint(); err != nil {
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
			return err
		}
		//Check whether the history DB is in sync with block storage
		if recoverHistoryDB, err = isRecoveryNeeded(historyDBSavepoint, info.Height); err != nil {
			return err
		}
	}

	if recoverHistoryDB == false && recoverStateDB == false {
		//If nothing needs recovery, return
		if ledgerconfig.IsHistoryDBEnabled() == true {
			logger.Debugf("Both state database and history database are in sync with the block storage. No need to perform recovery operation.")
		} else {
			logger.Debugf("State database is in sync with the block storage.")
		}
senthil's avatar
senthil committed
113
		return nil
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
	} else if recoverHistoryDB == false && recoverStateDB == true {
		logger.Debugf("State database is behind block storage by %d blocks. Recovering state database.", info.Height-stateDBSavepoint)
		if err = recommitLostBlocks(l, stateDBSavepoint, info.Height, true, false); err != nil {
			return err
		}
	} else if recoverHistoryDB == true && recoverStateDB == false {
		logger.Debugf("History database is behind block storage by %d blocks. Recovering history database.", info.Height-historyDBSavepoint)
		if err = recommitLostBlocks(l, historyDBSavepoint, info.Height, false, true); err != nil {
			return err
		}
	} else if recoverHistoryDB == true && recoverStateDB == true {
		logger.Debugf("State database is behind block storage by %d blocks, and history database is behind block storage by %d blocks. Recovering both state and history database.", info.Height-stateDBSavepoint, info.Height-historyDBSavepoint)
		//If both state DB and history DB need to be recovered, first
		//we need to ensure that the state DB and history DB are in same state
		//before recommitting lost blocks.
		if stateDBSavepoint > historyDBSavepoint {
			logger.Debugf("History database is behind the state database by %d blocks", stateDBSavepoint-historyDBSavepoint)
			logger.Debugf("Making the history DB in sync with state DB")
			if err = recommitLostBlocks(l, historyDBSavepoint, stateDBSavepoint, false, true); err != nil {
				return err
			}
			logger.Debugf("Making both history DB and state DB in sync with the block storage")
			if err = recommitLostBlocks(l, stateDBSavepoint, info.Height, true, true); err != nil {
				return err
			}
		} else if stateDBSavepoint < historyDBSavepoint {
			logger.Debugf("State database is behind the history database by %d blocks", historyDBSavepoint-stateDBSavepoint)
			logger.Debugf("Making the state DB in sync with history DB")
			if err = recommitLostBlocks(l, stateDBSavepoint, historyDBSavepoint, true, false); err != nil {
				return err
			}
			logger.Debugf("Making both state DB and history DB in sync with the block storage")
			if err = recommitLostBlocks(l, historyDBSavepoint, info.Height, true, true); err != nil {
				return err
			}
		} else {
			logger.Debugf("State and history database are in same state but behind block storage")
			logger.Debugf("Making both state DB and history DB in sync with the block storage")
			if err = recommitLostBlocks(l, stateDBSavepoint, info.Height, true, true); err != nil {
				return err
			}
		}
senthil's avatar
senthil committed
156
	}
157
158
	return nil
}
senthil's avatar
senthil committed
159

160
161
162
163
164
165
166
167
168
169
170
171
172
173
//isRecoveryNeeded compares savepoint and current block height to decide whether
//to initiate recovery process
func isRecoveryNeeded(savepoint uint64, blockHeight uint64) (bool, error) {
	if savepoint > blockHeight {
		return false, errors.New("BlockStorage height is behind savepoint by %d blocks. Recovery the BlockStore first")
	} else if savepoint == blockHeight {
		return false, nil
	} else {
		return true, nil
	}
}

//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
//state DB or history DB or both
manish's avatar
manish committed
174
func recommitLostBlocks(l *kvLedger, savepoint uint64, blockHeight uint64, recoverStateDB bool, recoverHistoryDB bool) error {
senthil's avatar
senthil committed
175
	//Compute updateSet for each missing savepoint and commit to state DB
176
177
178
	var err error
	var block *common.Block
	for blockNumber := savepoint + 1; blockNumber <= blockHeight; blockNumber++ {
179
		if block, err = l.GetBlockByNumber(blockNumber); err != nil {
senthil's avatar
senthil committed
180
181
			return err
		}
182
183
184
185
186
187
188
189
190
		if recoverStateDB == true {
			logger.Debugf("Constructing updateSet for the block %d", blockNumber)
			if err = l.txtmgmt.ValidateAndPrepare(block, false); err != nil {
				return err
			}
			logger.Debugf("Committing block %d to state database", blockNumber)
			if err = l.txtmgmt.Commit(); err != nil {
				return err
			}
senthil's avatar
senthil committed
191
		}
192
		if ledgerconfig.IsHistoryDBEnabled() == true && recoverHistoryDB == true {
193
			if err = l.historyDB.Commit(block); err != nil {
194
195
				return err
			}
senthil's avatar
senthil committed
196
197
		}
	}
198

senthil's avatar
senthil committed
199
	return nil
manish's avatar
manish committed
200
201
202
}

// GetTransactionByID retrieves a transaction by id
203
func (l *kvLedger) GetTransactionByID(txID string) (*common.Envelope, error) {
manish's avatar
manish committed
204
205
206
207
	return l.blockStore.RetrieveTxByID(txID)
}

// GetBlockchainInfo returns basic info about blockchain
208
func (l *kvLedger) GetBlockchainInfo() (*common.BlockchainInfo, error) {
manish's avatar
manish committed
209
210
211
212
	return l.blockStore.GetBlockchainInfo()
}

// GetBlockByNumber returns block at a given height
denyeart's avatar
denyeart committed
213
// blockNumber of  math.MaxUint64 will return last block
manish's avatar
manish committed
214
func (l *kvLedger) GetBlockByNumber(blockNumber uint64) (*common.Block, error) {
manish's avatar
manish committed
215
216
217
218
	return l.blockStore.RetrieveBlockByNumber(blockNumber)

}

manish's avatar
manish committed
219
220
221
// GetBlocksIterator returns an iterator that starts from `startBlockNumber`(inclusive).
// The iterator is a blocking iterator i.e., it blocks till the next block gets available in the ledger
// ResultsIterator contains type BlockHolder
222
func (l *kvLedger) GetBlocksIterator(startBlockNumber uint64) (commonledger.ResultsIterator, error) {
manish's avatar
manish committed
223
	return l.blockStore.RetrieveBlocks(startBlockNumber)
manish's avatar
manish committed
224
225
226
227

}

// GetBlockByHash returns a block given it's hash
manish's avatar
manish committed
228
func (l *kvLedger) GetBlockByHash(blockHash []byte) (*common.Block, error) {
manish's avatar
manish committed
229
230
231
232
	return l.blockStore.RetrieveBlockByHash(blockHash)
}

//Prune prunes the blocks/transactions that satisfy the given policy
233
func (l *kvLedger) Prune(policy commonledger.PrunePolicy) error {
manish's avatar
manish committed
234
235
236
237
	return errors.New("Not yet implemented")
}

// NewTxSimulator returns new `ledger.TxSimulator`
manish's avatar
manish committed
238
func (l *kvLedger) NewTxSimulator() (ledger.TxSimulator, error) {
manish's avatar
manish committed
239
240
241
	return l.txtmgmt.NewTxSimulator()
}

242
// NewQueryExecutor gives handle to a query executor.
manish's avatar
manish committed
243
244
// A client can obtain more than one 'QueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
manish's avatar
manish committed
245
func (l *kvLedger) NewQueryExecutor() (ledger.QueryExecutor, error) {
manish's avatar
manish committed
246
247
248
	return l.txtmgmt.NewQueryExecutor()
}

249
250
251
// NewHistoryQueryExecutor gives handle to a history query executor.
// A client can obtain more than one 'HistoryQueryExecutor's for parallel execution.
// Any synchronization should be performed at the implementation level if required
manish's avatar
manish committed
252
func (l *kvLedger) NewHistoryQueryExecutor() (ledger.HistoryQueryExecutor, error) {
253
	return l.historyDB.NewHistoryQueryExecutor()
254
255
}

256
// Commit commits the valid block (returned in the method RemoveInvalidTransactionsAndPrepare) and related state changes
manish's avatar
manish committed
257
func (l *kvLedger) Commit(block *common.Block) error {
manish's avatar
manish committed
258
259
	var err error

260
261
262
263
	logger.Debugf("Validating block")
	err = l.txtmgmt.ValidateAndPrepare(block, true)
	if err != nil {
		return err
manish's avatar
manish committed
264
	}
265
266

	logger.Debugf("Committing block to storage")
267
	if err = l.blockStore.AddBlock(block); err != nil {
manish's avatar
manish committed
268
269
		return err
	}
270

271
	logger.Debugf("Committing block transactions to state database")
272
	if err = l.txtmgmt.Commit(); err != nil {
manish's avatar
manish committed
273
274
		panic(fmt.Errorf(`Error during commit to txmgr:%s`, err))
	}
275

276
	// History database could be written in parallel with state and/or async as a future optimization
277
	if ledgerconfig.IsHistoryDBEnabled() == true {
278
279
280
		logger.Debugf("Committing block transactions to history database")
		if err := l.historyDB.Commit(block); err != nil {
			panic(fmt.Errorf(`Error during commit to history db:%s`, err))
281
282
283
		}
	}

manish's avatar
manish committed
284
285
286
287
	return nil
}

// Close closes `KVLedger`
manish's avatar
manish committed
288
func (l *kvLedger) Close() {
manish's avatar
manish committed
289
290
291
	l.blockStore.Shutdown()
	l.txtmgmt.Shutdown()
}