1 Star 0 Fork 2

who7708/etcd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
stresser.go 5.28 KB
一键复制 编辑 原始数据 按行查看 历史
Gyu-Ho Lee 提交于 2016-06-20 19:39 . *: use capnslog for grpclog
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"fmt"
"math/rand"
"net"
"net/http"
"sync"
"time"
clientV2 "github.com/coreos/etcd/client"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/transport"
)
func init() {
grpclog.SetLogger(plog)
}
type Stresser interface {
// Stress starts to stress the etcd cluster
Stress() error
// Cancel cancels the stress test on the etcd cluster
Cancel()
// Report reports the success and failure of the stress test
Report() (success int, failure int)
}
type stresser struct {
Endpoint string
KeySize int
KeySuffixRange int
N int
mu sync.Mutex
wg *sync.WaitGroup
cancel func()
conn *grpc.ClientConn
success int
}
func (s *stresser) Stress() error {
// TODO: add backoff option
conn, err := grpc.Dial(s.Endpoint, grpc.WithInsecure())
if err != nil {
return fmt.Errorf("%v (%s)", err, s.Endpoint)
}
defer conn.Close()
ctx, cancel := context.WithCancel(context.Background())
wg := &sync.WaitGroup{}
wg.Add(s.N)
s.mu.Lock()
s.conn = conn
s.cancel = cancel
s.wg = wg
s.mu.Unlock()
kvc := pb.NewKVClient(conn)
for i := 0; i < s.N; i++ {
go func(i int) {
defer wg.Done()
for {
// TODO: 10-second is enough timeout to cover leader failure
// and immediate leader election. Find out what other cases this
// could be timed out.
putctx, putcancel := context.WithTimeout(ctx, 10*time.Second)
_, err := kvc.Put(putctx, &pb.PutRequest{
Key: []byte(fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))),
Value: []byte(randStr(s.KeySize)),
})
putcancel()
if err != nil {
shouldContinue := false
switch grpc.ErrorDesc(err) {
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
shouldContinue = true
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
shouldContinue = true
case transport.ErrConnClosing.Desc:
// server closed the transport (failure injected node)
shouldContinue = true
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
shouldContinue = true
// default:
// errors from stresser.Cancel method:
// rpc error: code = 1 desc = context canceled (type grpc.rpcError)
// rpc error: code = 2 desc = grpc: the client connection is closing (type grpc.rpcError)
}
if shouldContinue {
continue
}
return
}
s.mu.Lock()
s.success++
s.mu.Unlock()
}
}(i)
}
<-ctx.Done()
return nil
}
func (s *stresser) Cancel() {
s.mu.Lock()
cancel, conn, wg := s.cancel, s.conn, s.wg
s.mu.Unlock()
cancel()
wg.Wait()
conn.Close()
}
func (s *stresser) Report() (int, int) {
s.mu.Lock()
defer s.mu.Unlock()
// TODO: find a better way to report v3 tests
return s.success, -1
}
type stresserV2 struct {
Endpoint string
KeySize int
KeySuffixRange int
N int
mu sync.Mutex
failure int
success int
cancel func()
}
func (s *stresserV2) Stress() error {
cfg := clientV2.Config{
Endpoints: []string{s.Endpoint},
Transport: &http.Transport{
Dial: (&net.Dialer{
Timeout: time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
MaxIdleConnsPerHost: s.N,
},
}
c, err := clientV2.New(cfg)
if err != nil {
return err
}
kv := clientV2.NewKeysAPI(c)
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
for i := 0; i < s.N; i++ {
go func() {
for {
setctx, setcancel := context.WithTimeout(ctx, clientV2.DefaultRequestTimeout)
key := fmt.Sprintf("foo%d", rand.Intn(s.KeySuffixRange))
_, err := kv.Set(setctx, key, randStr(s.KeySize), nil)
setcancel()
if err == context.Canceled {
return
}
s.mu.Lock()
if err != nil {
s.failure++
} else {
s.success++
}
s.mu.Unlock()
}
}()
}
<-ctx.Done()
return nil
}
func (s *stresserV2) Cancel() {
s.cancel()
}
func (s *stresserV2) Report() (success int, failure int) {
s.mu.Lock()
defer s.mu.Unlock()
return s.success, s.failure
}
func randStr(size int) string {
data := make([]byte, size)
for i := 0; i < size; i++ {
data[i] = byte(int('a') + rand.Intn(26))
}
return string(data)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/who7708/etcd.git
git@gitee.com:who7708/etcd.git
who7708
etcd
etcd
v3.0.11

搜索帮助