historyleveldb.go 6.33 KB
Newer Older
1
/*
2
Copyright IBM Corp. All Rights Reserved.
3

4
SPDX-License-Identifier: Apache-2.0
5
6
7
8
9
*/

package historyleveldb

import (
10
	"github.com/hyperledger/fabric/common/flogging"
11
	"github.com/hyperledger/fabric/common/ledger/blkstorage"
12
	"github.com/hyperledger/fabric/common/ledger/util/leveldbhelper"
13
14
	"github.com/hyperledger/fabric/core/ledger"
	"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
manish's avatar
manish committed
15
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
16
17
	"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
	"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
18
	"github.com/hyperledger/fabric/core/ledger/util"
19
	"github.com/hyperledger/fabric/protos/common"
denyeart's avatar
denyeart committed
20
	putils "github.com/hyperledger/fabric/protos/utils"
21
22
)

23
var logger = flogging.MustGetLogger("historyleveldb")
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81

var savePointKey = []byte{0x00}
var emptyValue = []byte{}

// HistoryDBProvider implements interface HistoryDBProvider
type HistoryDBProvider struct {
	dbProvider *leveldbhelper.Provider
}

// NewHistoryDBProvider instantiates HistoryDBProvider
func NewHistoryDBProvider() *HistoryDBProvider {
	dbPath := ledgerconfig.GetHistoryLevelDBPath()
	logger.Debugf("constructing HistoryDBProvider dbPath=%s", dbPath)
	dbProvider := leveldbhelper.NewProvider(&leveldbhelper.Conf{DBPath: dbPath})
	return &HistoryDBProvider{dbProvider}
}

// GetDBHandle gets the handle to a named database
func (provider *HistoryDBProvider) GetDBHandle(dbName string) (historydb.HistoryDB, error) {
	return newHistoryDB(provider.dbProvider.GetDBHandle(dbName), dbName), nil
}

// Close closes the underlying db
func (provider *HistoryDBProvider) Close() {
	provider.dbProvider.Close()
}

// historyDB implements HistoryDB interface
type historyDB struct {
	db     *leveldbhelper.DBHandle
	dbName string
}

// newHistoryDB constructs an instance of HistoryDB
func newHistoryDB(db *leveldbhelper.DBHandle, dbName string) *historyDB {
	return &historyDB{db, dbName}
}

// Open implements method in HistoryDB interface
func (historyDB *historyDB) Open() error {
	// do nothing because shared db is used
	return nil
}

// Close implements method in HistoryDB interface
func (historyDB *historyDB) Close() {
	// do nothing because shared db is used
}

// Commit implements method in HistoryDB interface
func (historyDB *historyDB) Commit(block *common.Block) error {

	blockNo := block.Header.Number
	//Set the starting tranNo to 0
	var tranNo uint64

	dbBatch := leveldbhelper.NewUpdateBatch()

82
83
	logger.Debugf("Channel [%s]: Updating history database for blockNo [%v] with [%d] transactions",
		historyDB.dbName, blockNo, len(block.Data.Data))
84

85
86
87
88
	// Get the invalidation byte array for the block
	txsFilter := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])

	// write each tran's write set to history db
89
90
	for _, envBytes := range block.Data.Data {

91
		// If the tran is marked as invalid, skip it
92
		if txsFilter.IsInvalid(int(tranNo)) {
93
94
			logger.Debugf("Channel [%s]: Skipping history write for invalid transaction number %d",
				historyDB.dbName, tranNo)
95
			tranNo++
96
97
98
			continue
		}

denyeart's avatar
denyeart committed
99
		env, err := putils.GetEnvelopeFromBlock(envBytes)
100
101
102
103
		if err != nil {
			return err
		}

denyeart's avatar
denyeart committed
104
105
		payload, err := putils.GetPayload(env)
		if err != nil {
106
107
108
			return err
		}

109
110
111
112
113
114
		chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
		if err != nil {
			return err
		}

		if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
115

denyeart's avatar
denyeart committed
116
117
118
119
120
			// extract actions from the envelope message
			respPayload, err := putils.GetActionFromEnvelope(envBytes)
			if err != nil {
				return err
			}
121

denyeart's avatar
denyeart committed
122
			//preparation for extracting RWSet from transaction
manish's avatar
manish committed
123
			txRWSet := &rwsetutil.TxRwSet{}
denyeart's avatar
denyeart committed
124
125
126

			// Get the Result from the Action and then Unmarshal
			// it into a TxReadWriteSet using custom unmarshalling
manish's avatar
manish committed
127
			if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
denyeart's avatar
denyeart committed
128
129
130
131
				return err
			}
			// for each transaction, loop through the namespaces and writesets
			// and add a history record for each write
manish's avatar
manish committed
132
			for _, nsRWSet := range txRWSet.NsRwSets {
denyeart's avatar
denyeart committed
133
				ns := nsRWSet.NameSpace
134

manish's avatar
manish committed
135
				for _, kvWrite := range nsRWSet.KvRwSet.Writes {
denyeart's avatar
denyeart committed
136
					writeKey := kvWrite.Key
137

denyeart's avatar
denyeart committed
138
139
140
141
142
143
					//composite key for history records is in the form ns~key~blockNo~tranNo
					compositeHistoryKey := historydb.ConstructCompositeHistoryKey(ns, writeKey, blockNo, tranNo)

					// No value is required, write an empty byte array (emptyValue) since Put() of nil is not allowed
					dbBatch.Put(compositeHistoryKey, emptyValue)
				}
144
			}
denyeart's avatar
denyeart committed
145
146

		} else {
147
			logger.Debugf("Skipping transaction [%d] since it is not an endorsement transaction\n", tranNo)
148
		}
149
		tranNo++
150
151
152
153
154
155
156
	}

	// add savepoint for recovery purpose
	height := version.NewHeight(blockNo, tranNo)
	dbBatch.Put(savePointKey, height.ToBytes())

	// write the block's history records and savepoint to LevelDB
157
158
	// Setting snyc to true as a precaution, false may be an ok optimization after further testing.
	if err := historyDB.db.WriteBatch(dbBatch, true); err != nil {
159
160
161
		return err
	}

162
	logger.Debugf("Channel [%s]: Updates committed to history database for blockNo [%v]", historyDB.dbName, blockNo)
163
164
165
166
	return nil
}

// NewHistoryQueryExecutor implements method in HistoryDB interface
167
168
func (historyDB *historyDB) NewHistoryQueryExecutor(blockStore blkstorage.BlockStore) (ledger.HistoryQueryExecutor, error) {
	return &LevelHistoryDBQueryExecutor{historyDB, blockStore}, nil
169
170
171
}

// GetBlockNumFromSavepoint implements method in HistoryDB interface
172
func (historyDB *historyDB) GetLastSavepoint() (*version.Height, error) {
173
174
	versionBytes, err := historyDB.db.Get(savePointKey)
	if err != nil || versionBytes == nil {
175
		return nil, err
176
177
	}
	height, _ := version.NewHeightFromBytes(versionBytes)
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
	return height, nil
}

// ShouldRecover implements method in interface kvledger.Recoverer
func (historyDB *historyDB) ShouldRecover(lastAvailableBlock uint64) (bool, uint64, error) {
	if !ledgerconfig.IsHistoryDBEnabled() {
		return false, 0, nil
	}
	savepoint, err := historyDB.GetLastSavepoint()
	if err != nil {
		return false, 0, err
	}
	if savepoint == nil {
		return true, 0, nil
	}
	return savepoint.BlockNum != lastAvailableBlock, savepoint.BlockNum + 1, nil
}

// CommitLostBlock implements method in interface kvledger.Recoverer
197
198
func (historyDB *historyDB) CommitLostBlock(blockAndPvtdata *ledger.BlockAndPvtData) error {
	block := blockAndPvtdata.Block
199
200
201
202
	if err := historyDB.Commit(block); err != nil {
		return err
	}
	return nil
203
}