diff --git a/API Reference.md b/API Reference.md index 2169d98915a1ca931fdcde7489c908d50f23f030..172921640727ac59d99224c3b4cae0722fefff4b 100644 --- a/API Reference.md +++ b/API Reference.md @@ -134,9 +134,23 @@ Parameters: - **t** (String): The type of the definition. - ***args** (String): The properties of the definition. +```python +@pecan_swagger.decorators.consumes(method_name, *consume) +``` +Parameters: +- **method_name** (String): The name of the method. +- ***consume** (String): The consumes of the method. + +```python +@pecan_swagger.decorators.produces(method_name, *pro) +``` +Parameters: +- **method_name** (String): The name of the method. +- ***pro** (String): The produces of the method. + ```python @pecan_swagger.decorators.parameter(method_name, *args) ``` Parameters: - **method_name** (String): The name of the method. -- ***args** (String): The parameters of the definition. \ No newline at end of file +- ***args** (String): The parameters of the method. \ No newline at end of file diff --git a/README.md b/README.md index 44097608828d5ba025441313f1a1ba4c1864b728..fe5761ce76e602416ece7c0b897467965cebbe1a 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,157 @@ # Pecan Swagger -## 1. 项目背景 +## Usage example 用法实例 +- 在项目根目录下创建一个 ``myapp-doc.py``文件, +- Name a file ``myapp-doc.py`` under the root of pecan project. +```python +from pecan_swagger import utils # import pecan_swagger +import industrialai.api.controllers.root # import root of the controller -Pecan-Swagger 用于为 Pecan Web 服务生成 Swagger 文档。该项目源自 ,目前原 repo 已无人维护,[原示例链接](examples/README.rst)。 +s = utils.swagger_build('industrialai', '1.0') # Must have. Including the title and the version of the project. +description = 'This is a This is a test project' +termsOfService = 'http://swagger.io/terms/' +contact = {'email': "apiteam@swagger.io"} +license = {'name': "Apache 2.0", + 'url': "http://www.apache.org/licenses/LICENSE-2.0.html"} +s = utils.add_info(s, description, termsOfService, contact, license) # Optional. Add infos into swagger. +externalDocs = {'description': "Find out more about Swagger", + 'url': "http://swagger.io"} +s = utils.add_external_docs(s, externalDocs) # Optional. Add external docs into swagger. +host = "127.0.0.1" +s = utils.add_host(s, host) # Optional. Add host into swagger. +basePath = "/v1" +s = utils.add_base_path(s, basePath) # Optional. Add base path into swagger. +output_path = "test.json" +utils.print_json(s, output_path) # Optional. Print swagger as json. +output_path = "test.yaml" +utils.print_yaml(s, output_path) # Optional. Print swagger as json. +``` +- 项目文件中,首先导入Pecan Swagger的装饰器。 +- In project files, import Pecan Swagger's decorators. +```python +from pecan_swagger import decorators as swagger # import pecan_swagger +``` +- 在你想要生成Swagger文档处添加装饰器。 +- Add decorators where you want to generate Swagger documentation. +```python +@swagger.definitions('object') +@swagger.parameter('index', {'in': "body", + 'name': "body", + 'description': "get user", + 'required': True, + 'schema': 'userInfo'}) +@swagger.response('myself', '401', 'fail') +@swagger.path("/", "Root") +... +``` +- 运行``myapp-doc.py`` +如果你使用了```print_json```或```print_yaml```,你会得到如下输出。 +- Run ``myapp-doc.py``! +If you use method ```print_json``` or ```print_yaml```, you will get the output file as follows. +JSON output: +```json +{ + "swagger": "2.0", + "info": { + "title": "industrialai", + "version": "1.0", + "description": "This is a This is a test project", + "termsOfService": "http://swagger.io/terms/", + "contact": { + "email": "apiteam@swagger.io" + }, + "license": { + "name": "Apache 2.0", + "url": "http://www.apache.org/licenses/LICENSE-2.0.html" + } + }, + "consumes": [], + "produces": [], + "paths": { + "/roles/get_all": { + "get": { + "responses": { + "401": { + "description": "fail" + } + }, + "parameters": [ + { + "in": "body", + "name": "body", + "description": "get all roles", + "required": true, + "schema": { + "$ref": "#/definitions/RoleItem" + } + } + ] + } + } + }, + "tags": [], + "schemes": [], + "definitions": { + "Roletest": { + "type": "object", + "properties": { + "message": { + "type": "string" + }, + "roleId": { + "type": "integer" + } + } + } + }, + "externalDocs": { + "description": "Find out more about Swagger", + "url": "http://swagger.io" + }, + "host": "127.0.0.1", + "basePath": "/v1" +} +``` +YAML output: +```yaml +basePath: /v1 +consumes: [] +definitions: + Roletest: + properties: + message: + type: string + roleId: + type: integer + type: object +externalDocs: + description: Find out more about Swagger + url: http://swagger.io +host: 127.0.0.1 +info: + contact: + email: apiteam@swagger.io + description: This is a This is a test project + license: + name: Apache 2.0 + url: http://www.apache.org/licenses/LICENSE-2.0.html + termsOfService: http://swagger.io/terms/ + title: industrialai + version: '1.0' +paths: + /: + get: + responses: + '401': + description: fail +produces: [] +schemes: [] +swagger: '2.0' +tags: [] +``` +更多用法请参考[API reference.md](API reference.md)。 +For more usage, please refer to [API reference.md](API reference.md). +更多实例请参考[industrial-ai-apiserver](examples/industrial-ai-apiserver)。 +For more example, please refer to [industrial-ai-apiserver](examples/industrial-ai-apiserver). -Pecan 是 OpenStack 诸多模块中比较受欢迎的一种 Restful API 框架,新兴的项目大多采用 Pecan,比如 Magnum / CloudKitty 等等。Pecan-Swagger 依然是有很大价值的,因此我们再这个 repo 继续更新 Pecan-Swagger。 - -## 2. 需要预先掌握的技能 - -1. [Python 三日课程](https://gitee.com/wu-wen-xiang/training-python/blob/master/doc/learning-python-in-3-days.md) -2. [Git 基本使用](https://gitee.com/wu-wen-xiang/training-python/blob/master/doc/learning-python-in-3-days.md#31-%E7%89%88%E6%9C%AC%E6%8E%A7%E5%88%B6) -3. [Markdown 基本语法](https://markdown.com.cn/basic-syntax/) -4. [Gitee 合作](https://gitee.com/help/articles/4346) -5. [Swagger](https://editor.swagger.io/) - -## 3. 预期目标 - -1. 对接 Pecan 最近的 Stable 版本,提供 Pecan-Swagger 模块,为 Pecan 项目提供在线的 Swagger 文档。 +Enjoy it! :) diff --git a/README.rst b/README.rst deleted file mode 100644 index 1bc6070dd2d9c0851f07aedf2b0cb1b8a7a9966c..0000000000000000000000000000000000000000 --- a/README.rst +++ /dev/null @@ -1,66 +0,0 @@ -===================== -Pecan Swagger Project -===================== - -some helpers to create swagger output from a pecan app - -example usage -------------- - -given a file named ``myapp.py`` - -:: - - import pecan - from pecan_swagger import decorators as swagger - - - @swagger.path('profile', 'Profile', 'Root') - class ProfileController(object): - - @pecan.expose(generic=True, template='index.html') - def index(self): - return dict() - - @index.when(method='POST') - def index_post(self, **kw): - print(kw) - pecan.redirect('/profile') - - - @swagger.path('/', 'Root') - class RootController(object): - - profile = ProfileController() - -and another file named ``myapp-doc.py`` - -:: - - import pprint - - from pecan_swagger import utils - import myapp - - pp = pprint.PrettyPrinter(indent=2) - pp.pprint(utils.swagger_build('myapp', '1.0')) - - -the following will be produced when run - -:: - - $ python myapp-doc.py - { - "swagger": "2.0", - "info": { - "version": "1.0", - "title": "myapp" - }, - "paths": { - "/profile": { - "POST": {}, - "GET": {} - } - } - } diff --git a/examples/industrial-ai-apiserver/myapp-doc.py b/examples/industrial-ai-apiserver/myapp-doc.py index 88c51afba95579067a619fda6d3e64482db63ff5..6c540dde5ea7ce365c9cbcbf0838bb25107658e2 100644 --- a/examples/industrial-ai-apiserver/myapp-doc.py +++ b/examples/industrial-ai-apiserver/myapp-doc.py @@ -1,24 +1,21 @@ -# import pprint -import json -from pecan_swagger import utils -import industrialai.api.controllers.root +from pecan_swagger import utils # import pecan_swagger +import industrialai.api.controllers.root # import root of the controller -# pp = pprint.PrettyPrinter(indent=2) -# pp.pprint(utils.swagger_build('industrialai', '1.0')) -# Create JSON file -s = utils.swagger_build('industrialai', '1.0') +s = utils.swagger_build('industrialai', '1.0') # Must have. Including the title and the version of the project. description = 'This is a This is a test project' termsOfService = 'http://swagger.io/terms/' contact = {'email': "apiteam@swagger.io"} license = {'name': "Apache 2.0", 'url': "http://www.apache.org/licenses/LICENSE-2.0.html"} -s = utils.add_info(s, description, termsOfService, contact, license) +s = utils.add_info(s, description, termsOfService, contact, license) # Optional. Add infos into swagger. externalDocs = {'description': "Find out more about Swagger", 'url': "http://swagger.io"} -s = utils.add_external_docs(s, externalDocs) +s = utils.add_external_docs(s, externalDocs) # Optional. Add external docs into swagger. host = "127.0.0.1" -s = utils.add_host(s, host) +s = utils.add_host(s, host) # Optional. Add host into swagger. basePath = "/v1" -s = utils.add_base_path(s, basePath) +s = utils.add_base_path(s, basePath) # Optional. Add base path into swagger. output_path = "test.json" -utils.print_json(s, output_path) +utils.print_json(s, output_path) # Optional. Print swagger as json. +output_path = "test.yaml" +utils.print_yaml(s, output_path) # Optional. Print swagger as json. diff --git a/examples/industrial-ai-apiserver/pecan_swagger/decorators.py b/examples/industrial-ai-apiserver/pecan_swagger/decorators.py index 8c014866900bff251412917f3076a85eb41a214a..ca65b7c9d0cfb7df218eff4da2702d9d0a4afdad 100644 --- a/examples/industrial-ai-apiserver/pecan_swagger/decorators.py +++ b/examples/industrial-ai-apiserver/pecan_swagger/decorators.py @@ -27,6 +27,7 @@ def path(endpoint, name, parent=None): c.__swag = dict(endpoint=endpoint, name=name, parent=parent) g.add_path(c) return c + return decorator @@ -170,6 +171,7 @@ def definitions(t, *args): return decorator + def consumes(method_name, *consume): """ 获取方法的consumes @@ -192,6 +194,7 @@ def consumes(method_name, *consume): return decorator + def produces(method_name, *pro): """ 获取方法的produces @@ -214,6 +217,7 @@ def produces(method_name, *pro): return decorator + def parameter(method_name, *args): """ 获取方法的parameter diff --git a/examples/myapp-doc.py b/examples/myapp-doc.py deleted file mode 100644 index f48ef1b760c110abcb3a9ba9b47e7deeb148eb36..0000000000000000000000000000000000000000 --- a/examples/myapp-doc.py +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/env python -import pprint - -from pecan_swagger import utils -import myapp - -pp = pprint.PrettyPrinter(indent=2) -pp.pprint(utils.swagger_build('myapp', '1.0')) diff --git a/examples/myapp.py b/examples/myapp.py deleted file mode 100644 index 88dcc0beeca489ffc3b306012b73a82e5fc0e4d3..0000000000000000000000000000000000000000 --- a/examples/myapp.py +++ /dev/null @@ -1,47 +0,0 @@ -import pecan -from pecan_swagger import decorators as swagger - - -@swagger.path('messages', 'Messages', 'Root') -class MessagesController(object): - - @pecan.expose(generic=True, template='messages.html') - def index(self): - return list() - - @index.when(method='POST') - def index_post(self, **kw): - print(kw) - pecan.redirect('/messages') - - -@swagger.path('profile', 'Profile', 'Root') -class ProfileController(object): - - @pecan.expose(generic=True, template='profile.html') - def index(self): - return dict() - - @index.when(method='POST') - def index_post(self, **kw): - print(kw) - pecan.redirect('/profile') - - @pecan.expose(generic=True) - def image(self): - print('no image uploaded') - - @image.when(method='POST') - def image_post(self): - print('not supported') - - @pecan.expose() - def stats(self): - return dict() - - -@swagger.path('/', 'Root') -class RootController(object): - - profile = ProfileController() - messages = MessagesController() diff --git a/pecan_swagger/decorators.py b/pecan_swagger/decorators.py index 5b8ef4aefaa1919dbeca2d4e6f9715f41b14c6ed..ca65b7c9d0cfb7df218eff4da2702d9d0a4afdad 100644 --- a/pecan_swagger/decorators.py +++ b/pecan_swagger/decorators.py @@ -1,38 +1,243 @@ -import pecan_swagger.g as g - -""" -decorators module - -these decorators are meant to be used by developers who wish to markup -their pecan applications to produce swagger. -""" - - -def path(endpoint, name, parent=None): - """ - path decorator - - this decorator should be used on pecan controllers to instruct how - they map to routes. - - :param endpoint: the root uri for this controller. - :param name: the name of this path. - :param parent: an optional path name to indicate a parent/child - relationship between this path and another. - """ - def decorator(c): - if hasattr(c, '__swag'): - raise Exception('{} already has swag'.format(c.__name__)) - c.__swag = dict(endpoint=endpoint, name=name, parent=parent) - g.add_path(c) - return c - return decorator - - -def method(method): - def decorator(m): - if hasattr(m, '__swag'): - raise Exception('{} already has swag'.format(m.__name__)) - m.__swag = dict(method=method) - return m - return decorator +import pecan_swagger.g as g + +""" +decorators module + +these decorators are meant to be used by developers who wish to markup +their pecan applications to produce swagger. +""" + + +def path(endpoint, name, parent=None): + """ + path decorator + + this decorator should be used on pecan controllers to instruct how + they map to routes. + + :param endpoint: the root uri for this controller. + :param name: the name of this path. + :param parent: an optional path name to indicate a parent/child + relationship between this path and another. + """ + + def decorator(c): + if hasattr(c, '__swag'): + raise Exception('{} already has swag'.format(c.__name__)) + c.__swag = dict(endpoint=endpoint, name=name, parent=parent) + g.add_path(c) + return c + + return decorator + + +def response(method_name, res_code, des, schema=None): + """ + 获取方法的response + 生成字典{response:res_code:{description:des}} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + if not m.__swag['method'][method_name].get('responses'): + m.__swag['method'][method_name]['responses'] = {} + if not m.__swag['method'][method_name]['responses'].get(res_code): + m.__swag['method'][method_name]['responses'][res_code] = {} + m.__swag['method'][method_name]['responses'][res_code]['description'] = des + if schema: + m.__swag['method'][method_name]['responses'][res_code]['schema'] = schema + return m + + return decorator + + +def tags(method_name, *tags): + """ + 获取方法的tags + 生成字典{tags:tag} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + if not m.__swag['method'][method_name].get('tags'): + m.__swag['method'][method_name]['tags'] = [] + for i in tags: + m.__swag['method'][method_name]['tags'].append(i) + return m + + return decorator + + +def summary(method_name, summary): + """ + 获取方法的summary + 生成字典{summary:summary} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + m.__swag['method'][method_name]['summary'] = summary + return m + + return decorator + + +def description(method_name, des): + """ + 获取方法的description + 生成字典{description:des} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + m.__swag['method'][method_name]['description'] = des + return m + + return decorator + + +def operationId(method_name, oi): + """ + 获取方法的operationId + 生成字典{operationId:oi} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + m.__swag['method'][method_name]['operationId'] = oi + return m + + return decorator + + +def method(*args): + """ + 获取被装饰类包含的方法 + 生成method字典 + 并加入controller.__swag + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + d = {} + for i in args: + if hasattr(m, i): + d[i] = {} + m.__swag['method'] = d + return m + + return decorator + + +def definitions(t, *args): + """ + 添加definitions到g.py中 + """ + + def decorator(m): + g.add_definitions(t, m) + return m + + return decorator + + +def consumes(method_name, *consume): + """ + 获取方法的consumes + 生成字典{consumes:consumes} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + if not m.__swag['method'][method_name].get('consumes'): + m.__swag['method'][method_name]['consumes'] = [] + for i in consume: + m.__swag['method'][method_name]['consumes'].append(i) + return m + + return decorator + + +def produces(method_name, *pro): + """ + 获取方法的produces + 生成字典{produces:pro} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + if not m.__swag['method'][method_name].get('produces'): + m.__swag['method'][method_name]['produces'] = [] + for i in pro: + m.__swag['method'][method_name]['produces'].append(i) + return m + + return decorator + + +def parameter(method_name, *args): + """ + 获取方法的parameter + 生成字典{parameter:[]} + 并加入controller.__swag[method][method_name] + """ + + def decorator(m): + if not hasattr(m, '__swag'): + raise Exception('You need to write a path in {} first'.format(m.__name__)) + if not 'method' in m.__swag: + raise Exception('You need to write the methods in {}'.format(m.__name__)) + if not method_name in m.__swag['method']: + raise Exception('There is no method called {} in {}'.format(method_name, m.__name__)) + if not m.__swag['method'][method_name].get('parameters'): + m.__swag['method'][method_name]['parameters'] = [] + for i in args: + if i.get('schema'): + i['schema'] = {'$ref': '#/definitions/' + i['schema']} + m.__swag['method'][method_name]['parameters'].append(i) + return m + + return decorator diff --git a/pecan_swagger/g.py b/pecan_swagger/g.py index 383c2c6fda52f76e98717f62d5b99155dc7fc12f..15f610405e3b349cff971e534609cfc901d180c0 100644 --- a/pecan_swagger/g.py +++ b/pecan_swagger/g.py @@ -1,487 +1,617 @@ -import copy -import inspect - -from pecan import util as p_u -import wsme.types as wtypes -import six -import weakref - - -""" -global hierarchy module - -this module is at the heart of the swagger conversion utility. it -contains a global "hierarchy" object which will provide a single point -to assemble an application's swagger output. - -there are also several helper functions to assist in building the -hierarchy of controllers, routes, and methods. -""" - -_hierarchy = {} - -_http_methods = {'get': '', 'post': '', 'put': '', - 'patch': '', 'delete': '', - 'head': '', 'trace': ''} - -_all_wsme_types = wtypes.native_types + ( - wtypes.ArrayType, wtypes.DictType, wtypes.BinaryType, - wtypes.IntegerType, wtypes.StringType, wtypes.IPv4AddressType, - wtypes.IPv6AddressType, wtypes.UuidType, wtypes.Enum, wtypes.UnsetType) - -_definitions = {} - - -def add_path(c): - """adds a named controller to the hierarchy.""" - if _hierarchy.get(c.__swag['name']): - raise Exception( - 'name {} already exists in hierarchy'.format(c.__swag['name'])) - _hierarchy[c.__swag['name']] = c - - -def build_path(swaginfo): - """return the route path for a swag metadata item.""" - if swaginfo.get('parent') is not None: - return path_join(build_path(get_swag(swaginfo.get('parent'))), - swaginfo.get('endpoint')) - return swaginfo.get('endpoint') - - -def get_controller_paths(controllers, wsme_defs): - """ - get a list of paths with methods - - returns a list of tuples (controller path, methods). - """ - def get_methods_for_generic(name): - methods = [] - generic_handlers = lc.get(name).get('generic_handlers', {}) - for method, func in generic_handlers.items(): - if method == 'DEFAULT': - methods.append('get') - else: - methods.append(method.lower()) - # TODO drill down through decorators to get true function name - truename = getrealname(func) - del lc[truename] - return methods - - def append_methods_for_specific(name, tpl): - paths.append(tpl) - del lc[name] - - lc = copy.deepcopy(controllers) - paths = [] - # TODO incorporate method decorator, removing functions marked - if lc.get('index'): - for method in get_methods_for_generic('index'): - paths.append(('', (method, {}))) - - # for REST controller - for method, path in _http_methods.items(): - if lc.get(method): - spec = wsme_defs.get(method, {}) # add wsme definitions - append_methods_for_specific(method, (path, (method, spec))) - - if lc.get('get_all'): - # add wsme definitions - spec = wsme_defs.get('get_all') # add wsme definitions - append_methods_for_specific('get_all', ('', ('get', spec))) - if lc.get('get_one'): - # add wsme definitions - spec = wsme_defs.get('get_one') # add wsme definitions - append_methods_for_specific('get_one', ('', ('get', spec))) - - if lc.get('_default'): - append_methods_for_specific('_default', ('', ('*', {}))) - if lc.get('_route'): - append_methods_for_specific('_route', ('', ('*', {}))) - if lc.get('_lookup'): - del lc['_lookup'] - - if lc.get('_custom_actions'): - for ca, ca_method in lc['_custom_actions'].items(): - spec = wsme_defs.get(ca) # add wsme definitions - for m in ca_method: - paths.append((ca, (m.lower(), spec))) - del lc['_custom_actions'] - - generic_controllers = [c for c in lc if lc[c].get('generic')] - for controller in generic_controllers: - for method in get_methods_for_generic(controller): - paths.append((controller, (method, {}))) - for controller in lc: - if controller not in [path[0] for path in paths]: - paths.append((controller, ('get', {}))) - return paths - - -def get_wsme_defs(name): - - def datatype_to_type_and_format(datatype): - tf = {} - if datatype == 'uuid' or datatype == 'ipv4address' \ - or datatype == 'ipv6address' or datatype == 'date' \ - or datatype == 'time': - tf['type'] = 'string' - tf['format'] = datatype - elif datatype == 'datetime': - tf['type'] = 'string' - tf['format'] = 'date-time' - elif datatype == 'binary' or datatype == 'bytes': - # base64 encoded characters - tf['type'] = 'string' - tf['format'] = 'byte' - elif datatype == 'array' or datatype == 'boolean' \ - or datatype == 'integer' or datatype == 'string': - # no format - tf['type'] = datatype - elif datatype == 'float': - # number - tf['type'] = 'number' - tf['format'] = datatype - elif datatype == 'unicode' or datatype == 'str' \ - or datatype == 'text': - # primitive, no format - tf['type'] = 'string' - elif datatype == 'int' or datatype == 'decimal': - # primitive, no format - tf['type'] = 'integer' - elif datatype == 'bool': - # primitive, no format - tf['type'] = 'boolean' - elif datatype == 'enum': - tf['type'] = 'enum' - elif datatype == 'unset' or datatype == 'none': - tf['type'] = None - elif datatype == 'dict': - tf['type'] = 'object' - else: - tf['type'] = 'object' - return tf - - def class_to_name_str(c): - return (('%s' % c).replace('<', '').replace('>', '') - .replace('class ', '').replace('\'', '') - .split(' ', 1)[0].rsplit('.', 1)[-1]) - - def conv_class_name_to_type_str(cn): - type_str = '' - if cn.endswith('Type'): - type_str = cn[:-4] - elif cn == 'str': - type_str = 'string' - else: - type_str = cn - type_str = type_str.lower() - return type_str - - def get_type_str(obj, src_dict=None): - type_str = '' - if hasattr(obj, '__name__'): - type_str = obj.__name__ - else: - type_str = obj.__class__.__name__ - type_str = conv_class_name_to_type_str(type_str) - - tf = datatype_to_type_and_format(type_str) - - if hasattr(obj, 'basetype') and \ - (obj.__class__ not in _all_wsme_types or type_str == 'enum'): - # UserType or Enum - tf['type'] = get_type_str(obj.basetype) - - if isinstance(src_dict, dict): - # if dict is in args, set 'type' and 'format' into the dict and - # return - src_dict.update({'type': tf['type']}) - if 'format' in tf: - src_dict.update({'format': tf['format']}) - - # get datatype options. ex.) min_length, minimum, .. - for k, v in inspect.getmembers(obj): - if ((k == 'minimum' or k == 'maxmum' - or k == 'min_length' or k == 'max_length') - and v is not None): - src_dict[to_swag_key(k)] = v - elif k == 'pattern' and v is not None: - src_dict[to_swag_key(k)] = v.pattern - # TODO(shu-mutou): this should be removed for production. - # else: - # src_dict[to_swag_key(k)] = v - - if type_str == 'enum': - # EnumType use 'set' that doesn't have sequence. - # So use 'sorted' for keeping static output. - src_dict['enum'] = sorted([v for v in obj.values]) - - # return 'type' only - return tf['type'] - - def to_swag_key(key): - keys = { - 'doc': 'description', - 'arguments': 'parameters', - 'return_type': 'schema', - 'datatype': 'type', - 'mandatory': 'required', - 'sample': 'examples', - 'readonly': 'readOnly', - 'min_length': 'minLength', - 'max_length': 'maxLength', - } - if key in keys: - return keys[key] - else: - return key - - def get_wm_item_prop(item, isparams=False): - # Add prop into 'properties' and 'required' in 'Items Object' - # 'Items Object' can be part of 'Schema Object' or 'Items Object', - # and can use recursively - prop_dict = {} - # TODO(shu-mutou): this should be removed for production. - # prop_dict['obj'] = inspect.getmembers(item) - - _item = item - if wtypes.iscomplex(item): - _item = weakref.ref(item) - - for a, i in inspect.getmembers(_item): - if a == 'datatype': - datatype = get_type_str(i, prop_dict) - if datatype == 'array': - # if array, do recursively - prop_dict['items'] = inspect_wm_schema(i.item_type) - elif datatype == 'object': - # if obuject, do recursively - prop_dict['items'] = inspect_wm_schema(i) - elif a == 'default' and i: - prop_dict[to_swag_key(a)] = i - elif a == 'name' and isparams: - prop_dict[to_swag_key(a)] = i - elif a == 'mandatory' and i: - prop_dict[to_swag_key(a)] = i - elif a == 'readonly' and i: - prop_dict[to_swag_key(a)] = i - elif a == 'doc' and i is not None: - prop_dict[to_swag_key(a)] = i - - if isparams and prop_dict['type'] in ['object', 'array']: - prop_dict['schema'] = {'items': prop_dict['items'], - 'type': prop_dict['type']} - del prop_dict['type'] - del prop_dict['items'] - return prop_dict - - def get_wsattr_and_wsproperty(obj): - ws_dict = {} - for key, value in inspect.getmembers(obj): - if (key[0] != '_' and - (value.__class__.__name__ == 'wsattr' - or value.__class__.__name__ == 'wsproperty')): - ws_dict[key] = value - return ws_dict - - def inspect_wm_schema(schema_obj, isparams=False): - schema_dict = {} - # TODO(shu-mutou): this should be removed for production. - # schema_dict['obj'] = get_wsattr_and_wsproperty(schema_obj) - ws_len = len(get_wsattr_and_wsproperty(schema_obj)) - model_name = class_to_name_str(schema_obj) - - for key, obj in inspect.getmembers(schema_obj): - if (key[0] != '_' and - (obj.__class__.__name__ == 'wsattr' - or obj.__class__.__name__ == 'wsproperty')): - # TODO(shu-mutou): this should be removed for production. - # _definitions[model_name][to_swag_key(key)] = \ - # {'obj': inspect.getmembers(obj)} - - if ws_len == 1: - # single schema - schema_dict = get_wm_item_prop(obj, isparams) - else: - # multi property schema - schema_dict.update({'type': 'object'}) - # properties - if 'items' not in schema_dict: - schema_dict['items'] = {'properties': {}} - prop = {key: get_wm_item_prop(obj, isparams)} - # required as array of string - if 'required' in prop[key] and prop[key]['required'] \ - and isinstance(prop[key]['required'], bool): - if 'required' not in schema_dict: - schema_dict['required'] = [] - schema_dict['required'].append(key) - del prop[key]['required'] - schema_dict['items']['properties'].update(prop) - - if schema_obj not in _all_wsme_types: - if model_name not in _definitions: - _definitions[model_name] = schema_dict - schema_dict = {'$ref': "#/definitions/%s" % model_name} - - return schema_dict - - def get_wm_def(o): - wsme = {'description': ''} - for w, d in inspect.getmembers(o): - if w == 'arguments': - wsme[to_swag_key(w)] = [] - for arg in d: - # set each 'Parameter Object', it can include - # 'Items Object' recursively - item_dict = get_wm_item_prop(arg, True) - # TODO: MUST be set one of - # 'body|query|path|header|formData' - item_dict['in'] = 'query' - wsme[to_swag_key(w)].append(item_dict) - # 'body' should be set due to existing 'body' option - # in WSME expose - if o.body_type is not None: - # if body is set, last arg is it. - wsme[to_swag_key(w)][-1]['in'] = 'body' - elif w == 'doc' and d: - wsme[to_swag_key(w)] = d - elif w == 'return_type': - wsme['responses'] = {'status': {'description': ''}} - if d: - if d in _all_wsme_types: - # d is set single WSME type class or implicit type - wsme['responses']['status'][to_swag_key(w)] = ( - datatype_to_type_and_format( - conv_class_name_to_type_str( - class_to_name_str(d)))) - else: - # d is model class - wsme['responses']['status'][to_swag_key(w)] = ( - inspect_wm_schema(d, False)) - doc = inspect.getdoc(d) - if doc is not None: - wsme['responses']['status']['description'] = doc - elif w == 'status_code': - wsme['responses'][d] = wsme['responses']['status'] - del wsme['responses']['status'] - # TODO(shu-mutou): this should be removed for production. - # elif w == 'body_type': - # wsme[to_swag_key(w)] = get_type_str(d) - # elif w == 'extra_options' or w == 'ignore_extra_args' \ - # or w == 'name' or w == 'pass_request': - # wsme[to_swag_key(w)] = d - # else: - # wsme[to_swag_key(w)] = d - return wsme - - c = _hierarchy[name] - wsme_defs = {} - for k, v in inspect.getmembers(c): - if p_u.iscontroller(v): - for m, o in inspect.getmembers(v): - if m == "_wsme_definition": - wsme_defs[k] = get_wm_def(o) - - # TODO(shu-mutou): this should be removed for production. - # output wsme info into files by each controller for dev - # import pprint - # with open(name + '.txt', 'w') as fout: - # pprint.pprint(wsme_defs, stream=fout, indent=2) - - return wsme_defs - - -def get_controllers(name): - """ - get all the controllers associated with a path - - returns a dictionary of controllers indexed by their names. - """ - c = _hierarchy[name] - controllers = {k: p_u._cfg(v) - for k, v in c.__dict__.items() if p_u.iscontroller(v)} - cacts = {k: v for k, v in c.__dict__.items() if k == '_custom_actions'} - if len(cacts): - controllers.update(cacts) - return controllers - - -def get_paths(): - """ - return all the registered paths - - loops through the hierarchy and retuns a list of tuples containing the - paths and their methods. - - :returns: [(path, methods), ...] - """ - pathlist = [] - for name in _hierarchy: - fullpath = build_path(get_swag(name)) - controllers = get_controllers(name) - wsme_defs = get_wsme_defs(name) - paths = get_controller_paths(controllers, wsme_defs) - for path in paths: - ptuple = (path_join(fullpath, path[0]), path[1]) - pathlist.append(ptuple) - return pathlist - - -def get_swag(name): - """return the swag metadata from an named controller.""" - return _hierarchy.get(name).__swag - - -def getrealname(method): - """attempt to get a method's real name.""" - argspec = inspect.getargspec(method) - args = argspec[0] - if args and args[0] == 'self': - return method.__name__ - if hasattr(method, '__func__'): - method = method.__func__ - - func_closure = six.get_function_closure(method) - - # NOTE(sileht): if the closure is None we cannot look deeper, - # so return actual argspec, this occurs when the method - # is static for example. - if func_closure is None: - return method.__name__ - - closure = next( - ( - c for c in func_closure if six.callable(c.cell_contents) - ), - None - ) - method = closure.cell_contents - return getrealname(method) - - -def methods_get(name): - """get all the methods for a named controller.""" - c = _hierarchy[name] - mlist = [] - if hasattr(c, 'index') and p_u.iscontroller(c.index): - cfg = p_u._cfg(c.index) - if cfg.get('generic_handlers').get('DEFAULT'): - mlist.append('get') - mlist += cfg.get('allowed_methods') - for i in c.__dict__: - ii = getattr(c, i) - if hasattr(ii, '__swag'): - m = ii.__swag.get('method') - if m is not None: - mlist.append(m) - return mlist - - -def path_join(part1, part2): - """join two url paths.""" - if len(part2) == 0: - return part1 - sep = '/' - if part1[-1] == sep: - sep = '' - return part1 + sep + part2 +import copy +import inspect + +from pecan import util as p_u +import wsme.types as wtypes +import six +import weakref + +""" +global hierarchy module + +this module is at the heart of the swagger conversion utility. it +contains a global "hierarchy" object which will provide a single point +to assemble an application's swagger output. + +there are also several helper functions to assist in building the +hierarchy of controllers, routes, and methods. +""" + +_hierarchy = {} + +_http_methods = {'get': '', 'post': '', 'put': '', + 'patch': '', 'delete': '', + 'head': '', 'trace': ''} + +_all_wsme_types = wtypes.native_types + ( + wtypes.ArrayType, wtypes.DictType, wtypes.BinaryType, + wtypes.IntegerType, wtypes.StringType, wtypes.IPv4AddressType, + wtypes.IPv6AddressType, wtypes.UuidType, wtypes.Enum, wtypes.UnsetType) + +_definitions = {} + +all_definitions = {} + + +def add_path(c): + """adds a named controller to the hierarchy.""" + if _hierarchy.get(c.__swag['name']): + raise Exception( + 'name {} already exists in hierarchy'.format(c.__swag['name'])) + _hierarchy[c.__swag['name']] = c + + +def build_path(swaginfo): + """return the route path for a swag metadata item.""" + if swaginfo.get('parent') is not None: + return path_join(build_path(get_swag(swaginfo.get('parent'))), + swaginfo.get('endpoint')) + return swaginfo.get('endpoint') + + +def get_controller_paths(controllers, wsme_defs): + """ + get a list of paths with methods + + returns a list of tuples (controller path, methods). + """ + + def get_methods_for_generic(name): + methods = [] + generic_handlers = lc.get(name).get('generic_handlers', {}) + for method, func in generic_handlers.items(): + if method == 'DEFAULT': + methods.append('get') + else: + methods.append(method.lower()) + # TODO drill down through decorators to get true function name + truename = getrealname(func) + del lc[truename] + return methods + + def append_methods_for_specific(name, tpl): + paths.append(tpl) + del lc[name] + + lc = copy.deepcopy(controllers) + paths = [] + # TODO incorporate method decorator, removing functions marked + if lc.get('index'): + for method in get_methods_for_generic('index'): + paths.append(('', (method, {}))) + spec = wsme_defs.get('index') # add wsme definitions + append_methods_for_specific('index', ('', ('get', spec))) + + # for REST controller + for method, path in _http_methods.items(): + if lc.get(method): + spec = wsme_defs.get(method, {}) # add wsme definitions + append_methods_for_specific(method, (path, (method, spec))) + + if lc.get('get_all'): + # add wsme definitions + spec = wsme_defs.get('get_all') # add wsme definitions + append_methods_for_specific('get_all', ('get_all', ('get', spec))) + if lc.get('get_one'): + # add wsme definitions + spec = wsme_defs.get('get_one') # add wsme definitions + append_methods_for_specific('get_one', ('get_one', ('get', spec))) + + if lc.get('_default'): + append_methods_for_specific('_default', ('', ('*', {}))) + if lc.get('_route'): + append_methods_for_specific('_route', ('', ('*', {}))) + if lc.get('_lookup'): + del lc['_lookup'] + + if lc.get('_custom_actions'): + for ca, ca_method in lc['_custom_actions'].items(): + spec = wsme_defs.get(ca) # add wsme definitions + for m in ca_method: + paths.append((ca, (m.lower(), spec))) + del lc['_custom_actions'] + + generic_controllers = [c for c in lc if lc[c].get('generic')] + for controller in generic_controllers: + for method in get_methods_for_generic(controller): + paths.append((controller, (method, {}))) + for controller in lc: + if controller not in [path[0] for path in paths]: + paths.append((controller, ('get', {}))) + return paths + + +def get_wsme_defs(name): + """获得wsexpose装饰方法的definitions.""" + + def datatype_to_type_and_format(datatype): + tf = {} + if datatype == 'uuid' or datatype == 'ipv4address' \ + or datatype == 'ipv6address' or datatype == 'date' \ + or datatype == 'time': + tf['type'] = 'string' + tf['format'] = datatype + elif datatype == 'datetime': + tf['type'] = 'string' + tf['format'] = 'date-time' + elif datatype == 'binary' or datatype == 'bytes': + # base64 encoded characters + tf['type'] = 'string' + tf['format'] = 'byte' + elif datatype == 'array' or datatype == 'boolean' \ + or datatype == 'integer' or datatype == 'string': + # no format + tf['type'] = datatype + elif datatype == 'float': + # number + tf['type'] = 'number' + tf['format'] = datatype + elif datatype == 'unicode' or datatype == 'str' \ + or datatype == 'text': + # primitive, no format + tf['type'] = 'string' + elif datatype == 'int' or datatype == 'decimal': + # primitive, no format + tf['type'] = 'integer' + elif datatype == 'bool': + # primitive, no format + tf['type'] = 'boolean' + elif datatype == 'enum': + tf['type'] = 'enum' + elif datatype == 'unset' or datatype == 'none': + tf['type'] = None + elif datatype == 'dict': + tf['type'] = 'object' + else: + tf['type'] = 'object' + return tf + + def class_to_name_str(c): + return (('%s' % c).replace('<', '').replace('>', '') + .replace('class ', '').replace('\'', '') + .split(' ', 1)[0].rsplit('.', 1)[-1]) + + def conv_class_name_to_type_str(cn): + type_str = '' + if cn.endswith('Type'): + type_str = cn[:-4] + elif cn == 'str': + type_str = 'string' + else: + type_str = cn + type_str = type_str.lower() + return type_str + + def get_type_str(obj, src_dict=None): + type_str = '' + if hasattr(obj, '__name__'): + type_str = obj.__name__ + else: + type_str = obj.__class__.__name__ + type_str = conv_class_name_to_type_str(type_str) + + tf = datatype_to_type_and_format(type_str) + + if hasattr(obj, 'basetype') and \ + (obj.__class__ not in _all_wsme_types or type_str == 'enum'): + # UserType or Enum + tf['type'] = get_type_str(obj.basetype) + + if isinstance(src_dict, dict): + # if dict is in args, set 'type' and 'format' into the dict and + # return + src_dict.update({'type': tf['type']}) + if 'format' in tf: + src_dict.update({'format': tf['format']}) + + # get datatype options. ex.) min_length, minimum, .. + for k, v in inspect.getmembers(obj): + if ((k == 'minimum' or k == 'maxmum' + or k == 'min_length' or k == 'max_length') + and v is not None): + src_dict[to_swag_key(k)] = v + elif k == 'pattern' and v is not None: + src_dict[to_swag_key(k)] = v.pattern + # TODO(shu-mutou): this should be removed for production. + # else: + # src_dict[to_swag_key(k)] = v + + if type_str == 'enum': + # EnumType use 'set' that doesn't have sequence. + # So use 'sorted' for keeping static output. + src_dict['enum'] = sorted([v for v in obj.values]) + + # return 'type' only + return tf['type'] + + def to_swag_key(key): + keys = { + 'doc': 'description', + 'arguments': 'parameters', + 'return_type': 'schema', + 'datatype': 'type', + 'mandatory': 'required', + 'sample': 'examples', + 'readonly': 'readOnly', + 'min_length': 'minLength', + 'max_length': 'maxLength', + } + if key in keys: + return keys[key] + else: + return key + + def get_wm_item_prop(item, isparams=False): + # Add prop into 'properties' and 'required' in 'Items Object' + # 'Items Object' can be part of 'Schema Object' or 'Items Object', + # and can use recursively + prop_dict = {} + # TODO(shu-mutou): this should be removed for production. + # prop_dict['obj'] = inspect.getmembers(item) + + _item = item + if wtypes.iscomplex(item): + _item = weakref.ref(item) + + for a, i in inspect.getmembers(_item): + if a == 'datatype': + datatype = get_type_str(i, prop_dict) + if datatype == 'array': + # if array, do recursively + prop_dict['items'] = inspect_wm_schema(i.item_type) + elif datatype == 'object': + # if obuject, do recursively + prop_dict['items'] = inspect_wm_schema(i) + elif a == 'default' and i: + prop_dict[to_swag_key(a)] = i + elif a == 'name' and isparams: + prop_dict[to_swag_key(a)] = i + elif a == 'mandatory' and i: + prop_dict[to_swag_key(a)] = i + elif a == 'readonly' and i: + prop_dict[to_swag_key(a)] = i + elif a == 'doc' and i is not None: + prop_dict[to_swag_key(a)] = i + + if isparams and prop_dict['type'] in ['object', 'array']: + prop_dict['schema'] = {'items': prop_dict['items'], + 'type': prop_dict['type']} + if prop_dict['type'] == 'object': + prop_dict['schema'] = prop_dict['items'] + del prop_dict['type'] + del prop_dict['items'] + return prop_dict + + def get_wsattr_and_wsproperty(obj): + ws_dict = {} + for key, value in inspect.getmembers(obj): + if (key[0] != '_' and + (value.__class__.__name__ == 'wsattr' + or value.__class__.__name__ == 'wsproperty')): + ws_dict[key] = value + return ws_dict + + def inspect_wm_schema(schema_obj, isparams=False): + schema_dict = {} + # TODO(shu-mutou): this should be removed for production. + # schema_dict['obj'] = get_wsattr_and_wsproperty(schema_obj) + ws_len = len(get_wsattr_and_wsproperty(schema_obj)) + model_name = class_to_name_str(schema_obj) + + for key, obj in inspect.getmembers(schema_obj): + if (key[0] != '_' and + (obj.__class__.__name__ == 'wsattr' + or obj.__class__.__name__ == 'wsproperty')): + # TODO(shu-mutou): this should be removed for production. + # _definitions[model_name][to_swag_key(key)] = \ + # {'obj': inspect.getmembers(obj)} + + if ws_len == 1: + # single schema + schema_dict = get_wm_item_prop(obj, isparams) + else: + # multi property schema + schema_dict.update({'type': 'object'}) + # properties + if 'properties' not in schema_dict: + schema_dict['properties'] = {} + prop = {key: get_wm_item_prop(obj, isparams)} + # required as array of string + if 'required' in prop[key] and prop[key]['required'] \ + and isinstance(prop[key]['required'], bool): + if 'required' not in schema_dict: + schema_dict['required'] = [] + schema_dict['required'].append(key) + del prop[key]['required'] + schema_dict['properties'].update(prop) + + if schema_obj not in _all_wsme_types: + if model_name not in _definitions: + _definitions[model_name] = schema_dict + schema_dict = {'$ref': "#/definitions/%s" % model_name} + + return schema_dict + + def get_wm_def(o): + wsme = {'description': ''} + for w, d in inspect.getmembers(o): + if w == 'arguments': + wsme[to_swag_key(w)] = [] + for arg in d: + # set each 'Parameter Object', it can include + # 'Items Object' recursively + item_dict = get_wm_item_prop(arg, True) + # TODO: MUST be set one of + # 'body|query|path|header|formData' + item_dict['in'] = 'query' + wsme[to_swag_key(w)].append(item_dict) + # 'body' should be set due to existing 'body' option + # in WSME expose + if o.body_type is not None: + # if body is set, last arg is it. + wsme[to_swag_key(w)][-1]['in'] = 'body' + elif w == 'doc' and d: + wsme[to_swag_key(w)] = d + elif w == 'return_type': + wsme['responses'] = {'status': {'description': ''}} + if d: + if d in _all_wsme_types: + # d is set single WSME type class or implicit type + wsme['responses']['status'][to_swag_key(w)] = ( + datatype_to_type_and_format( + conv_class_name_to_type_str( + class_to_name_str(d)))) + else: + # d is model class + wsme['responses']['status'][to_swag_key(w)] = ( + inspect_wm_schema(d, False)) + doc = inspect.getdoc(d) + if doc is not None: + wsme['responses']['status']['description'] = doc + elif w == 'status_code': + wsme['responses'][d] = wsme['responses']['status'] + del wsme['responses']['status'] + # TODO(shu-mutou): this should be removed for production. + # elif w == 'body_type': + # wsme[to_swag_key(w)] = get_type_str(d) + # elif w == 'extra_options' or w == 'ignore_extra_args' \ + # or w == 'name' or w == 'pass_request': + # wsme[to_swag_key(w)] = d + # else: + # wsme[to_swag_key(w)] = d + return wsme + + c = _hierarchy[name] + wsme_defs = {} + for k, v in inspect.getmembers(c): + if p_u.iscontroller(v): + for m, o in inspect.getmembers(v): + if m == "_wsme_definition": + wsme_defs[k] = get_wm_def(o) + + # TODO(shu-mutou): this should be removed for production. + # output wsme info into files by each controller for dev + # import pprint + # with open(name + '.txt', 'w') as fout: + # pprint.pprint(wsme_defs, stream=fout, indent=2) + + return wsme_defs + + +def get_controllers(name): + """ + get all the controllers associated with a path + + returns a dictionary of controllers indexed by their names. + """ + c = _hierarchy[name] + controllers = {k: p_u._cfg(v) + for k, v in c.__dict__.items() if p_u.iscontroller(v)} + cacts = {k: v for k, v in c.__dict__.items() if k == '_custom_actions'} + if len(cacts): + controllers.update(cacts) + return controllers + + +def get_paths(): + """ + return all the registered paths + + loops through the hierarchy and retuns a list of tuples containing the + paths and their methods. + + :returns: [(path, methods), ...] + """ + pathlist = [] + for name in _hierarchy: + fullpath = build_path(get_swag(name)) + controllers = get_controllers(name) + paths = get_controller_paths(controllers, combine_method_defs(name)) + for path in paths: + ptuple = (path_join(fullpath, path[0]), path[1]) + pathlist.append(ptuple) + combine_all_definitions() + return pathlist + + +def get_swag(name): + """return the swag metadata from an named controller.""" + return _hierarchy.get(name).__swag + + +def getrealname(method): + """attempt to get a method's real name.""" + argspec = inspect.getargspec(method) + args = argspec[0] + if args and args[0] == 'self': + return method.__name__ + if hasattr(method, '__func__'): + method = method.__func__ + + func_closure = six.get_function_closure(method) + + # NOTE(sileht): if the closure is None we cannot look deeper, + # so return actual argspec, this occurs when the method + # is static for example. + if func_closure is None: + return method.__name__ + + closure = next( + ( + c for c in func_closure if six.callable(c.cell_contents) + ), + None + ) + method = closure.cell_contents + return getrealname(method) + + +def methods_get(name): + """get all the methods for a named controller.""" + c = _hierarchy[name] + mlist = [] + if hasattr(c, 'index') and p_u.iscontroller(c.index): + cfg = p_u._cfg(c.index) + if cfg.get('generic_handlers') and cfg.get('generic_handlers').get('DEFAULT'): + mlist.append('get') + if cfg.get('allowed_methods'): + mlist += cfg.get('allowed_methods') + if hasattr(c, '__swag') and c.__swag.get('method'): + for i in c.__swag['method']: + mlist.append(i) + # for i in c.__dict__: + # ii = getattr(c, i) + # if hasattr(ii, '__swag'): + # m = ii.__swag.get('method') + # if m is not None: + # mlist.append(m) + return mlist + + +def path_join(part1, part2): + """join two url paths.""" + if len(part2) == 0: + return part1 + sep = '/' + if part1[-1] == sep: + sep = '' + return part1 + sep + part2 + + +def get_def(name): + """获取所有expose装饰方法的definitions.""" + c = _hierarchy[name] + if hasattr(c, '__swag') and c.__swag.get('method'): + return c.__swag['method'] + return {} + + +def combine_method_defs(name): + """合并expose和wsexpose装饰方法的definitions.""" + + c = _hierarchy[name] + lc = copy.deepcopy(c) + w_d = get_wsme_defs(name) + e_d = get_def(name) + if w_d and e_d: + for i in w_d: + if e_d.get(i): + for j in w_d[i]: + e_d[i][j] = w_d[i][j] + else: + e_d[i] = w_d[i] + elif w_d: + return w_d + else: + return e_d + return e_d + + +def format_type(t): + """ + 判断数据类型是否为自定义的数据类型 + """ + + def datatype_to_type_and_format(datatype): + + tf = {} + if datatype == 'uuid' or datatype == 'ipv4address' \ + or datatype == 'ipv6address' or datatype == 'date' \ + or datatype == 'time': + tf['type'] = 'string' + tf['format'] = datatype + elif datatype == 'datetime': + tf['type'] = 'string' + tf['format'] = 'date-time' + elif datatype == 'binary' or datatype == 'bytes': + # base64 encoded characters + tf['type'] = 'string' + tf['format'] = 'byte' + elif datatype == 'array' or datatype == 'boolean' \ + or datatype == 'integer' or datatype == 'string': + # no format + tf['type'] = datatype + elif datatype == 'float': + # number + tf['type'] = 'number' + tf['format'] = datatype + elif datatype == 'unicode' or datatype == 'str' \ + or datatype == 'text': + # primitive, no format + tf['type'] = 'string' + elif datatype == 'int' or datatype == 'decimal': + # primitive, no format + tf['type'] = 'integer' + elif datatype == 'bool': + # primitive, no format + tf['type'] = 'boolean' + elif datatype == 'enum': + tf['type'] = 'enum' + elif datatype == 'unset' or datatype == 'none': + tf['type'] = None + elif datatype == 'dict': + tf['type'] = 'object' + else: + tf['$ref'] = '#/definitions/' + datatype + return tf + + def conv_class_name_to_type_str(cn): + type_str = '' + if cn.endswith('Type'): + type_str = cn[:-4] + elif cn == 'str': + type_str = 'string' + else: + type_str = cn + type_str = type_str + return type_str + + def class_to_name_str(c): + return (('%s' % c).replace('<', '').replace('>', '') + .replace('class ', '').replace('\'', '') + .split(' ', 1)[0].rsplit('.', 1)[-1]) + + return datatype_to_type_and_format(conv_class_name_to_type_str(class_to_name_str(t))) + + +def add_definitions(t, m): + """将定义的类添加到all_definitions.""" + + if not all_definitions.get(m.__name__): + d = {} + d['type'] = t + d['properties'] = {} + for i, j in inspect.getmembers(m): + if not i.endswith('__'): + d['properties'][i] = format_type(j) + all_definitions[m.__name__] = d + + +def combine_all_definitions(): + """合并自定义和wsexpose解析出的definitions.""" + + for i in all_definitions: + if not _definitions.get(i): + _definitions[i] = all_definitions[i] + + return _definitions diff --git a/pecan_swagger/utils.py b/pecan_swagger/utils.py index aa3d3b6155a6322005791e833abb278cd6ed2a01..889fbf4d19bf24a2948c156be442f15db5ec7ccd 100644 --- a/pecan_swagger/utils.py +++ b/pecan_swagger/utils.py @@ -1,64 +1,108 @@ -from pecan_swagger import g as g - - -""" -utility function module - -this module contains the utility functions to assemble the swagger -dictionary object. they can be consumed by end-user applications to -build up swagger objects for applications. - -functions: -swagger_build -- build a full swagger dictionary -""" - - -def swagger_build(title, version): - swag = dict() - swag['swagger'] = '2.0' - swag['info'] = dict(title=title, version=version) - swag['consumes'] = [] - swag['produces'] = [] - swag['paths'] = {} - for p in g.get_paths(): - if p[0] not in swag['paths']: - swag['paths'][p[0]] = _tuple_to_dict(p[1]) - elif len(p[1]) > 0: - swag['paths'][p[0]].update(_tuple_to_dict(p[1])) - swag['definitions'] = g._definitions - return swag - - -def _tuple_to_dict(tpl): - """Convert tuple to dictionary - - each tuple must have key and value. - This function arrows taple including lists. - - ex.) acceptable taple - OK: ('/', ('get',(('desc',{}),('res',{}),('params',['id','name'])))) - NG: ('/', ('get',('desc','res',('params',['id','name'])))) - """ - if isinstance(tpl, list): - d = [] - for e in tpl: - d.append(_tuple_to_dict(e)) - elif isinstance(tpl, tuple): - d = {} - if isinstance(tpl[0], tuple): - # tuple has some child tuple - for e in tpl: - d[e[0]] = _tuple_to_dict(e[1]) - elif isinstance(tpl[0], list): - # list member should be processed recursively - d = _tuple_to_dict(tpl[0]) - else: - if len(tpl) == 2: - # single tuple node - d[tpl[0]] = _tuple_to_dict(tpl[1]) - else: - raise Exception(tpl) - else: - # value or dict - d = tpl - return d +from pecan_swagger import g as g +import json +import yaml + +""" +utility function module + +this module contains the utility functions to assemble the swagger +dictionary object. they can be consumed by end-user applications to +build up swagger objects for applications. + +functions: +swagger_build -- build a full swagger dictionary +""" + + +def swagger_build(title, version): + swag = dict() + swag['swagger'] = '2.0' + swag['info'] = dict(title=title, version=version) + swag['consumes'] = [] + swag['produces'] = [] + swag['paths'] = {} + swag['tags'] = [] + swag['schemes'] = [] + for p in g.get_paths(): + if p[0] not in swag['paths']: + swag['paths'][p[0]] = _tuple_to_dict(p[1]) + elif len(p[1]) > 0: + swag['paths'][p[0]].update(_tuple_to_dict(p[1])) + swag['definitions'] = g._definitions + return swag + + +def print_json(s, output_path): + json_str = json.dumps(s, indent=4) + f = open(output_path, "w") + f.write(json_str) + f.close() + + +def print_yaml(s, output_path): + yaml_str = yaml.dump(s, indent=4) + f = open(output_path, "w") + f.write(yaml_str) + f.close() + + +def add_info(s, description=None, terms_of_service=None, contact=None, license=None): + if description is not None: + s['info']['description'] = description + if terms_of_service is not None: + s['info']['termsOfService'] = terms_of_service + if contact is not None: + s['info']['contact'] = contact + if license is not None: + s['info']['license'] = license + return s + + +def add_external_docs(s, external_docs): + s["externalDocs"] = external_docs + return s + + +def add_host(s, host): + s["host"] = host + return s + + +def add_base_path(s, basepath): + s["basePath"] = basepath + return s + + +def _tuple_to_dict(tpl): + """Convert tuple to dictionary + + each tuple must have key and value. + This function arrows taple including lists. + + ex.) acceptable taple + OK: ('/', ('get',(('desc',{}),('res',{}),('params',['id','name'])))) + NG: ('/', ('get',('desc','res',('params',['id','name'])))) + """ + if isinstance(tpl, list): + d = [] + for e in tpl: + d.append(_tuple_to_dict(e)) + elif isinstance(tpl, tuple): + d = {} + if isinstance(tpl[0], tuple): + # tuple has some child tuple + for e in tpl: + d[e[0]] = _tuple_to_dict(e[1]) + elif isinstance(tpl[0], list): + # list member should be processed recursively + d = _tuple_to_dict(tpl[0]) + else: + if len(tpl) == 2: + # single tuple node + d[tpl[0]] = _tuple_to_dict(tpl[1]) + else: + raise Exception(tpl) + else: + # value or dict + d = tpl + return d