Commit db46110c authored by Saad Karim's avatar Saad Karim Committed by Matthew Sykes
Browse files

FAB-12372 Collect info on go routines



Collect information about all of the currently executing go routines
within a process by handling SIGUSR1. SIGUSR1 will not terminate the
process.

Change-Id: Ide5a26a8f1ee9bae93d3c6ba4b0b178661cbfd6c
Signed-off-by: default avatarSaad Karim <skarim@us.ibm.com>
Signed-off-by: default avatarMatthew Sykes <sykesmat@us.ibm.com>
parent ac63a6f5
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package diag
import (
"bytes"
"runtime/pprof"
)
type Logger interface {
Infof(template string, args ...interface{})
Errorf(template string, args ...interface{})
}
func CaptureGoRoutines() (string, error) {
var buf bytes.Buffer
err := pprof.Lookup("goroutine").WriteTo(&buf, 2)
if err != nil {
return "", err
}
return buf.String(), nil
}
func LogGoRoutines(logger Logger) {
output, err := CaptureGoRoutines()
if err != nil {
logger.Errorf("failed to capture go routines: %s", err)
return
}
logger.Infof("Go routines report:\n%s", output)
}
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package diag_test
import (
"testing"
"github.com/hyperledger/fabric/common/diag"
"github.com/hyperledger/fabric/common/flogging/floggingtest"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)
func TestCaptureGoRoutines(t *testing.T) {
gt := NewGomegaWithT(t)
output, err := diag.CaptureGoRoutines()
gt.Expect(err).NotTo(HaveOccurred())
gt.Expect(output).To(MatchRegexp(`goroutine \d+ \[running\]:`))
gt.Expect(output).To(ContainSubstring("github.com/hyperledger/fabric/common/diag.CaptureGoRoutines"))
}
func TestLogGoRoutines(t *testing.T) {
gt := NewGomegaWithT(t)
logger, recorder := floggingtest.NewTestLogger(t, floggingtest.Named("goroutine"))
diag.LogGoRoutines(logger)
gt.Expect(recorder).To(gbytes.Say(`goroutine \d+ \[running\]:`))
}
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package e2e
import (
"io/ioutil"
"os"
"syscall"
docker "github.com/fsouza/go-dockerclient"
"github.com/hyperledger/fabric/integration/nwo"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/ginkgomon"
)
var _ = Describe("SignalHandling", func() {
var (
testDir string
client *docker.Client
network *nwo.Network
peerRunner, ordererRunner *ginkgomon.Runner
peerProcess, ordererProcess ifrit.Process
)
BeforeEach(func() {
var err error
testDir, err = ioutil.TempDir("", "e2e-sigs")
Expect(err).NotTo(HaveOccurred())
client, err = docker.NewClientFromEnv()
Expect(err).NotTo(HaveOccurred())
network = nwo.New(nwo.BasicSolo(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
network.Bootstrap()
ordererRunner = network.OrdererRunner(network.Orderers[0])
ordererProcess = ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())
peerRunner = network.PeerRunner(network.Peers[0])
peerProcess = ifrit.Invoke(peerRunner)
Eventually(peerProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())
})
AfterEach(func() {
if peerProcess != nil {
peerProcess.Signal(syscall.SIGKILL)
}
if ordererProcess != nil {
ordererProcess.Signal(syscall.SIGKILL)
}
if network != nil {
network.Cleanup()
}
os.RemoveAll(testDir)
})
It("handles signals", func() {
By("verifying SIGUSR1 to the peer dumps go routines")
peerProcess.Signal(syscall.SIGUSR1)
Eventually(peerRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(peerRunner.Err()).Should(gbytes.Say(`Go routines report`))
By("verifying SIGUSR1 to the orderer dumps go routines")
ordererProcess.Signal(syscall.SIGUSR1)
Eventually(ordererRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(ordererRunner.Err()).Should(gbytes.Say(`Go routines report`))
By("verifying SIGUSR1 does not terminate processes")
Consistently(peerProcess.Wait()).ShouldNot(Receive())
Consistently(ordererProcess.Wait()).ShouldNot(Receive())
By("verifying SIGTERM to the peer stops the process")
peerProcess.Signal(syscall.SIGTERM)
Eventually(peerRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(peerProcess.Wait()).Should(Receive())
peerProcess = nil
By("verifying SIGTERM to the orderer stops the process")
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(ordererProcess.Wait()).Should(Receive())
ordererProcess = nil
})
})
......@@ -14,11 +14,14 @@ import (
"net/http"
_ "net/http/pprof" // This is essentially the main package for the orderer
"os"
"os/signal"
"syscall"
"time"
"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/diag"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
......@@ -129,6 +132,10 @@ func Start(cmd string, conf *localconfig.TopLevel) {
switch cmd {
case start.FullCommand(): // "start" command
logger.Infof("Starting %s", metadata.GetVersionInfo())
go handleSignals(map[os.Signal]func(){
syscall.SIGTERM: func() { grpcServer.Stop() },
syscall.SIGUSR1: func() { diag.LogGoRoutines(logger.Named("diag")) },
})
initializeProfilingService(conf)
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
logger.Info("Beginning to serve requests")
......@@ -162,6 +169,21 @@ func initializeProfilingService(conf *localconfig.TopLevel) {
}
}
func handleSignals(handlers map[os.Signal]func()) {
var signals []os.Signal
for sig := range handlers {
signals = append(signals, sig)
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, signals...)
for sig := range signalChan {
logger.Infof("Received signal: %d (%s)", sig, sig)
handlers[sig]()
}
}
func initializeClusterConfig(conf *localconfig.TopLevel) comm.ClientConfig {
cc := comm.ClientConfig{
AsyncConnect: true,
......
......@@ -20,6 +20,7 @@ import (
ccdef "github.com/hyperledger/fabric/common/chaincode"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/diag"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
......@@ -357,14 +358,6 @@ func serve(args []string) error {
// genesis block if needed.
serve := make(chan error)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logger.Debugf("sig: %s", sig)
serve <- nil
}()
go func() {
var grpcErr error
if grpcErr = peerServer.Start(); grpcErr != nil {
......@@ -385,12 +378,33 @@ func serve(args []string) error {
}()
}
go handleSignals(map[os.Signal]func(){
syscall.SIGUSR1: func() { diag.LogGoRoutines(logger.Named("diag")) },
syscall.SIGINT: func() { serve <- nil },
syscall.SIGTERM: func() { serve <- nil },
})
logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
// Block until grpc server exits
return <-serve
}
func handleSignals(handlers map[os.Signal]func()) {
var signals []os.Signal
for sig := range handlers {
signals = append(signals, sig)
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, signals...)
for sig := range signalChan {
logger.Infof("Received signal: %d (%s)", sig, sig)
handlers[sig]()
}
}
func localPolicy(policyObject proto.Message) policies.Policy {
localMSP := mgmt.GetLocalMSP()
pp := cauthdsl.NewPolicyProvider(localMSP)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment