1 Star 0 Fork 0

7x24/grpc-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
balancer_test.go 24.22 KB
一键复制 编辑 原始数据 按行查看 历史
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795
/*
*
* Copyright 2016 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package grpc
import (
"context"
"fmt"
"math"
"strconv"
"sync"
"testing"
"time"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/grpclog/glogger"
"google.golang.org/grpc/naming"
"google.golang.org/grpc/status"
// V1 balancer tests use passthrough resolver instead of dns.
// TODO(bar) remove this when removing v1 balaner entirely.
_ "google.golang.org/grpc/resolver/passthrough"
)
func pickFirstBalancerV1(r naming.Resolver) Balancer {
return &pickFirst{&roundRobin{r: r}}
}
type testWatcher struct {
// the channel to receives name resolution updates
update chan *naming.Update
// the side channel to get to know how many updates in a batch
side chan int
// the channel to notify update injector that the update reading is done
readDone chan int
}
func (w *testWatcher) Next() (updates []*naming.Update, err error) {
n := <-w.side
if n == 0 {
return nil, fmt.Errorf("w.side is closed")
}
for i := 0; i < n; i++ {
u := <-w.update
if u != nil {
updates = append(updates, u)
}
}
w.readDone <- 0
return
}
func (w *testWatcher) Close() {
close(w.side)
}
// Inject naming resolution updates to the testWatcher.
func (w *testWatcher) inject(updates []*naming.Update) {
w.side <- len(updates)
for _, u := range updates {
w.update <- u
}
<-w.readDone
}
type testNameResolver struct {
w *testWatcher
addr string
}
func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
r.w = &testWatcher{
update: make(chan *naming.Update, 1),
side: make(chan int, 1),
readDone: make(chan int),
}
r.w.side <- 1
r.w.update <- &naming.Update{
Op: naming.Add,
Addr: r.addr,
}
go func() {
<-r.w.readDone
}()
return r.w, nil
}
func startServers(t *testing.T, numServers int, maxStreams uint32) ([]*server, *testNameResolver, func()) {
var servers []*server
for i := 0; i < numServers; i++ {
s := newTestServer()
servers = append(servers, s)
go s.start(t, 0, maxStreams)
s.wait(t, 2*time.Second)
}
// Point to server[0]
addr := "localhost:" + servers[0].port
return servers, &testNameResolver{
addr: addr,
}, func() {
for i := 0; i < numServers; i++ {
servers[i].stop()
}
}
}
func (s) TestNameDiscovery(t *testing.T) {
// Start 2 servers on 2 ports.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
req := "port"
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Inject the name resolution change to remove servers[0] and add servers[1].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
})
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
// Loop until the rpcs in flight talks to servers[1].
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestEmptyAddrs(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
}
// Inject name resolution change to remove the server so that there is no address
// available after that.
u := &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until the above updates apply.
for {
time.Sleep(10 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
cancel()
break
}
cancel()
}
}
func (s) TestRoundRobin(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
// Add server2[2] to the service discovery.
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until both servers[2] are up.
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
break
}
time.Sleep(10 * time.Millisecond)
}
// Check the incoming RPCs served in a round-robin manner.
for i := 0; i < 10; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[i%numServers].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", i, err, servers[i%numServers].port)
}
}
}
func (s) TestCloseWithPendingRPC(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
// Loop until the above update applies.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
time.Sleep(10 * time.Millisecond)
cancel()
}
// Issue 2 RPCs which should be completed with error status once cc is closed.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
go func() {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
time.Sleep(5 * time.Millisecond)
cc.Close()
wg.Wait()
}
func (s) TestGetOnWaitChannel(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Remove all servers so that all upcoming RPCs will block on waitCh.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
for {
var reply string
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
cancel()
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
}
}()
// Add a connected server to get the above RPC through.
updates = []*naming.Update{{
Op: naming.Add,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
// Wait until the above RPC succeeds.
wg.Wait()
}
func (s) TestOneServerDown(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, kill server[0].
servers[0].stop()
wg.Done()
}()
// All non-failfast RPCs should not block because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is killed around the same time to make it racy between balancer and gRPC internals.
cc.Invoke(context.Background(), "/foo/bar", &req, &reply, WaitForReady(true))
wg.Done()
}()
}
wg.Wait()
}
func (s) TestOneAddressRemoval(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
req := "port"
var reply string
// Loop until servers[1] is up
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, delete server[0].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
})
r.w.inject(updates)
wg.Done()
}()
// All non-failfast RPCs should not fail because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
var reply string
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
}
wg.Done()
}()
}
wg.Wait()
}
func checkServerUp(t *testing.T, currentServer *server) {
req := "port"
port := currentServer.port
cc, err := Dial("passthrough:///localhost:"+port, WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == port {
break
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestPickFirstEmptyAddrs(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply); err != nil || reply != expectedResponse {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, reply = %q, want %q, <nil>", err, reply, expectedResponse)
}
// Inject name resolution change to remove the server so that there is no address
// available after that.
u := &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until the above updates apply.
for {
time.Sleep(10 * time.Millisecond)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply); err != nil {
cancel()
break
}
cancel()
}
}
func (s) TestPickFirstCloseWithPendingRPC(t *testing.T) {
servers, r, cleanup := startServers(t, 1, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want %s", err, servers[0].port)
}
// Remove the server.
updates := []*naming.Update{{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}}
r.w.inject(updates)
// Loop until the above update applies.
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
if err := cc.Invoke(ctx, "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); status.Code(err) == codes.DeadlineExceeded {
cancel()
break
}
time.Sleep(10 * time.Millisecond)
cancel()
}
// Issue 2 RPCs which should be completed with error status once cc is closed.
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
var reply string
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
go func() {
defer wg.Done()
var reply string
time.Sleep(5 * time.Millisecond)
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err == nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want not nil", err)
}
}()
time.Sleep(5 * time.Millisecond)
cc.Close()
wg.Wait()
}
func (s) TestPickFirstOrderAllServerUp(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] and [2] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until all 3 servers are up
checkServerUp(t, servers[0])
checkServerUp(t, servers[1])
checkServerUp(t, servers[2])
// Check the incoming RPCs served in server[0]
req := "port"
var reply string
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[0] in the balancer, the incoming RPCs served in server[1]
// For test addrconn, close server[0] instead
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
// Loop until it changes to server[1]
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// Add server[0] back to the balancer, the incoming RPCs served in server[1]
// Add is append operation, the order of Notify now is {server[1].port server[2].port server[0].port}
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[0].port,
}
r.w.inject([]*naming.Update{u})
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[1] in the balancer, the incoming RPCs served in server[2]
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[2].port {
break
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[2].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 2, err, servers[2].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[2] in the balancer, the incoming RPCs served in server[0]
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestPickFirstOrderOneServerDown(t *testing.T) {
// Start 3 servers on 3 ports.
numServers := 3
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///foo.bar.com", WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}), WithWaitForHandshake())
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] and [2] to the service discovery.
u := &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
u = &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[2].port,
}
r.w.inject([]*naming.Update{u})
// Loop until all 3 servers are up
checkServerUp(t, servers[0])
checkServerUp(t, servers[1])
checkServerUp(t, servers[2])
// Check the incoming RPCs served in server[0]
req := "port"
var reply string
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
// server[0] down, incoming RPCs served in server[1], but the order of Notify still remains
// {server[0] server[1] server[2]}
servers[0].stop()
// Loop until it changes to server[1]
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[1].port {
break
}
time.Sleep(10 * time.Millisecond)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// up the server[0] back, the incoming RPCs served in server[1]
p, _ := strconv.Atoi(servers[0].port)
servers[0] = newTestServer()
go servers[0].start(t, p, math.MaxUint32)
defer servers[0].stop()
servers[0].wait(t, 2*time.Second)
checkServerUp(t, servers[0])
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[1].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 1, err, servers[1].port)
}
time.Sleep(10 * time.Millisecond)
}
// Delete server[1] in the balancer, the incoming RPCs served in server[0]
u = &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[1].port,
}
r.w.inject([]*naming.Update{u})
for {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err != nil && errorDesc(err) == servers[0].port {
break
}
time.Sleep(1 * time.Second)
}
for i := 0; i < 20; i++ {
if err := cc.Invoke(context.Background(), "/foo/bar", &req, &reply); err == nil || errorDesc(err) != servers[0].port {
t.Fatalf("Index %d: Invoke(_, _, _, _, _) = %v, want %s", 0, err, servers[0].port)
}
time.Sleep(10 * time.Millisecond)
}
}
func (s) TestPickFirstOneAddressRemoval(t *testing.T) {
// Start 2 servers.
numServers := 2
servers, r, cleanup := startServers(t, numServers, math.MaxUint32)
defer cleanup()
cc, err := Dial("passthrough:///localhost:"+servers[0].port, WithBalancer(pickFirstBalancerV1(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
if err != nil {
t.Fatalf("Failed to create ClientConn: %v", err)
}
defer cc.Close()
// Add servers[1] to the service discovery.
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Add,
Addr: "localhost:" + servers[1].port,
})
r.w.inject(updates)
// Create a new cc to Loop until servers[1] is up
checkServerUp(t, servers[0])
checkServerUp(t, servers[1])
var wg sync.WaitGroup
numRPC := 100
sleepDuration := 10 * time.Millisecond
wg.Add(1)
go func() {
time.Sleep(sleepDuration)
// After sleepDuration, delete server[0].
var updates []*naming.Update
updates = append(updates, &naming.Update{
Op: naming.Delete,
Addr: "localhost:" + servers[0].port,
})
r.w.inject(updates)
wg.Done()
}()
// All non-failfast RPCs should not fail because there's at least one connection available.
for i := 0; i < numRPC; i++ {
wg.Add(1)
go func() {
var reply string
time.Sleep(sleepDuration)
// After sleepDuration, invoke RPC.
// server[0] is removed around the same time to make it racy between balancer and gRPC internals.
if err := cc.Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, WaitForReady(true)); err != nil {
t.Errorf("grpc.Invoke(_, _, _, _, _) = %v, want nil", err)
}
wg.Done()
}()
}
wg.Wait()
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/wangHvip/grpc-go.git
git@gitee.com:wangHvip/grpc-go.git
wangHvip
grpc-go
grpc-go
master

搜索帮助