Commit 5cfdffce authored by Claudiu Mihali's avatar Claudiu Mihali
Browse files

Bigfixes; Added working end-to-end prototype with decompression + perturbation in hw

parent f790292b
This diff is collapsed.
......@@ -30,6 +30,23 @@ module decompress_group_512to64
(* mark_debug = "true" *)input wire out_pred_ready
);
(* mark_debug = "true" *)reg [63:0] cnt_valid_deco;
(* mark_debug = "true" *)reg [63:0] cnt_last_deco;
always @(posedge clk) begin
if (rst == 1) begin
cnt_valid_deco <= 0;
cnt_last_deco <= 0;
end else begin
if (out_valid == 1 && out_ready == 1) begin
cnt_valid_deco <= cnt_valid_deco + 1;
if (out_last == 1) begin
cnt_last_deco <= cnt_last_deco + 1;
end
end
end
end
reg [$clog2(DECOMPRESS_ENGINES_NO)-1:0] prev_in_engine_addr;
reg [$clog2(DECOMPRESS_ENGINES_NO)-1:0] cur_in_engine_addr;
reg [$clog2(DECOMPRESS_ENGINES_NO)-1:0] cur_out_engine_addr;
......
......@@ -27,27 +27,28 @@ wire [MEMORY_WIDTH:0] buffer_input_data [COL_COUNT-1:0];
wire [COL_COUNT-1:0] buffer_input_valid;
wire [COL_COUNT-1:0] buffer_input_ready;
wire [MEMORY_WIDTH:0] buffer_output_data [COL_COUNT-1:0];
wire [COL_COUNT-1:0] buffer_output_valid;
reg [COL_COUNT-1:0] buffer_output_ready;
(* mark_debug = "true" *)wire [MEMORY_WIDTH:0] buffer_output_data [COL_COUNT-1:0];
(* mark_debug = "true" *)wire [COL_COUNT-1:0] buffer_output_valid;
(* mark_debug = "true" *)reg [COL_COUNT-1:0] buffer_output_ready;
reg [MEMORY_WIDTH-1:0] colword_buf [COL_COUNT-1:0];
reg [COL_COUNT-1:0] colword_last;
reg [$clog2(MEMORY_WIDTH)-1:0] colword_addr [COL_COUNT-1:0];
(* mark_debug = "true" *)reg [MEMORY_WIDTH-1:0] colword_buf [COL_COUNT-1:0];
(* mark_debug = "true" *)reg [COL_COUNT-1:0] colword_last;
(* mark_debug = "true" *)reg [$clog2(MEMORY_WIDTH)-1:0] colword_addr [COL_COUNT-1:0];
reg [COL_COUNT-1:0] first_word_flag;
(* mark_debug = "true" *)reg [COL_COUNT-1:0] first_word_flag;
reg [8*VALUE_SIZE_BYTES_NO-1:0] value_bytes_counter [COL_COUNT-1:0];
(* mark_debug = "true" *)reg [8*VALUE_SIZE_BYTES_NO-1:0] value_bytes_counter [COL_COUNT-1:0];
reg [COL_WIDTH*COL_COUNT-1:0] assembled_data;
reg [COL_COUNT-1:0] assembled_valid_pre;
wire assembled_valid;
wire assembled_last;
wire assembled_ready;
(* mark_debug = "true" *)reg [COL_WIDTH*COL_COUNT-1:0] assembled_data;
(* mark_debug = "true" *)reg [COL_COUNT-1:0] assembled_valid_pre;
(* mark_debug = "true" *)wire assembled_valid;
(* mark_debug = "true" *)wire assembled_last;
(* mark_debug = "true" *)wire assembled_ready;
reg [$clog2(COL_WIDTH/8)-1:0] offset [COL_COUNT-1:0];
(* mark_debug = "true" *)reg [$clog2(COL_WIDTH/8)-1:0] offset1 [COL_COUNT-1:0];
(* mark_debug = "true" *)reg [$clog2(COL_WIDTH/8)-1:0] offset2 [COL_COUNT-1:0];
integer idx, byte;
integer idx, byteIdx;
genvar i;
generate
......@@ -130,32 +131,34 @@ always @(posedge clk) begin
// HACK: there are some unwanted bytes in the beginning of the page
if (colword_buf[idx][8*VALUE_SIZE_BYTES_NO +: 8] == 8'h02) begin
value_bytes_counter[idx] <= colword_buf[idx][0 +: 8*VALUE_SIZE_BYTES_NO] - VALUE_SIZE_BYTES_NO - 6;
colword_addr[idx] <= 8 * (VALUE_SIZE_BYTES_NO + 6);
colword_addr[idx] <= VALUE_SIZE_BYTES_NO + 6;
first_word_flag[idx] <= 0;
if (idx == 0) begin
value_size_data <= colword_buf[idx][0 +: 8*VALUE_SIZE_BYTES_NO] - 6;
end
offset[idx] <= (MEMORY_WIDTH/8-(VALUE_SIZE_BYTES_NO + 6)) % (COL_WIDTH/8);
offset1[idx] <= (MEMORY_WIDTH/8-(VALUE_SIZE_BYTES_NO + 6)) % (COL_WIDTH/8);
offset2[idx] <= COL_WIDTH/8-(MEMORY_WIDTH/8-(VALUE_SIZE_BYTES_NO + 6)) % (COL_WIDTH/8);
end else begin
if (colword_buf[idx][8*VALUE_SIZE_BYTES_NO +: 8] == 8'h03) begin
value_bytes_counter[idx] <= colword_buf[idx][0 +: 8*VALUE_SIZE_BYTES_NO] - VALUE_SIZE_BYTES_NO - 7;
colword_addr[idx] <= 8 * (VALUE_SIZE_BYTES_NO + 7);
colword_addr[idx] <= VALUE_SIZE_BYTES_NO + 7;
first_word_flag[idx] <= 0;
if (idx == 0) begin
value_size_data <= colword_buf[idx][0 +: 8*VALUE_SIZE_BYTES_NO] - 7;
end
offset[idx] <= (MEMORY_WIDTH/8-(VALUE_SIZE_BYTES_NO + 7)) % (COL_WIDTH/8);
offset1[idx] <= (MEMORY_WIDTH/8-(VALUE_SIZE_BYTES_NO + 7)) % (COL_WIDTH/8);
offset2[idx] <= COL_WIDTH/8-(MEMORY_WIDTH/8-(VALUE_SIZE_BYTES_NO + 7)) % (COL_WIDTH/8);
end
end
end else begin
if (colword_addr[idx] == 0) begin
for (byte = 0; byte < (COL_WIDTH/8-offset[idx]); byte = byte + 1) begin
assembled_data[idx * COL_WIDTH + offset[idx]*8 + byte*8 +: 8] <= colword_buf[idx][colword_addr[idx] + byte*8 +: 8];
for (byteIdx = 0; byteIdx < offset2[idx]; byteIdx = byteIdx + 1) begin
assembled_data[idx * COL_WIDTH + offset1[idx]*8 + byteIdx*8 +: 8] <= colword_buf[idx][colword_addr[idx]*8 + byteIdx*8 +: 8];
end
assembled_valid_pre[idx] <= 1;
colword_addr[idx] <= colword_addr[idx] + (COL_WIDTH-offset[idx]*8);
colword_addr[idx] <= colword_addr[idx] + offset2[idx];
value_bytes_counter[idx] <= value_bytes_counter[idx] - COL_WIDTH / 8;
if (colword_addr[idx] + (COL_WIDTH-offset[idx]*8) >= MEMORY_WIDTH) begin
if (colword_addr[idx] + offset2[idx] >= MEMORY_WIDTH/8) begin
colword_addr[idx] <= 0;
buffer_output_ready[idx] <= 1;
end
......@@ -165,14 +168,14 @@ always @(posedge clk) begin
first_word_flag[idx] <= 1;
end
end else begin
if (colword_addr[idx] <= MEMORY_WIDTH - COL_WIDTH) begin
for (byte = 0; byte < COL_WIDTH/8; byte = byte + 1) begin
assembled_data[idx * COL_WIDTH + byte*8 +: 8] <= colword_buf[idx][colword_addr[idx] + byte*8 +: 8];
if (colword_addr[idx] <= MEMORY_WIDTH/8 - COL_WIDTH/8) begin
for (byteIdx = 0; byteIdx < COL_WIDTH/8; byteIdx = byteIdx + 1) begin
assembled_data[idx * COL_WIDTH + byteIdx*8 +: 8] <= colword_buf[idx][colword_addr[idx]*8 + byteIdx*8 +: 8];
end
assembled_valid_pre[idx] <= 1;
colword_addr[idx] <= colword_addr[idx] + COL_WIDTH;
colword_addr[idx] <= colword_addr[idx] + COL_WIDTH/8;
value_bytes_counter[idx] <= value_bytes_counter[idx] - COL_WIDTH / 8;
if (colword_addr[idx] + COL_WIDTH == MEMORY_WIDTH) begin
if (colword_addr[idx] + COL_WIDTH/8 == MEMORY_WIDTH/8) begin
colword_addr[idx] <= 0;
buffer_output_ready[idx] <= 1;
end
......@@ -182,12 +185,12 @@ always @(posedge clk) begin
first_word_flag[idx] <= 1;
end
end else begin
if (colword_addr[idx] >= MEMORY_WIDTH) begin
if (colword_addr[idx] >= MEMORY_WIDTH/8) begin
colword_addr[idx] <= 0;
buffer_output_ready[idx] <= 1;
end else begin
for (byte = 0; byte < offset[idx]; byte = byte + 1) begin
assembled_data[idx * COL_WIDTH + byte*8 +: 8] <= colword_buf[idx][colword_addr[idx] + byte*8 +: 8];
for (byteIdx = 0; byteIdx < offset1[idx]; byteIdx = byteIdx + 1) begin
assembled_data[idx * COL_WIDTH + byteIdx*8 +: 8] <= colword_buf[idx][colword_addr[idx]*8 + byteIdx*8 +: 8];
end
colword_addr[idx] <= 0;
buffer_output_ready[idx] <= 1;
......
......@@ -43,38 +43,18 @@ module nukv_Privacy_Pipeline
);
reg [1:0] skip_blocks;
reg first_cnt_word;
(* mark_debug = "true" *)reg [63:0] cnt_valid;
(* mark_debug = "true" *)reg [63:0] cnt_last;
(* mark_debug = "true" *)reg [63:0] cnt_valid_priv;
(* mark_debug = "true" *)reg [63:0] cnt_last_priv;
always @(posedge clk) begin
if (rst == 1) begin
cnt_valid <= 0;
cnt_last <= 0;
skip_blocks <= 0;
first_cnt_word <= 1;
cnt_valid_priv <= 0;
cnt_last_priv <= 0;
end else begin
if (skip_blocks < 3) begin
if (output_valid == 1 && output_ready == 1 && output_last == 1) begin
skip_blocks <= skip_blocks + 1;
end
end else begin
if (first_cnt_word == 1) begin
if (output_valid == 1 && output_ready == 1) begin
first_cnt_word <= 0;
cnt_valid <= 1;
if (output_last == 1) begin
cnt_last <= 1;
end
end
end else begin
if (output_valid == 1 && output_ready == 1) begin
cnt_valid <= cnt_valid + 1;
if (output_last == 1) begin
cnt_last <= cnt_last + 1;
end
end
if (output_valid == 1 && output_ready == 1) begin
cnt_valid_priv <= cnt_valid_priv + 1;
if (output_last == 1) begin
cnt_last_priv <= cnt_last_priv + 1;
end
end
end
......
......@@ -4,7 +4,6 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"github.com/apache/thrift/lib/go/thrift"
......@@ -204,8 +203,6 @@ func (divider *Divider) readMetaData(columnChunksValuesNo []int) error {
// }
// defer fp.Close()
fmt.Printf("%v\n", columnChunksValuesNo)
footerBuff, err := divider.serializer.Write(context.TODO(), divider.reader.Footer)
if err != nil {
return err
......
......@@ -332,10 +332,6 @@ func (processor *Processor) GetPerturbedRows(key []byte, shouldDecompress bool,
}
}
// fmt.Printf("perturbedColumnsIndices = %v\n", perturbedColumnsIndices)
// fmt.Printf("outputColumnsIndices = %v\n", outputColumnsIndices)
// fmt.Printf("processor.ColumnPermutation = %v\n", processor.ColumnPermutation)
keys := make([][]byte, 0, processor.columnChunksValuesNo[processor.columnsNo])
getCondNo := 0
getNo := 0
......@@ -438,10 +434,10 @@ func (processor *Processor) GetPerturbedRows(key []byte, shouldDecompress bool,
processor.columnChunksValuesNo[outputColumnsIndices[i]] {
var k int
if pages[pagesIdx][41] == 0x00 && pages[pagesIdx][42] == 0x03 {
k = 49
} else if pages[pagesIdx][41] == 0x02 && pages[pagesIdx][42] == 0x00 {
k = 47
if pages[pagesIdx][0] == 0x03 {
k = 7
} else if pages[pagesIdx][0] == 0x02 {
k = 6
}
for ; k < len(pages[pagesIdx]); k += 8 {
bits := binary.LittleEndian.Uint64(pages[pagesIdx][k : k+8])
......
......@@ -18,15 +18,22 @@ import (
func main() {
var (
err error
inFilePath string
err error
inFilePath string
shouldCompress bool
)
flag.StringVar(&inFilePath, "f", "bank_labeled.csv", "Path to the .csv input file.")
flag.BoolVar(&shouldCompress, "c", false, "Set true if the file should be compressed.")
flag.Parse()
inFilePathWithoutExtension := inFilePath[:strings.IndexByte(inFilePath, '.')]
parquetFilePath := fmt.Sprintf("%s.parquet", inFilePathWithoutExtension)
var parquetFilePath string
if shouldCompress {
parquetFilePath = fmt.Sprintf("%s_c.parquet", inFilePathWithoutExtension)
} else {
parquetFilePath = fmt.Sprintf("%s.parquet", inFilePathWithoutExtension)
}
pfin, err := os.Open(inFilePath)
if err != nil {
......@@ -56,7 +63,11 @@ func main() {
}
pw.RowGroupSize = 1024 * 1024 * 1024 * 1024
pw.CompressionType = pq.CompressionCodec_UNCOMPRESSED
if shouldCompress {
pw.CompressionType = pq.CompressionCodec_SNAPPY
} else {
pw.CompressionType = pq.CompressionCodec_UNCOMPRESSED
}
pw.PageSize = 2000
for _, csvRowString := range csvDataString[1:] {
......@@ -87,7 +98,11 @@ func main() {
}
pw.RowGroupSize = 1024 * 1024 * 1024 * 1024
pw.CompressionType = pq.CompressionCodec_UNCOMPRESSED
if shouldCompress {
pw.CompressionType = pq.CompressionCodec_SNAPPY
} else {
pw.CompressionType = pq.CompressionCodec_UNCOMPRESSED
}
pw.PageSize = 2000
for _, csvRowString := range csvDataString[1:] {
......@@ -118,7 +133,11 @@ func main() {
}
pw.RowGroupSize = 1024 * 1024 * 1024 * 1024
pw.CompressionType = pq.CompressionCodec_UNCOMPRESSED
if shouldCompress {
pw.CompressionType = pq.CompressionCodec_SNAPPY
} else {
pw.CompressionType = pq.CompressionCodec_UNCOMPRESSED
}
pw.PageSize = 2000
for _, csvRowString := range csvDataString[1:] {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment