1 Star 13 Fork 2

tencentcloud / tencentcloud-sdk-python

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
eb_client.py 27.85 KB
一键复制 编辑 原始数据 按行查看 历史
tencentcloud 提交于 2024-01-17 04:17 . release 3.0.1077
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762
# -*- coding: utf8 -*-
# Copyright (c) 2017-2021 THL A29 Limited, a Tencent company. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.common.abstract_client import AbstractClient
from tencentcloud.eb.v20210416 import models
class EbClient(AbstractClient):
_apiVersion = '2021-04-16'
_endpoint = 'eb.tencentcloudapi.com'
_service = 'eb'
def CheckRule(self, request):
"""检验规则
:param request: Request instance for CheckRule.
:type request: :class:`tencentcloud.eb.v20210416.models.CheckRuleRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CheckRuleResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CheckRule", params, headers=headers)
response = json.loads(body)
model = models.CheckRuleResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def CheckTransformation(self, request):
"""用于在ETL配置页面, 测试规则和数据.
:param request: Request instance for CheckTransformation.
:type request: :class:`tencentcloud.eb.v20210416.models.CheckTransformationRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CheckTransformationResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CheckTransformation", params, headers=headers)
response = json.loads(body)
model = models.CheckTransformationResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def CreateConnection(self, request):
"""创建事件连接器
:param request: Request instance for CreateConnection.
:type request: :class:`tencentcloud.eb.v20210416.models.CreateConnectionRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CreateConnectionResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CreateConnection", params, headers=headers)
response = json.loads(body)
model = models.CreateConnectionResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def CreateEventBus(self, request):
"""用于创建事件集
:param request: Request instance for CreateEventBus.
:type request: :class:`tencentcloud.eb.v20210416.models.CreateEventBusRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CreateEventBusResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CreateEventBus", params, headers=headers)
response = json.loads(body)
model = models.CreateEventBusResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def CreateRule(self, request):
"""创建事件规则
:param request: Request instance for CreateRule.
:type request: :class:`tencentcloud.eb.v20210416.models.CreateRuleRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CreateRuleResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CreateRule", params, headers=headers)
response = json.loads(body)
model = models.CreateRuleResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def CreateTarget(self, request):
"""创建事件目标
:param request: Request instance for CreateTarget.
:type request: :class:`tencentcloud.eb.v20210416.models.CreateTargetRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CreateTargetResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CreateTarget", params, headers=headers)
response = json.loads(body)
model = models.CreateTargetResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def CreateTransformation(self, request):
"""用于创建转换器
:param request: Request instance for CreateTransformation.
:type request: :class:`tencentcloud.eb.v20210416.models.CreateTransformationRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.CreateTransformationResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("CreateTransformation", params, headers=headers)
response = json.loads(body)
model = models.CreateTransformationResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def DeleteConnection(self, request):
"""删除事件连接器
:param request: Request instance for DeleteConnection.
:type request: :class:`tencentcloud.eb.v20210416.models.DeleteConnectionRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.DeleteConnectionResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("DeleteConnection", params, headers=headers)
response = json.loads(body)
model = models.DeleteConnectionResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def DeleteEventBus(self, request):
"""删除事件集
:param request: Request instance for DeleteEventBus.
:type request: :class:`tencentcloud.eb.v20210416.models.DeleteEventBusRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.DeleteEventBusResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("DeleteEventBus", params, headers=headers)
response = json.loads(body)
model = models.DeleteEventBusResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def DeleteRule(self, request):
"""删除事件规则
:param request: Request instance for DeleteRule.
:type request: :class:`tencentcloud.eb.v20210416.models.DeleteRuleRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.DeleteRuleResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("DeleteRule", params, headers=headers)
response = json.loads(body)
model = models.DeleteRuleResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def DeleteTarget(self, request):
"""删除事件目标
:param request: Request instance for DeleteTarget.
:type request: :class:`tencentcloud.eb.v20210416.models.DeleteTargetRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.DeleteTargetResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("DeleteTarget", params, headers=headers)
response = json.loads(body)
model = models.DeleteTargetResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def DeleteTransformation(self, request):
"""用于删除转换器
:param request: Request instance for DeleteTransformation.
:type request: :class:`tencentcloud.eb.v20210416.models.DeleteTransformationRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.DeleteTransformationResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("DeleteTransformation", params, headers=headers)
response = json.loads(body)
model = models.DeleteTransformationResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def DescribeLogTagValue(self, request):
"""前置条件:需开启事件存储;事件查询维度值
:param request: Request instance for DescribeLogTagValue.
:type request: :class:`tencentcloud.eb.v20210416.models.DescribeLogTagValueRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.DescribeLogTagValueResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("DescribeLogTagValue", params, headers=headers)
response = json.loads(body)
model = models.DescribeLogTagValueResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def GetEventBus(self, request):
"""获取事件集详情
:param request: Request instance for GetEventBus.
:type request: :class:`tencentcloud.eb.v20210416.models.GetEventBusRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.GetEventBusResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("GetEventBus", params, headers=headers)
response = json.loads(body)
model = models.GetEventBusResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def GetPlatformEventTemplate(self, request):
"""获取平台产品事件模板
:param request: Request instance for GetPlatformEventTemplate.
:type request: :class:`tencentcloud.eb.v20210416.models.GetPlatformEventTemplateRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.GetPlatformEventTemplateResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("GetPlatformEventTemplate", params, headers=headers)
response = json.loads(body)
model = models.GetPlatformEventTemplateResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def GetRule(self, request):
"""获取事件规则详情
:param request: Request instance for GetRule.
:type request: :class:`tencentcloud.eb.v20210416.models.GetRuleRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.GetRuleResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("GetRule", params, headers=headers)
response = json.loads(body)
model = models.GetRuleResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def GetTransformation(self, request):
"""用于获取转换器详情
:param request: Request instance for GetTransformation.
:type request: :class:`tencentcloud.eb.v20210416.models.GetTransformationRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.GetTransformationResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("GetTransformation", params, headers=headers)
response = json.loads(body)
model = models.GetTransformationResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListConnections(self, request):
"""获取事件连接器列表
:param request: Request instance for ListConnections.
:type request: :class:`tencentcloud.eb.v20210416.models.ListConnectionsRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListConnectionsResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListConnections", params, headers=headers)
response = json.loads(body)
model = models.ListConnectionsResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListEventBuses(self, request):
"""获取事件集列表
:param request: Request instance for ListEventBuses.
:type request: :class:`tencentcloud.eb.v20210416.models.ListEventBusesRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListEventBusesResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListEventBuses", params, headers=headers)
response = json.loads(body)
model = models.ListEventBusesResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListPlatformEventNames(self, request):
"""获取平台产品事件名称
:param request: Request instance for ListPlatformEventNames.
:type request: :class:`tencentcloud.eb.v20210416.models.ListPlatformEventNamesRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListPlatformEventNamesResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListPlatformEventNames", params, headers=headers)
response = json.loads(body)
model = models.ListPlatformEventNamesResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListPlatformEventPatterns(self, request):
"""获取平台产品事件匹配规则
:param request: Request instance for ListPlatformEventPatterns.
:type request: :class:`tencentcloud.eb.v20210416.models.ListPlatformEventPatternsRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListPlatformEventPatternsResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListPlatformEventPatterns", params, headers=headers)
response = json.loads(body)
model = models.ListPlatformEventPatternsResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListPlatformProducts(self, request):
"""获取平台产品列表
:param request: Request instance for ListPlatformProducts.
:type request: :class:`tencentcloud.eb.v20210416.models.ListPlatformProductsRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListPlatformProductsResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListPlatformProducts", params, headers=headers)
response = json.loads(body)
model = models.ListPlatformProductsResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListRules(self, request):
"""获取事件规则列表
:param request: Request instance for ListRules.
:type request: :class:`tencentcloud.eb.v20210416.models.ListRulesRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListRulesResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListRules", params, headers=headers)
response = json.loads(body)
model = models.ListRulesResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def ListTargets(self, request):
"""获取事件目标列表
:param request: Request instance for ListTargets.
:type request: :class:`tencentcloud.eb.v20210416.models.ListTargetsRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.ListTargetsResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("ListTargets", params, headers=headers)
response = json.loads(body)
model = models.ListTargetsResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def PublishEvent(self, request):
"""(已废弃)用于Event事件投递
:param request: Request instance for PublishEvent.
:type request: :class:`tencentcloud.eb.v20210416.models.PublishEventRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.PublishEventResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("PublishEvent", params, headers=headers)
response = json.loads(body)
model = models.PublishEventResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def PutEvents(self, request):
"""用于Event事件投递
:param request: Request instance for PutEvents.
:type request: :class:`tencentcloud.eb.v20210416.models.PutEventsRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.PutEventsResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("PutEvents", params, headers=headers)
response = json.loads(body)
model = models.PutEventsResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def SearchLog(self, request):
"""前置条件:开启事件存储;查询历史推送事件
:param request: Request instance for SearchLog.
:type request: :class:`tencentcloud.eb.v20210416.models.SearchLogRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.SearchLogResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("SearchLog", params, headers=headers)
response = json.loads(body)
model = models.SearchLogResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def UpdateConnection(self, request):
"""更新事件连接器
:param request: Request instance for UpdateConnection.
:type request: :class:`tencentcloud.eb.v20210416.models.UpdateConnectionRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.UpdateConnectionResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("UpdateConnection", params, headers=headers)
response = json.loads(body)
model = models.UpdateConnectionResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def UpdateEventBus(self, request):
"""更新事件集
:param request: Request instance for UpdateEventBus.
:type request: :class:`tencentcloud.eb.v20210416.models.UpdateEventBusRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.UpdateEventBusResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("UpdateEventBus", params, headers=headers)
response = json.loads(body)
model = models.UpdateEventBusResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def UpdateRule(self, request):
"""更新事件规则
:param request: Request instance for UpdateRule.
:type request: :class:`tencentcloud.eb.v20210416.models.UpdateRuleRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.UpdateRuleResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("UpdateRule", params, headers=headers)
response = json.loads(body)
model = models.UpdateRuleResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def UpdateTarget(self, request):
"""更新事件目标
:param request: Request instance for UpdateTarget.
:type request: :class:`tencentcloud.eb.v20210416.models.UpdateTargetRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.UpdateTargetResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("UpdateTarget", params, headers=headers)
response = json.loads(body)
model = models.UpdateTargetResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
def UpdateTransformation(self, request):
"""用于更新转换器
:param request: Request instance for UpdateTransformation.
:type request: :class:`tencentcloud.eb.v20210416.models.UpdateTransformationRequest`
:rtype: :class:`tencentcloud.eb.v20210416.models.UpdateTransformationResponse`
"""
try:
params = request._serialize()
headers = request.headers
body = self.call("UpdateTransformation", params, headers=headers)
response = json.loads(body)
model = models.UpdateTransformationResponse()
model._deserialize(response["Response"])
return model
except Exception as e:
if isinstance(e, TencentCloudSDKException):
raise
else:
raise TencentCloudSDKException(type(e).__name__, str(e))
1
https://gitee.com/tencentcloud/tencentcloud-sdk-python.git
git@gitee.com:tencentcloud/tencentcloud-sdk-python.git
tencentcloud
tencentcloud-sdk-python
tencentcloud-sdk-python
master

搜索帮助