63 Star 183 Fork 3

Gitee 极速下载/hyperledger-fabric

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
此仓库是为了提升国内下载速度的镜像仓库,每日同步一次。 原始仓库: https://github.com/hyperledger/fabric
克隆/下载
dockercontroller.go 15.62 KB
一键复制 编辑 原始数据 按行查看 历史
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package dockercontroller
import (
"archive/tar"
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/hex"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"time"
docker "github.com/fsouza/go-dockerclient"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/metrics"
"github.com/hyperledger/fabric/common/util"
"github.com/hyperledger/fabric/core/container"
"github.com/hyperledger/fabric/core/container/ccintf"
cutil "github.com/hyperledger/fabric/core/container/util"
"github.com/pkg/errors"
"github.com/spf13/viper"
)
// ContainerType is the string which the docker container type
// is registered with the container.VMController
const ContainerType = "DOCKER"
var (
dockerLogger = flogging.MustGetLogger("dockercontroller")
hostConfig *docker.HostConfig
vmRegExp = regexp.MustCompile("[^a-zA-Z0-9-_.]")
imageRegExp = regexp.MustCompile("^[a-z0-9]+(([._-][a-z0-9]+)+)?$")
)
// getClient returns an instance that implements dockerClient interface
type getClient func() (dockerClient, error)
// DockerVM is a vm. It is identified by an image id
type DockerVM struct {
getClientFnc getClient
PeerID string
NetworkID string
BuildMetrics *BuildMetrics
}
// dockerClient represents a docker client
type dockerClient interface {
// CreateContainer creates a docker container, returns an error in case of failure
CreateContainer(opts docker.CreateContainerOptions) (*docker.Container, error)
// UploadToContainer uploads a tar archive to be extracted to a path in the
// filesystem of the container.
UploadToContainer(id string, opts docker.UploadToContainerOptions) error
// StartContainer starts a docker container, returns an error in case of failure
StartContainer(id string, cfg *docker.HostConfig) error
// AttachToContainer attaches to a docker container, returns an error in case of
// failure
AttachToContainer(opts docker.AttachToContainerOptions) error
// BuildImage builds an image from a tarball's url or a Dockerfile in the input
// stream, returns an error in case of failure
BuildImage(opts docker.BuildImageOptions) error
// RemoveImageExtended removes a docker image by its name or ID, returns an
// error in case of failure
RemoveImageExtended(id string, opts docker.RemoveImageOptions) error
// StopContainer stops a docker container, killing it after the given timeout
// (in seconds). Returns an error in case of failure
StopContainer(id string, timeout uint) error
// KillContainer sends a signal to a docker container, returns an error in
// case of failure
KillContainer(opts docker.KillContainerOptions) error
// RemoveContainer removes a docker container, returns an error in case of failure
RemoveContainer(opts docker.RemoveContainerOptions) error
// PingWithContext pings the docker daemon. The context object can be used
// to cancel the ping request.
PingWithContext(context.Context) error
// WaitContainer blocks until the given container stops, and returns the exit
// code of the container status.
WaitContainer(containerID string) (int, error)
}
// Provider implements container.VMProvider
type Provider struct {
PeerID string
NetworkID string
BuildMetrics *BuildMetrics
}
// NewProvider creates a new instance of Provider
func NewProvider(peerID, networkID string, metricsProvider metrics.Provider) *Provider {
return &Provider{
PeerID: peerID,
NetworkID: networkID,
BuildMetrics: NewBuildMetrics(metricsProvider),
}
}
// NewVM creates a new DockerVM instance
func (p *Provider) NewVM() container.VM {
return NewDockerVM(p.PeerID, p.NetworkID, p.BuildMetrics)
}
// NewDockerVM returns a new DockerVM instance
func NewDockerVM(peerID, networkID string, buildMetrics *BuildMetrics) *DockerVM {
return &DockerVM{
PeerID: peerID,
NetworkID: networkID,
getClientFnc: getDockerClient,
BuildMetrics: buildMetrics,
}
}
func getDockerClient() (dockerClient, error) {
return cutil.NewDockerClient()
}
func getDockerHostConfig() *docker.HostConfig {
if hostConfig != nil {
return hostConfig
}
dockerKey := func(key string) string { return "vm.docker.hostConfig." + key }
getInt64 := func(key string) int64 { return int64(viper.GetInt(dockerKey(key))) }
var logConfig docker.LogConfig
err := viper.UnmarshalKey(dockerKey("LogConfig"), &logConfig)
if err != nil {
dockerLogger.Warningf("load docker HostConfig.LogConfig failed, error: %s", err.Error())
}
networkMode := viper.GetString(dockerKey("NetworkMode"))
if networkMode == "" {
networkMode = "host"
}
dockerLogger.Debugf("docker container hostconfig NetworkMode: %s", networkMode)
return &docker.HostConfig{
CapAdd: viper.GetStringSlice(dockerKey("CapAdd")),
CapDrop: viper.GetStringSlice(dockerKey("CapDrop")),
DNS: viper.GetStringSlice(dockerKey("Dns")),
DNSSearch: viper.GetStringSlice(dockerKey("DnsSearch")),
ExtraHosts: viper.GetStringSlice(dockerKey("ExtraHosts")),
NetworkMode: networkMode,
IpcMode: viper.GetString(dockerKey("IpcMode")),
PidMode: viper.GetString(dockerKey("PidMode")),
UTSMode: viper.GetString(dockerKey("UTSMode")),
LogConfig: logConfig,
ReadonlyRootfs: viper.GetBool(dockerKey("ReadonlyRootfs")),
SecurityOpt: viper.GetStringSlice(dockerKey("SecurityOpt")),
CgroupParent: viper.GetString(dockerKey("CgroupParent")),
Memory: getInt64("Memory"),
MemorySwap: getInt64("MemorySwap"),
MemorySwappiness: getInt64("MemorySwappiness"),
OOMKillDisable: viper.GetBool(dockerKey("OomKillDisable")),
CPUShares: getInt64("CpuShares"),
CPUSet: viper.GetString(dockerKey("Cpuset")),
CPUSetCPUs: viper.GetString(dockerKey("CpusetCPUs")),
CPUSetMEMs: viper.GetString(dockerKey("CpusetMEMs")),
CPUQuota: getInt64("CpuQuota"),
CPUPeriod: getInt64("CpuPeriod"),
BlkioWeight: getInt64("BlkioWeight"),
}
}
func (vm *DockerVM) createContainer(client dockerClient, imageID, containerID string, args, env []string, attachStdout bool) error {
logger := dockerLogger.With("imageID", imageID, "containerID", containerID)
logger.Debugw("create container")
_, err := client.CreateContainer(docker.CreateContainerOptions{
Name: containerID,
Config: &docker.Config{
Cmd: args,
Image: imageID,
Env: env,
AttachStdout: attachStdout,
AttachStderr: attachStdout,
},
HostConfig: getDockerHostConfig(),
})
if err != nil {
return err
}
logger.Debugw("created container")
return nil
}
func (vm *DockerVM) deployImage(client dockerClient, ccid ccintf.CCID, reader io.Reader) error {
id, err := vm.GetVMNameForDocker(ccid)
if err != nil {
return err
}
outputbuf := bytes.NewBuffer(nil)
opts := docker.BuildImageOptions{
Name: id,
Pull: viper.GetBool("chaincode.pull"),
InputStream: reader,
OutputStream: outputbuf,
}
startTime := time.Now()
err = client.BuildImage(opts)
vm.BuildMetrics.ChaincodeImageBuildDuration.With(
"chaincode", ccid.Name+":"+ccid.Version,
"success", strconv.FormatBool(err == nil),
).Observe(time.Since(startTime).Seconds())
if err != nil {
dockerLogger.Errorf("Error building image: %s", err)
dockerLogger.Errorf("Build Output:\n********************\n%s\n********************", outputbuf.String())
return err
}
dockerLogger.Debugf("Created image: %s", id)
return nil
}
// Start starts a container using a previously created docker image
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
imageName, err := vm.GetVMNameForDocker(ccid)
if err != nil {
return err
}
attachStdout := viper.GetBool("vm.docker.attachStdout")
containerName := vm.GetVMName(ccid)
logger := dockerLogger.With("imageName", imageName, "containerName", containerName)
client, err := vm.getClientFnc()
if err != nil {
logger.Debugf("failed to get docker client", "error", err)
return err
}
vm.stopInternal(client, containerName, 0, false, false)
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err == docker.ErrNoSuchImage {
reader, err := builder.Build()
if err != nil {
return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName)
}
err = vm.deployImage(client, ccid, reader)
if err != nil {
return err
}
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err != nil {
logger.Errorf("failed to create container: %s", err)
return err
}
} else if err != nil {
logger.Errorf("create container failed: %s", err)
return err
}
// stream stdout and stderr to chaincode logger
if attachStdout {
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
streamOutput(dockerLogger, client, containerName, containerLogger)
}
// upload specified files to the container before starting it
// this can be used for configurations such as TLS key and certs
if len(filesToUpload) != 0 {
// the docker upload API takes a tar file, so we need to first
// consolidate the file entries to a tar
payload := bytes.NewBuffer(nil)
gw := gzip.NewWriter(payload)
tw := tar.NewWriter(gw)
for path, fileToUpload := range filesToUpload {
cutil.WriteBytesToPackage(path, fileToUpload, tw)
}
// Write the tar file out
if err := tw.Close(); err != nil {
return fmt.Errorf("Error writing files to upload to Docker instance into a temporary tar blob: %s", err)
}
gw.Close()
err := client.UploadToContainer(containerName, docker.UploadToContainerOptions{
InputStream: bytes.NewReader(payload.Bytes()),
Path: "/",
NoOverwriteDirNonDir: false,
})
if err != nil {
return fmt.Errorf("Error uploading files to the container instance %s: %s", containerName, err)
}
}
// start container with HostConfig was deprecated since v1.10 and removed in v1.2
err = client.StartContainer(containerName, nil)
if err != nil {
dockerLogger.Errorf("start-could not start container: %s", err)
return err
}
dockerLogger.Debugf("Started container %s", containerName)
return nil
}
// streamOutput mirrors output from the named container to a fabric logger.
func streamOutput(logger *flogging.FabricLogger, client dockerClient, containerName string, containerLogger *flogging.FabricLogger) {
// Launch a few go routines to manage output streams from the container.
// They will be automatically destroyed when the container exits
attached := make(chan struct{})
r, w := io.Pipe()
go func() {
// AttachToContainer will fire off a message on the "attached" channel once the
// attachment completes, and then block until the container is terminated.
// The returned error is not used outside the scope of this function. Assign the
// error to a local variable to prevent clobbering the function variable 'err'.
err := client.AttachToContainer(docker.AttachToContainerOptions{
Container: containerName,
OutputStream: w,
ErrorStream: w,
Logs: true,
Stdout: true,
Stderr: true,
Stream: true,
Success: attached,
})
// If we get here, the container has terminated. Send a signal on the pipe
// so that downstream may clean up appropriately
_ = w.CloseWithError(err)
}()
go func() {
defer r.Close() // ensure the pipe reader gets closed
// Block here until the attachment completes or we timeout
select {
case <-attached: // successful attach
close(attached) // close indicates the streams can now be copied
case <-time.After(10 * time.Second):
logger.Errorf("Timeout while attaching to IO channel in container %s", containerName)
return
}
is := bufio.NewReader(r)
for {
// Loop forever dumping lines of text into the containerLogger
// until the pipe is closed
line, err := is.ReadString('\n')
switch err {
case nil:
containerLogger.Info(line)
case io.EOF:
logger.Infof("Container %s has closed its IO channel", containerName)
return
default:
logger.Errorf("Error reading container output: %s", err)
return
}
}
}()
}
// Stop stops a running chaincode
func (vm *DockerVM) Stop(ccid ccintf.CCID, timeout uint, dontkill bool, dontremove bool) error {
client, err := vm.getClientFnc()
if err != nil {
dockerLogger.Debugf("stop - cannot create client %s", err)
return err
}
id := vm.ccidToContainerID(ccid)
return vm.stopInternal(client, id, timeout, dontkill, dontremove)
}
// Wait blocks until the container stops and returns the exit code of the container.
func (vm *DockerVM) Wait(ccid ccintf.CCID) (int, error) {
client, err := vm.getClientFnc()
if err != nil {
dockerLogger.Debugf("stop - cannot create client %s", err)
return 0, err
}
id := vm.ccidToContainerID(ccid)
return client.WaitContainer(id)
}
func (vm *DockerVM) ccidToContainerID(ccid ccintf.CCID) string {
return strings.Replace(vm.GetVMName(ccid), ":", "_", -1)
}
// HealthCheck checks if the DockerVM is able to communicate with the Docker
// daemon.
func (vm *DockerVM) HealthCheck(ctx context.Context) error {
client, err := vm.getClientFnc()
if err != nil {
return errors.Wrap(err, "failed to connect to Docker daemon")
}
if err := client.PingWithContext(ctx); err != nil {
return errors.Wrap(err, "failed to ping to Docker daemon")
}
return nil
}
func (vm *DockerVM) stopInternal(client dockerClient, id string, timeout uint, dontkill, dontremove bool) error {
logger := dockerLogger.With("id", id)
logger.Debugw("stopping container")
err := client.StopContainer(id, timeout)
dockerLogger.Debugw("stop container result", "error", err)
if !dontkill {
logger.Debugw("killing container")
err = client.KillContainer(docker.KillContainerOptions{ID: id})
logger.Debugw("kill container result", "error", err)
}
if !dontremove {
logger.Debugw("removing container")
err = client.RemoveContainer(docker.RemoveContainerOptions{ID: id, Force: true})
logger.Debugw("remove container result", "error", err)
}
return err
}
// GetVMName generates the VM name from peer information. It accepts a format
// function parameter to allow different formatting based on the desired use of
// the name.
func (vm *DockerVM) GetVMName(ccid ccintf.CCID) string {
// replace any invalid characters with "-" (either in network id, peer id, or in the
// entire name returned by any format function)
return vmRegExp.ReplaceAllString(vm.preFormatImageName(ccid), "-")
}
// GetVMNameForDocker formats the docker image from peer information. This is
// needed to keep image (repository) names unique in a single host, multi-peer
// environment (such as a development environment). It computes the hash for the
// supplied image name and then appends it to the lowercase image name to ensure
// uniqueness.
func (vm *DockerVM) GetVMNameForDocker(ccid ccintf.CCID) (string, error) {
name := vm.preFormatImageName(ccid)
hash := hex.EncodeToString(util.ComputeSHA256([]byte(name)))
saniName := vmRegExp.ReplaceAllString(name, "-")
imageName := strings.ToLower(fmt.Sprintf("%s-%s", saniName, hash))
// Check that name complies with Docker's repository naming rules
if !imageRegExp.MatchString(imageName) {
dockerLogger.Errorf("Error constructing Docker VM Name. '%s' breaks Docker's repository naming rules", name)
return "", fmt.Errorf("Error constructing Docker VM Name. '%s' breaks Docker's repository naming rules", imageName)
}
return imageName, nil
}
func (vm *DockerVM) preFormatImageName(ccid ccintf.CCID) string {
name := ccid.GetName()
if vm.NetworkID != "" && vm.PeerID != "" {
name = fmt.Sprintf("%s-%s-%s", vm.NetworkID, vm.PeerID, name)
} else if vm.NetworkID != "" {
name = fmt.Sprintf("%s-%s", vm.NetworkID, name)
} else if vm.PeerID != "" {
name = fmt.Sprintf("%s-%s", vm.PeerID, name)
}
return name
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/mirrors/hyperledger-fabric.git
git@gitee.com:mirrors/hyperledger-fabric.git
mirrors
hyperledger-fabric
hyperledger-fabric
v1.4.3

搜索帮助

0d507c66 1850385 C8b1a773 1850385