Commit 0cc98742 authored by manish's avatar manish Committed by Gari Singh
Browse files

[FAB-9681] Ledger: Metadata - couch-statedb support



    This CR includes
     - Introducing a proto message for encapsulating the version
       and metadata for persisting both in the ~version field
     - Enhancement to the encoding/decoding so as to handle both
       the old and new format of encoding of the ~version field

Change-Id: Ica55f7af1206deeee4d93ae89d9a9045b30a8d26
Signed-off-by: default avatarmanish <manish.sethi@gmail.com>
Signed-off-by: default avatarMatthias Neugschwandtner <eug@zurich.ibm.com>
parent 843d9265
......@@ -730,3 +730,33 @@ func TestBatchWithIndividualRetry(t *testing.T, dbProvider statedb.VersionedDBPr
testutil.AssertNoError(t, err, "")
}
// TestValueAndMetadataWrites tests statedb for value and metadata read-writes
func TestValueAndMetadataWrites(t *testing.T, dbProvider statedb.VersionedDBProvider) {
db, err := dbProvider.GetDBHandle("testvalueandmetadata")
testutil.AssertNoError(t, err, "")
batch := statedb.NewUpdateBatch()
vv1 := statedb.VersionedValue{Value: []byte("value1"), Metadata: []byte("metadata1"), Version: version.NewHeight(1, 1)}
vv2 := statedb.VersionedValue{Value: []byte("value2"), Metadata: []byte("metadata2"), Version: version.NewHeight(1, 2)}
vv3 := statedb.VersionedValue{Value: []byte("value3"), Version: version.NewHeight(1, 3)}
vv4 := statedb.VersionedValue{Value: []byte{}, Metadata: []byte("metadata4"), Version: version.NewHeight(1, 4)}
batch.PutValAndMetadata("ns1", "key1", vv1.Value, vv1.Metadata, vv1.Version)
batch.PutValAndMetadata("ns1", "key2", vv2.Value, vv2.Metadata, vv2.Version)
batch.PutValAndMetadata("ns2", "key3", vv3.Value, vv3.Metadata, vv3.Version)
batch.PutValAndMetadata("ns2", "key4", vv4.Value, vv4.Metadata, vv4.Version)
db.ApplyUpdates(batch, version.NewHeight(2, 5))
vv, _ := db.GetState("ns1", "key1")
testutil.AssertEquals(t, vv, &vv1)
vv, _ = db.GetState("ns1", "key2")
testutil.AssertEquals(t, vv, &vv2)
vv, _ = db.GetState("ns2", "key3")
testutil.AssertEquals(t, vv, &vv3)
vv, _ = db.GetState("ns2", "key4")
testutil.AssertEquals(t, vv, &vv4)
}
......@@ -8,8 +8,6 @@ package statecouchdb
import (
"bytes"
"encoding/json"
"fmt"
"strconv"
"strings"
"unicode/utf8"
......@@ -83,7 +81,11 @@ func couchDocToKeyValue(doc *couchdb.CouchDoc) (*keyValue, error) {
}
key := jsonResult[idField].(string)
// create the return version from the version field in the JSON
returnVersion := createVersionHeightFromVersionString(jsonResult[versionField].(string))
returnVersion, returnMetadata, err := decodeVersionAndMetadata(jsonResult[versionField].(string))
if err != nil {
return nil, err
}
// remove the _id, _rev and version fields
delete(jsonResult, idField)
delete(jsonResult, revField)
......@@ -103,7 +105,11 @@ func couchDocToKeyValue(doc *couchdb.CouchDoc) (*keyValue, error) {
return nil, err
}
}
return &keyValue{key, &statedb.VersionedValue{Value: returnValue, Version: returnVersion}}, nil
return &keyValue{key, &statedb.VersionedValue{
Value: returnValue,
Metadata: returnMetadata,
Version: returnVersion},
}, nil
}
func keyValToCouchDoc(kv *keyValue, revision string) (*couchdb.CouchDoc, error) {
......@@ -113,7 +119,7 @@ func keyValToCouchDoc(kv *keyValue, revision string) (*couchdb.CouchDoc, error)
kvTypeJSON
kvTypeAttachment
)
key, value, version := kv.key, kv.VersionedValue.Value, kv.VersionedValue.Version
key, value, metadata, version := kv.key, kv.Value, kv.Metadata, kv.Version
jsonMap := make(jsonValue)
var kvtype kvType
......@@ -135,8 +141,12 @@ func keyValToCouchDoc(kv *keyValue, revision string) (*couchdb.CouchDoc, error)
kvtype = kvTypeAttachment
}
// add the version, id, revision, and delete marker (if needed)
jsonMap[versionField] = fmt.Sprintf("%v:%v", version.BlockNum, version.TxNum)
verAndMetadata, err := encodeVersionAndMetadata(version, metadata)
if err != nil {
return nil, err
}
// add the (version + metadata), id, revision, and delete marker (if needed)
jsonMap[versionField] = verAndMetadata
jsonMap[idField] = key
if revision != "" {
jsonMap[revField] = revision
......@@ -191,15 +201,6 @@ func decodeSavepoint(couchDoc *couchdb.CouchDoc) (*version.Height, error) {
return &version.Height{BlockNum: savepointDoc.BlockNum, TxNum: savepointDoc.TxNum}, nil
}
func createVersionHeightFromVersionString(encodedVersion string) *version.Height {
versionArray := strings.Split(fmt.Sprintf("%s", encodedVersion), ":")
// convert the blockNum from String to unsigned int
blockNum, _ := strconv.ParseUint(versionArray[0], 10, 64)
// convert the txNum from String to unsigned int
txNum, _ := strconv.ParseUint(versionArray[1], 10, 64)
return version.NewHeight(blockNum, txNum)
}
func validateValue(value []byte) error {
isJSON, jsonVal := tryCastingToJSON(value)
if !isJSON {
......
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: msgs.proto
package msgs // import "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb/msgs"
import proto "github.com/golang/protobuf/proto"
import fmt "fmt"
import math "math"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type VersionFieldProto struct {
VersionBytes []byte `protobuf:"bytes,1,opt,name=version_bytes,json=versionBytes,proto3" json:"version_bytes,omitempty"`
Metadata []byte `protobuf:"bytes,2,opt,name=metadata,proto3" json:"metadata,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *VersionFieldProto) Reset() { *m = VersionFieldProto{} }
func (m *VersionFieldProto) String() string { return proto.CompactTextString(m) }
func (*VersionFieldProto) ProtoMessage() {}
func (*VersionFieldProto) Descriptor() ([]byte, []int) {
return fileDescriptor_msgs_41c2b9a37861a33d, []int{0}
}
func (m *VersionFieldProto) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_VersionFieldProto.Unmarshal(m, b)
}
func (m *VersionFieldProto) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_VersionFieldProto.Marshal(b, m, deterministic)
}
func (dst *VersionFieldProto) XXX_Merge(src proto.Message) {
xxx_messageInfo_VersionFieldProto.Merge(dst, src)
}
func (m *VersionFieldProto) XXX_Size() int {
return xxx_messageInfo_VersionFieldProto.Size(m)
}
func (m *VersionFieldProto) XXX_DiscardUnknown() {
xxx_messageInfo_VersionFieldProto.DiscardUnknown(m)
}
var xxx_messageInfo_VersionFieldProto proto.InternalMessageInfo
func (m *VersionFieldProto) GetVersionBytes() []byte {
if m != nil {
return m.VersionBytes
}
return nil
}
func (m *VersionFieldProto) GetMetadata() []byte {
if m != nil {
return m.Metadata
}
return nil
}
func init() {
proto.RegisterType((*VersionFieldProto)(nil), "msgs.VersionFieldProto")
}
func init() { proto.RegisterFile("msgs.proto", fileDescriptor_msgs_41c2b9a37861a33d) }
var fileDescriptor_msgs_41c2b9a37861a33d = []byte{
// 177 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xca, 0x2d, 0x4e, 0x2f,
0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x01, 0xb1, 0x95, 0x42, 0xb8, 0x04, 0xc3, 0x52,
0x8b, 0x8a, 0x33, 0xf3, 0xf3, 0xdc, 0x32, 0x53, 0x73, 0x52, 0x02, 0xc0, 0x52, 0xca, 0x5c, 0xbc,
0x65, 0x10, 0xc1, 0xf8, 0xa4, 0xca, 0x92, 0xd4, 0x62, 0x09, 0x46, 0x05, 0x46, 0x0d, 0x9e, 0x20,
0x1e, 0xa8, 0xa0, 0x13, 0x48, 0x4c, 0x48, 0x8a, 0x8b, 0x23, 0x37, 0xb5, 0x24, 0x31, 0x25, 0xb1,
0x24, 0x51, 0x82, 0x09, 0x2c, 0x0f, 0xe7, 0x3b, 0x85, 0x46, 0x05, 0xa7, 0x67, 0x96, 0x64, 0x94,
0x26, 0xe9, 0x25, 0xe7, 0xe7, 0xea, 0x67, 0x54, 0x16, 0xa4, 0x16, 0xe5, 0xa4, 0xa6, 0xa4, 0xa7,
0x16, 0xe9, 0xa7, 0x25, 0x26, 0x15, 0x65, 0x26, 0xeb, 0x27, 0xe7, 0x17, 0xa5, 0xea, 0x43, 0x85,
0xb2, 0xcb, 0xa0, 0x8c, 0x92, 0x8a, 0xdc, 0xf4, 0xdc, 0x12, 0xfd, 0xe2, 0x92, 0xc4, 0x92, 0xd4,
0x94, 0x24, 0x08, 0x9d, 0x9c, 0x5f, 0x9a, 0x9c, 0x91, 0x92, 0xa4, 0x0f, 0x72, 0x6c, 0x12, 0x1b,
0xd8, 0xe5, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x19, 0xd6, 0xc4, 0x1f, 0xc7, 0x00, 0x00,
0x00,
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
syntax = "proto3";
option go_package = "github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb/msgs";
package msgs;
message VersionFieldProto {
bytes version_bytes = 1;
bytes metadata = 2;
}
\ No newline at end of file
......@@ -165,7 +165,11 @@ func (vdb *VersionedDB) LoadCommittedVersions(keys []*statedb.CompositeKey) erro
for _, keyMetadata := range nsMetadata {
// TODO - why would version be ever zero if loaded from db?
if len(keyMetadata.Version) != 0 {
committedDataCache.setVerAndRev(ns, keyMetadata.ID, createVersionHeightFromVersionString(keyMetadata.Version), keyMetadata.Rev)
version, _, err := decodeVersionAndMetadata(keyMetadata.Version)
if err != nil {
return err
}
committedDataCache.setVerAndRev(ns, keyMetadata.ID, version, keyMetadata.Rev)
}
}
}
......
......@@ -178,6 +178,17 @@ func TestBatchRetry(t *testing.T) {
commontests.TestBatchWithIndividualRetry(t, env.DBProvider)
}
func TestValueAndMetadataWrites(t *testing.T) {
env := NewTestVDBEnv(t)
env.Cleanup("testvalueandmetadata_")
env.Cleanup("testvalueandmetadata_ns1")
env.Cleanup("testvalueandmetadata_ns2")
defer env.Cleanup("testvalueandmetadata_")
defer env.Cleanup("testvalueandmetadata_ns1")
defer env.Cleanup("testvalueandmetadata_ns2")
commontests.TestValueAndMetadataWrites(t, env.DBProvider)
}
// TestUtilityFunctions tests utility functions
func TestUtilityFunctions(t *testing.T) {
......
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package statecouchdb
import (
"encoding/base64"
"fmt"
"strconv"
"strings"
proto "github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb/msgs"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
)
func encodeVersionAndMetadata(version *version.Height, metadata []byte) (string, error) {
msg := &msgs.VersionFieldProto{
VersionBytes: version.ToBytes(),
Metadata: metadata,
}
msgBytes, err := proto.Marshal(msg)
if err != nil {
return "", err
}
msgBase64 := base64.StdEncoding.EncodeToString(msgBytes)
encodedVersionField := append([]byte{byte(0)}, []byte(msgBase64)...)
return string(encodedVersionField), nil
}
func decodeVersionAndMetadata(encodedstr string) (*version.Height, []byte, error) {
if oldFormatEncoding(encodedstr) {
return decodeVersionOldFormat(encodedstr), nil, nil
}
versionFieldBytes, err := base64.StdEncoding.DecodeString(encodedstr[1:])
if err != nil {
return nil, nil, err
}
versionFieldMsg := &msgs.VersionFieldProto{}
if err = proto.Unmarshal(versionFieldBytes, versionFieldMsg); err != nil {
return nil, nil, err
}
ver, _ := version.NewHeightFromBytes(versionFieldMsg.VersionBytes)
return ver, versionFieldMsg.Metadata, nil
}
// encodeVersionOldFormat return string representation of version
// With the intorduction of metadata feature, we change the encoding (see function below). However, we retain
// this funtion for test so as to make sure that we can decode old format and support mixed formats present
// in a statedb. This function should be used only in tests to generate the encoding in old format
func encodeVersionOldFormat(version *version.Height) string {
return fmt.Sprintf("%v:%v", version.BlockNum, version.TxNum)
}
// decodeVersionOldFormat separates the version and value from encoded string
// See comments in the function `encodeVersionOldFormat`. We retain this function as is
// to use this for decoding the old format data present in the statedb. This function
// should not be used directly or in a tests. The function 'decodeVersionAndMetadata' should be used
// for all decodings - which is expected to detect the encoded format and direct the call
// to this function for decoding the versions encoded in the old format
func decodeVersionOldFormat(encodedVersion string) *version.Height {
versionArray := strings.Split(fmt.Sprintf("%s", encodedVersion), ":")
// convert the blockNum from String to unsigned int
blockNum, _ := strconv.ParseUint(versionArray[0], 10, 64)
// convert the txNum from String to unsigned int
txNum, _ := strconv.ParseUint(versionArray[1], 10, 64)
return version.NewHeight(blockNum, txNum)
}
func oldFormatEncoding(encodedstr string) bool {
return []byte(encodedstr)[0] != byte(0)
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package statecouchdb
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/version"
)
func TestEncodeDecodeOldAndNewFormat(t *testing.T) {
testdata := []*statedb.VersionedValue{
{
Version: version.NewHeight(1, 2),
},
{
Version: version.NewHeight(50, 50),
},
{
Version: version.NewHeight(50, 50),
Metadata: []byte("sample-metadata"),
},
}
for i, testdatum := range testdata {
t.Run(fmt.Sprintf("testcase-newfmt-%d", i),
func(t *testing.T) { testEncodeDecodeNewFormat(t, testdatum) },
)
}
for i, testdatum := range testdata {
t.Run(fmt.Sprintf("testcase-oldfmt-%d", i),
func(t *testing.T) { testEncodeDecodeOldFormat(t, testdatum) },
)
}
}
func testEncodeDecodeNewFormat(t *testing.T, v *statedb.VersionedValue) {
encodedVerField, err := encodeVersionAndMetadata(v.Version, v.Metadata)
assert.NoError(t, err)
ver, metadata, err := decodeVersionAndMetadata(encodedVerField)
assert.NoError(t, err)
assert.Equal(t, v.Version, ver)
assert.Equal(t, v.Metadata, metadata)
}
func testEncodeDecodeOldFormat(t *testing.T, v *statedb.VersionedValue) {
encodedVerField := encodeVersionOldFormat(v.Version)
// function 'decodeVersionAndMetadata' should be able to handle the old format
ver, metadata, err := decodeVersionAndMetadata(encodedVerField)
assert.NoError(t, err)
assert.Equal(t, v.Version, ver)
assert.Nil(t, metadata)
}
......@@ -118,3 +118,9 @@ func TestUtilityFunctions(t *testing.T) {
// ValidateKeyValue should return nil for a valid key and value
testutil.AssertNoError(t, db.ValidateKeyValue("testKey", []byte("testValue")), "leveldb should accept all key-values")
}
func TestValueAndMetadataWrites(t *testing.T) {
env := NewTestVDBEnv(t)
defer env.Cleanup()
commontests.TestValueAndMetadataWrites(t, env.DBProvider)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment