Commit 5d76c5b0 authored by Gari Singh's avatar Gari Singh Committed by Gerrit Code Review
Browse files

Merge changes I6ffa3ef6,Ic75bee7c into feature/convergence

* changes:
  Allow ledger selection to be done at runtime for solo.
  Add a fileledger implementation in rawledger.
parents 2326cf90 987b7575
# Hyperledger Ordering Service
The hyperledger fabric ordering service is intended to provide an atomic broadcast ordering service for consumption by the peers. This means that many clients may submit messages for ordering, and all clients are delivered the same series of ordered batches in response.
## Protocol definition
The atomic broadcast ordering protocol for hyperledger fabric is described in `hyperledger/fabric/orderer/atomicbroadcast/ab.proto`. There are two services, the `Broadcast` service for injecting messages into the system, and the `Deliver` service for receiving ordered batches from the service. Sometimes, the service will reside over the network, while othertimes, the service may be bound locally into a peer process. The service may be bound locally for single process development deployments, or when the underlying ordering service has its own backing network protocol and the proto serves only as a wrapper.
## Service types
* Solo Orderer:
The solo orderer is intended to be an extremely easy to deploy, non-production orderer. It consists of a single process which serves all clients, so no `consensus' is required as there is a single central authority. There is correspondingly no high availability or scalability. This makes solo ideal for development and testing, but not deployment. The Solo orderer depends on a backing raw ledger.
* Kafka Orderer (pending):
The Kafka orderer leverages the Kafka pubsub system to perform the ordering, but wraps this in the familiar `ab.proto` definition so that the peer orderer client code does not to be written specifically for Kafka. In real world deployments, it would be expected that the Kafka proto service would bound locally in process, as Kafka has its own robust wire protocol. However, for testing or novel deployment scenarios, the Kafka orderer may be deployed as a network service. Kafka is anticipated to be the preferred choice production deployments which demand high throughput and high availability but do not require byzantine fault tolerance. The Kafka orderer does not utilize a backing raw ledger because this is handled by the Kafka brokers.
* PBFT Orderer (pending):
The PBFT orderer uses the hyperledger fabric PBFT implementation to order messages in a byzantine fault tolerant way. Because the implementation is being developed expressly for the hyperledger fabric, the `ab.proto` is used for wireline communication to the PBFT orderer. Therefore it is unusual to bind the PBFT orderer into the peer process, though might be desirable for some deployments. The PBFT orderer depends on a backing raw ledger.
## Raw Ledger Types
Because the ordering service must allow clients to seek within the ordered batch stream, orderers must maintain a local copy of past batches. The length of time batches are retained may be configurable (or all batches may be retained indefinitely). Not all ledgers are crash fault tolerant, so care should be used when selecting a ledger for an application. Because the raw leger interface is abstracted, the ledger type for a particular orderer may be selected at runtime. Not all orderers require (or can utilize) a backing raw ledger (for instance Kafka, does not).
* RAM Ledger
The RAM ledger implementation is a simple development oriented ledger which stores batches purely in RAM, with a configurable history size for retention. This ledger is not crash fault tolerant, restarting the process will reset the ledger to the genesis block. This is the default ledger.
* File Ledger
The file ledger implementation is a simple development oriented ledger which stores batches as JSON encoded files on the filesystem. This is intended to make inspecting the ledger easy and to allow for crash fault tolerance. This ledger is not intended to be performant, but is intended to be simple and easy to deploy and understand. This ledger may be enabled before executing the `orderer` binary by setting `ORDERER_LEDGER_TYPE=file` (note: this is a temporary hack and may not persist into the future).
* Other Ledgers
There are currently no other raw ledgers available, although it is anticipated that some high performance database or other log based storage system will eventually be adapter for production deployments.
## Experimenting with the orderer service
To experiment with the orderer service you may build the orderer binary by simply typing `go build` in the `hyperledger/fabric/orderer` directory. You may then invoke the orderer binary with no parameters, or you can override the bind address, port, and backing ledger by setting the environment variables `ORDERER_LISTEN_ADDRESS`, `ORDERER_LISTEN_PORT` and `ORDERER_LEDGER_TYPE` respectively. Presently, only the solo orderer is supported. The deployment and configuration is very stopgap at this point, so expect for this to change noticably in the future.
There are sample clients in the `fabric/orderer/sample_clients` directory. The `broadcast_timestamp` client sends a message containing the timestamp to the `Broadcast` service. The `deliver_stdout` client prints received batches to stdout from the `Deliver` interface. These may both be build simply by typing `go build` in their respective directories. Neither presently supports config, so editing the source manually to adjust address and port is required.
......@@ -18,10 +18,14 @@ package main
import (
"fmt"
"io/ioutil"
"net"
"os"
"time"
"github.com/hyperledger/fabric/orderer/rawledger"
"github.com/hyperledger/fabric/orderer/rawledger/fileledger"
"github.com/hyperledger/fabric/orderer/rawledger/ramledger"
"github.com/hyperledger/fabric/orderer/solo"
"google.golang.org/grpc"
......@@ -47,6 +51,27 @@ func main() {
grpcServer := grpc.NewServer()
solo.New(100, 10, 10, 10*time.Second, grpcServer)
// Stand in until real config
ledgerType := os.Getenv("ORDERER_LEDGER_TYPE")
var rawledger rawledger.ReadWriter
switch ledgerType {
case "file":
name, err := ioutil.TempDir("", "hyperledger") // TODO, config
if err != nil {
panic(fmt.Errorf("Error creating temp dir: %s", err))
}
rawledger = fileledger.New(name)
case "ram":
fallthrough
default:
historySize := 10 // TODO, config
rawledger = ramledger.New(historySize)
}
queueSize := 100 // TODO configure
batchSize := 10
batchTimeout := 10 * time.Second
solo.New(queueSize, batchSize, batchTimeout, rawledger, grpcServer)
grpcServer.Serve(lis)
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fileledger
import (
"fmt"
"io/ioutil"
"os"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/rawledger"
"github.com/golang/protobuf/jsonpb"
"github.com/op/go-logging"
)
var logger = logging.MustGetLogger("rawledger/fileledger")
var closedChan chan struct{}
func init() {
logging.SetLevel(logging.DEBUG, "")
closedChan = make(chan struct{})
close(closedChan)
}
const blockFileFormatString string = "block_%020d.json"
type cursor struct {
fl *fileLedger
blockNumber uint64
}
type fileLedger struct {
directory string
fqFormatString string
height uint64
signal chan struct{}
lastHash []byte
marshaler *jsonpb.Marshaler
}
// New creates a new instance of the file ledger
func New(directory string) rawledger.ReadWriter {
logger.Debugf("Initializing fileLedger at '%s'", directory)
if err := os.MkdirAll(directory, 0700); err != nil {
panic(err)
}
fl := &fileLedger{
directory: directory,
fqFormatString: directory + "/" + blockFileFormatString,
signal: make(chan struct{}),
marshaler: &jsonpb.Marshaler{Indent: " "},
}
genesisBlock := &ab.Block{
Number: 0,
PrevHash: []byte("GENESIS"),
}
if _, err := os.Stat(fl.blockFilename(genesisBlock.Number)); os.IsNotExist(err) {
fl.writeBlock(genesisBlock)
}
fl.initializeBlockHeight()
logger.Debugf("Initialized to block height %d with hash %x", fl.height-1, fl.lastHash)
return fl
}
// initializeBlockHeight verifies all blocks exist between 0 and the block height, and populates the lastHash
func (fl *fileLedger) initializeBlockHeight() {
infos, err := ioutil.ReadDir(fl.directory)
if err != nil {
panic(err)
}
nextNumber := uint64(0)
for _, info := range infos {
if info.IsDir() {
continue
}
var number uint64
_, err := fmt.Sscanf(info.Name(), blockFileFormatString, &number)
if err != nil {
continue
}
if number != nextNumber {
panic(fmt.Errorf("Missing block %d in the chain", nextNumber))
}
nextNumber++
}
fl.height = nextNumber
block, found := fl.readBlock(fl.height - 1)
if !found {
panic(fmt.Errorf("Block %d was in directory listing but error reading", fl.height-1))
}
if block == nil {
panic(fmt.Errorf("Error reading block %d", fl.height-1))
}
fl.lastHash = block.Hash()
}
// blockFilename returns the fully qualified path to where a block of a given number should be stored on disk
func (fl *fileLedger) blockFilename(number uint64) string {
return fmt.Sprintf(fl.fqFormatString, number)
}
// writeBlock commits a block to disk
func (fl *fileLedger) writeBlock(block *ab.Block) {
file, err := os.Create(fl.blockFilename(block.Number))
if err != nil {
panic(err)
}
defer file.Close()
err = fl.marshaler.Marshal(file, block)
logger.Debugf("Wrote block %d", block.Number)
if err != nil {
panic(err)
}
}
// readBlock returns the block or nil, and whether the block was found or not, (nil,true) generally indicates an irrecoverable problem
func (fl *fileLedger) readBlock(number uint64) (*ab.Block, bool) {
file, err := os.Open(fl.blockFilename(number))
if err == nil {
defer file.Close()
block := &ab.Block{}
err = jsonpb.Unmarshal(file, block)
if err != nil {
return nil, true
}
logger.Debugf("Read block %d", block.Number)
return block, true
}
return nil, false
}
// Height returns the highest block number in the chain, plus one
func (fl *fileLedger) Height() uint64 {
return fl.height
}
// Append creates a new block and appends it to the ledger
func (fl *fileLedger) Append(messages []*ab.BroadcastMessage, proof []byte) *ab.Block {
block := &ab.Block{
Number: fl.height,
PrevHash: fl.lastHash,
Messages: messages,
Proof: proof,
}
fl.writeBlock(block)
fl.height++
close(fl.signal)
fl.signal = make(chan struct{})
return block
}
// Iterator implements the rawledger.Reader definition
func (fl *fileLedger) Iterator(startType ab.SeekInfo_StartType, specified uint64) (rawledger.Iterator, uint64) {
switch startType {
case ab.SeekInfo_OLDEST:
return &cursor{fl: fl, blockNumber: 0}, 0
case ab.SeekInfo_NEWEST:
high := fl.height - 1
return &cursor{fl: fl, blockNumber: high}, high
case ab.SeekInfo_SPECIFIED:
if specified > fl.height {
return &rawledger.NotFoundErrorIterator{}, 0
}
return &cursor{fl: fl, blockNumber: specified}, specified
}
// This line should be unreachable, but the compiler requires it
return &rawledger.NotFoundErrorIterator{}, 0
}
// Next blocks until there is a new block available, or returns an error if the next block is no longer retrievable
func (cu *cursor) Next() (*ab.Block, ab.Status) {
// This only loops once, as signal reading indicates the new block has been written
for {
block, found := cu.fl.readBlock(cu.blockNumber)
if found {
if block == nil {
return nil, ab.Status_SERVICE_UNAVAILABLE
}
cu.blockNumber++
return block, ab.Status_SUCCESS
}
<-cu.fl.signal
}
}
// ReadyChan returns a channel that will close when Next is ready to be called without blocking
func (cu *cursor) ReadyChan() <-chan struct{} {
signal := cu.fl.signal
if _, err := os.Stat(cu.fl.blockFilename(cu.blockNumber)); os.IsNotExist(err) {
return signal
}
return closedChan
}
/*
Copyright IBM Corp. 2016 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fileledger
import (
"bytes"
"io/ioutil"
"os"
"testing"
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
)
type testEnv struct {
t *testing.T
location string
}
func initialize(t *testing.T) (*testEnv, *fileLedger) {
name, err := ioutil.TempDir("", "hyperledger")
if err != nil {
t.Fatalf("Error creating temp dir: %s", err)
}
return &testEnv{location: name, t: t}, New(name).(*fileLedger)
}
func (tev *testEnv) tearDown() {
err := os.RemoveAll(tev.location)
if err != nil {
tev.t.Fatalf("Error tearing down env: %s", err)
}
}
func TestInitialization(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
if fl.height != 1 {
t.Fatalf("Block height should be 1")
}
block, found := fl.readBlock(0)
if block == nil || !found {
t.Fatalf("Error retrieving genesis block")
}
if !bytes.Equal(block.Hash(), fl.lastHash) {
t.Fatalf("Block hashes did no match")
}
}
func TestReinitialization(t *testing.T) {
tev, ofl := initialize(t)
defer tev.tearDown()
ofl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
fl := New(tev.location).(*fileLedger)
if fl.height != 2 {
t.Fatalf("Block height should be 2")
}
block, found := fl.readBlock(1)
if block == nil || !found {
t.Fatalf("Error retrieving block 1")
}
if !bytes.Equal(block.Hash(), fl.lastHash) {
t.Fatalf("Block hashes did no match")
}
}
func TestAddition(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
prevHash := fl.lastHash
fl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
if fl.height != 2 {
t.Fatalf("Block height should be 2")
}
block, found := fl.readBlock(1)
if block == nil || !found {
t.Fatalf("Error retrieving genesis block")
}
if !bytes.Equal(block.PrevHash, prevHash) {
t.Fatalf("Block hashes did no match")
}
}
func TestRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
fl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
it, num := fl.Iterator(ab.SeekInfo_OLDEST, 99)
if num != 0 {
t.Fatalf("Expected genesis block iterator, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should be ready for block read")
}
block, status := it.Next()
if status != ab.Status_SUCCESS {
t.Fatalf("Expected to successfully read the genesis block")
}
if block.Number != 0 {
t.Fatalf("Expected to successfully retrieve the genesis block")
}
signal = it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should still be ready for block read")
}
block, status = it.Next()
if status != ab.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
}
if block.Number != 1 {
t.Fatalf("Expected to successfully retrieve the second block but got block number %d", block.Number)
}
}
func TestBlockedRetrieval(t *testing.T) {
tev, fl := initialize(t)
defer tev.tearDown()
it, num := fl.Iterator(ab.SeekInfo_SPECIFIED, 1)
if num != 1 {
t.Fatalf("Expected block iterator at 1, but got %d", num)
}
signal := it.ReadyChan()
select {
case <-signal:
t.Fatalf("Should not be ready for block read")
default:
}
fl.Append([]*ab.BroadcastMessage{&ab.BroadcastMessage{Data: []byte("My Data")}}, nil)
select {
case <-signal:
default:
t.Fatalf("Should now be ready for block read")
}
block, status := it.Next()
if status != ab.Status_SUCCESS {
t.Fatalf("Expected to successfully read the second block")
}
if block.Number != 1 {
t.Fatalf("Expected to successfully retrieve the second block")
}
}
......@@ -21,7 +21,6 @@ import (
ab "github.com/hyperledger/fabric/orderer/atomicbroadcast"
"github.com/hyperledger/fabric/orderer/rawledger"
"github.com/hyperledger/fabric/orderer/rawledger/ramledger"
"github.com/op/go-logging"
"google.golang.org/grpc"
......@@ -39,13 +38,11 @@ const MagicLargestWindow = 1000
type server struct {
bs *broadcastServer
ds *deliverServer
rl rawledger.ReadWriter
}
// New creates a ab.AtomicBroadcastServer based on the solo orderer implementation
func New(queueSize, batchSize, historySize int, batchTimeout time.Duration, grpcServer *grpc.Server) ab.AtomicBroadcastServer {
logger.Infof("Starting solo with queueSize=%d batchSize=%d historySize=%d batchTimeout=%v", queueSize, batchSize, historySize, batchTimeout)
rl := ramledger.New(historySize)
func New(queueSize, batchSize int, batchTimeout time.Duration, rl rawledger.ReadWriter, grpcServer *grpc.Server) ab.AtomicBroadcastServer {
logger.Infof("Starting solo with queueSize=%d, batchSize=%d batchTimeout=%v and ledger=%T", queueSize, batchSize, batchTimeout, rl)
s := &server{
bs: newBroadcastServer(queueSize, batchSize, batchTimeout, rl),
ds: newDeliverServer(rl, MagicLargestWindow),
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment