Commit aaddc8e8 authored by Claudiu Mihali's avatar Claudiu Mihali
Browse files

Added some optimizations to the client library

parent 974632c8
......@@ -168,8 +168,14 @@ func prepareRequest(o *Operation) ([]byte, error) {
lb /= 8
header := []byte{0xff, 0xff, 0x00, o.OpCode, byte(lb & 0xff), byte((lb >> 8) & 0xff), 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
header := make([]byte, 0, 16)
header = append(header, []byte{0xff, 0xff, 0x00, o.OpCode, byte(lb & 0xff), byte((lb >> 8) & 0xff), 0x00, 0x00}...)
if o.Checkpoint == nil {
header = append(header, []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}...)
} else {
header = append(header, []byte{0xBB, 0xBB, o.Checkpoint.TokenBucketIdx, o.Checkpoint.TokensEachTick,
o.Checkpoint.ClkCyclesBeforeTick, o.Checkpoint.MaxBurstSize[0], o.Checkpoint.MaxBurstSize[1], 0x00, 0x00}...)
}
copy(b, header)
......
......@@ -2,6 +2,7 @@ package internal
import (
"bufio"
"fmt"
"io"
)
......@@ -32,10 +33,18 @@ const (
// Result is set, depending on the kind of operation, in the
// response handling.
type Operation struct {
OpCode byte
Key []byte
Value []byte
Result []byte
OpCode byte
Key []byte
Value []byte
Result []byte
Checkpoint *CheckpointConfig
}
type CheckpointConfig struct {
TokenBucketIdx byte
TokensEachTick byte
ClkCyclesBeforeTick byte
MaxBurstSize [2]byte
}
// ResponseHandler handles the response of a K/V operation.
......@@ -114,6 +123,7 @@ type ValueResHandler struct {
}
// var errs = 0
var responseNo = 0
func (ValueResHandler) handleResponse(o *Operation, rd *bufio.Reader) error {
......@@ -180,6 +190,10 @@ func (ValueResHandler) handleResponse(o *Operation, rd *bufio.Reader) error {
}
o.Result = rpl
responseNo++
fmt.Printf("%d ", responseNo)
// } else if o.OpCode == OpGetCond {
// plb := make([]byte, 4)
......@@ -219,30 +233,34 @@ func (ValueResHandler) handleResponse(o *Operation, rd *bufio.Reader) error {
// NewGetOp initializes an operation type that performs a Get.
func NewGetOp(key []byte) *Operation {
return &Operation{OpGet, key, nil, nil}
return &Operation{OpGet, key, nil, nil, nil}
}
// NewSetOp initializes an operation type that performs a local Set.
func NewSetOp(key, value []byte) *Operation {
return &Operation{OpSetLoc, key, value, nil}
return &Operation{OpSetLoc, key, value, nil, nil}
}
// NewSetReplOp initializes an operation that performs a replicated Set.
func NewSetReplOp(key, value []byte) *Operation {
return &Operation{OpSetRep, key, value, nil}
return &Operation{OpSetRep, key, value, nil, nil}
}
// NewInitOp initializes an operation that performs a Flush.
func NewInitOp() *Operation {
return &Operation{OpFlush, []byte(initData), nil, nil}
return &Operation{OpFlush, []byte(initData), nil, nil, nil}
}
// NewDelOp initializes an operation that performs a Delete
func NewDelOp(key []byte) *Operation {
return &Operation{OpDelete, key, nil, nil}
return &Operation{OpDelete, key, nil, nil, nil}
}
// NewGetCondOp initializes an operation that performs a Conditional Get.
func NewGetCondOp(key, value []byte) *Operation {
return &Operation{OpGetCond, key, value, nil}
return &Operation{OpGetCond, key, value, nil, nil}
}
func NewGetWithCheckpoint(key []byte, c *CheckpointConfig) *Operation {
return &Operation{OpGet, key, nil, nil, c}
}
......@@ -108,6 +108,35 @@ func (c Client) Get(key []byte) ([]byte, error) {
return res[:curLen], nil
}
func (c Client) GetWithCheckpoint(key []byte, tokenBucketIdx int, tokensEachTick int, clkCyclesBeforeTick int, maxBurstSize int) ([]byte, error) {
if len(key) > maxKLen {
return nil, newKeyError(maxKLen)
}
initKey := make([]byte, len(key)+idxLen)
copy(initKey[idxLen:], key)
gOp := internal.NewGetWithCheckpoint(initKey, &internal.CheckpointConfig{TokenBucketIdx: byte(tokenBucketIdx),
TokensEachTick: byte(tokensEachTick), ClkCyclesBeforeTick: byte(clkCyclesBeforeTick),
MaxBurstSize: [2]byte{byte(maxBurstSize & 0xFF), byte((maxBurstSize >> 8) & 0xFF)}})
rh := internal.ValueResHandler{}
err := c.conn.Send(gOp, &rh)
if err != nil {
return nil, err
}
pRes := gOp.Result
if pRes == nil {
return nil, nil
}
return pRes, nil
}
func (c Client) GetPerturbed(key [perturbedGroupSize][]byte) ([][]byte, error) {
rqs := make([]*internal.Operation, perturbedGroupSize)
......@@ -142,6 +171,94 @@ func (c Client) GetPerturbed(key [perturbedGroupSize][]byte) ([][]byte, error) {
return results, nil
}
func (c Client) GetBulkN(keys [][]byte, getCondNo int, getNo int, n int) ([][]byte, error) {
if getCondNo%perturbedGroupSize != 0 {
return nil, fmt.Errorf("Error GetBulkN: getCondNo should be multiple of perturbedGroupSize.")
}
if n%perturbedGroupSize != 0 {
return nil, fmt.Errorf("Error GetBulkN: n should be multiple of perturbedGroupSize.")
}
results := make([][]byte, getCondNo+getNo)
rqs := make([]*internal.Operation, n)
rh := internal.ValueResHandler{}
i := 0
for ; i < getCondNo+getNo; i++ {
initKey := make([]byte, len(keys[i])+idxLen)
copy(initKey[idxLen:], keys[i])
if i < getCondNo {
value := []byte{0xFF}
rqs[i%n] = internal.NewGetCondOp(initKey, value)
} else {
rqs[i%n] = internal.NewGetOp(initKey)
}
if (i+1)%n == 0 {
err := c.conn.SendBulk(rqs, &rh)
if err != nil {
return nil, err
}
for j, r := range rqs {
if r.Result == nil {
return nil, fmt.Errorf("Corrupted packet of key %x", r.Key)
}
results[i-n+1+j] = r.Result
}
}
}
if i%n != 0 {
err := c.conn.SendBulk(rqs[:(i%n)], &rh)
if err != nil {
return nil, err
}
for j := 0; j < i%n; j++ {
if rqs[j].Result == nil {
return nil, fmt.Errorf("Corrupted packet of key %x", rqs[j].Key)
}
results[i-i%n+j] = rqs[j].Result
}
}
return results, nil
}
func (c Client) GetBulk(keys [][]byte, getCondNo int, getNo int) ([][]byte, error) {
if getCondNo%perturbedGroupSize != 0 {
return nil, fmt.Errorf("Error GetBulk: getCondNo should be multiple of perturbedGroupSize.")
}
rqs := make([]*internal.Operation, getCondNo+getNo)
for i := 0; i < getCondNo+getNo; i++ {
initKey := make([]byte, len(keys[i])+idxLen)
copy(initKey[idxLen:], keys[i])
if i < getCondNo {
value := []byte{0xFF}
rqs[i] = internal.NewGetCondOp(initKey, value)
} else {
rqs[i] = internal.NewGetOp(initKey)
}
}
rh := internal.ValueResHandler{}
err := c.conn.SendBulk(rqs, &rh)
if err != nil {
return nil, err
}
results := make([][]byte, getCondNo+getNo)
for i, r := range rqs {
if r.Result == nil {
return nil, fmt.Errorf("Corrupted packet of key %x", r.Key)
}
results[i] = r.Result
}
return results, nil
}
func (c Client) GetRotationMatrix(key []byte) error {
if len(key) > maxKLen {
return newKeyError(maxKLen)
......
......@@ -8,7 +8,6 @@ import (
"os"
"strconv"
//"fmt"
"io"
"math"
"reflect"
......@@ -232,7 +231,7 @@ func (processor *Processor) GetMetaData(key []byte) (*pq.FileMetaData, []int, er
return parquetFileMetaData, processor.columnChunksValuesNo, nil
}
func (processor *Processor) GetPerturbedRows(key []byte, columnPermutation []int) ([][]float64, error) {
func (processor *Processor) GetPerturbedRows(key []byte, columnPermutation []int, n int) ([][]float64, error) {
var err error
if processor.parquetFileMetaData == nil {
......@@ -242,6 +241,19 @@ func (processor *Processor) GetPerturbedRows(key []byte, columnPermutation []int
}
}
var dirVal []byte
if len(processor.arrayDir) == 0 {
dirVal, err = processor.Client.GetArrayDir(key)
if dirVal == nil {
return nil, nil
}
if err != nil {
return nil, err
}
} else {
dirVal = processor.arrayDir
}
var enabledColumnsIndices []int
var disabledColumnsIndices []int
for i := 0; i < processor.ColumnsNo; i++ {
......@@ -254,24 +266,14 @@ func (processor *Processor) GetPerturbedRows(key []byte, columnPermutation []int
//columnPermutation := rotpert.RandomPermutation(len(enabledColumnsIndices))
keys := make([][]byte, 0, processor.columnChunksValuesNo[processor.ColumnsNo])
getCondNo := 0
getNo := 0
// fmt.Printf("Column permutation: %v\n", columnPermutation)
// fmt.Printf("Column chunks values no: %v\n", processor.columnChunksValuesNo)
// fmt.Printf("valueNoOffsets: %v\n", processor.valueNoOffsets)
var dirVal []byte
if len(processor.arrayDir) == 0 {
dirVal, err = processor.Client.GetArrayDir(key)
if dirVal == nil {
return nil, nil
}
if err != nil {
return nil, err
}
} else {
dirVal = processor.arrayDir
}
outCols := make([][]float64, processor.ColumnsNo)
// fmt.Printf("getCondNo = %d; getNo = %d\n\n", getCondNo, getNo)
for i := 0; i < len(columnPermutation); i += 3 {
for {
......@@ -288,41 +290,90 @@ func (processor *Processor) GetPerturbedRows(key []byte, columnPermutation []int
break
}
// start1 := time.Now()
var keys [3][]byte
for j := 0; j < 3; j++ {
keys[j], err = processor.Client.ArrayGetElemKey(dirVal, key, processor.columnChunksValuesNo[enabledColumnsIndices[columnPermutation[i+j]]]+
processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]])
k, err := processor.Client.ArrayGetElemKey(dirVal, key,
processor.columnChunksValuesNo[enabledColumnsIndices[columnPermutation[i+j]]]+
processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]])
if err != nil {
return nil, err
}
keys = append(keys, k)
getCondNo++
processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]]++
}
// t1 := float64(time.Since(start1).Nanoseconds()) / 1e3
// fmt.Printf("I1 = %f\n", t1)
// start2 := time.Now()
pages, err := processor.Client.GetPerturbed(keys)
// fmt.Printf("Column permutation: %v\n", columnPermutation)
// fmt.Printf("Column chunks values no: %v\n", processor.columnChunksValuesNo)
// fmt.Printf("valueNoOffsets: %v\n", processor.valueNoOffsets)
// fmt.Printf("getCondNo = %d; getNo = %d\n\n", getCondNo, getNo)
}
for j := 0; j < 3; j++ {
processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]] = 0
}
}
for i := 0; i < len(disabledColumnsIndices); i++ {
for processor.valueNoOffsets[disabledColumnsIndices[i]] < processor.columnChunksValuesNo[disabledColumnsIndices[i]+1]-
processor.columnChunksValuesNo[disabledColumnsIndices[i]] {
k, err := processor.Client.ArrayGetElemKey(dirVal, key, processor.columnChunksValuesNo[disabledColumnsIndices[i]]+
processor.valueNoOffsets[disabledColumnsIndices[i]])
if err != nil {
return nil, err
}
// t2 := float64(time.Since(start2).Nanoseconds()) / 1e3
// fmt.Printf("I2 = %f\n", t2)
keys = append(keys, k)
getNo++
processor.valueNoOffsets[disabledColumnsIndices[i]]++
// fmt.Printf("Column permutation: %v\n", columnPermutation)
// fmt.Printf("Column chunks values no: %v\n", processor.columnChunksValuesNo)
// fmt.Printf("valueNoOffsets: %v\n", processor.valueNoOffsets)
// fmt.Printf("getCondNo = %d; getNo = %d\n\n", getCondNo, getNo)
}
processor.valueNoOffsets[disabledColumnsIndices[i]] = 0
}
fmt.Printf("Column permutation: %v\n", columnPermutation)
fmt.Printf("Column chunks values no: %v\n", processor.columnChunksValuesNo)
fmt.Printf("valueNoOffsets: %v\n", processor.valueNoOffsets)
fmt.Printf("getCondNo = %d; getNo = %d\n\n", getCondNo, getNo)
pages, err := processor.Client.GetBulkN(keys, getCondNo, getNo, n)
//pages, err := processor.Client.GetBulk(keys, getCondNo, getNo)
if err != nil {
return nil, err
}
outCols := make([][]float64, processor.ColumnsNo)
pagesIdx := 0
for i := 0; i < len(columnPermutation); i += 3 {
for {
done := false
for j := 0; j < 3; j++ {
if processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]] >=
processor.columnChunksValuesNo[enabledColumnsIndices[columnPermutation[i+j]]+1]-
processor.columnChunksValuesNo[enabledColumnsIndices[columnPermutation[i+j]]] {
done = true
break
}
}
if done {
break
}
// start3 := time.Now()
for j := 0; j < 3; j++ {
if i+j < len(enabledColumnsIndices) {
for k := 0; k < len(pages[j]); k += 8 {
bits := binary.LittleEndian.Uint64(pages[j][k : k+8])
for k := 0; k < len(pages[pagesIdx]); k += 8 {
bits := binary.LittleEndian.Uint64(pages[pagesIdx][k : k+8])
n := math.Float64frombits(bits)
outCols[enabledColumnsIndices[columnPermutation[i+j]]] = append(outCols[enabledColumnsIndices[columnPermutation[i+j]]], n)
}
}
pagesIdx++
processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]]++
}
// t3 := float64(time.Since(start3).Nanoseconds()) / 1e3
// fmt.Printf("I3 = %f\n", t3)
}
for j := 0; j < 3; j++ {
processor.valueNoOffsets[enabledColumnsIndices[columnPermutation[i+j]]] = 0
}
......@@ -332,40 +383,24 @@ func (processor *Processor) GetPerturbedRows(key []byte, columnPermutation []int
for processor.valueNoOffsets[disabledColumnsIndices[i]] < processor.columnChunksValuesNo[disabledColumnsIndices[i]+1]-
processor.columnChunksValuesNo[disabledColumnsIndices[i]] {
key0, err := processor.Client.ArrayGetElemKey(dirVal, key, processor.columnChunksValuesNo[disabledColumnsIndices[i]]+
processor.valueNoOffsets[disabledColumnsIndices[i]])
if err != nil {
return nil, err
}
page, err := processor.Client.Get(key0)
if err != nil {
return nil, err
}
var k int
if page[41] == 0x00 && page[42] == 0x03 {
if pages[pagesIdx][41] == 0x00 && pages[pagesIdx][42] == 0x03 {
k = 49
} else if page[41] == 0x02 && page[42] == 0x00 {
} else if pages[pagesIdx][41] == 0x02 && pages[pagesIdx][42] == 0x00 {
k = 47
}
for ; k < len(page); k += 8 {
bits := binary.LittleEndian.Uint64(page[k : k+8])
for ; k < len(pages[pagesIdx]); k += 8 {
bits := binary.LittleEndian.Uint64(pages[pagesIdx][k : k+8])
n := math.Float64frombits(bits)
outCols[disabledColumnsIndices[i]] = append(outCols[disabledColumnsIndices[i]], n)
}
pagesIdx++
processor.valueNoOffsets[disabledColumnsIndices[i]]++
}
}
processor.valueNoOffsets[disabledColumnsIndices[i]] = 0
}
// fmt.Printf("Column permutation: %v\n", columnPermutation)
// fmt.Printf("Column chunks values no: %v\n", processor.columnChunksValuesNo)
// fmt.Printf("valueNoOffsets: %v\n", processor.valueNoOffsets)
return outCols, nil
}
......
......@@ -35,10 +35,12 @@ func main() {
err error
inFilePath string
hostAddress string
n int
)
flag.StringVar(&hostAddress, "h", "localhost:11211", "The address of the server (host:port)")
flag.StringVar(&inFilePath, "f", "diabetes.parquet", "Path to the .parquet input file.")
flag.IntVar(&n, "n", 3, "no of requests for bulk sending")
flag.Parse()
inFilePathWithoutExtension := inFilePath[:strings.IndexByte(inFilePath, '.')]
......@@ -109,6 +111,9 @@ func main() {
key := []byte("ttt")
// p.Client.GetWithCheckpoint([]byte{0xFF, 0xFF, 0xFF}, 0, 2, 1, 128)
// p.Client.GetWithCheckpoint([]byte{0xFF, 0xFF, 0xFF}, 1, 16, 1, 512)
err = p.StoreFile(key, inFilePath)
if err != nil {
log.Fatalf("Error store file: %s\n", err)
......@@ -136,11 +141,11 @@ func main() {
p.DisableColumn(p.ColumnsNo - 1)
start1 := time.Now()
fpgaData, err := p.GetPerturbedRows(key, columnPermutation)
fpgaData, err := p.GetPerturbedRows(key, columnPermutation, n)
if err != nil {
log.Fatalf("Error GetPerturbedRows: %s\n", err)
}
t1 := float64(time.Since(start1).Nanoseconds()) / 1e3
t1 := float64(time.Since(start1).Microseconds()) / 1e3
fmt.Printf("T1 = %f\n", t1)
stringData := make([][]string, len(fpgaData[0])+1)
......@@ -170,7 +175,7 @@ func main() {
}
start2 := time.Now()
fileData, err := p.GetPerturbedRows(key, []int{})
fileData, err := p.GetPerturbedRows(key, []int{}, n)
if err != nil {
log.Fatalf("Error GetPerturbedRows: %s\n", err)
}
......@@ -195,16 +200,16 @@ func main() {
for i := 0; i < len(fileData[0]); i++ {
rotatedData[i][len(data[0])] = fileData[len(data[0])][i]
}
t2 := float64(time.Since(start2).Nanoseconds()) / 1e3
t2 := float64(time.Since(start2).Microseconds()) / 1e3
fmt.Printf("T2 = %f\n", t2)
stringData = make([][]string, len(fileData[0])+1)
for i := 0; i < len(fileData[0])+1; i++ {
stringData[i] = make([]string, len(fileData))
stringData = make([][]string, len(rotatedData)+1)
for i := 0; i < len(rotatedData)+1; i++ {
stringData[i] = make([]string, 9)
}
for i := 0; i < len(fileData); i++ {
for i := 0; i < 9; i++ {
stringData[0][i] = columns[i]
for j := 1; j <= len(fileData[0]); j++ {
for j := 1; j <= len(rotatedData); j++ {
stringData[j][i] = fmt.Sprintf("%f", rotatedData[j-1][i])
}
}
......
......@@ -38,10 +38,12 @@ func main() {
err error
inFilePath string
hostAddress string
n int
)
flag.StringVar(&hostAddress, "h", "localhost:11211", "The address of the server (host:port)")
flag.StringVar(&inFilePath, "f", "diabetes.parquet", "Path to the .parquet input file.")
flag.IntVar(&n, "n", 3, "no of requests for bulk sending")
flag.Parse()
inFilePathWithoutExtension := inFilePath[:strings.IndexByte(inFilePath, '.')]
......@@ -139,11 +141,11 @@ func main() {
p.DisableColumn(p.ColumnsNo - 1)
start1 := time.Now()
fpgaData, err := p.GetPerturbedRows(key, columnPermutation)
fpgaData, err := p.GetPerturbedRows(key, columnPermutation, n)
if err != nil {
log.Fatalf("Error GetPerturbedRows: %s\n", err)
}
t1 := float64(time.Since(start1).Nanoseconds()) / 1e3
t1 := float64(time.Since(start1).Microseconds()) / 1e3
fmt.Printf("T1 = %f\n", t1)
stringData := make([][]string, len(fpgaData[0])+1)
......@@ -178,22 +180,22 @@ func main() {
}
start2 := time.Now()
n := int(pr.GetNumRows())
rowsNo := int(pr.GetNumRows())
f := make([]parquet.PimaIndiansDiabetesData, n)
f := make([]parquet.PimaIndiansDiabetesData, rowsNo)
err = pr.Read(&f)
if err != nil {
log.Fatalf("Error read: %v\n", err)
}
rotatedData := make([][]float64, n)
rotatedData := make([][]float64, rowsNo)
for i := range rotatedData {
rotatedData[i] = make([]float64, 9)
}
var rotated []float64
for i := 0; i < n; i++ {
for i := 0; i < rowsNo; i++ {
rotated = rotpert.RotateVector3D(rotationMatrix, []float64{*f[i].Pregnancies, *f[i].Glucose, *f[i].BloodPressure})
for k := 0; k < 3; k++ {