Commit 7439cd35 authored by manish's avatar manish
Browse files

WIP - Initial commit for ledger code.



This includes structure of the new code conforming to new architecture
and a basic working implementation.

Change-Id: I885e122c0001ff82301cd1448a7ddad80db931b0
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent f6a60035
### Read-Write set semantics
This documents discusses the details of the current implementation about the semantics of read-write sets.
##### Transaction simulation and read-write set
During simulation of a transaction at an `endorser`, a read-write set is prepared for the transaction. The `read set` contains a list of unique keys and their their committed versions that the transaction reads during simulation. The `write set` contains a list of unique keys (though there can be overlap with the keys present in the read set) and their new values that the transaction writes. A delete marker is set (in the place of new value) for the key if the update performed by the transaction is to delete the key.
Further, if the transaction writes a value multiple times for a key, only the last written value is retained. Also, if a transaction reads a value for a key that the transaction itself has written before, the last written value is returned instead of the value present in the committed snapshot; one implication of this is that if a transaction writes a value for a key before reading it from the committed snapshot, the key does not appear in the read set of the transaction.
As noted earlier, the versions of the keys are recorded only in the read set; the write set just contains the list of unique keys and their latest values set by the transaction.
Following is an illustration of an example read-write set prepared by simulation of an hypothetical transaction.
```
<TxReadWriteSet>
<NsReadWriteSet name="chaincode1">
<read-set>
<read key="K1", version="1">
<read key="K2", version="1">
</read-set>
<write-set>
<write key="K1", value="V1"
<write key="K3", value="V2"
<write key="K4", isDelete="true"
</write-set>
</NsReadWriteSet>
<TxReadWriteSet>
```
##### Transaction validation and updating world state using read-write set
A `committer` uses the read set portion of the read-write set for checking the validity of a transaction and the write set portion of the read-write set for updating the versions and the values of the affected keys.
In the validation phase, a transaction is considered `valid` iff the version of each key present in the read-set of the transaction matches the version for the same key in the world state - assuming all the preceding `valid` transactions (including the preceding transactions in the same block) are committed.
If a transaction passes the validity check, the committer uses the write set for updating the world state. In the update phase, for each key present in the write set, the value in the world state for the same key is set to the value as specified in the write set. Further, the version of the key in the world state is incremented by one.
##### Example simulation and validation
This section helps understanding the semantics with the help of an example scenario.
For the purpose of this example, the presence of a key `k` in the world state is represented by a tuple `(k,ver,val)` where `ver` is the latest version of the key `k` having `val` as its value.
Now, consider a set of file transactions `T1, T2, T3, T4, and T5`, all simulated on the same snapshot of the world state. Following snippet shows the snapshot of the world state against witch the transactions are simulated and the sequence of read and write activities performed by each of these transactions.
```
World state: (k1,1,v1), (k2,1,v2), (k3,1,v3), (k4,1,v4), (k5,1,v5)
T1 -> Write(k1, v1'), Write(k2, v2')
T2 -> Read(k1), Write(k3, v3')
T3 -> Write(k2, v2'')
T4 -> Write(k2, v2'''), read(k2)
T5 -> Write(k6, v6'), read(k1)
```
Now, assume that these transactions are ordered in the sequence of T1,..,T5 (could be contained in a single block or different blocks)
1. `T1` passes the validation because it does not perform any read. Further, the tuple of keys `k1` and `k2` in the world state are updated to `(k1,2,v1'), (k2,2,v2')`
2. `T2` fails the validation because it reads a key `k1` which is modified by a preceding transaction `T1`
3. `T3` passes the validation because it does not perform a read. Further the tuple of the key `k2` in the world state are updated to `(k2,3,v2'')`
4. `T4` passes the validation because it performs a read the key `k2` after writing the new value (though the key was modified by a preceding transaction `T1`). Further the tuple of the key `k2` in the world state are updated to `(k2,4,v2''')`
5. `T5` fails the validation because it performs a read for key `k1` which is modified by a preceding transaction `T1`
#### Transactions with multiple read-write sets
If a transaction contains multiple read-write sets as a result of including different simulations results in a single transaction, the validation also checks for read conflicts between the read-write sets in addition to the read conflicts check with preceding transactions.
#### Questions
1. In the final block, is there a benefit of persisting read-set portion of the read-write set? The advantage of not storing clearly reduces the storage space requirement. If we chose not to store the read-set, the endorsers should sign only the write set portion of the read-write set which means that the
`actionBytes` field in the `EndorsedAction` would contain only write set and a separate field would be required for the read set.
2. Is there a benefit of deciding the version for a key in the write-set at simulation time instead of commit time? If we fix the version at the simulation time, then we would have to discard the transactions that have only write conflicts (i.e., some other transaction has written the version).
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package blkstorage
import (
"github.com/hyperledger/fabric/core/ledgernext"
"github.com/hyperledger/fabric/protos"
)
// BlockStore - an interface for persisting and retrieving blocks
type BlockStore interface {
AddBlock(block *protos.Block2) error
GetBlockchainInfo() (*protos.BlockchainInfo, error)
RetrieveBlocks(startNum uint64, endNum uint64) (ledger.ResultsIterator, error)
RetrieveBlockByHash(blockHash []byte) (*protos.Block2, error)
RetrieveBlockByNumber(blockNum uint64) (*protos.Block2, error)
RetrieveTxByID(txID string) (*protos.Transaction2, error)
Shutdown()
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"fmt"
"sync/atomic"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledgernext/util"
"github.com/hyperledger/fabric/core/ledgernext/util/db"
"github.com/hyperledger/fabric/protos"
"github.com/op/go-logging"
"github.com/tecbot/gorocksdb"
)
var logger = logging.MustGetLogger("kvledger")
const (
blockIndexCF = "blockIndexCF"
blockfilePrefix = "blockfile_"
)
var (
blkMgrInfoKey = []byte("blkMgrInfo")
)
type blockfileMgr struct {
rootDir string
conf *Conf
db *db.DB
defaultCF *gorocksdb.ColumnFamilyHandle
index *blockIndex
cpInfo *checkpointInfo
currentFileWriter *blockfileWriter
bcInfo atomic.Value
}
func newBlockfileMgr(conf *Conf) *blockfileMgr {
rootDir := conf.blockfilesDir
_, err := util.CreateDirIfMissing(rootDir)
if err != nil {
panic(fmt.Sprintf("Error: %s", err))
}
db := initDB(conf)
mgr := &blockfileMgr{rootDir: rootDir, conf: conf, db: db, defaultCF: db.GetDefaultCFHandle()}
cpInfo, err := mgr.loadCurrentInfo()
if err != nil {
panic(fmt.Sprintf("Could not get block file info for current block file from db: %s", err))
}
if cpInfo == nil {
cpInfo = &checkpointInfo{latestFileChunkSuffixNum: 0, latestFileChunksize: 0}
err = mgr.saveCurrentInfo(cpInfo)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
}
currentFileWriter, err := newBlockfileWriter(deriveBlockfilePath(rootDir, cpInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to current file: %s", err))
}
err = currentFileWriter.truncateFile(cpInfo.latestFileChunksize)
if err != nil {
panic(fmt.Sprintf("Could not truncate current file to known size in db: %s", err))
}
mgr.index = newBlockIndex(db)
mgr.cpInfo = cpInfo
mgr.currentFileWriter = currentFileWriter
// init BlockchainInfo
bcInfo := &protos.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil}
if cpInfo.lastBlockNumber > 0 {
lastBlock, err := mgr.retrieveSerBlockByNumber(cpInfo.lastBlockNumber)
if err != nil {
panic(fmt.Sprintf("Could not retrieve last block form file: %s", err))
}
lastBlockHash := lastBlock.ComputeHash()
previousBlockHash, err := lastBlock.GetPreviousBlockHash()
if err != nil {
panic(fmt.Sprintf("Error in decoding block: %s", err))
}
bcInfo = &protos.BlockchainInfo{
Height: cpInfo.lastBlockNumber,
CurrentBlockHash: lastBlockHash,
PreviousBlockHash: previousBlockHash}
}
mgr.bcInfo.Store(bcInfo)
return mgr
}
func initDB(conf *Conf) *db.DB {
dbInst := db.CreateDB(&db.Conf{DBPath: conf.dbPath, CFNames: []string{blockIndexCF}})
dbInst.Open()
return dbInst
}
func deriveBlockfilePath(rootDir string, suffixNum int) string {
return rootDir + "/" + blockfilePrefix + fmt.Sprintf("%06d", suffixNum)
}
func (mgr *blockfileMgr) open() error {
return mgr.currentFileWriter.open()
}
func (mgr *blockfileMgr) close() {
mgr.currentFileWriter.close()
mgr.db.Close()
}
func (mgr *blockfileMgr) moveToNextFile() {
nextFileInfo := &checkpointInfo{latestFileChunkSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum + 1, latestFileChunksize: 0}
nextFileWriter, err := newBlockfileWriter(deriveBlockfilePath(mgr.rootDir, nextFileInfo.latestFileChunkSuffixNum))
if err != nil {
panic(fmt.Sprintf("Could not open writer to next file: %s", err))
}
mgr.currentFileWriter.close()
err = mgr.saveCurrentInfo(nextFileInfo)
if err != nil {
panic(fmt.Sprintf("Could not save next block file info to db: %s", err))
}
mgr.cpInfo = nextFileInfo
mgr.currentFileWriter = nextFileWriter
}
func (mgr *blockfileMgr) addBlock(block *protos.Block2) error {
serBlock, err := protos.ConstructSerBlock2(block)
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
blockBytes := serBlock.GetBytes()
blockHash := serBlock.ComputeHash()
txOffsets, err := serBlock.GetTxOffsets()
if err != nil {
return fmt.Errorf("Error while serializing block: %s", err)
}
currentOffset := mgr.cpInfo.latestFileChunksize
length := len(blockBytes)
encodedLen := proto.EncodeVarint(uint64(length))
totalLen := length + len(encodedLen)
if currentOffset+totalLen > mgr.conf.maxBlockfileSize {
mgr.moveToNextFile()
currentOffset = 0
}
err = mgr.currentFileWriter.append(encodedLen)
if err != nil {
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
}
err = mgr.currentFileWriter.append(blockBytes)
if err != nil {
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
}
return fmt.Errorf("Error while appending block to file: %s", err)
}
mgr.cpInfo.latestFileChunksize += totalLen
mgr.cpInfo.lastBlockNumber++
err = mgr.saveCurrentInfo(mgr.cpInfo)
if err != nil {
mgr.cpInfo.latestFileChunksize -= totalLen
err1 := mgr.currentFileWriter.truncateFile(mgr.cpInfo.latestFileChunksize)
if err1 != nil {
panic(fmt.Sprintf("Could not truncate current file to known size after an error while appending a block: %s", err))
}
return fmt.Errorf("Error while saving current file info to db: %s", err)
}
blockFLP := &fileLocPointer{fileSuffixNum: mgr.cpInfo.latestFileChunkSuffixNum}
blockFLP.offset = currentOffset
mgr.index.indexBlock(mgr.cpInfo.lastBlockNumber, blockHash, blockFLP, length, len(encodedLen), txOffsets)
mgr.updateBlockchainInfo(blockHash, block)
return nil
}
func (mgr *blockfileMgr) getBlockchainInfo() *protos.BlockchainInfo {
return mgr.bcInfo.Load().(*protos.BlockchainInfo)
}
func (mgr *blockfileMgr) updateBlockchainInfo(latestBlockHash []byte, latestBlock *protos.Block2) {
currentBCInfo := mgr.getBlockchainInfo()
newBCInfo := &protos.BlockchainInfo{Height: currentBCInfo.Height + 1, CurrentBlockHash: latestBlockHash, PreviousBlockHash: latestBlock.PreviousBlockHash}
mgr.bcInfo.Store(newBCInfo)
}
func (mgr *blockfileMgr) retrieveBlockByHash(blockHash []byte) (*protos.Block2, error) {
logger.Debugf("retrieveBlockByHash() - blockHash = [%#v]", blockHash)
loc, err := mgr.index.getBlockLocByHash(blockHash)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlockByNumber(blockNum uint64) (*protos.Block2, error) {
logger.Debugf("retrieveBlockByNumber() - blockNum = [%d]", blockNum)
loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
if err != nil {
return nil, err
}
return mgr.fetchBlock(loc)
}
func (mgr *blockfileMgr) retrieveSerBlockByNumber(blockNum uint64) (*protos.SerBlock2, error) {
logger.Debugf("retrieveSerBlockByNumber() - blockNum = [%d]", blockNum)
loc, err := mgr.index.getBlockLocByBlockNum(blockNum)
if err != nil {
return nil, err
}
return mgr.fetchSerBlock(loc)
}
func (mgr *blockfileMgr) retrieveBlocks(startNum uint64, endNum uint64) (*BlocksItr, error) {
var lp *fileLocPointer
var err error
if lp, err = mgr.index.getBlockLocByBlockNum(startNum); err != nil {
return nil, err
}
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
var stream *blockStream
if stream, err = newBlockStream(filePath, int64(lp.offset)); err != nil {
return nil, err
}
return newBlockItr(stream, int(endNum-startNum)+1), nil
}
func (mgr *blockfileMgr) retrieveTransactionByID(txID string) (*protos.Transaction2, error) {
logger.Debugf("retrieveTransactionByID() - txId = [%s]", txID)
loc, err := mgr.index.getTxLoc(txID)
if err != nil {
return nil, err
}
return mgr.fetchTransaction(loc)
}
func (mgr *blockfileMgr) fetchBlock(lp *fileLocPointer) (*protos.Block2, error) {
serBlock, err := mgr.fetchSerBlock(lp)
if err != nil {
return nil, err
}
block, err := serBlock.ToBlock2()
if err != nil {
return nil, err
}
return block, nil
}
func (mgr *blockfileMgr) fetchSerBlock(lp *fileLocPointer) (*protos.SerBlock2, error) {
blockBytes, err := mgr.fetchBlockBytes(lp)
if err != nil {
return nil, err
}
return protos.NewSerBlock2(blockBytes), nil
}
func (mgr *blockfileMgr) fetchTransaction(lp *fileLocPointer) (*protos.Transaction2, error) {
txBytes, err := mgr.fetchRawBytes(lp)
if err != nil {
return nil, err
}
tx := &protos.Transaction2{}
err = proto.Unmarshal(txBytes, tx)
if err != nil {
return nil, err
}
return tx, nil
}
func (mgr *blockfileMgr) fetchBlockBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
stream, err := newBlockStream(filePath, int64(lp.offset))
if err != nil {
return nil, err
}
defer stream.close()
b, err := stream.nextBlockBytes()
if err != nil {
return nil, err
}
return b, nil
}
func (mgr *blockfileMgr) fetchRawBytes(lp *fileLocPointer) ([]byte, error) {
filePath := deriveBlockfilePath(mgr.rootDir, lp.fileSuffixNum)
reader, err := newBlockfileReader(filePath)
if err != nil {
return nil, err
}
defer reader.close()
b, err := reader.read(lp.offset, lp.bytesLength)
if err != nil {
return nil, err
}
return b, nil
}
func (mgr *blockfileMgr) loadCurrentInfo() (*checkpointInfo, error) {
b, err := mgr.db.Get(mgr.defaultCF, blkMgrInfoKey)
if err != nil {
return nil, err
}
if b == nil {
return nil, err
}
i := &checkpointInfo{}
if err = i.unmarshal(b); err != nil {
return nil, err
}
return i, nil
}
func (mgr *blockfileMgr) saveCurrentInfo(i *checkpointInfo) error {
b, err := i.marshal()
if err != nil {
return err
}
err = mgr.db.Put(mgr.defaultCF, blkMgrInfoKey, b)
if err != nil {
return err
}
return nil
}
// blkMgrInfo
type checkpointInfo struct {
latestFileChunkSuffixNum int
latestFileChunksize int
lastBlockNumber uint64
}
func (i *checkpointInfo) marshal() ([]byte, error) {
buffer := proto.NewBuffer([]byte{})
var err error
if err = buffer.EncodeVarint(uint64(i.latestFileChunkSuffixNum)); err != nil {
return nil, err
}
if err = buffer.EncodeVarint(uint64(i.latestFileChunksize)); err != nil {
return nil, err
}
if err = buffer.EncodeVarint(i.lastBlockNumber); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func (i *checkpointInfo) unmarshal(b []byte) error {
buffer := proto.NewBuffer(b)
var val uint64
var err error
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.latestFileChunkSuffixNum = int(val)
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.latestFileChunksize = int(val)
if val, err = buffer.DecodeVarint(); err != nil {
return err
}
i.lastBlockNumber = val
return nil
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fsblkstorage
import (
"fmt"
"testing"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledgernext/testutil"
"github.com/hyperledger/fabric/protos"
)
func TestBlockfileMgrBlockReadWrite(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
blkfileMgrWrapper.testGetBlockByHash(blocks)
blkfileMgrWrapper.testGetBlockByNumber(blocks, 1)
}
func TestBlockfileMgrBlockIterator(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
itr, err := blkfileMgrWrapper.blockfileMgr.retrieveBlocks(1, 8)
defer itr.Close()
testutil.AssertNoError(t, err, "Error while getting blocks iterator")
numBlocksItrated := 0
for ; itr.Next(); numBlocksItrated++ {
block, err := itr.Get()
testutil.AssertNoError(t, err, fmt.Sprintf("Error while getting block number [%d] from iterator", numBlocksItrated))
testutil.AssertEquals(t, block.(*BlockHolder).GetBlock(), blocks[numBlocksItrated])
}
testutil.AssertEquals(t, numBlocksItrated, 8)
}
func TestBlockfileMgrBlockchainInfo(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
bcInfo := blkfileMgrWrapper.blockfileMgr.getBlockchainInfo()
testutil.AssertEquals(t, bcInfo, &protos.BlockchainInfo{Height: 0, CurrentBlockHash: nil, PreviousBlockHash: nil})
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
bcInfo = blkfileMgrWrapper.blockfileMgr.getBlockchainInfo()
testutil.AssertEquals(t, bcInfo.Height, uint64(10))
}
func TestBlockfileMgrGetTxById(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()
blkfileMgrWrapper := newTestBlockfileWrapper(t, env)
defer blkfileMgrWrapper.close()
blocks := testutil.ConstructTestBlocks(t, 10)
blkfileMgrWrapper.addBlocks(blocks)
for i, blk := range blocks {
for j, txBytes := range blk.Transactions {
// blockNum starts with 1
txID := constructTxID(uint64(i+1), j)
txFromFileMgr, err := blkfileMgrWrapper.blockfileMgr.retrieveTransactionByID(txID)
testutil.AssertNoError(t, err, "Error while retrieving tx from blkfileMgr")
tx := &protos.Transaction2{}
err = proto.Unmarshal(txBytes, tx)
testutil.AssertNoError(t, err, "Error while unmarshalling tx")
testutil.AssertEquals(t, txFromFileMgr, tx)
}
}
}
func TestBlockfileMgrRestart(t *testing.T) {
env := newTestEnv(t)
defer env.Cleanup()