Commit c39d69bd authored by Will Lahti's avatar Will Lahti
Browse files

[FAB-7273] Update deliver to facilitate usage on peer



This CR updates the deliver functionality to facilitate its usage on
a peer as well as an orderer.

This required:
- modifying the signal logic for when a new block is available due to
 the difference in addition of blocks to the ledger between the orderer
 and the peer. The signal logic is now handled using the iterator
 itself, which signals when it finds a new block
- adding a policy variable to the deliver handler to ensure the peer
and orderer each can control access to deliver

Change-Id: Iebb6c25a8c5ac32d65f909eb0519f26bfde0dc31
Signed-off-by: default avatarWill Lahti <wtlahti@us.ibm.com>
parent 0dfe4f35
......@@ -65,13 +65,15 @@ type Support interface {
}
type deliverServer struct {
sm SupportManager
sm SupportManager
policyName string
}
// NewHandlerImpl creates an implementation of the Handler interface
func NewHandlerImpl(sm SupportManager) Handler {
func NewHandlerImpl(sm SupportManager, policyName string) Handler {
return &deliverServer{
sm: sm,
sm: sm,
policyName: policyName,
}
}
......@@ -137,7 +139,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
lastConfigSequence := chain.Sequence()
sf := NewSigFilter(policies.ChannelReaders, chain)
sf := NewSigFilter(ds.policyName, chain)
if err := sf.Apply(envelope); err != nil {
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
return sendStatusReply(srv, cb.Status_FORBIDDEN)
......@@ -173,21 +175,21 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
}
for {
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
select {
case <-erroredChan:
logger.Warningf("[channel: %s] Aborting deliver for request because of consenter error", chdr.ChannelId, addr)
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
case <-cursor.ReadyChan():
}
} else {
select {
case <-cursor.ReadyChan():
default:
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return sendStatusReply(srv, cb.Status_NOT_FOUND)
}
}
block, status := nextBlock(cursor, erroredChan)
if status != cb.Status_SUCCESS {
cursor.Close()
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}
// increment block number to support FAIL_IF_NOT_READY deliver behavior
number++
currentConfigSequence := chain.Sequence()
if currentConfigSequence > lastConfigSequence {
lastConfigSequence = currentConfigSequence
......@@ -197,12 +199,6 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
}
}
block, status := cursor.Next()
if status != cb.Status_SUCCESS {
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
return sendStatusReply(srv, status)
}
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
if err := sendBlockReply(srv, block); err != nil {
......@@ -226,6 +222,22 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
}
func nextBlock(cursor blockledger.Iterator, cancel <-chan struct{}) (block *cb.Block, status cb.Status) {
done := make(chan struct{})
go func() {
defer close(done)
block, status = cursor.Next()
}()
select {
case <-done:
return
case <-cancel:
logger.Warningf("Aborting deliver for request because of background error")
return nil, cb.Status_SERVICE_UNAVAILABLE
}
}
func sendStatusReply(srv ab.AtomicBroadcast_DeliverServer, status cb.Status) error {
return srv.Send(&ab.DeliverResponse{
Type: &ab.DeliverResponse_Status{Status: status},
......
......@@ -41,6 +41,8 @@ var genesisBlock = cb.NewBlock(0, nil)
var systemChainID = "systemChain"
var policyName = policies.ChannelReaders
const ledgerSize = 10
func init() {
......@@ -156,7 +158,7 @@ func initializeDeliverHandler() Handler {
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
}
return NewHandlerImpl(mm)
return NewHandlerImpl(mm, policyName)
}
func newMockMultichainManager() *mockSupportManager {
......@@ -288,7 +290,7 @@ func TestUnauthorizedSeek(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)
......@@ -313,7 +315,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)
......@@ -396,7 +398,7 @@ func TestBlockingSeek(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)
......@@ -450,7 +452,7 @@ func TestErroredSeek(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)
......@@ -474,7 +476,7 @@ func TestErroredBlockingSeek(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)
......@@ -499,7 +501,7 @@ func TestErroredBlockingSeek(t *testing.T) {
func TestSGracefulShutdown(t *testing.T) {
m := newMockD()
ds := NewHandlerImpl(nil)
ds := NewHandlerImpl(nil, policyName)
close(m.recvChan)
assert.NoError(t, ds.Handle(m), "Expected no error for hangup")
......@@ -527,7 +529,7 @@ func TestReversedSeqSeek(t *testing.T) {
}
func TestBadStreamRecv(t *testing.T) {
bh := NewHandlerImpl(nil)
bh := NewHandlerImpl(nil, policyName)
assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error")
}
......@@ -616,7 +618,7 @@ func TestChainNotFound(t *testing.T) {
m := newMockD()
defer close(m.recvChan)
ds := NewHandlerImpl(mm)
ds := NewHandlerImpl(mm, policyName)
go ds.Handle(m)
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})
......
......@@ -46,7 +46,7 @@ func (flf *fileLedgerFactory) GetOrCreate(chainID string) (blockledger.ReadWrite
if err != nil {
return nil, err
}
ledger = &fileLedger{blockStore: blockStore, signal: make(chan struct{})}
ledger = NewFileLedger(blockStore)
flf.ledgers[key] = ledger
return ledger, nil
}
......
......@@ -19,7 +19,6 @@ package fileledger
import (
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/blockledger"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
......@@ -40,13 +39,26 @@ func init() {
}
// FileLedger is a struct used to interact with a node's ledger
type fileLedger struct {
blockStore blkstorage.BlockStore
type FileLedger struct {
blockStore FileLedgerBlockStore
signal chan struct{}
}
// FileLedgerBlockStore defines the interface to interact with deliver when using a
// file ledger
type FileLedgerBlockStore interface {
AddBlock(block *cb.Block) error
GetBlockchainInfo() (*cb.BlockchainInfo, error)
RetrieveBlocks(startBlockNumber uint64) (ledger.ResultsIterator, error)
}
// NewFileLedger creates a new FileLedger for interaction with the ledger
func NewFileLedger(blockStore FileLedgerBlockStore) *FileLedger {
return &FileLedger{blockStore: blockStore, signal: make(chan struct{})}
}
type fileLedgerIterator struct {
ledger *fileLedger
ledger *FileLedger
blockNumber uint64
commonIterator ledger.ResultsIterator
}
......@@ -54,17 +66,11 @@ type fileLedgerIterator struct {
// Next blocks until there is a new block available, or returns an error if the
// next block is no longer retrievable
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
for {
if i.blockNumber < i.ledger.Height() {
result, err := i.commonIterator.Next()
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
i.blockNumber++
return result.(*cb.Block), cb.Status_SUCCESS
}
<-i.ledger.signal
result, err := i.commonIterator.Next()
if err != nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
return result.(*cb.Block), cb.Status_SUCCESS
}
// ReadyChan supplies a channel which will block until Next will not block
......@@ -83,7 +89,7 @@ func (i *fileLedgerIterator) Close() {
// Iterator returns an Iterator, as specified by an ab.SeekInfo message, and its
// starting block number
func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) {
case *ab.SeekPosition_Oldest:
......@@ -114,7 +120,7 @@ func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iter
}
// Height returns the number of blocks on the ledger
func (fl *fileLedger) Height() uint64 {
func (fl *FileLedger) Height() uint64 {
info, err := fl.blockStore.GetBlockchainInfo()
if err != nil {
logger.Panic(err)
......@@ -123,7 +129,7 @@ func (fl *fileLedger) Height() uint64 {
}
// Append a new block to the ledger
func (fl *fileLedger) Append(block *cb.Block) error {
func (fl *FileLedger) Append(block *cb.Block) error {
err := fl.blockStore.AddBlock(block)
if err == nil {
close(fl.signal)
......
......@@ -46,7 +46,7 @@ type testEnv struct {
flf blockledger.Factory
}
func initialize(t *testing.T) (*testEnv, *fileLedger) {
func initialize(t *testing.T) (*testEnv, *FileLedger) {
name, err := ioutil.TempDir("", "hyperledger_fabric")
assert.NoError(t, err, "Error creating temp dir: %s", err)
......@@ -55,7 +55,7 @@ func initialize(t *testing.T) (*testEnv, *fileLedger) {
assert.NoError(t, err, "Error GetOrCreate chain")
fl.Append(genesisBlock)
return &testEnv{location: name, t: t, flf: flf}, fl.(*fileLedger)
return &testEnv{location: name, t: t, flf: flf}, fl.(*FileLedger)
}
func (tev *testEnv) tearDown() {
......@@ -155,14 +155,11 @@ func TestReinitialization(t *testing.T) {
ledger1.Append(b1)
fl, err := tev.flf.GetOrCreate(genesisconfig.TestChainID)
ledger1, ok := fl.(*fileLedger)
ledger1, ok := fl.(*FileLedger)
assert.NoError(t, err, "Expected to sucessfully get test chain")
assert.Equal(t, 1, len(tev.flf.ChainIDs()), "Exptected not new chain to be created")
assert.True(t, ok, "Exptected type assertion to succeed")
// shutdown the ledger
ledger1.blockStore.Shutdown()
// shut down the ledger provider
tev.shutDown()
......@@ -177,7 +174,7 @@ func TestReinitialization(t *testing.T) {
ledger2, err := provider2.GetOrCreate(chains[0])
assert.NoError(t, err, "Unexpected error: %s", err)
fl = ledger2.(*fileLedger)
fl = ledger2.(*FileLedger)
assert.Equal(t, uint64(2), fl.Height(), "Block height should be 2. Got %v", fl.Height())
block := blockledger.GetBlock(fl, 1)
......@@ -206,24 +203,10 @@ func TestRetrieval(t *testing.T) {
defer it.Close()
assert.Zero(t, num, "Expected genesis block iterator, but got %d", num)
signal := it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should be ready for block read")
}
block, status := it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the genesis block")
assert.Zero(t, block.Header.Number, "Expected to successfully retrieve the genesis block")
signal = it.ReadyChan()
select {
case <-signal:
default:
t.Fatalf("Should still be ready for block read")
}
block, status = it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the second block")
assert.Equal(
......@@ -243,18 +226,7 @@ func TestBlockedRetrieval(t *testing.T) {
}
assert.Equal(t, uint64(1), num, "Expected block iterator at 1, but got %d", num)
signal := it.ReadyChan()
select {
case <-signal:
t.Fatalf("Should not be ready for block read")
default:
}
fl.Append(blockledger.CreateNextBlock(fl, []*cb.Envelope{{Payload: []byte("My Data")}}))
select {
case <-signal:
default:
t.Fatalf("Should now be ready for block read")
}
block, status := it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the second block")
......@@ -264,17 +236,11 @@ func TestBlockedRetrieval(t *testing.T) {
block.Header.Number,
"Expected to successfully retrieve the second block but got block number %d", block.Header.Number)
go func() {
fl.Append(blockledger.CreateNextBlock(fl, []*cb.Envelope{{Payload: []byte("My Data")}}))
}()
select {
case <-it.ReadyChan():
t.Fatalf("Should not be ready for block read")
default:
block, status = it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the third block")
assert.Equal(t, uint64(2), block.Header.Number, "Expected to successfully retrieve the third block")
}
fl.Append(blockledger.CreateNextBlock(fl, []*cb.Envelope{{Payload: []byte("My Data")}}))
block, status = it.Next()
assert.Equal(t, cb.Status_SUCCESS, status, "Expected to successfully read the third block")
assert.Equal(t, uint64(2), block.Header.Number, "Expected to successfully retrieve the third block")
}
func TestBlockstoreError(t *testing.T) {
......@@ -282,7 +248,7 @@ func TestBlockstoreError(t *testing.T) {
// is properly handled. We don't bother creating fully
// legit ledgers here (without genesis block).
{
fl := &fileLedger{
fl := &FileLedger{
blockStore: &mockBlockStore{
blockchainInfo: nil,
getBlockchainInfoError: fmt.Errorf("Error getting blockchain info"),
......@@ -303,7 +269,7 @@ func TestBlockstoreError(t *testing.T) {
}
{
fl := &fileLedger{
fl := &FileLedger{
blockStore: &mockBlockStore{
blockchainInfo: &cb.BlockchainInfo{Height: uint64(1)},
getBlockchainInfoError: nil,
......@@ -324,7 +290,7 @@ func TestBlockstoreError(t *testing.T) {
resultsIterator := &mockBlockStoreIterator{}
resultsIterator.On("Next").Return(nil, errors.New("a mocked error"))
resultsIterator.On("Close").Return()
fl := &fileLedger{
fl := &FileLedger{
blockStore: &mockBlockStore{
blockchainInfo: &cb.BlockchainInfo{Height: uint64(1)},
getBlockchainInfoError: nil,
......
......@@ -16,6 +16,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/orderer/common/broadcast"
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
"github.com/hyperledger/fabric/orderer/common/multichannel"
......@@ -48,7 +49,7 @@ type server struct {
// NewServer creates a ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
func NewServer(r *multichannel.Registrar, _ crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer {
s := &server{
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}),
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}, policies.ChannelReaders),
bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
debug: debug,
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment