1 Star 0 Fork 3

JOHNSON / multithreaded-socket

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
protocol.py 2.25 KB
一键复制 编辑 原始数据 按行查看 历史
JOHNSON 提交于 2021-07-27 12:17 . 仿 HTTP 协议传输
from buff import Buff
from urllib.parse import quote, unquote
from typing import AnyStr
JSON_MESSAGE = "JSON"
TEXT_MESSAGE = "TEXT"
FILE_MESSAGE = "FILE"
class HeaderList(dict):
@property
def str(self):
d: str = ""
for i in self.items():
d += "{KEY}: {VALUE}\r\n".format(KEY=i[0], VALUE=i[1])
return d
class Protocol(object):
"""
协议结构如下, 参考 HTTP 协议包结构
type version
key: value
data
协议内容如下, 参考 HTTP 协议包结构
FILE V1.0
Content-Length: 5
12345
头部信息
Content-Length // 内容长度
"""
VERSION: str = "1.0"
TYPE: str
HEADER: HeaderList = HeaderList()
BODY: str
def __init__(self, ty=None, data=None):
if data:
self.set_content(data)
if ty:
self.set_type(ty)
def set_type(self, ty: str):
self.TYPE = ty
def set_header(self, key, value):
self.HEADER[key] = value
def set_content(self, data: AnyStr):
if isinstance(data, str):
data = data.encode()
self.set_header("Content-Length", len(data))
self.BODY: bytes = data
@property
def header(self):
return self.HEADER
@property
def body(self):
return self.BODY
@property
def type(self):
return self.TYPE
@property
def version(self):
return self.VERSION
@staticmethod
def handler(buff: Buff):
l: str = buff.readLine(deocde=True)
c = l.split(" ", 1)
if len(c) < 2:
return None
p = Protocol()
p.TYPE = c[0].strip()
p.VERSION = c[1].strip()
while 1:
h: str = buff.readLine(deocde=True)
e = h.split(": ")
if len(e) != 2:
break
p.HEADER[e[0].strip()] = e[1].strip()
if p.HEADER.get('Content-Length'):
content_length = int(p.HEADER['Content-Length'])
p.BODY = buff.read(content_length)
return p
return None
@property
def raw(self):
# Bytes 数据包结构
return self.TYPE.encode() + b" " + self.VERSION.encode() + b"\r\n" + self.HEADER.str.encode() + b"\r\n" + \
self.BODY
Python
1
https://gitee.com/J0hNs0N/multithreaded-socket.git
git@gitee.com:J0hNs0N/multithreaded-socket.git
J0hNs0N
multithreaded-socket
multithreaded-socket
master

搜索帮助