Commit b7dca4f6 authored by Claudiu Mihali's avatar Claudiu Mihali
Browse files

Added decompression integration code in client

parent a6d9be5d
`timescale 1ns / 1ps
//////////////////////////////////////////////////////////////////////////////////
// Company:
// Engineer:
//
// Create Date: 09/06/2019 03:27:27 PM
// Design Name:
// Module Name: decompress_group_512to64
// Project Name:
// Target Devices:
// Tool Versions:
// Description:
//
// Dependencies:
//
// Revision:
// Revision 0.01 - File Created
// Additional Comments:
//
//////////////////////////////////////////////////////////////////////////////////
module decompress_group_512to64
#(
parameter MEMORY_WIDTH = 512,
parameter DECOMPRESS_MODE_SIZE = 8,
parameter DECOMPRESS_ENGINES_NO = 24,
parameter VALUE_SIZE_BYTES_NO = 2,
......@@ -36,7 +16,7 @@ module decompress_group_512to64
input wire in_valid,
output reg in_ready,
input wire [MEMORY_WIDTH-1:0] in_pred_data,
input wire [WORD_SIZE-1:0] in_pred_data,
input wire in_pred_valid,
output reg in_pred_ready,
......@@ -45,11 +25,15 @@ module decompress_group_512to64
output reg out_last,
input wire out_ready,
output reg [MEMORY_WIDTH-1:0] out_pred_data,
output reg out_pred_valid,
output wire [WORD_SIZE-1:0] out_pred_data,
output wire out_pred_valid,
input wire out_pred_ready
);
reg [WORD_SIZE-1:0] out_pred_data_interm;
reg out_pred_valid_interm;
wire out_pred_ready_interm;
reg [DECOMPRESS_MODE_SIZE-1:0] mode_data_buf;
reg [DECOMPRESS_ENGINES_NO-1:0] mode_valids;
......@@ -91,13 +75,14 @@ reg first_word_flag = 1;
reg [8*VALUE_SIZE_BYTES_NO-1:0] value_bytes_counter;
reg [$clog2(DECOMPRESS_ENGINES_NO)-1:0] cur_in_engine_addr = 0;
reg [$clog2(DECOMPRESS_ENGINES_NO)-1:0] prev_in_engine_addr = DECOMPRESS_ENGINES_NO-1;
reg [$clog2(DECOMPRESS_ENGINES_NO)-1:0] cur_out_engine_addr = 0;
genvar i;
generate
for (i = 0; i < DECOMPRESS_ENGINES_NO; i = i + 1) begin
nukv_fifogen #(
.ADDR_BITS(8),
.ADDR_BITS(9),
.DATA_SIZE(DECOMPRESS_MODE_SIZE)
) fifo_mode (
.clk(clk),
......@@ -114,7 +99,7 @@ generate
);
nukv_fifogen #(
.ADDR_BITS(8),
.ADDR_BITS(9),
.DATA_SIZE(DECOMPRESS_WORD_SIZE)
) fifo_in (
.clk(clk),
......@@ -152,7 +137,7 @@ generate
);
nukv_fifogen #(
.ADDR_BITS(8),
.ADDR_BITS(9),
.DATA_SIZE(DECOMPRESS_WORD_SIZE+1)
) fifo_out (
.clk(clk),
......@@ -170,14 +155,30 @@ generate
end
endgenerate
nukv_fifogen #(
.ADDR_BITS(9),
.DATA_SIZE(WORD_SIZE)
) fifo_mode (
.clk(clk),
.rst(rst),
.s_axis_tdata(out_pred_data_interm),
.s_axis_tvalid(out_pred_valid_interm),
.s_axis_tready(out_pred_ready_interm),
.m_axis_tdata(out_pred_data),
.m_axis_tvalid(out_pred_valid),
.m_axis_tready(out_pred_ready)
);
always @(posedge clk) begin
if (rst == 1) begin
first_word_flag <= 1;
if (rst == 1) begin
first_word_flag <= 1;
value_bytes_counter <= 0;
in_last <= 0;
in_ready <= 1;
in_pred_ready <= 1;
out_pred_valid <= 0;
out_pred_valid_interm <= 0;
out_valid <= 0;
out_last <= 0;
out_data <= 0;
......@@ -187,6 +188,7 @@ always @(posedge clk) begin
out_readys <= 0;
cur_in_engine_addr <= 0;
prev_in_engine_addr <= DECOMPRESS_ENGINES_NO-1;
cur_out_engine_addr <= 0;
out_data_addr <= 0;
......@@ -196,25 +198,24 @@ always @(posedge clk) begin
end else begin
// mode data round-robin logic
if (in_pred_valid == 1 && in_pred_ready == 1) begin
if (in_pred_data[0 +: 8*VALUE_SIZE_BYTES_NO] == 0 || in_pred_data[0 +: 8*VALUE_SIZE_BYTES_NO] == 2) begin
if (in_pred_data[0 +: 8*VALUE_SIZE_BYTES_NO] <= 2) begin
mode_data_buf <= 0;
out_pred_data <= in_pred_data;
out_pred_data_interm <= in_pred_data;
end else begin
mode_data_buf <= in_pred_data[8*VALUE_SIZE_BYTES_NO +: DECOMPRESS_MODE_SIZE];
out_pred_data[0 +: 8*VALUE_SIZE_BYTES_NO] <= in_pred_data[0 +: 8*VALUE_SIZE_BYTES_NO] - DECOMPRESS_MODE_SIZE/8;
out_pred_data[8*VALUE_SIZE_BYTES_NO +: MEMORY_WIDTH-8*VALUE_SIZE_BYTES_NO] <= // shift
in_pred_data[8*VALUE_SIZE_BYTES_NO+DECOMPRESS_MODE_SIZE +: MEMORY_WIDTH-8*VALUE_SIZE_BYTES_NO-DECOMPRESS_MODE_SIZE];
out_pred_data_interm[0 +: 8*VALUE_SIZE_BYTES_NO] <= in_pred_data[0 +: 8*VALUE_SIZE_BYTES_NO] - DECOMPRESS_MODE_SIZE/8;
out_pred_data_interm[8*VALUE_SIZE_BYTES_NO +: WORD_SIZE-8*VALUE_SIZE_BYTES_NO] <= // shift
in_pred_data[8*VALUE_SIZE_BYTES_NO+DECOMPRESS_MODE_SIZE +: WORD_SIZE-8*VALUE_SIZE_BYTES_NO-DECOMPRESS_MODE_SIZE];
end
in_pred_ready <= 0;
mode_valids[cur_in_engine_addr] <= 1;
out_pred_valid <= 1;
end else begin
if (mode_readys[cur_in_engine_addr] == 1) begin
mode_valids <= 0;
end
if (out_pred_valid == 1 && out_pred_ready == 1) begin
out_pred_valid <= 0;
end
out_pred_valid_interm <= 1;
end
if (mode_valids[cur_in_engine_addr] == 1 && mode_readys[cur_in_engine_addr] == 1) begin
mode_valids <= 0;
end
if (out_pred_valid_interm == 1 && out_pred_ready_interm == 1) begin
out_pred_valid_interm <= 0;
end
// input data round-robin logic
......@@ -225,11 +226,31 @@ always @(posedge clk) begin
in_valids[cur_in_engine_addr] <= 1;
in_data_buf_addr <= DECOMPRESS_WORD_SIZE;
if (cur_in_engine_addr != prev_in_engine_addr) begin
in_valids[prev_in_engine_addr] <= 0;
end
in_last <= 0;
if (first_word_flag == 1) begin
if (in_data[0 +: 8*VALUE_SIZE_BYTES_NO] <= DECOMPRESS_WORD_SIZE/8) begin
in_last <= 1;
in_data_buf_addr <= 0;
if (in_readys[cur_in_engine_addr] == 1) begin
in_ready <= 1;
in_pred_ready <= 1;
if (cur_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_in_engine_addr <= 0;
prev_in_engine_addr <= DECOMPRESS_ENGINES_NO-1;
end else begin
cur_in_engine_addr <= cur_in_engine_addr + 1;
if (prev_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
prev_in_engine_addr <= 0;
end else begin
prev_in_engine_addr <= prev_in_engine_addr + 1;
end
end
end
end else begin
in_last <= 0;
first_word_flag <= 0;
value_bytes_counter <= in_data[0 +: 8*VALUE_SIZE_BYTES_NO] - DECOMPRESS_WORD_SIZE/8;
end
......@@ -238,14 +259,28 @@ always @(posedge clk) begin
in_last <= 1;
first_word_flag <= 1;
in_data_buf_addr <= 0;
if (in_readys[cur_in_engine_addr] == 1) begin
in_ready <= 1;
in_pred_ready <= 1;
if (cur_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_in_engine_addr <= 0;
prev_in_engine_addr <= DECOMPRESS_ENGINES_NO-1;
end else begin
cur_in_engine_addr <= cur_in_engine_addr + 1;
if (prev_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
prev_in_engine_addr <= 0;
end else begin
prev_in_engine_addr <= prev_in_engine_addr + 1;
end
end
end
end else begin
in_last <= 0;
first_word_flag <= 0;
value_bytes_counter <= value_bytes_counter - DECOMPRESS_WORD_SIZE/8;
end
end
end else begin
if (in_readys[cur_in_engine_addr] == 1) begin
if (in_valids[cur_in_engine_addr] == 1 && in_readys[cur_in_engine_addr] == 1) begin
if (in_data_buf_addr != 0 && in_last != 1) begin
in_data_decompress <= in_data_buf[in_data_buf_addr +: DECOMPRESS_WORD_SIZE];
in_valids[cur_in_engine_addr] <= 1;
......@@ -259,6 +294,21 @@ always @(posedge clk) begin
in_last <= 1;
first_word_flag <= 1;
in_data_buf_addr <= 0;
if (in_readys[cur_in_engine_addr] == 1) begin
in_ready <= 1;
in_pred_ready <= 1;
if (cur_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_in_engine_addr <= 0;
prev_in_engine_addr <= DECOMPRESS_ENGINES_NO-1;
end else begin
cur_in_engine_addr <= cur_in_engine_addr + 1;
if (prev_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
prev_in_engine_addr <= 0;
end else begin
prev_in_engine_addr <= prev_in_engine_addr + 1;
end
end
end
end else begin
in_last <= 0;
first_word_flag <= 0;
......@@ -272,44 +322,70 @@ always @(posedge clk) begin
in_pred_ready <= 1;
if (cur_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_in_engine_addr <= 0;
prev_in_engine_addr <= DECOMPRESS_ENGINES_NO-1;
end else begin
cur_in_engine_addr <= cur_in_engine_addr + 1;
if (prev_in_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
prev_in_engine_addr <= 0;
end else begin
prev_in_engine_addr <= prev_in_engine_addr + 1;
end
end
end
end
end else begin
if (in_valids[prev_in_engine_addr] == 1 && in_readys[prev_in_engine_addr] == 1) begin
in_ready <= 1;
in_valids <= 0;
in_last <= 0;
in_pred_ready <= 1;
end
end
end
// output round-robin logic
if (out_valid == 0) begin
out_readys[cur_out_engine_addr] <= 1;
if (out_valids[cur_out_engine_addr] == 1 && out_readys[cur_out_engine_addr] == 1) begin
out_data[out_data_addr +: DECOMPRESS_WORD_SIZE] <= out_datas[cur_out_engine_addr];
out_data_addr <= out_data_addr + DECOMPRESS_WORD_SIZE;
if (out_data_addr == WORD_SIZE-DECOMPRESS_WORD_SIZE || out_lasts[cur_out_engine_addr] == 1) begin
out_valid <= 1;
out_readys <= 0;
out_last <= out_lasts[cur_out_engine_addr];
out_data_addr <= 0;
end
end
end else begin
if (out_ready == 1) begin
out_valid <= 0;
out_data <= 0;
if (out_last == 1) begin
out_last <= 0;
if (cur_out_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_out_engine_addr <= 0;
end else begin
cur_out_engine_addr <= cur_out_engine_addr + 1;
if (out_readys == 0) begin
if (cur_out_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_out_engine_addr <= 0;
end else begin
cur_out_engine_addr <= cur_out_engine_addr + 1;
end
end
end
end else begin
out_readys <= 0;
end
end
if (out_valids[cur_out_engine_addr] == 1 && out_readys[cur_out_engine_addr] == 1) begin
out_data[out_data_addr +: DECOMPRESS_WORD_SIZE] <= out_datas[cur_out_engine_addr];
out_data_addr <= out_data_addr + DECOMPRESS_WORD_SIZE;
if (out_data_addr == WORD_SIZE-DECOMPRESS_WORD_SIZE || out_lasts[cur_out_engine_addr] == 1) begin
out_valid <= 1;
out_last <= out_lasts[cur_out_engine_addr];
out_data_addr <= 0;
if (out_ready == 1) begin
if (out_lasts[cur_out_engine_addr] == 1) begin
out_readys[cur_out_engine_addr] <= 0;
if (cur_out_engine_addr == DECOMPRESS_ENGINES_NO-1) begin
cur_out_engine_addr <= 0;
out_readys[0] <= 1;
end else begin
cur_out_engine_addr <= cur_out_engine_addr + 1;
out_readys[cur_out_engine_addr + 1] <= 1;
end
end
end else begin
out_readys <= 0;
end
end
end
end
end
endmodule
\ No newline at end of file
endmodule
......@@ -131,7 +131,7 @@ func (c Client) GetWithCheckpoint(key []byte, tokenBucketIdx int, tokensEachTick
return gOp.Result, nil
}
func (c Client) GetPerturbed(key [][]byte) ([][]byte, error) {
func (c Client) GetPerturbed(key [][]byte, shouldDecompress bool) ([][]byte, error) {
if len(key)%perturbedGroupSize != 0 {
return nil, fmt.Errorf("Error GetPerturbed: the number of keys should be multiple of perturbedGroupSize.")
}
......@@ -145,7 +145,12 @@ func (c Client) GetPerturbed(key [][]byte) ([][]byte, error) {
initKey := make([]byte, len(key[i])+idxLen)
copy(initKey[idxLen:], key[i])
value := []byte{0xFF}
var value []byte
if shouldDecompress {
value = []byte{0x01, 0xFF}
} else {
value = []byte{0x00, 0xFF}
}
rqs[i] = internal.NewGetCondOp(initKey, value)
}
......@@ -167,7 +172,7 @@ func (c Client) GetPerturbed(key [][]byte) ([][]byte, error) {
return results, nil
}
func (c Client) GetBulkN(keys [][]byte, getCondNo int, getNo int, n int) ([][]byte, error) {
func (c Client) GetBulkN(keys [][]byte, getCondNo int, getNo int, shouldDecompress bool, n int) ([][]byte, error) {
if getCondNo%perturbedGroupSize != 0 {
return nil, fmt.Errorf("Error GetBulkN: getCondNo should be multiple of perturbedGroupSize.")
}
......@@ -183,11 +188,21 @@ func (c Client) GetBulkN(keys [][]byte, getCondNo int, getNo int, n int) ([][]by
for ; i < getCondNo+getNo; i++ {
initKey := make([]byte, len(keys[i])+idxLen)
copy(initKey[idxLen:], keys[i])
if i < getCondNo {
value := []byte{0xFF}
rqs[i%n] = internal.NewGetCondOp(initKey, value)
if shouldDecompress {
if i < getCondNo {
value := []byte{0x01, 0xFF}
rqs[i%n] = internal.NewGetCondOp(initKey, value)
} else {
value := []byte{0x01, 0x00}
rqs[i%n] = internal.NewGetCondOp(initKey, value)
}
} else {
rqs[i%n] = internal.NewGetOp(initKey)
if i < getCondNo {
value := []byte{0x00, 0xFF}
rqs[i%n] = internal.NewGetCondOp(initKey, value)
} else {
rqs[i%n] = internal.NewGetOp(initKey)
}
}
if (i+1)%n == 0 {
......@@ -237,7 +252,7 @@ func (c Client) GetRotationMatrix() error {
initKey := make([]byte, len(key)+idxLen)
copy(initKey[idxLen:], key)
value := []byte{0xFE}
value := []byte{0x00, 0xFE}
op := internal.NewGetCondOp(initKey, value)
......
......@@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
......@@ -14,7 +15,7 @@ import (
)
// var ok = true
var col = 0
// var col = 0
type Divider struct {
reader *ParquetReader.ParquetReader
......@@ -102,7 +103,7 @@ func readPageStruct(column *ParquetReader.ColumnBufferType) (*Layout.Page, error
func (divider *Divider) readPagesFromColumn(column *ParquetReader.ColumnBufferType) error {
var err error
col++
//col++
// // test
// fp, err := os.Create("out/page" + strconv.Itoa(divider.pagesRead))
......@@ -140,25 +141,14 @@ func (divider *Divider) readPagesFromColumn(column *ParquetReader.ColumnBufferTy
// }
}
headerBuff, err := divider.serializer.Write(context.TODO(), page.Header)
if err != nil {
return err
}
// headerBuff, err := divider.serializer.Write(context.TODO(), page.Header)
// if err != nil {
// return err
// }
divider.pageValues[divider.pagesRead] = append(divider.pageValues[divider.pagesRead], headerBuff...)
// divider.pageValues[divider.pagesRead] = append(divider.pageValues[divider.pagesRead], headerBuff...)
divider.pageValues[divider.pagesRead] = append(divider.pageValues[divider.pagesRead], page.RawData...)
// // test
// manyPages := true
// if len(headerBuff)+len(page.RawData) == len(divider.pageValues[divider.pagesRead]) {
// manyPages = false
// }
// fmt.Printf("Column %d, page %d: headerSize=%d; dataSize=%d; valueSize=%d; pageSize=%d; manyPages=%t\n",
// col, divider.pagesRead, len(headerBuff), len(page.RawData), len(page.RawData)-7, len(divider.pageValues[divider.pagesRead]), manyPages)
// if len(divider.pageValues[divider.pagesRead]) > 2048 {
// fmt.Printf("\nToo big page!!!\n")
// }
// // test
// uncompressed, err := Compress.Uncompress(page.RawData, page.CompressType)
// if err != nil {
......@@ -186,14 +176,14 @@ func (divider *Divider) readPagesFromColumn(column *ParquetReader.ColumnBufferTy
// if err != nil {
// return err
// }
// _, err = fp.Write(headerBuff)
// if err != nil {
// return err
// }
// _, err = fp.Write([]byte("+++++++++"))
// if err != nil {
// return err
// }
// // _, err = fp.Write(headerBuff)
// // if err != nil {
// // return err
// // }
// // _, err = fp.Write([]byte("+++++++++"))
// // if err != nil {
// // return err
// // }
// _, err = fp.Write(page.RawData)
// if err != nil {
// return err
......@@ -214,6 +204,8 @@ func (divider *Divider) readMetaData(columnChunksValuesNo []int) error {
// }
// defer fp.Close()
fmt.Printf("%v\n", columnChunksValuesNo)
footerBuff, err := divider.serializer.Write(context.TODO(), divider.reader.Footer)
if err != nil {
return err
......
......@@ -5,6 +5,8 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"strconv"
"io"
"math"
......@@ -162,35 +164,35 @@ func (processor *Processor) GetFile(key []byte, parquetFilePath string) error {
return errors.New("No value found at the given key")
}
// // test
// err = os.Mkdir("out2", 0777)
// if err != nil {
// return err
// }
// for i, v := range pageValues {
// fp, err := os.Create("out2/page" + strconv.Itoa(i))
// if err != nil {
// return err
// }
// defer fp.Close()
// _, err = fp.Write(v)
// if err != nil {
// return err
// }
// fmt.Printf("Page %d len: %d\n", i, len(v))
// }
composer, err := NewComposer(pageValues, parquetFilePath, processor.parquetSchema)
// test
err = os.Mkdir("out2", 0777)
if err != nil {
return err
}
defer composer.Close()
err = composer.ComposeFile()
if err != nil {
return err
for i, v := range pageValues {
fp, err := os.Create("out2/page" + strconv.Itoa(i))
if err != nil {
return err
}
defer fp.Close()
_, err = fp.Write(v)
if err != nil {
return err
}
}
// This no longer works after eliminating the page header -> should build it manually in the future
// composer, err := NewComposer(pageValues, parquetFilePath, processor.parquetSchema)
// if err != nil {
// return err
// }
// defer composer.Close()
// err = composer.ComposeFile()
// if err != nil {
// return err
// }
return nil
}
......@@ -298,7 +300,7 @@ func (processor *Processor) SetRotationMatrix(parquetFilePath string) error {
return processor.client.GetRotationMatrix()
}
func (processor *Processor) GetPerturbedRows(key []byte, n int) ([][]float64, error) {
func (processor *Processor) GetPerturbedRows(key []byte, shouldDecompress bool, n int) ([][]float64, error) {
var err error
if processor.parquetFileMetaData == nil {
......@@ -389,7 +391,7 @@ func (processor *Processor) GetPerturbedRows(key []byte, n int) ([][]float64, er
processor.valueNoOffsets[outputColumnsIndices[i]] = 0
}
pages, err := processor.client.GetBulkN(keys, getCondNo, getNo, n)
pages, err := processor.client.GetBulkN(keys, getCondNo, getNo, shouldDecompress, n)
if err != nil {
return nil, err
}
......
......@@ -117,15 +117,16 @@ func getRot(file string) ([][]float64, []int) {
func main() {
var (
err error
hostAddress string
inFilePath string
keyString string
repeatsNo int
bulkSize int
shouldWrite bool
mode string
shouldPrint bool
err error
hostAddress string
inFilePath string
keyString string
repeatsNo int
bulkSize int
shouldWrite bool
mode string
shouldPrint bool
shouldDecompress bool
)
flag.StringVar(&hostAddress, "h", "localhost:11211", "The address of the server (host:port).")
......@@ -136,6 +137,7 @@ func main() {
flag.BoolVar(&shouldWrite, "w", false, "Set true if result files should be written.")