1 Star 0 Fork 2

who7708/etcd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
lease.go 3.24 KB
一键复制 编辑 原始数据 按行查看 历史
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package v3rpc
import (
"io"
"github.com/coreos/etcd/etcdserver"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/coreos/etcd/lease"
"golang.org/x/net/context"
)
type LeaseServer struct {
hdr header
le etcdserver.Lessor
}
func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
return &LeaseServer{le: s, hdr: newHeader(s)}
}
func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
resp, err := ls.le.LeaseGrant(ctx, cr)
if err != nil {
return nil, togRPCError(err)
}
ls.hdr.fill(resp.Header)
return resp, nil
}
func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
resp, err := ls.le.LeaseRevoke(ctx, rr)
if err != nil {
return nil, togRPCError(err)
}
ls.hdr.fill(resp.Header)
return resp, nil
}
func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
resp, err := ls.le.LeaseTimeToLive(ctx, rr)
if err != nil && err != lease.ErrLeaseNotFound {
return nil, togRPCError(err)
}
if err == lease.ErrLeaseNotFound {
resp = &pb.LeaseTimeToLiveResponse{
Header: &pb.ResponseHeader{},
ID: rr.ID,
TTL: -1,
}
}
ls.hdr.fill(resp.Header)
return resp, nil
}
func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
errc := make(chan error, 1)
go func() {
errc <- ls.leaseKeepAlive(stream)
}()
select {
case err = <-errc:
case <-stream.Context().Done():
// the only server-side cancellation is noleader for now.
err = stream.Context().Err()
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
}
}
return err
}
func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
// Create header before we sent out the renew request.
// This can make sure that the revision is strictly smaller or equal to
// when the keepalive happened at the local server (when the local server is the leader)
// or remote leader.
// Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
// at rev 4.
resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
ls.hdr.fill(resp.Header)
ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
if err == lease.ErrLeaseNotFound {
err = nil
ttl = 0
}
if err != nil {
return togRPCError(err)
}
resp.TTL = ttl
err = stream.Send(resp)
if err != nil {
return err
}
}
}
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/who7708/etcd.git
git@gitee.com:who7708/etcd.git
who7708
etcd
etcd
v3.2.8

搜索帮助

23e8dbc6 1850385 7e0993f3 1850385