3 Star 2 Fork 1

vrv_media/go-micro-framework

加入 Gitee
与超过 1400万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
discovery.go 8.52 KB
一键复制 编辑 原始数据 按行查看 历史
bujinyuan 提交于 2025-03-11 09:17 +08:00 . feat:修改
package kubernetes
import (
"context"
"errors"
"fmt"
"gitee.com/vrv_media/go-micro-framework/pkg/common/util/net"
"gitee.com/vrv_media/go-micro-framework/registry"
regOps "gitee.com/vrv_media/go-micro-framework/registry/options"
jsoniter "github.com/json-iterator/go"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
const (
// LabelsKeyServiceID is used to define the ID of the service
LabelsKeyServiceID = "kratos-service-id"
// LabelsKeyServiceName is used to define the name of the service
LabelsKeyServiceName = "kratos-service-app"
// LabelsKeyServiceVersion is used to define the version of the service
LabelsKeyServiceVersion = "kratos-service-version"
// AnnotationsKeyMetadata is used to define the metadata of the service
AnnotationsKeyMetadata = "kratos-service-metadata"
// AnnotationsKeyProtocolMap is used to define the protocols of the service
// Through the value of this field, Kratos can obtain the application layer protocol corresponding to the port
// Example value: {"80": "http", "8081": "grpc"}
AnnotationsKeyProtocolMap = "kratos-service-protocols"
)
type KubernetesDiscovery struct {
clientSet *kubernetes.Clientset
informerFactory informers.SharedInformerFactory
podLister listerv1.PodLister
podInformer cache.SharedIndexInformer
stopCh chan struct{}
}
func NewKubernetesDiscovery(options *regOps.DiscoveryOptions) (*KubernetesDiscovery, error) {
address := options.Address
host, port, err := net.ExtractHostPort(address)
if err != nil {
return nil, err
}
_ = os.Setenv("KUBERNETES_SERVICE_HOST", host)
_ = os.Setenv("KUBERNETES_SERVICE_PORT", strconv.FormatUint(port, 10))
restConfig, err := rest.InClusterConfig()
home := homedir.HomeDir()
if err != nil {
kubeConfig := filepath.Join(home, ".kube", "config")
restConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
if err != nil {
return nil, err
}
}
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
return newKubernetesDiscovery(clientSet), nil
}
// NewKubernetesDiscovery is used to initialize the discovery
func newKubernetesDiscovery(clientSet *kubernetes.Clientset) *KubernetesDiscovery {
informerFactory := informers.NewSharedInformerFactory(clientSet, time.Minute*10)
podInformer := informerFactory.Core().V1().Pods().Informer()
podLister := informerFactory.Core().V1().Pods().Lister()
return &KubernetesDiscovery{
clientSet: clientSet,
informerFactory: informerFactory,
podInformer: podInformer,
podLister: podLister,
stopCh: make(chan struct{}),
}
}
// GetService return the service instances in memory according to the service name.
func (d *KubernetesDiscovery) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
pods, err := d.podLister.List(labels.SelectorFromSet(map[string]string{
LabelsKeyServiceName: name,
}))
if err != nil {
return nil, err
}
ret := make([]*registry.ServiceInstance, 0, len(pods))
for _, pod := range pods {
if pod.Status.Phase != corev1.PodRunning {
continue
}
instance, err := getServiceInstanceFromPod(pod)
if err != nil {
return nil, err
}
ret = append(ret, instance)
}
return ret, nil
}
// Watch creates a watcher according to the service name.
func (d *KubernetesDiscovery) Watch(ctx context.Context, name string) (registry.Watcher, error) {
stopCh := make(chan struct{}, 1)
announcement := make(chan []*registry.ServiceInstance, 1)
_, err := d.podInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
select {
case <-stopCh:
return false
case <-d.stopCh:
return false
default:
pod := obj.(*corev1.Pod)
val := pod.GetLabels()[LabelsKeyServiceName]
return val == name
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.sendLatestInstances(ctx, name, announcement)
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.sendLatestInstances(ctx, name, announcement)
},
DeleteFunc: func(obj interface{}) {
d.sendLatestInstances(ctx, name, announcement)
},
},
})
if err != nil {
return nil, err
}
return NewIterator(announcement, stopCh), nil
}
// Start is used to start the Registry
// It is non-blocking
func (d *KubernetesDiscovery) Start() {
d.informerFactory.Start(d.stopCh)
if !cache.WaitForCacheSync(d.stopCh, d.podInformer.HasSynced) {
return
}
}
// Close is used to close the Registry
// After closing, any callbacks generated by Watch will not be executed
func (d *KubernetesDiscovery) Close() {
select {
case <-d.stopCh:
default:
close(d.stopCh)
}
}
// Iterator performs the conversion from channel to iterator
// It reads the latest changes from the `chan []*registry.ServiceInstance`
// And the outside can sense the closure of Iterator through stopCh
type Iterator struct {
ch chan []*registry.ServiceInstance
stopCh chan struct{}
}
// NewIterator is used to initialize Iterator
func NewIterator(channel chan []*registry.ServiceInstance, stopCh chan struct{}) *Iterator {
return &Iterator{
ch: channel,
stopCh: stopCh,
}
}
// Next will block until ServiceInstance changes
func (iter *Iterator) Next() ([]*registry.ServiceInstance, error) {
select {
case instances := <-iter.ch:
return instances, nil
case <-iter.stopCh:
return nil, ErrIteratorClosed
}
}
// Stop is used to close the iterator
func (iter *Iterator) Stop() error {
select {
case <-iter.stopCh:
default:
close(iter.stopCh)
}
return nil
}
func (d *KubernetesDiscovery) sendLatestInstances(ctx context.Context, name string, announcement chan []*registry.ServiceInstance) {
instances, err := d.GetService(ctx, name)
if err != nil {
panic(err)
}
announcement <- instances
}
func getServiceInstanceFromPod(pod *corev1.Pod) (*registry.ServiceInstance, error) {
podIP := pod.Status.PodIP
podLabels := pod.GetLabels()
// Get Metadata
metadata, err := getMetadataFromPod(pod)
if err != nil {
return nil, err
}
// Get Protocols Definition
protocolMap, err := getProtocolMapFromPod(pod)
if err != nil {
return nil, err
}
// Get Endpoints
var endpoints []string
for _, container := range pod.Spec.Containers {
for _, cp := range container.Ports {
port := cp.ContainerPort
protocol := protocolMap.GetProtocol(port)
if protocol == "" {
if cp.Name != "" {
protocol = strings.Split(cp.Name, "-")[0]
} else {
protocol = string(cp.Protocol)
}
}
addr := fmt.Sprintf("%s://%s:%d", protocol, podIP, port)
endpoints = append(endpoints, addr)
}
}
return &registry.ServiceInstance{
ID: podLabels[LabelsKeyServiceID],
Name: podLabels[LabelsKeyServiceName],
Version: podLabels[LabelsKeyServiceVersion],
Metadata: metadata,
Endpoints: endpoints,
}, nil
}
func getProtocolMapFromPod(pod *corev1.Pod) (protocolMap, error) {
protoMap := protocolMap{}
if s := pod.Annotations[AnnotationsKeyProtocolMap]; !isEmptyObjectString(s) {
err := unmarshal(s, &protoMap)
if err != nil {
return nil, &ErrorHandleResource{Namespace: pod.Namespace, Name: pod.Name, Reason: err}
}
}
return protoMap, nil
}
func getMetadataFromPod(pod *corev1.Pod) (map[string]string, error) {
metadata := map[string]string{}
if s := pod.Annotations[AnnotationsKeyMetadata]; !isEmptyObjectString(s) {
err := unmarshal(s, &metadata)
if err != nil {
return nil, &ErrorHandleResource{Namespace: pod.Namespace, Name: pod.Name, Reason: err}
}
}
return metadata, nil
}
func isEmptyObjectString(s string) bool {
switch s {
case "", "{}", "null", "nil", "[]":
return true
}
return false
}
func unmarshal(data string, in interface{}) error {
return jsoniter.UnmarshalFromString(data, in)
}
// ErrIteratorClosed defines the error that the iterator is closed
var ErrIteratorClosed = errors.New("iterator closed")
type protocolMap map[string]string
func (m protocolMap) GetProtocol(port int32) string {
return m[strconv.Itoa(int(port))]
}
// ErrorHandleResource defines the error that cannot handle K8S resources normally
type ErrorHandleResource struct {
Namespace string
Name string
Reason error
}
// Error implements the error interface
func (err *ErrorHandleResource) Error() string {
return fmt.Sprintf("failed to handle resource(namespace=%s, name=%s): %s",
err.Namespace, err.Name, err.Reason)
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Go
1
https://gitee.com/vrv_media/go-micro-framework.git
git@gitee.com:vrv_media/go-micro-framework.git
vrv_media
go-micro-framework
go-micro-framework
4d2a37d3a8d1

搜索帮助