From fc97c59b504a64a4a87c217dc14f52e9783a5289 Mon Sep 17 00:00:00 2001 From: Stick_to_yourself <1422737964@qq.com> Date: Tue, 20 Dec 2022 15:00:48 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E4=BC=98=E5=8C=96vul=E4=B8=80=E9=94=AE?= =?UTF-8?q?=E6=89=AB=E6=8F=8F=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../sysom_vul/apps/vul/async_fetch.py | 196 ++++++++++++++++++ sysom_server/sysom_vul/apps/vul/vul.py | 13 +- 2 files changed, 208 insertions(+), 1 deletion(-) create mode 100644 sysom_server/sysom_vul/apps/vul/async_fetch.py diff --git a/sysom_server/sysom_vul/apps/vul/async_fetch.py b/sysom_server/sysom_vul/apps/vul/async_fetch.py new file mode 100644 index 00000000..e059b267 --- /dev/null +++ b/sysom_server/sysom_vul/apps/vul/async_fetch.py @@ -0,0 +1,196 @@ +''' +@File: async_fetch.py +@Time: 2022-12-20 14:54:18 +@Author: DM +@Email: wb-msm261421@alibaba-inc.com +@Desc: 异步获取VUl数据 +''' + +import json +import asyncio +import urllib.parse +import requests +from typing import List, Dict, Union +from loguru import logger +from django.db import connection +from aiohttp import ClientSession +from asyncio import AbstractEventLoop +from aiohttp.client_exceptions import ContentTypeError + +from .models import VulAddrModel + + +class FetchVulData: + ''' + description: + return {*} + ''' + CONCURRENCY = 7 + + def __init__(self, + loop: AbstractEventLoop = None, + instance: VulAddrModel = None, + session: ClientSession = None, + cve_data_path: List[str] = None, + total_page: int = 98 + ) -> None: + self._uri = 'https://anas.openanolis.cn/api/portal/v1/cves/?format=json&page_num={}&page_size=50' + self.session = session + self.loop = loop + self._instance = instance + self.total_page = total_page + self._cve_data_path = cve_data_path + self._tasks: List[asyncio.Task] = [] + self.semaphore = asyncio.Semaphore(self.CONCURRENCY) + self._results: List[Dict] = [] + + @property + def request_session(self) -> ClientSession: + if self.session is None: + self.session = ClientSession() + return self.session + + async def fetch(self, kwargs) -> Union[Dict, str]: + """ + 获取请求数据 + + """ + result = None + try: + async with self.semaphore: + try: + async with self.request_session.request(**kwargs) as response: + result = await response.json() + except ContentTypeError: + res = await response.text() + result = f'被检测为Spider: {res}' + finally: + # await self._session_close() + ... + return result + + def result_callback(self, future: asyncio.Task) -> None: + res = future.result() + if isinstance(res, str): + logger.error(res) + return + if not self._cve_data_path: + self._results.extend(res) + else: + if len(self._cve_data_path) >= 1: + for key in self._cve_data_path: + res = res.get(key) + self._results.extend(res) + + async def _session_close(self): + await self.session.close() + + async def start(self) -> None: + for i in range(1, self.total_page+1): + kwargs = self.make_request_params(instance=self._instance, page_num=i) + # kwargs['str_or_url'] = kwargs.pop('url') + task: asyncio.Task = self.loop.create_task( + self.fetch(kwargs)) + task.add_done_callback(self.result_callback) + self._tasks.append(task) + + await asyncio.gather(*self._tasks) + await self._session_close() + + @property + def results(self): + return self._results + + @staticmethod + def make_request_params(instance, page_num: int = 1) -> Dict: + """ + 结构化请求的参数 + + { + 'method': instance.get_method_display(), + 'url': instance.url, + 'headers': json.loads(instance.headers), + 'data': json.loads(instance.body), + 'params': json.loads(instance.params), + 'auth': auth + } + """ + def _parase_query(q: str) -> Dict: + return {item[0]: item[1] for item in [i.split('=') for i in q.split("&")]} + + kwargs = dict() + kwargs['method'] = instance.get_method_display() + kwargs['headers'] = json.loads(instance.headers) + kwargs['data'] = json.loads(instance.body) + kwargs['params'] = json.loads(instance.params) + + if instance.authorization_type.lower() == "basic": + authorization_body = json.loads(instance.authorization_body) + auth = (authorization_body.get('username'), + authorization_body.get('password')) + kwargs['auth'] = auth + + uri: str = instance.url + + uris = uri.split('?') + if len(uris) == 1: + kwargs['url'] = uri + else: + base_url, query = uris[0], uris[1] + query = _parase_query(query) + query['page_num'] = page_num + params = urllib.parse.urlencode(query=query) + kwargs['url'] = f'{base_url}?{params}' + return kwargs + + @classmethod + def _get_page_total_num(cls, kwargs) -> Union[bool, int]: + response = requests.request(**kwargs) + if response.status_code == 200: + result = response.json() + return result['data']['total_page'] + else: + return False + + @classmethod + def _update_vul_data_status(cls, instance: VulAddrModel, status: int): + try: + instance.status = status + instance.save() + finally: + connection.close() + + @classmethod + def run(cls, instance: VulAddrModel, cve_data_path:List[str], loop=None): + """实例化一个event loop + + @instance VulAddrModel对象 (必填参数) + @cve_data_path 解析cve数据的结构体 [] + : loop 默认值为None, 不传递可以new_event_loop + + return [] + """ + + kwargs = cls.make_request_params(instance=instance) + page_total_num = cls._get_page_total_num(kwargs) + if not page_total_num: + logger.error(f'总页码数获取失败, 参数: {kwargs}') + cls._update_vul_data_status(instance, 1) + raise Exception('总页码数获取失败, 参数: {kwargs}') + + cls._update_vul_data_status(instance, 0) + + logger.info(f'总页数: {page_total_num}') + + _loop = loop or asyncio.new_event_loop() + asyncio.set_event_loop(_loop) + spider = cls( + loop=_loop, + instance=instance, + cve_data_path=cve_data_path, + total_page=page_total_num + ) + spider.loop.run_until_complete(spider.start()) + spider.loop.close() + + return spider.results diff --git a/sysom_server/sysom_vul/apps/vul/vul.py b/sysom_server/sysom_vul/apps/vul/vul.py index 603269b6..25891bb9 100644 --- a/sysom_server/sysom_vul/apps/vul/vul.py +++ b/sysom_server/sysom_vul/apps/vul/vul.py @@ -19,6 +19,7 @@ from apps.host.models import HostModel from apps.vul.ssh_pool import SshProcessQueueManager, VulTaskManager from lib.utils import human_datetime +from .async_fetch import FetchVulData def update_vul(): @@ -45,7 +46,7 @@ def update_vul_db(): logger.info("Try to get vul db info") vul_addr_obj = VulDataParse(vul_addr) try: - body = vul_addr_obj.get_vul_data() + body = vul_addr_obj._get_vul_data() if body: vul_addr_obj.parse_and_store_vul_data(body) except Exception as e: @@ -140,6 +141,15 @@ class VulDataParse(object): logger.warning(e) return vul_data + def _get_vul_data(self): + """ + 异步获取vul + """ + return FetchVulData.run( + instance=self.vul_addr_obj, + cve_data_path=self.cve_data_path + ) + def parse_and_store_vul_data(self, body): cve_data = body logger.info("Update sys_vul vul data") @@ -257,6 +267,7 @@ done hosts = [item['ip'] for item in HostModel.objects.all().values('ip')] vtm = VulTaskManager(hosts, cmd) results = vtm.run(VulTaskManager.run_command) + logger.info(results) # # cve2host_info={"cve1":[(host,software,version,os)]} # [{'host': 'GqYLM32pIZaNH0rOjd7JViwxPs', 'ret': {'status': 1, 'result': timeout('timed out')}}] -- Gitee From 901a604680b37ffd36356ebf48a374dee4bce447 Mon Sep 17 00:00:00 2001 From: Stick_to_yourself <1422737964@qq.com> Date: Tue, 20 Dec 2022 15:04:00 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E6=B8=85=E9=99=A4=E6=89=93=E5=8D=B0,=20?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BE=9D=E8=B5=96=E5=8C=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- script/server/0_env/requirements.txt | 1 + sysom_server/sysom_vul/apps/vul/vul.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/script/server/0_env/requirements.txt b/script/server/0_env/requirements.txt index 633f8a30..49431b0f 100644 --- a/script/server/0_env/requirements.txt +++ b/script/server/0_env/requirements.txt @@ -1,5 +1,6 @@ cffi==1.15.1 aiofiles==0.8.0 +aiohttp==3.8.3 alembic==1.7.7 anyio==3.6.2 asyncer==0.0.2 diff --git a/sysom_server/sysom_vul/apps/vul/vul.py b/sysom_server/sysom_vul/apps/vul/vul.py index 25891bb9..2c2bccf8 100644 --- a/sysom_server/sysom_vul/apps/vul/vul.py +++ b/sysom_server/sysom_vul/apps/vul/vul.py @@ -267,7 +267,6 @@ done hosts = [item['ip'] for item in HostModel.objects.all().values('ip')] vtm = VulTaskManager(hosts, cmd) results = vtm.run(VulTaskManager.run_command) - logger.info(results) # # cve2host_info={"cve1":[(host,software,version,os)]} # [{'host': 'GqYLM32pIZaNH0rOjd7JViwxPs', 'ret': {'status': 1, 'result': timeout('timed out')}}] -- Gitee From 1c54f94c513d9c95ae61b943ea7ec3e256b3a2c9 Mon Sep 17 00:00:00 2001 From: Stick_to_yourself <1422737964@qq.com> Date: Tue, 20 Dec 2022 15:04:52 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E6=B8=85=E9=99=A4=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sysom_server/sysom_vul/apps/vul/async_fetch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sysom_server/sysom_vul/apps/vul/async_fetch.py b/sysom_server/sysom_vul/apps/vul/async_fetch.py index e059b267..669a7c6d 100644 --- a/sysom_server/sysom_vul/apps/vul/async_fetch.py +++ b/sysom_server/sysom_vul/apps/vul/async_fetch.py @@ -2,7 +2,6 @@ @File: async_fetch.py @Time: 2022-12-20 14:54:18 @Author: DM -@Email: wb-msm261421@alibaba-inc.com @Desc: 异步获取VUl数据 ''' -- Gitee From a13d00302a7cce2527b7ba68e30f2c64cc079bd0 Mon Sep 17 00:00:00 2001 From: Stick_to_yourself <1422737964@qq.com> Date: Tue, 20 Dec 2022 15:05:41 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E6=B8=85=E9=99=A4=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sysom_server/sysom_vul/apps/vul/async_fetch.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sysom_server/sysom_vul/apps/vul/async_fetch.py b/sysom_server/sysom_vul/apps/vul/async_fetch.py index 669a7c6d..0b7f1d08 100644 --- a/sysom_server/sysom_vul/apps/vul/async_fetch.py +++ b/sysom_server/sysom_vul/apps/vul/async_fetch.py @@ -20,10 +20,7 @@ from .models import VulAddrModel class FetchVulData: - ''' - description: - return {*} - ''' + CONCURRENCY = 7 def __init__(self, -- Gitee