代码拉取完成,页面将自动刷新
from gmssl import sm4
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives import serialization
import socket
import sys
from common import fileSelector, showImage
import base64
RFC3526_PRIME_2048 = 0xFFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF
RFC3526_GENERATOR = 2
def generate_dh_parameters():
parameter_numbers = dh.DHParameterNumbers(
p=RFC3526_PRIME_2048,
g=RFC3526_GENERATOR
)
return parameter_numbers.parameters()
def client_dh_key_agreement(conn, parameters):
"""客户端DH密钥交换"""
# 生成客户端DH密钥对
client_private_key = parameters.generate_private_key()
# 接收服务端公钥
server_public_bytes = conn.recv(1024)
server_public_key = serialization.load_pem_public_key(
server_public_bytes,
)
# 发送公钥
client_public_key = client_private_key.public_key()
client_public_bytes = client_public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
conn.sendall(client_public_bytes)
# 计算共享密钥
shared_key = client_private_key.exchange(server_public_key)
return shared_key[:16] # SM4需要16字节密钥
def sm4_encrypt(data, key):
"""SM4加密"""
crypt_sm4 = sm4.CryptSM4()
crypt_sm4.set_key(key, sm4.SM4_ENCRYPT)
return crypt_sm4.crypt_ecb(data)
def sm4_decrypt(data, key):
"""SM4解密"""
crypt_sm4 = sm4.CryptSM4()
crypt_sm4.set_key(key, sm4.SM4_DECRYPT)
return crypt_sm4.crypt_ecb(data)
class SecureSocketIO:
"""客户端安全I/O包装器"""
def __init__(self, conn, key):
self.conn = conn
self.key = key
self.buffer = b""
def write(self, data):
"""加密并发送数据到服务器"""
try:
encrypted = sm4_encrypt(data.encode('utf-8'), self.key)
self.conn.sendall(len(encrypted).to_bytes(4, 'big') + encrypted)
except Exception as e:
print(f"发送错误: {e}", file=sys.__stdout__)
def readline(self):
"""从服务器接收并解密数据"""
while True:
# 检查缓冲区是否有完整消息
if len(self.buffer) >= 4:
msg_len = int.from_bytes(self.buffer[:4], 'big')
if len(self.buffer) >= 4 + msg_len:
encrypted = self.buffer[4:4+msg_len]
self.buffer = self.buffer[4+msg_len:]
try:
decrypted = sm4_decrypt(encrypted, self.key)
return decrypted.decode('utf-8') + '\0'
except Exception as e:
print(f"解密错误: {e}", file=sys.__stdout__)
return '\n'
# 接收更多数据
try:
data = self.conn.recv(4096)
if not data:
return ''
self.buffer += data
except Exception as e:
print(f"接收错误: {e}", file=sys.__stdout__)
return ''
def flush(self):
pass
def start_client(host='localhost', port=65432):
ihost = input("输入服务器主机(默认:localhost):")
if ihost:
host=ihost
"""启动客户端并建立安全连接"""
try:
# 建立连接
conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn.connect((host, port))
print(f"已连接到服务器 {host}:{port}")
# 生成DH参数并执行密钥交换
parameters = generate_dh_parameters()
shared_key = client_dh_key_agreement(conn, parameters)
print(f"共享密钥已建立: {shared_key.hex()}")
# 创建安全I/O包装器
secure_io = SecureSocketIO(conn, shared_key)
# 主交互循环
while True:
try:
# 从服务器获取提示
prompt = secure_io.readline()
if not prompt:
break
if "__!input()_called!__" in prompt:
# 显示提示并获取用户输入
user_input = input()
# 发送输入到服务器
secure_io.write(user_input)
elif "__!fileSelector()_called!__" in prompt:
data = fileSelector()
data = base64.b64encode(data).decode()
secure_io.write(data)
secure_io.readline()
elif "__!showImage()_called!__" in prompt:
data = secure_io.readline().strip()
data = base64.b64decode(data)
showImage(data)
else:
print(prompt, end='')
except KeyboardInterrupt:
print("\n客户端中断")
break
except Exception as e:
print(f"通信错误: {e}")
break
finally:
conn.close()
print("连接已关闭")
if __name__ == "__main__":
start_client()
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。