1 Star 0 Fork 0

0x43/dubbo-go

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
unmarshal_rds.go 16.31 KB
一键复制 编辑 原始数据 按行查看 历史
0x43 提交于 2024-12-28 17:37 . module
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2021 gRPC authors.
*
*/
package resource
import (
"fmt"
"regexp"
"strings"
"time"
)
import (
dubbogoLogger "gitee.com/git4chen/gost/log/logger"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3typepb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"github.com/golang/protobuf/proto"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/types/known/anypb"
)
import (
"gitee.com/git4chen/dubbo-go/xds/client/resource/version"
"gitee.com/git4chen/dubbo-go/xds/clusterspecifier"
"gitee.com/git4chen/dubbo-go/xds/utils/envconfig"
"gitee.com/git4chen/dubbo-go/xds/utils/pretty"
)
// UnmarshalRouteConfig processes resources received in an RDS response,
// validates them, and transforms them into a native struct which contains only
// fields we are interested in. The provided hostname determines the route
// configuration resources of interest.
func UnmarshalRouteConfig(opts *UnmarshalOptions) (map[string]RouteConfigUpdateErrTuple, UpdateMetadata, error) {
update := make(map[string]RouteConfigUpdateErrTuple)
md, err := processAllResources(opts, update)
return update, md, err
}
func unmarshalRouteConfigResource(r *anypb.Any, logger dubbogoLogger.Logger) (string, RouteConfigUpdate, error) {
if !IsRouteConfigResource(r.GetTypeUrl()) {
return "", RouteConfigUpdate{}, fmt.Errorf("unexpected resource type: %q ", r.GetTypeUrl())
}
rc := &v3routepb.RouteConfiguration{}
if err := proto.Unmarshal(r.GetValue(), rc); err != nil {
return "", RouteConfigUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}
dubbogoLogger.Debugf("Resource with name: %v, type: %T, contains: %v.", rc.GetName(), rc, pretty.ToJSON(rc))
// TODO: Pass version.TransportAPI instead of relying upon the type URL
v2 := r.GetTypeUrl() == version.V2RouteConfigURL
u, err := generateRDSUpdateFromRouteConfiguration(rc, logger, v2)
if err != nil {
return rc.GetName(), RouteConfigUpdate{}, err
}
u.Raw = r
return rc.GetName(), u, nil
}
// generateRDSUpdateFromRouteConfiguration checks if the provided
// RouteConfiguration meets the expected criteria. If so, it returns a
// RouteConfigUpdate with nil error.
//
// A RouteConfiguration resource is considered valid when only if it contains a
// VirtualHost whose domain field matches the server name from the URI passed
// to the gRPC channel, and it contains a clusterName or a weighted cluster.
//
// The RouteConfiguration includes a list of virtualHosts, which may have zero
// or more elements. We are interested in the element whose domains field
// matches the server name specified in the "xds:" URI. The only field in the
// VirtualHost proto that the we are interested in is the list of routes. We
// only look at the last route in the list (the default route), whose match
// field must be empty and whose route field must be set. Inside that route
// message, the cluster field will contain the clusterName or weighted clusters
// we are looking for.
func generateRDSUpdateFromRouteConfiguration(rc *v3routepb.RouteConfiguration, logger dubbogoLogger.Logger, v2 bool) (RouteConfigUpdate, error) {
vhs := make([]*VirtualHost, 0, len(rc.GetVirtualHosts()))
csps := make(map[string]clusterspecifier.BalancerConfig)
if envconfig.XDSRLS {
var err error
csps, err = processClusterSpecifierPlugins(rc.ClusterSpecifierPlugins)
if err != nil {
return RouteConfigUpdate{}, fmt.Errorf("received route is invalid %v", err)
}
}
// cspNames represents all the cluster specifiers referenced by Route
// Actions - any cluster specifiers not referenced by a Route Action can be
// ignored and not emitted by the xdsclient.
var cspNames = make(map[string]bool)
for _, vh := range rc.GetVirtualHosts() {
routes, cspNs, err := routesProtoToSlice(vh.Routes, csps, logger, v2)
if err != nil {
return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}
for n := range cspNs {
cspNames[n] = true
}
rc, err := generateRetryConfig(vh.GetRetryPolicy())
if err != nil {
return RouteConfigUpdate{}, fmt.Errorf("received route is invalid: %v", err)
}
vhOut := &VirtualHost{
Domains: vh.GetDomains(),
Routes: routes,
RetryConfig: rc,
}
if !v2 {
cfgs, err := processHTTPFilterOverrides(vh.GetTypedPerFilterConfig())
if err != nil {
return RouteConfigUpdate{}, fmt.Errorf("virtual host %+v: %v", vh, err)
}
vhOut.HTTPFilterConfigOverride = cfgs
}
vhs = append(vhs, vhOut)
}
// "For any entry in the RouteConfiguration.cluster_specifier_plugins not
// referenced by an enclosed ActionType's cluster_specifier_plugin, the xDS
// client should not provide it to its consumers." - RLS in xDS Design
for name := range csps {
if !cspNames[name] {
delete(csps, name)
}
}
return RouteConfigUpdate{VirtualHosts: vhs, ClusterSpecifierPlugins: csps}, nil
}
func processClusterSpecifierPlugins(csps []*v3routepb.ClusterSpecifierPlugin) (map[string]clusterspecifier.BalancerConfig, error) {
cspCfgs := make(map[string]clusterspecifier.BalancerConfig)
// "The xDS client will inspect all elements of the
// cluster_specifier_plugins field looking up a plugin based on the
// extension.typed_config of each." - RLS in xDS design
for _, csp := range csps {
cs := clusterspecifier.Get(csp.GetExtension().GetTypedConfig().GetTypeUrl())
if cs == nil {
// "If no plugin is registered for it, the resource will be NACKed."
// - RLS in xDS design
return nil, fmt.Errorf("cluster specifier %q of type %q was not found", csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl())
}
lbCfg, err := cs.ParseClusterSpecifierConfig(csp.GetExtension().GetTypedConfig())
if err != nil {
// "If a plugin is found, the value of the typed_config field will
// be passed to it's conversion method, and if an error is
// encountered, the resource will be NACKED." - RLS in xDS design
return nil, fmt.Errorf("error: %q parsing config %q for cluster specifier %q of type %q", err, csp.GetExtension().GetTypedConfig(), csp.GetExtension().GetName(), csp.GetExtension().GetTypedConfig().GetTypeUrl())
}
// "If all cluster specifiers are valid, the xDS client will store the
// configurations in a map keyed by the name of the extension instance." -
// RLS in xDS Design
cspCfgs[csp.GetExtension().GetName()] = lbCfg
}
return cspCfgs, nil
}
func generateRetryConfig(rp *v3routepb.RetryPolicy) (*RetryConfig, error) {
if rp == nil {
return nil, nil
}
cfg := &RetryConfig{RetryOn: make(map[codes.Code]bool)}
for _, s := range strings.Split(rp.GetRetryOn(), ",") {
switch strings.TrimSpace(strings.ToLower(s)) {
// FIXME, is this misspelled by grpc?
case "cancel" + "led":
cfg.RetryOn[codes.Canceled] = true
case "deadline-exceeded":
cfg.RetryOn[codes.DeadlineExceeded] = true
case "internal":
cfg.RetryOn[codes.Internal] = true
case "resource-exhausted":
cfg.RetryOn[codes.ResourceExhausted] = true
case "unavailable":
cfg.RetryOn[codes.Unavailable] = true
}
}
if rp.NumRetries == nil {
cfg.NumRetries = 1
} else {
cfg.NumRetries = rp.GetNumRetries().Value
if cfg.NumRetries < 1 {
return nil, fmt.Errorf("retry_policy.num_retries = %v; must be >= 1", cfg.NumRetries)
}
}
backoff := rp.GetRetryBackOff()
if backoff == nil {
cfg.RetryBackoff.BaseInterval = 25 * time.Millisecond
} else {
cfg.RetryBackoff.BaseInterval = backoff.GetBaseInterval().AsDuration()
if cfg.RetryBackoff.BaseInterval <= 0 {
return nil, fmt.Errorf("retry_policy.base_interval = %v; must be > 0", cfg.RetryBackoff.BaseInterval)
}
}
if max := backoff.GetMaxInterval(); max == nil {
cfg.RetryBackoff.MaxInterval = 10 * cfg.RetryBackoff.BaseInterval
} else {
cfg.RetryBackoff.MaxInterval = max.AsDuration()
if cfg.RetryBackoff.MaxInterval <= 0 {
return nil, fmt.Errorf("retry_policy.max_interval = %v; must be > 0", cfg.RetryBackoff.MaxInterval)
}
}
if len(cfg.RetryOn) == 0 {
return &RetryConfig{}, nil
}
return cfg, nil
}
func routesProtoToSlice(routes []*v3routepb.Route, csps map[string]clusterspecifier.BalancerConfig, logger dubbogoLogger.Logger, v2 bool) ([]*Route, map[string]bool, error) {
var routesRet []*Route
var cspNames = make(map[string]bool)
for _, r := range routes {
match := r.GetMatch()
if match == nil {
return nil, nil, fmt.Errorf("route %+v doesn't have a match", r)
}
if len(match.GetQueryParameters()) != 0 {
// Ignore route with query parameters.
logger.Warnf("route %+v has query parameter matchers, the route will be ignored", r)
continue
}
pathSp := match.GetPathSpecifier()
if pathSp == nil {
return nil, nil, fmt.Errorf("route %+v doesn't have a path specifier", r)
}
var route Route
switch pt := pathSp.(type) {
case *v3routepb.RouteMatch_Prefix:
route.Prefix = &pt.Prefix
case *v3routepb.RouteMatch_Path:
route.Path = &pt.Path
case *v3routepb.RouteMatch_SafeRegex:
regex := pt.SafeRegex.GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
}
route.Regex = re
default:
return nil, nil, fmt.Errorf("route %+v has an unrecognized path specifier: %+v", r, pt)
}
if caseSensitive := match.GetCaseSensitive(); caseSensitive != nil {
route.CaseInsensitive = !caseSensitive.Value
}
for _, h := range match.GetHeaders() {
var header HeaderMatcher
switch ht := h.GetHeaderMatchSpecifier().(type) {
case *v3routepb.HeaderMatcher_ExactMatch:
header.ExactMatch = &ht.ExactMatch
case *v3routepb.HeaderMatcher_SafeRegexMatch:
regex := ht.SafeRegexMatch.GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, nil, fmt.Errorf("route %+v contains an invalid regex %q", r, regex)
}
header.RegexMatch = re
case *v3routepb.HeaderMatcher_RangeMatch:
header.RangeMatch = &Int64Range{
Start: ht.RangeMatch.Start,
End: ht.RangeMatch.End,
}
case *v3routepb.HeaderMatcher_PresentMatch:
header.PresentMatch = &ht.PresentMatch
case *v3routepb.HeaderMatcher_PrefixMatch:
header.PrefixMatch = &ht.PrefixMatch
case *v3routepb.HeaderMatcher_SuffixMatch:
header.SuffixMatch = &ht.SuffixMatch
default:
return nil, nil, fmt.Errorf("route %+v has an unrecognized header matcher: %+v", r, ht)
}
header.Name = h.GetName()
invert := h.GetInvertMatch()
header.InvertMatch = &invert
route.Headers = append(route.Headers, &header)
}
if fr := match.GetRuntimeFraction(); fr != nil {
d := fr.GetDefaultValue()
n := d.GetNumerator()
switch d.GetDenominator() {
case v3typepb.FractionalPercent_HUNDRED:
n *= 10000
case v3typepb.FractionalPercent_TEN_THOUSAND:
n *= 100
case v3typepb.FractionalPercent_MILLION:
}
route.Fraction = &n
}
switch r.GetAction().(type) {
case *v3routepb.Route_Route:
route.WeightedClusters = make(map[string]WeightedCluster)
action := r.GetRoute()
// Hash Policies are only applicable for a Ring Hash LB.
if envconfig.XDSRingHash {
hp, err := hashPoliciesProtoToSlice(action.HashPolicy, logger)
if err != nil {
return nil, nil, err
}
route.HashPolicies = hp
}
switch a := action.GetClusterSpecifier().(type) {
case *v3routepb.RouteAction_Cluster:
route.WeightedClusters[a.Cluster] = WeightedCluster{Weight: 1}
case *v3routepb.RouteAction_WeightedClusters:
wcs := a.WeightedClusters
var totalWeight uint32
for _, c := range wcs.Clusters {
w := c.GetWeight().GetValue()
if w == 0 {
continue
}
wc := WeightedCluster{Weight: w}
if !v2 {
cfgs, err := processHTTPFilterOverrides(c.GetTypedPerFilterConfig())
if err != nil {
return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, a, err)
}
wc.HTTPFilterConfigOverride = cfgs
}
route.WeightedClusters[c.GetName()] = wc
totalWeight += w
}
// envoy xds doc
// default TotalWeight https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/route/v3/route_components.proto.html#envoy-v3-api-field-config-route-v3-weightedcluster-total-weight
wantTotalWeight := uint32(100)
if tw := wcs.GetTotalWeight(); tw != nil {
wantTotalWeight = tw.GetValue()
}
if totalWeight != wantTotalWeight {
return nil, nil, fmt.Errorf("route %+v, action %+v, weights of clusters do not add up to total total weight, got: %v, expected total weight from response: %v", r, a, totalWeight, wantTotalWeight)
}
if totalWeight == 0 {
return nil, nil, fmt.Errorf("route %+v, action %+v, has no valid cluster in WeightedCluster action", r, a)
}
case *v3routepb.RouteAction_ClusterHeader:
continue
case *v3routepb.RouteAction_ClusterSpecifierPlugin:
if !envconfig.XDSRLS {
return nil, nil, fmt.Errorf("route %+v, has an unknown ClusterSpecifier: %+v", r, a)
}
if _, ok := csps[a.ClusterSpecifierPlugin]; !ok {
// "When processing RouteActions, if any action includes a
// cluster_specifier_plugin value that is not in
// RouteConfiguration.cluster_specifier_plugins, the
// resource will be NACKed." - RLS in xDS design
return nil, nil, fmt.Errorf("route %+v, action %+v, specifies a cluster specifier plugin %+v that is not in Route Configuration", r, a, a.ClusterSpecifierPlugin)
}
cspNames[a.ClusterSpecifierPlugin] = true
route.ClusterSpecifierPlugin = a.ClusterSpecifierPlugin
default:
return nil, nil, fmt.Errorf("route %+v, has an unknown ClusterSpecifier: %+v", r, a)
}
msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
dur := msd.GetGrpcTimeoutHeaderMax()
if dur == nil {
dur = msd.GetMaxStreamDuration()
}
if dur != nil {
d := dur.AsDuration()
route.MaxStreamDuration = &d
}
var err error
route.RetryConfig, err = generateRetryConfig(action.GetRetryPolicy())
if err != nil {
return nil, nil, fmt.Errorf("route %+v, action %+v: %v", r, action, err)
}
route.ActionType = RouteActionRoute
case *v3routepb.Route_NonForwardingAction:
// Expected to be used on server side.
route.ActionType = RouteActionNonForwardingAction
default:
route.ActionType = RouteActionUnsupported
}
if !v2 {
cfgs, err := processHTTPFilterOverrides(r.GetTypedPerFilterConfig())
if err != nil {
return nil, nil, fmt.Errorf("route %+v: %v", r, err)
}
route.HTTPFilterConfigOverride = cfgs
}
routesRet = append(routesRet, &route)
}
return routesRet, cspNames, nil
}
func hashPoliciesProtoToSlice(policies []*v3routepb.RouteAction_HashPolicy, logger dubbogoLogger.Logger) ([]*HashPolicy, error) {
var hashPoliciesRet []*HashPolicy
for _, p := range policies {
policy := HashPolicy{Terminal: p.Terminal}
switch p.GetPolicySpecifier().(type) {
case *v3routepb.RouteAction_HashPolicy_Header_:
policy.HashPolicyType = HashPolicyTypeHeader
policy.HeaderName = p.GetHeader().GetHeaderName()
if rr := p.GetHeader().GetRegexRewrite(); rr != nil {
regex := rr.GetPattern().GetRegex()
re, err := regexp.Compile(regex)
if err != nil {
return nil, fmt.Errorf("hash policy %+v contains an invalid regex %q", p, regex)
}
policy.Regex = re
policy.RegexSubstitution = rr.GetSubstitution()
}
case *v3routepb.RouteAction_HashPolicy_FilterState_:
if p.GetFilterState().GetKey() != "io.grpc.channel_id" {
logger.Infof("hash policy %+v contains an invalid key for filter state policy %q", p, p.GetFilterState().GetKey())
continue
}
policy.HashPolicyType = HashPolicyTypeChannelID
default:
logger.Infof("hash policy %T is an unsupported hash policy", p.GetPolicySpecifier())
continue
}
hashPoliciesRet = append(hashPoliciesRet, &policy)
}
return hashPoliciesRet, nil
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/git4chen/dubbo-go.git
git@gitee.com:git4chen/dubbo-go.git
git4chen
dubbo-go
dubbo-go
v1.0.0

搜索帮助