Commit c820bcb9 authored by Jay Guo's avatar Jay Guo
Browse files

[FAB-12708] 1/4 remove current orderer benchmark tests



Remove current orderer benchmark tests. We will be using nwo onwards.

Change-Id: I20b9fc26c16e57ffe2bcd0b7b7139e31795a87e4
Signed-off-by: default avatarJay Guo <guojiannan1101@gmail.com>
parent 38157497
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package performance
import (
"context"
"io"
"sync"
"github.com/hyperledger/fabric/common/flogging"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
)
var logger = flogging.MustGetLogger("orderer.common.performance")
// BenchmarkServer is a pseudo-server that grpc services could be registered to
type BenchmarkServer struct {
server ab.AtomicBroadcastServer
start chan struct{}
halt chan struct{}
}
var (
servers []*BenchmarkServer
index int
mutex sync.Mutex
)
// InitializeServerPool instantiates a Benchmark server pool of size 'number'
func InitializeServerPool(number int) {
mutex = sync.Mutex{}
index = 0
servers = make([]*BenchmarkServer, number)
for i := 0; i < number; i++ {
servers[i] = &BenchmarkServer{
server: nil,
start: make(chan struct{}),
halt: make(chan struct{}),
}
}
}
// GetBenchmarkServer retrieves next unused server in the pool.
// This method should ONLY be called by orderer main() and it
// should be used after initialization
func GetBenchmarkServer() *BenchmarkServer {
mutex.Lock()
defer mutex.Unlock()
if index >= len(servers) {
panic("Not enough servers in the pool!")
}
defer func() { index++ }()
return servers[index]
}
// GetBenchmarkServerPool returns the whole server pool for client to use
// This should be used after initialization
func GetBenchmarkServerPool() []*BenchmarkServer {
return servers
}
// Start blocks until server being halted. It is to prevent main process to exit
func (server *BenchmarkServer) Start() {
server.halt = make(chan struct{})
close(server.start) // signal waiters that service is registered
// Block reading here to prevent process exit
<-server.halt
}
// Halt server
func Halt(server *BenchmarkServer) { server.Halt() }
// Halt server
func (server *BenchmarkServer) Halt() {
logger.Debug("Stopping benchmark server")
server.server = nil
server.start = make(chan struct{})
close(server.halt)
}
// WaitForService blocks waiting for service to be registered
func WaitForService(server *BenchmarkServer) { server.WaitForService() }
// WaitForService blocks waiting for service to be registered
func (server *BenchmarkServer) WaitForService() { <-server.start }
// RegisterService registers a grpc service to server
func (server *BenchmarkServer) RegisterService(s ab.AtomicBroadcastServer) {
server.server = s
}
// CreateBroadcastClient creates a broadcast client of this server
func (server *BenchmarkServer) CreateBroadcastClient() *BroadcastClient {
client := &BroadcastClient{
requestChan: make(chan *cb.Envelope),
responseChan: make(chan *ab.BroadcastResponse),
errChan: make(chan error),
}
go func() {
client.errChan <- server.server.Broadcast(client)
}()
return client
}
// BroadcastClient represents a broadcast client that is used to interact
// with `broadcast` API
type BroadcastClient struct {
grpc.ServerStream
requestChan chan *cb.Envelope
responseChan chan *ab.BroadcastResponse
errChan chan error
}
func (BroadcastClient) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}
// SendRequest sends an envelope to `broadcast` API synchronously
func (bc *BroadcastClient) SendRequest(request *cb.Envelope) {
// TODO make this async
bc.requestChan <- request
}
// GetResponse waits for a response of `broadcast` API synchronously
func (bc *BroadcastClient) GetResponse() *ab.BroadcastResponse {
return <-bc.responseChan
}
// Close closes a broadcast client
func (bc *BroadcastClient) Close() {
close(bc.requestChan)
}
// Errors returns the channel which return value of broadcast handler is sent to
func (bc *BroadcastClient) Errors() <-chan error {
return bc.errChan
}
// Send implements AtomicBroadcast_BroadcastServer interface
func (bc *BroadcastClient) Send(br *ab.BroadcastResponse) error {
bc.responseChan <- br
return nil
}
// Recv implements AtomicBroadcast_BroadcastServer interface
func (bc *BroadcastClient) Recv() (*cb.Envelope, error) {
msg, ok := <-bc.requestChan
if !ok {
return msg, io.EOF
}
return msg, nil
}
// CreateDeliverClient creates a broadcast client of this server
func (server *BenchmarkServer) CreateDeliverClient() *DeliverClient {
client := &DeliverClient{
requestChan: make(chan *cb.Envelope),
ResponseChan: make(chan *ab.DeliverResponse),
ResultChan: make(chan error),
}
go func() {
client.ResultChan <- server.server.Deliver(client)
}()
return client
}
// DeliverClient represents a deliver client that is used to interact
// with `deliver` API
type DeliverClient struct {
grpc.ServerStream
requestChan chan *cb.Envelope
ResponseChan chan *ab.DeliverResponse
ResultChan chan error
}
func (DeliverClient) Context() context.Context {
return peer.NewContext(context.Background(), &peer.Peer{})
}
// SendRequest sends an envelope to `deliver` API synchronously
func (bc *DeliverClient) SendRequest(request *cb.Envelope) {
// TODO make this async
bc.requestChan <- request
}
// GetResponse waits for a response of `deliver` API synchronously
func (bc *DeliverClient) GetResponse() *ab.DeliverResponse {
return <-bc.ResponseChan
}
// Close closes a deliver client
func (bc *DeliverClient) Close() {
close(bc.requestChan)
}
// Send implements AtomicBroadcast_BroadcastServer interface
func (bc *DeliverClient) Send(br *ab.DeliverResponse) error {
bc.ResponseChan <- br
return nil
}
// Recv implements AtomicBroadcast_BroadcastServer interface
func (bc *DeliverClient) Recv() (*cb.Envelope, error) {
msg, ok := <-bc.requestChan
if !ok {
return msg, io.EOF
}
return msg, nil
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package performance
import (
"fmt"
"math/rand"
"sync"
"time"
"github.com/hyperledger/fabric/common/localmsp"
"github.com/hyperledger/fabric/common/tools/configtxgen/encoder"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
cb "github.com/hyperledger/fabric/protos/common"
ab "github.com/hyperledger/fabric/protos/orderer"
protosutils "github.com/hyperledger/fabric/protos/utils"
)
const (
// Kilo allows us to convert byte units to kB.
Kilo = 1024 // TODO Consider adding a unit pkg
)
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz")
var seedOnce sync.Once
// MakeNormalTx creates a properly signed transaction that could be used against `broadcast` API
func MakeNormalTx(channelID string, size int) *cb.Envelope {
env, err := protosutils.CreateSignedEnvelope(
cb.HeaderType_ENDORSER_TRANSACTION,
channelID,
localmsp.NewSigner(),
&cb.Envelope{Payload: make([]byte, size*Kilo)},
0,
0,
)
if err != nil {
panic(fmt.Errorf("Failed to create signed envelope because: %s", err))
}
return env
}
// OrdererExecWithArgs executes func for each orderer in parallel
func OrdererExecWithArgs(f func(s *BenchmarkServer, i ...interface{}), i ...interface{}) {
servers := GetBenchmarkServerPool()
var wg sync.WaitGroup
wg.Add(len(servers))
for _, server := range servers {
go func(server *BenchmarkServer) {
f(server, i...)
wg.Done()
}(server)
}
wg.Wait()
}
// OrdererExec executes func for each orderer in parallel
func OrdererExec(f func(s *BenchmarkServer)) {
servers := GetBenchmarkServerPool()
var wg sync.WaitGroup
wg.Add(len(servers))
for _, server := range servers {
go func(server *BenchmarkServer) {
f(server)
wg.Done()
}(server)
}
wg.Wait()
}
// RandomID generates a random string of num chars
func RandomID(num int) string {
seedOnce.Do(func() { rand.Seed(time.Now().UnixNano()) })
b := make([]rune, num)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
// CreateChannel creates a channel with randomly generated ID of length 10
func CreateChannel(server *BenchmarkServer, channelProfile *genesisconfig.Profile) string {
client := server.CreateBroadcastClient()
defer client.Close()
channelID := RandomID(10)
createChannelTx, err := encoder.MakeChannelCreationTransaction(channelID, localmsp.NewSigner(), channelProfile)
if err != nil {
logger.Panicf("Failed to create channel creation transaction: %s", err)
}
client.SendRequest(createChannelTx)
response := client.GetResponse()
if response.Status != cb.Status_SUCCESS {
logger.Panicf("Failed to create channel: %s -- %v:%s", channelID, response.Status, response.Info)
}
return channelID
}
// WaitForChannels probes a channel till it's ready
func WaitForChannels(server *BenchmarkServer, channelIDs ...interface{}) {
var scoutWG sync.WaitGroup
scoutWG.Add(len(channelIDs))
for _, channelID := range channelIDs {
id, ok := channelID.(string)
if !ok {
panic("Expect a string as channelID")
}
go func(channelID string) {
logger.Infof("Scouting for channel: %s", channelID)
for {
status, err := SeekAllBlocks(server.CreateDeliverClient(), channelID, 0)
if err != nil {
panic(fmt.Errorf("Failed to call deliver because: %s", err))
}
switch status {
case cb.Status_SUCCESS:
logger.Infof("Channel '%s' is ready", channelID)
scoutWG.Done()
return
case cb.Status_SERVICE_UNAVAILABLE:
fallthrough
case cb.Status_NOT_FOUND:
logger.Debugf("Channel '%s' is not ready yet, keep scouting", channelID)
time.Sleep(time.Second)
default:
logger.Fatalf("Unexpected reply status '%s' while scouting for channel %s, exit", status.String(), channelID)
}
}
}(id)
}
scoutWG.Wait()
}
var seekOldest = &ab.SeekPosition{Type: &ab.SeekPosition_Oldest{Oldest: &ab.SeekOldest{}}}
// SeekAllBlocks seeks block from oldest to specified number
func SeekAllBlocks(c *DeliverClient, channelID string, number uint64) (status cb.Status, err error) {
env, err := protosutils.CreateSignedEnvelope(
cb.HeaderType_DELIVER_SEEK_INFO,
channelID,
localmsp.NewSigner(),
&ab.SeekInfo{Start: seekOldest, Stop: seekSpecified(number), Behavior: ab.SeekInfo_BLOCK_UNTIL_READY},
0,
0,
)
if err != nil {
panic(fmt.Errorf("Failed to create signed envelope because: %s", err))
}
c.SendRequest(env)
for {
select {
case reply := <-c.ResponseChan:
if reply.GetBlock() == nil {
status = reply.GetStatus()
c.Close()
}
case err = <-c.ResultChan:
return
}
}
}
func seekSpecified(number uint64) *ab.SeekPosition {
return &ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: number}}}
}
// Copyright IBM Corp. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package server
import (
"fmt"
"io/ioutil"
"os"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"
genesisconfig "github.com/hyperledger/fabric/common/tools/configtxgen/localconfig"
"github.com/hyperledger/fabric/core/config/configtest"
"github.com/hyperledger/fabric/orderer/common/localconfig"
perf "github.com/hyperledger/fabric/orderer/common/performance"
cb "github.com/hyperledger/fabric/protos/common"
"github.com/stretchr/testify/assert"
)
// USAGE
//
// BENCHMARK=true go test -run=TestOrdererBenchmark[Solo|Kafka][Broadcast|Deliver]
//
// You can specify a specific test permutation by specifying the complete subtest
// name corresponding to the permutation you would like to run. e.g:
//
// TestOrdererBenchmark[Solo|Kafka][Broadcast|Deliver]/10ch/10000tx/10kb/10bc/0dc/10ord
//
// (The permutation has to be valid as defined in the source code)
//
// RUNNING KAFKA ORDERER BENCHMARKS
//
// A Kafka cluster is required to run the Kafka-based benchmark. The benchmark
// expects to find a seed broker is listening on localhost:9092.
//
// A suitable Kafka cluster is provided as a docker compose application defined
// in the docker-compose.yml file provided with this package. To run the Kafka
// benchmarks with the provided Kafaka cluster:
//
// From this package's directory, first run:
//
// docker-compose up -d
//
// Then execute:
//
// BENCHMARK=true go test -run TestOrdererBenchmarkKafkaBroadcast
//
// If you are not using the Kafka cluster provided in the docker-compose.yml,
// the list of seed brokers can be adjusted by setting the value of
// x_ORDERERS_KAFKA_BROKERS in the `envvars` map below.
//
// DESCRIPTION
//
// Benchmark test makes [ch] channels, creates [bc] clients per channel per orderer. There
// are [ord] orderer instances in total. A client ONLY interacts with ONE channel and ONE
// orderer, so the number of client in total is [ch * bc * ord]. Note that all clients
// execute concurrently.
//
// The test sends [tx] transactions of size [kb] in total. These tx are evenly distributed
// among all clients, which gives us [tx / (ch * bc * ord)] tx per client.
//
// For example, given following configuration:
//
// channels [ch]: 4 (CH0, CH1, CH2, CH3)
// broadcast clients [bc]: 5
// orderers [ord]: 2 (ORD0, ORD1)
// transactions [tx]: 200
//
// We will spawn 4 * 5 * 2 = 40 simultaneous broadcast clients in total, 20 clients per
// orderer. For each orderer, there will be 5 clients per channel. Each client will send
// 200 / 40 = 5 transactions. Pseudo-code would be:
//
// for each one of [ord] orderers:
// for each one of [ch] channels:
// for each one of [bc] clients:
// go send [tx / (ch * bc * ord)] tx
//
// Additionally, [dc] deliver clients per channel per orderer seeks all the blocks in
// that channel. It would 'predict' the last block number, and seek from the oldest to
// that. In this manner, we could reliably assert that all transactions we send are
// ordered. This is important for evaluating elapsed time of async broadcast operations.
//
// Again, each deliver client only interacts with one channel and one orderer, which
// results in [ch * dc * ord] deliver clients in total.
//
// ch -> channelCounts
// bc -> broadcastClientPerChannel
// tx -> totalTx
// kb -> messagesSizes
// ord -> numOfOrderer
// dc -> deliverClientPerChannel
//
// If `multiplex` is true, broadcast and deliver are running simultaneously. Otherwise,
// broadcast is run before deliver. This is useful when testing deliver performance only,
// as deliver is effectively retrieving pre-generated blocks, so it shouldn't be choked
// by slower broadcast.
//
const (
MaxMessageCount = 10
// This is the hard limit for all types of tx, including config tx, which is normally
// larger than 13 KB. Therefore, for config tx not to be rejected, this value cannot
// be less than 13 KB.
AbsoluteMaxBytes = 16 // KB
PreferredMaxBytes = 10 // KB
ChannelProfile = genesisconfig.SampleSingleMSPChannelProfile
)
var envvars = map[string]string{
"FABRIC_LOGGING_SPEC": "error",
"ORDERER_GENERAL_GENESISPROFILE": genesisconfig.SampleDevModeSoloProfile,
"ORDERER_GENERAL_LEDGERTYPE": "file",
"ORDERER_KAFKA_VERBOSE": "false",
genesisconfig.Prefix + "_ORDERER_BATCHSIZE_MAXMESSAGECOUNT": strconv.Itoa(MaxMessageCount),
genesisconfig.Prefix + "_ORDERER_BATCHSIZE_ABSOLUTEMAXBYTES": strconv.Itoa(AbsoluteMaxBytes) + " KB",
genesisconfig.Prefix + "_ORDERER_BATCHSIZE_PREFERREDMAXBYTES": strconv.Itoa(PreferredMaxBytes) + " KB",
genesisconfig.Prefix + "_ORDERER_KAFKA_BROKERS": "[localhost:9092]",
}
type factors struct {
numOfChannels int // number of channels
totalTx int // total number of messages
messageSize int // message size in KB
broadcastClientPerChannel int // concurrent broadcast clients
deliverClientPerChannel int // concurrent deliver clients
numOfOrderer int // number of orderer instances (Kafka ONLY)
}
// This is to give each test run a better name. The output
// would be something like '4ch/100tx/5kb/100bc/100dc/5ord'
func (f factors) String() string {
return fmt.Sprintf(
"%dch/%dtx/%dkb/%dbc/%ddc/%dord",
f.numOfChannels,
f.totalTx,
f.messageSize,
f.broadcastClientPerChannel,
f.deliverClientPerChannel,
f.numOfOrderer,
)
}
// As benchmark tests are skipped by default, we put this test here to catch
// potential code changes that might break benchmark tests. If this test fails,
// it is likely that benchmark tests need to be updated.
func TestOrdererBenchmarkSolo(t *testing.T) {
for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
}
t.Run("Benchmark Sample Test (Solo)", func(t *testing.T) {
benchmarkOrderer(t, 1, 5, PreferredMaxBytes, 1, 0, 1, true)
})
}
// Benchmark broadcast API in Solo mode
func TestOrdererBenchmarkSoloBroadcast(t *testing.T) {
if os.Getenv("BENCHMARK") == "" {
t.Skip("Skipping benchmark test")
}
for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
}
var (
channelCounts = []int{1, 10, 50}
totalTx = []int{10000}
messagesSizes = []int{1, 2, 10}
broadcastClientPerChannel = []int{1, 10, 50}
deliverClientPerChannel = []int{0} // We are not interested in deliver performance here
numOfOrderer = []int{1}
args = [][]int{
channelCounts,
totalTx,
messagesSizes,
broadcastClientPerChannel,
deliverClientPerChannel,
numOfOrderer,
}
)
for factors := range combinations(args) {
t.Run(factors.String(), func(t *testing.T) {
benchmarkOrderer(
t,
factors.numOfChannels,
factors.totalTx,
factors.messageSize,
factors.broadcastClientPerChannel,
factors.deliverClientPerChannel,
1, // For solo orderer, we should always have exactly one instance
true,
)
})
}
}
// Benchmark deliver API in Solo mode
func TestOrdererBenchmarkSoloDeliver(t *testing.T) {
if os.Getenv("BENCHMARK") == "" {
t.Skip("Skipping benchmark test")
}
for key, value := range envvars {
os.Setenv(key, value)
defer os.Unsetenv(key)
}