Commit 4843e7f5 authored by Jonathan Levi's avatar Jonathan Levi Committed by Gerrit Code Review
Browse files

Merge "Add Kafka-based orderer" into feature/convergence

parents 8ff4a0b7 f6640f22
ordererBase:
image: hyperledger/fabric-orderer
environment:
- ORDERER_GENERAL_LEDGERTYPE=ram
- ORDERER_GENERAL_BATCHTIMEOUT=10s
- ORDERER_GENERAL_BATCHSIZE=10
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_LISTENPORT=5005
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer
kafka0:
image: kchristidis/kafka
environment:
- ADVERTISED_PORT=9092
orderer0:
extends:
file: docker-compose-orderer-base.yml
service: ordererBase
environment:
- ORDERER_GENERAL_ORDERERTYPE=kafka
- ORDERER_KAFKA_BROKERS=[kafka0:9092]
links:
- kafka0
command: orderer -loglevel debug -verbose true
orderer0:
image: hyperledger/fabric-orderer
environment:
- ORDERER_GENERAL_ORDERERTYPE=solo
- ORDERER_GENERAL_LEDGERTYPE=ram
- ORDERER_GENERAL_BATCHTIMEOUT=10s
- ORDERER_GENERAL_BATCHSIZE=10
- ORDERER_GENERAL_MAXWINDOWSIZE=1000
- ORDERER_GENERAL_LISTENADDRESS=0.0.0.0
- ORDERER_GENERAL_LISTENPORT=5005
- ORDERER_RAMLEDGER_HISTORY_SIZE=100
working_dir: /opt/gopath/src/github.com/hyperledger/fabric/orderer
command: orderer
extends:
file: docker-compose-orderer-base.yml
service: ordererBase
environment:
- ORDERER_GENERAL_ORDERERTYPE=solo
#
# Test Orderer
#
# Tags that can be used and will affect test internals:
# @doNotDecompose will NOT decompose the named compose_yaml after scenario ends. Useful for setting up environment and reviewing after scenario.
# @chaincodeImagesUpToDate use this if all scenarios chaincode images are up to date, and do NOT require building. BE SURE!!!
# @doNotDecompose will NOT decompose the named compose_yaml after scenario ends.
# Useful for setting up environment and reviewing after scenario.
#@chaincodeImagesUpToDate
@orderer
Feature: Orderer
As a Fabric developer
I want to run and validate a orderer service
# @doNotDecompose
Scenario Outline: Basic orderer function
Scenario Outline: Basic orderer function
Given we compose "<ComposeFile>"
And I wait ".5" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
Given we compose "<ComposeFile>"
And I wait "<BootTime>" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
| Start | SpecifiedNumber | WindowSize |
| SPECIFIED | 1 | 10 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 |
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 |
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 40 | 4 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 60 | 6 | 10 | 5 |
# @doNotDecompose
Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast)
Scenario Outline: Basic seek orderer function (Utilizing properties for atomic broadcast)
Given we compose "<ComposeFile>"
And I wait ".5" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
Given we compose "<ComposeFile>"
And I wait "<BootTime>" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
| Start | SpecifiedNumber | WindowSize |
| SPECIFIED | 1 | 10 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
When user "binhn" seeks to block "1" on deliver function on "orderer0"
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "1" seconds
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
When user "binhn" seeks to block "1" on deliver function on "orderer0"
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "1" seconds
Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
# | docker-compose-orderer-solo.yml | true | 40 | 4 | 10 |
# | docker-compose-orderer-solo.yml | true | 60 | 6 | 10 |
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 40 | 4 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 60 | 6 | 10 | .5 |
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 40 | 4 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 60 | 6 | 10 | 5 |
# @doNotDecompose
Scenario Outline: Basic orderer function varying ACK
Scenario Outline: Basic orderer function varying ACK
Given we compose "<ComposeFile>"
And I wait ".5" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
Given we compose "<ComposeFile>"
And I wait "<BootTime>" seconds
And user "binhn" is an authorized user of the ordering service
When user "binhn" broadcasts "<NumMsgsToBroadcast>" unique messages on "orderer0"
And user "binhn" connects to deliver function on "orderer0" with Ack of "<SendAck>" and properties:
| Start | SpecifiedNumber | WindowSize |
| SPECIFIED | 1 | 1 |
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
Then user "binhn" should get a delivery from "orderer0" of "<ExpectedBlocks>" blocks with "<NumMsgsToBroadcast>" messages within "<BatchTimeout>" seconds
Examples: Orderer Options
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout |
| docker-compose-orderer-solo.yml | false | 20 | 1 | 10 |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 |
| ComposeFile | SendAck | NumMsgsToBroadcast | ExpectedBlocks | BatchTimeout | BootTime |
| docker-compose-orderer-solo.yml | false | 20 | 1 | 10 | .5 |
| docker-compose-orderer-solo.yml | true | 20 | 2 | 10 | .5 |
| docker-compose-orderer-kafka.yml | false | 20 | 1 | 10 | 5 |
| docker-compose-orderer-kafka.yml | true | 20 | 2 | 10 | 5 |
......@@ -23,6 +23,7 @@ import (
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/op/go-logging"
"github.com/spf13/viper"
)
......@@ -59,6 +60,21 @@ type FileLedger struct {
Prefix string
}
// Kafka contains config for the Kafka orderer
type Kafka struct {
Brokers []string
Topic string
PartitionID int32
Retry Retry
Version sarama.KafkaVersion // TODO For now set this in code
}
// Retry contains config for the reconnection attempts to the Kafka brokers
type Retry struct {
Period time.Duration
Stop time.Duration
}
// TopLevel directly corresponds to the orderer config yaml
// Note, for non 1-1 mappings, you may append
// something like `mapstructure:"weirdFoRMat"` to
......@@ -68,6 +84,7 @@ type TopLevel struct {
General General
RAMLedger RAMLedger
FileLedger FileLedger
Kafka Kafka
}
var defaults = TopLevel{
......@@ -88,6 +105,16 @@ var defaults = TopLevel{
Location: "",
Prefix: "hyperledger-fabric-rawledger",
},
Kafka: Kafka{
Brokers: []string{"127.0.0.1:9092"},
Topic: "test",
PartitionID: 0,
Version: sarama.V0_9_0_1,
Retry: Retry{
Period: 3 * time.Second,
Stop: 60 * time.Second,
},
},
}
func (c *TopLevel) completeInitialization() {
......@@ -122,7 +149,22 @@ func (c *TopLevel) completeInitialization() {
case c.FileLedger.Prefix == "":
logger.Infof("FileLedger.Prefix unset, setting to %s", defaults.FileLedger.Prefix)
c.FileLedger.Prefix = defaults.FileLedger.Prefix
case c.Kafka.Brokers == nil:
logger.Infof("Kafka.Brokers unset, setting to %v", defaults.Kafka.Brokers)
c.Kafka.Brokers = defaults.Kafka.Brokers
case c.Kafka.Topic == "":
logger.Infof("Kafka.Topic unset, setting to %v", defaults.Kafka.Topic)
c.Kafka.Topic = defaults.Kafka.Topic
case c.Kafka.Retry.Period == 0*time.Second:
logger.Infof("Kafka.Retry.Period unset, setting to %v", defaults.Kafka.Retry.Period)
c.Kafka.Retry.Period = defaults.Kafka.Retry.Period
case c.Kafka.Retry.Stop == 0*time.Second:
logger.Infof("Kafka.Retry.Stop unset, setting to %v", defaults.Kafka.Retry.Stop)
c.Kafka.Retry.Stop = defaults.Kafka.Retry.Stop
default:
// A bit hacky, but its type makes it impossible to test for a nil value.
// This may be overwritten by the Kafka orderer upon instantiation.
c.Kafka.Version = defaults.Kafka.Version
return
}
}
......@@ -140,6 +182,7 @@ func Load() *TopLevel {
config.SetConfigName("orderer")
config.AddConfigPath("./")
config.AddConfigPath("../../.")
config.AddConfigPath("../orderer/")
config.AddConfigPath("../../orderer/")
// Path to look for the config file in based on GOPATH
......
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kafka
import (
"fmt"
"sync"
"time"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"
)
// Broadcaster allows the caller to submit messages to the orderer
type Broadcaster interface {
Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error
Closeable
}
type broadcasterImpl struct {
producer Producer
config *config.TopLevel
once sync.Once
batchChan chan *ab.BroadcastMessage
messages []*ab.BroadcastMessage
nextNumber uint64
prevHash []byte
}
func newBroadcaster(conf *config.TopLevel) Broadcaster {
return &broadcasterImpl{
producer: newProducer(conf),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("genesis")}},
nextNumber: 0,
}
}
// Broadcast receives ordering requests by clients and sends back an
// acknowledgement for each received message in order, indicating
// success or type of failure
func (b *broadcasterImpl) Broadcast(stream ab.AtomicBroadcast_BroadcastServer) error {
b.once.Do(func() {
// Send the genesis block to create the topic
// otherwise consumers will throw an exception.
b.sendBlock()
// Spawn the goroutine that cuts blocks
go b.cutBlock(b.config.General.BatchTimeout, b.config.General.BatchSize)
})
return b.recvRequests(stream)
}
// Close shuts down the broadcast side of the orderer
func (b *broadcasterImpl) Close() error {
if b.producer != nil {
return b.producer.Close()
}
return nil
}
func (b *broadcasterImpl) sendBlock() error {
block := &ab.Block{
Messages: b.messages,
Number: b.nextNumber,
PrevHash: b.prevHash,
}
logger.Debugf("Prepared block %d with %d messages (%+v)", block.Number, len(block.Messages), block)
b.messages = []*ab.BroadcastMessage{}
b.nextNumber++
hash, data := hashBlock(block)
b.prevHash = hash
return b.producer.Send(data)
}
func (b *broadcasterImpl) cutBlock(period time.Duration, maxSize uint) {
timer := time.NewTimer(period)
for {
select {
case msg := <-b.batchChan:
b.messages = append(b.messages, msg)
if len(b.messages) >= int(maxSize) {
if err := b.sendBlock(); err != nil {
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
}
if !timer.Stop() {
<-timer.C
}
timer.Reset(period)
}
case <-timer.C:
if len(b.messages) > 0 {
if err := b.sendBlock(); err != nil {
panic(fmt.Errorf("Cannot communicate with Kafka broker: %s", err))
}
}
}
}
}
func (b *broadcasterImpl) recvRequests(stream ab.AtomicBroadcast_BroadcastServer) error {
reply := new(ab.BroadcastResponse)
for {
msg, err := stream.Recv()
if err != nil {
logger.Debug("Can no longer receive requests from client (exited?)")
return err
}
b.batchChan <- msg
reply.Status = ab.Status_SUCCESS // TODO This shouldn't always be a success
if err := stream.Send(reply); err != nil {
logger.Info("Cannot send broadcast reply to client")
return err
}
logger.Debugf("Sent broadcast reply %v to client", reply.Status.String())
}
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kafka
import (
"testing"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/config"
)
func mockNewBroadcaster(t *testing.T, conf *config.TopLevel, seek int64, disk chan []byte) Broadcaster {
mb := &broadcasterImpl{
producer: mockNewProducer(t, conf, seek, disk),
config: conf,
batchChan: make(chan *ab.BroadcastMessage, conf.General.BatchSize),
messages: []*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("checkpoint")}},
nextNumber: uint64(seek),
}
return mb
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kafka
import (
"bytes"
"strconv"
"testing"
"time"
"github.com/golang/protobuf/proto"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)
func TestBroadcastInit(t *testing.T) {
disk := make(chan []byte)
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)
mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()
for {
select {
case in := <-disk:
block := new(ab.Block)
err := proto.Unmarshal(in, block)
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if !(bytes.Equal(block.GetMessages()[0].Data, []byte("checkpoint"))) {
t.Fatal("Expected first block to be a checkpoint")
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received the initialization block by now")
}
}
}
func TestBroadcastResponse(t *testing.T) {
disk := make(chan []byte)
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)
mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()
<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
// Send a message to the orderer
go func() {
mbs.incoming <- &ab.BroadcastMessage{Data: []byte("single message")}
}()
for {
select {
case reply := <-mbs.outgoing:
if reply.Status != ab.Status_SUCCESS {
t.Fatal("Client should have received a SUCCESS reply")
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received a broadcast reply by the orderer by now")
}
}
}
func TestBroadcastBatch(t *testing.T) {
disk := make(chan []byte)
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)
mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()
<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
mbs.incoming <- &ab.BroadcastMessage{Data: []byte("message " + strconv.Itoa(i))}
}
}()
// Ignore the broadcast replies as they have been tested elsewhere
for i := 0; i < int(testConf.General.BatchSize); i++ {
<-mbs.outgoing
}
for {
select {
case in := <-disk:
block := new(ab.Block)
err := proto.Unmarshal(in, block)
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Messages))
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received the initialization block by now")
}
}
}
func TestBroadcastBatchAndQuitEarly(t *testing.T) {
disk := make(chan []byte)
mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)
mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()
<-disk // We tested the checkpoint block in a previous test, so we can ignore it now
// Pump a batch's worth of messages into the system
go func() {
for i := 0; i < int(testConf.General.BatchSize); i++ {
mbs.incoming <- &ab.BroadcastMessage{Data: []byte("message " + strconv.Itoa(i))}
}
}()
// In contrast to TestBroadcastBatch, do not receive any replies.
// This simulates the case where you quit early (though you would
// most likely still get replies in a real world scenario, as long
// as you don't receive all of them we're on the same page).
for !mbs.CloseOut() {
}
for {
select {
case in := <-disk:
block := new(ab.Block)
err := proto.Unmarshal(in, block)
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != int(testConf.General.BatchSize) {
t.Fatalf("Expected block to have %d messages instead of %d", testConf.General.BatchSize, len(block.Messages))