Commit 95eebc1c authored by manish's avatar manish
Browse files

FAB-11755 Implement rollback in pvtdatastore



This CR implements rollback in pvtdata store. In the implementation,
it deletes the existing pvtdata and eligible missingdata that had
been created by the last 'prepare' call immediately preceding the
rollback call. See Jira for more details

FAB-11755 #done

Change-Id: Icc47c79f9c55bb3590ecea82a138acb8165ec02c
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
parent 73d19177
......@@ -197,3 +197,15 @@ func createRangeScanKeysForCollElg() (startKey, endKey []byte) {
return encodeCollElgKey(math.MaxUint64),
encodeCollElgKey(0)
}
func datakeyRange(blockNum uint64) (startKey, endKey []byte) {
startKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, 0).ToBytes()...)
endKey = append(pvtDataKeyPrefix, version.NewHeight(blockNum, math.MaxUint64).ToBytes()...)
return
}
func eligibleMissingdatakeyRange(blkNum uint64) (startKey, endKey []byte) {
startKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum)...)
endKey = append(eligibleMissingDataKeyPrefix, util.EncodeReverseOrderVarUint64(blkNum-1)...)
return
}
......@@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package pvtdatastorage
import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
......@@ -17,3 +18,63 @@ func TestDataKeyEncoding(t *testing.T) {
datakey2 := decodeDatakey(encodeDataKey(dataKey1))
assert.Equal(t, dataKey1, datakey2)
}
func TestDatakeyRange(t *testing.T) {
blockNum := uint64(20)
startKey, endKey := datakeyRange(blockNum)
var txNum uint64
for txNum = 0; txNum < 100; txNum++ {
keyOfBlock := encodeDataKey(
&dataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum},
txNum: txNum,
},
)
keyOfPreviousBlock := encodeDataKey(
&dataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum - 1},
txNum: txNum,
},
)
keyOfNextBlock := encodeDataKey(
&dataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum + 1},
txNum: txNum,
},
)
assert.Equal(t, bytes.Compare(keyOfPreviousBlock, startKey), -1)
assert.Equal(t, bytes.Compare(keyOfBlock, startKey), 1)
assert.Equal(t, bytes.Compare(keyOfBlock, endKey), -1)
assert.Equal(t, bytes.Compare(keyOfNextBlock, endKey), 1)
}
}
func TestEligibleMissingdataRange(t *testing.T) {
blockNum := uint64(20)
startKey, endKey := eligibleMissingdatakeyRange(blockNum)
var txNum uint64
for txNum = 0; txNum < 100; txNum++ {
keyOfBlock := encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum},
isEligible: true,
},
)
keyOfPreviousBlock := encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum - 1},
isEligible: true,
},
)
keyOfNextBlock := encodeMissingDataKey(
&missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns", coll: "coll", blkNum: blockNum + 1},
isEligible: true,
},
)
assert.Equal(t, bytes.Compare(keyOfNextBlock, startKey), -1)
assert.Equal(t, bytes.Compare(keyOfBlock, startKey), 1)
assert.Equal(t, bytes.Compare(keyOfBlock, endKey), -1)
assert.Equal(t, bytes.Compare(keyOfPreviousBlock, endKey), 1)
}
}
......@@ -236,12 +236,33 @@ func (s *store) Commit() error {
}
// Rollback implements the function in the interface `Store`
// Not deleting the existing data entries and expiry entries for now
// Because the next try would have exact same entries and will overwrite those
// This deletes the existing data entries and eligible missing data entries.
// However, this does not delete ineligible missing data entires as the next try
// would have exact same entries and will overwrite those. This also leaves the
// existing expiry entires as is because, most likely they will also get overwritten
// per new data entries. Even if some of the expiry entries does not get overwritten,
// (beacuse of some data may be missing next time), the additional expiry entries are just
// a Noop
func (s *store) Rollback() error {
if !s.batchPending {
return &ErrIllegalCall{"No pending batch to rollback"}
}
blkNum := s.nextBlockNum()
batch := leveldbhelper.NewUpdateBatch()
itr := s.db.GetIterator(datakeyRange(blkNum))
for itr.Next() {
batch.Delete(itr.Key())
}
itr.Release()
itr = s.db.GetIterator(eligibleMissingdatakeyRange(blkNum))
for itr.Next() {
batch.Delete(itr.Key())
}
itr.Release()
batch.Delete(pendingCommitKey)
if err := s.db.WriteBatch(batch, true); err != nil {
return err
}
s.batchPending = false
return nil
}
......
......@@ -773,7 +773,83 @@ func testCollElgEnabled(t *testing.T) {
assert.Equal(expectedMissingPvtDataInfo, missingPvtDataInfo)
}
// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback`
func TestRollBack(t *testing.T) {
btlPolicy := btltestutil.SampleBTLPolicy(
map[[2]string]uint64{
{"ns-1", "coll-1"}: 0,
{"ns-1", "coll-2"}: 0,
},
)
env := NewTestStoreEnv(t, "TestRollBack", btlPolicy)
defer env.Cleanup()
assert := assert.New(t)
store := env.TestStore
assert.NoError(store.Prepare(0, nil, nil))
assert.NoError(store.Commit())
pvtdata := []*ledger.TxPvtData{
produceSamplePvtdata(t, 0, []string{"ns-1:coll-1", "ns-1:coll-2"}),
produceSamplePvtdata(t, 5, []string{"ns-1:coll-1", "ns-1:coll-2"}),
}
missingData := make(ledger.TxMissingPvtDataMap)
missingData.Add(1, "ns-1", "coll-1", true)
missingData.Add(5, "ns-1", "coll-1", true)
missingData.Add(5, "ns-2", "coll-2", false)
for i := 1; i <= 9; i++ {
assert.NoError(store.Prepare(uint64(i), pvtdata, missingData))
assert.NoError(store.Commit())
}
datakeyTx0 := &dataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
txNum: 0,
}
datakeyTx5 := &dataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
txNum: 5,
}
eligibleMissingdatakey := &missingDataKey{
nsCollBlk: nsCollBlk{ns: "ns-1", coll: "coll-1"},
isEligible: true,
}
// test store state before preparing for block 10
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(10, assert, store)
// prepare for block 10 and test store for presence of datakeys and eligibile missingdatakeys
assert.NoError(store.Prepare(10, pvtdata, missingData))
testPendingBatch(true, assert, store)
testLastCommittedBlockHeight(10, assert, store)
datakeyTx0.blkNum = 10
datakeyTx5.blkNum = 10
eligibleMissingdatakey.blkNum = 10
assert.True(testDataKeyExists(t, store, datakeyTx0))
assert.True(testDataKeyExists(t, store, datakeyTx5))
assert.True(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
// rollback last prepared block and test store for absence of datakeys and eligibile missingdatakeys
store.Rollback()
testPendingBatch(false, assert, store)
testLastCommittedBlockHeight(10, assert, store)
assert.False(testDataKeyExists(t, store, datakeyTx0))
assert.False(testDataKeyExists(t, store, datakeyTx5))
assert.False(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
// For previously committed blocks the datakeys and eligibile missingdatakeys should still be present
for i := 1; i <= 9; i++ {
datakeyTx0.blkNum = uint64(i)
datakeyTx5.blkNum = uint64(i)
eligibleMissingdatakey.blkNum = uint64(i)
assert.True(testDataKeyExists(t, store, datakeyTx0))
assert.True(testDataKeyExists(t, store, datakeyTx5))
assert.True(testMissingDataKeyExists(t, store, eligibleMissingdatakey))
}
}
// TODO Add tests for simulating a crash between calls `Prepare` and `Commit`/`Rollback` - [FAB-13099]
func testEmpty(expectedEmpty bool, assert *assert.Assertions, store Store) {
isEmpty, err := store.IsEmpty()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment