Commit 7400cc17 authored by Latitia M Haskins's avatar Latitia M Haskins Committed by Gari Singh
Browse files

[FAB-9227] Add Kafka Runner



This CR adds the Kafka runner for use with the integration
tests.

Change-Id: I358c12606b413021d317d8732b0c97f50413cb39
Signed-off-by: default avatarLatitia M Haskins <latitia.haskins@gmail.com>
parent d13d614b
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package runner
import (
"context"
"fmt"
"io"
"net"
"os"
"strconv"
"sync"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/pkg/errors"
"github.com/tedsuo/ifrit"
)
const KafkaDefaultImage = "hyperledger/fabric-kafka:latest"
// Kafka manages the execution of an instance of a dockerized CounchDB
// for tests.
type Kafka struct {
Client *docker.Client
Image string
HostIP string
HostPort int
ContainerPort docker.Port
Name string
StartTimeout time.Duration
MessageMaxBytes int
ReplicaFetchMaxBytes int
UncleanLeaderElectionEnable bool
DefaultReplicationFactor int
MinInsyncReplicas int
BrokerID int
ZookeeperConnect string
ReplicaFetchResponseMaxBytes int
AdvertisedListeners string
LogLevel string
ErrorStream io.Writer
OutputStream io.Writer
NetworkID string
NetworkName string
ContainerID string
HostAddress string
ContainerAddress string
Address string
mutex sync.Mutex
stopped bool
}
// Run runs a Kafka container. It implements the ifrit.Runner interface
func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
if k.Image == "" {
k.Image = KafkaDefaultImage
}
if k.Name == "" {
k.Name = DefaultNamer()
}
if k.HostIP == "" {
k.HostIP = "127.0.0.1"
}
if k.ContainerPort == docker.Port("") {
k.ContainerPort = docker.Port("9092/tcp")
}
if k.StartTimeout == 0 {
k.StartTimeout = DefaultStartTimeout
}
if k.Client == nil {
client, err := docker.NewClientFromEnv()
if err != nil {
return err
}
k.Client = client
}
if k.DefaultReplicationFactor == 0 {
k.DefaultReplicationFactor = 1
}
if k.MinInsyncReplicas == 0 {
k.MinInsyncReplicas = 1
}
if k.ZookeeperConnect == "" {
k.ZookeeperConnect = "zookeeper:2181/kafka"
}
if k.MessageMaxBytes == 0 {
k.MessageMaxBytes = 1000012
}
if k.ReplicaFetchMaxBytes == 0 {
k.ReplicaFetchMaxBytes = 1048576
}
if k.ReplicaFetchResponseMaxBytes == 0 {
k.ReplicaFetchResponseMaxBytes = 10485760
}
if k.LogLevel == "" {
k.LogLevel = "warn"
}
hostConfig := &docker.HostConfig{
AutoRemove: true,
PortBindings: map[docker.Port][]docker.PortBinding{
k.ContainerPort: []docker.PortBinding{{
HostIP: k.HostIP,
HostPort: strconv.Itoa(k.HostPort),
}},
},
}
config := &docker.Config{
Image: k.Image,
Env: k.setEnv(),
}
networkingConfig := &docker.NetworkingConfig{
EndpointsConfig: map[string]*docker.EndpointConfig{
k.NetworkName: &docker.EndpointConfig{
NetworkID: k.NetworkID,
},
},
}
container, err := k.Client.CreateContainer(
docker.CreateContainerOptions{
Name: k.Name,
Config: config,
HostConfig: hostConfig,
NetworkingConfig: networkingConfig,
})
if err != nil {
return err
}
k.ContainerID = container.ID
err = k.Client.StartContainer(container.ID, nil)
if err != nil {
return err
}
defer k.Stop()
container, err = k.Client.InspectContainer(container.ID)
if err != nil {
return err
}
k.HostAddress = net.JoinHostPort(
container.NetworkSettings.Ports[k.ContainerPort][0].HostIP,
container.NetworkSettings.Ports[k.ContainerPort][0].HostPort,
)
k.ContainerAddress = net.JoinHostPort(
container.NetworkSettings.Networks[k.NetworkName].IPAddress,
k.ContainerPort.Port(),
)
logContext, cancelLogs := context.WithCancel(context.Background())
defer cancelLogs()
go k.streamLogs(logContext)
containerExit := k.wait()
ctx, cancel := context.WithTimeout(context.Background(), k.StartTimeout)
defer cancel()
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "kafka broker in container %s did not start", k.ContainerID)
case <-containerExit:
return errors.New("container exited before ready")
case <-k.ready(ctx, k.ContainerAddress):
k.Address = k.ContainerAddress
case <-k.ready(ctx, k.HostAddress):
k.Address = k.HostAddress
}
cancel()
close(ready)
select {
case err := <-containerExit:
return err
case <-sigCh:
return k.Stop()
}
}
func (k *Kafka) setEnv() []string {
env := []string{
"KAFKA_LOG_RETENTION_MS=-1",
fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d",
k.MessageMaxBytes),
fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d",
k.ReplicaFetchMaxBytes),
fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s",
strconv.FormatBool(k.UncleanLeaderElectionEnable)),
fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d",
k.DefaultReplicationFactor),
fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d",
k.MinInsyncReplicas),
fmt.Sprintf("KAFKA_BROKER_ID=%d",
k.BrokerID),
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s",
k.ZookeeperConnect),
fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d",
k.ReplicaFetchResponseMaxBytes),
fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093",
k.HostPort, k.NetworkName, k.Name),
fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093",
k.NetworkName),
fmt.Sprintf("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,%s:PLAINTEXT",
k.NetworkName),
fmt.Sprintf("KAFKA_INTER_BROKER_LISTENER_NAME=%s",
k.NetworkName),
}
return env
}
func (k *Kafka) ready(ctx context.Context, addr string) <-chan struct{} {
readyCh := make(chan struct{})
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
conn, err := net.DialTimeout("tcp", addr, 50*time.Millisecond)
if err == nil {
conn.Close()
close(readyCh)
return
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}()
return readyCh
}
func (k *Kafka) wait() <-chan error {
exitCh := make(chan error)
go func() {
if _, err := k.Client.WaitContainer(k.ContainerID); err != nil {
exitCh <- err
}
}()
return exitCh
}
func (k *Kafka) streamLogs(ctx context.Context) error {
if k.ErrorStream == nil && k.OutputStream == nil {
return nil
}
logOptions := docker.LogsOptions{
Context: ctx,
Container: k.ContainerID,
ErrorStream: k.ErrorStream,
OutputStream: k.OutputStream,
Stderr: k.ErrorStream != nil,
Stdout: k.OutputStream != nil,
Follow: true,
}
return k.Client.Logs(logOptions)
}
// Start starts the Kafka container using an ifrit runner
func (k *Kafka) Start() error {
p := ifrit.Invoke(k)
select {
case <-p.Ready():
return nil
case err := <-p.Wait():
return err
}
}
// Stop stops and removes the Kafka container
func (k *Kafka) Stop() error {
k.mutex.Lock()
if k.stopped {
k.mutex.Unlock()
return errors.Errorf("container %s already stopped", k.ContainerID)
}
k.stopped = true
k.mutex.Unlock()
return k.Client.StopContainer(k.ContainerID, 0)
}
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package runner_test
import (
"fmt"
"io"
"net"
"syscall"
"time"
"github.com/fsouza/go-dockerclient"
"github.com/hyperledger/fabric/integration/runner"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/tedsuo/ifrit"
)
var _ = Describe("Kafka Runner", func() {
var (
err error
client *docker.Client
network *docker.Network
networkName string
errBuffer *gbytes.Buffer
outBuffer *gbytes.Buffer
kafka *runner.Kafka
zookeeper *runner.Zookeeper
process ifrit.Process
)
BeforeEach(func() {
errBuffer = gbytes.NewBuffer()
outBuffer = gbytes.NewBuffer()
process = nil
client, err = docker.NewClientFromEnv()
Expect(err).NotTo(HaveOccurred())
// Create a network
networkName = runner.UniqueName()
network, err = client.CreateNetwork(
docker.CreateNetworkOptions{
Name: networkName,
Driver: "bridge",
},
)
// Start a zookeeper
zookeeper = &runner.Zookeeper{
Name: "zookeeper0",
ZooMyID: 1,
NetworkID: network.ID,
NetworkName: network.Name,
}
err = zookeeper.Start()
Expect(err).NotTo(HaveOccurred())
kafka = &runner.Kafka{
Name: "kafka1",
ErrorStream: io.MultiWriter(errBuffer, GinkgoWriter),
OutputStream: io.MultiWriter(outBuffer, GinkgoWriter),
ZookeeperConnect: "zookeeper0:2181",
BrokerID: 1,
NetworkID: network.ID,
NetworkName: network.Name,
}
})
AfterEach(func() {
if process != nil {
process.Signal(syscall.SIGTERM)
}
err := zookeeper.Stop()
Expect(err).NotTo(HaveOccurred())
if network != nil {
client.RemoveNetwork(networkName)
}
})
It("starts and stops a docker container with the specified image", func() {
By("starting kafka broker")
process = ifrit.Invoke(kafka)
Eventually(process.Ready(), runner.DefaultStartTimeout).Should(BeClosed())
Consistently(process.Wait()).ShouldNot(Receive())
By("inspecting the container by name")
container, err := client.InspectContainer("kafka1")
Expect(err).NotTo(HaveOccurred())
Expect(container.Name).To(Equal("/kafka1"))
Expect(container.State.Status).To(Equal("running"))
Expect(container.Config).NotTo(BeNil())
Expect(container.Config.Image).To(Equal("hyperledger/fabric-kafka:latest"))
Expect(container.ID).To(Equal(kafka.ContainerID))
portBindings := container.NetworkSettings.Ports[docker.Port("9092/tcp")]
Expect(portBindings).To(HaveLen(1))
Expect(kafka.HostAddress).To(Equal(net.JoinHostPort(portBindings[0].HostIP, portBindings[0].HostPort)))
Expect(kafka.ContainerAddress).To(Equal(net.JoinHostPort(container.NetworkSettings.Networks[kafka.NetworkName].IPAddress, "9092")))
By("getting the container logs")
Eventually(outBuffer, 30*time.Second).Should(gbytes.Say(`\Q[KafkaServer id=1] started (kafka.server.KafkaServer)\E`))
By("accessing the kafka broker")
address := kafka.Address
Expect(address).NotTo(BeEmpty())
By("terminating the container")
process.Signal(syscall.SIGTERM)
Eventually(process.Wait()).Should(Receive(BeNil()))
process = nil
_, err = client.InspectContainer("kafka1")
Expect(err).To(MatchError("No such container: kafka1"))
})
It("can be started and stopped without ifrit", func() {
err := kafka.Start()
Expect(err).NotTo(HaveOccurred())
err = kafka.Stop()
Expect(err).NotTo(HaveOccurred())
})
It("multiples can be started and stopped", func() {
k1 := &runner.Kafka{
Name: "kafka1",
ZookeeperConnect: "zookeeper0:2181",
BrokerID: 1,
MinInsyncReplicas: 2,
DefaultReplicationFactor: 3,
NetworkID: network.ID,
NetworkName: network.Name,
}
err := k1.Start()
Expect(err).NotTo(HaveOccurred())
k2 := &runner.Kafka{
Name: "kafka2",
ZookeeperConnect: "zookeeper0:2181",
BrokerID: 2,
MinInsyncReplicas: 2,
DefaultReplicationFactor: 3,
NetworkID: network.ID,
NetworkName: network.Name,
}
err = k2.Start()
Expect(err).NotTo(HaveOccurred())
k3 := &runner.Kafka{
Name: "kafka3",
ZookeeperConnect: "zookeeper0:2181",
BrokerID: 3,
MinInsyncReplicas: 2,
DefaultReplicationFactor: 3,
NetworkID: network.ID,
NetworkName: network.Name,
}
err = k3.Start()
Expect(err).NotTo(HaveOccurred())
k4 := &runner.Kafka{
Name: "kafka4",
ZookeeperConnect: "zookeeper0:2181",
BrokerID: 4,
MinInsyncReplicas: 2,
DefaultReplicationFactor: 3,
NetworkID: network.ID,
NetworkName: network.Name,
}
err = k4.Start()
Expect(err).NotTo(HaveOccurred())
err = k1.Stop()
Expect(err).NotTo(HaveOccurred())
err = k2.Stop()
Expect(err).NotTo(HaveOccurred())
err = k3.Stop()
Expect(err).NotTo(HaveOccurred())
err = k4.Stop()
Expect(err).NotTo(HaveOccurred())
})
Context("when the container has already been stopped", func() {
It("returns an error", func() {
err := kafka.Start()
Expect(err).NotTo(HaveOccurred())
containerID := kafka.ContainerID
err = kafka.Stop()
Expect(err).NotTo(HaveOccurred())
err = kafka.Stop()
Expect(err).To(MatchError(fmt.Sprintf("container %s already stopped", containerID)))
})
})
Context("when a name isn't provided", func() {
It("generates a unique name", func() {
k1 := &runner.Kafka{}
err := k1.Start()
Expect(err).To(HaveOccurred())
Expect(k1.Name).ShouldNot(BeEmpty())
Expect(k1.Name).To(HaveLen(26))
k2 := &runner.Kafka{}
err = k2.Start()
Expect(err).To(HaveOccurred())
Expect(k2.Name).ShouldNot(BeEmpty())
Expect(k2.Name).To(HaveLen(26))
Expect(k1.Name).NotTo(Equal(k2.Name))
err = k1.Stop()
Expect(err).To(HaveOccurred())
err = k2.Stop()
Expect(err).To(HaveOccurred())
})
})
})
......@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
package world
import (
"fmt"
"os"
"time"
......@@ -77,7 +76,14 @@ func (c *Components) Peer() *runner.Peer {
func (c *Components) Zookeeper(id int, network *docker.Network) *runner.Zookeeper {
return &runner.Zookeeper{
ZooMyID: id,
Name: fmt.Sprintf("zookeeper%d", id),
NetworkID: network.ID,
NetworkName: network.Name,
}
}
func (c *Components) Kafka(id int, network *docker.Network) *runner.Kafka {
return &runner.Kafka{
BrokerID: id,
NetworkID: network.ID,
NetworkName: network.Name,
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment