5 Star 0 Fork 0

杨谨徽/公文传输系统

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
views.py 37.78 KB
一键复制 编辑 原始数据 按行查看 历史
杨谨徽 提交于 2023-12-01 14:25 . views.py
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009
from django.shortcuts import render, redirect
from django.shortcuts import HttpResponse
from django.contrib import messages
from .models import UserProfile, Document, Log
from django.http import JsonResponse
from django.conf import settings
from rest_framework.decorators import api_view
from rest_framework.response import Response
from django.contrib.auth.decorators import login_required
from django.http import HttpRequest
from .forms import UserForm, AvatarUploadForm
from django.http import HttpResponseRedirect
from django.urls import reverse
from django.shortcuts import get_object_or_404
from django.views.decorators.csrf import csrf_exempt
from random import SystemRandom
from gmssl import sm2, func
import binascii
from gmssl.sm4 import CryptSM4, SM4_ENCRYPT, SM4_DECRYPT
from gmssl.sm3 import sm3_hash
import secrets
import logging
from django.utils import timezone
from docx import Document as Docu
import traceback # 导入 traceback 模块
import time, os, json
from PIL import Image
from os.path import join
# Create your views here.
def login(request):
if request.method == 'POST':
id_in = request.POST['id_in']
password_in = request.POST['password_in']
# 查询数据库,检查用户是否存在
try:
user_profile = UserProfile.objects.get(id=id_in, password_up=password_in)
except UserProfile.DoesNotExist:
messages.error(request, '学号或密码错误,请重新输入。')
return redirect('login')
# 登录成功,将用户信息存储到session中
request.session['user_id'] = user_profile.id
request.session['username'] = user_profile.username_up
LogData = Log.objects.create(
username = user_profile.username_up,
documentname = "无",
operation = f'用户{user_profile.username_up}{timezone.now()}登录了系统。'
)
LogData.save()
# 可以在这里添加其他处理,例如重定向到成功页面或显示成功消息
# 登录成功,添加消息
time.sleep(3)
return redirect('index')
return render(request, 'login.html') # 替换为你的模板路径
def register(request):
if request.method == 'POST':
id = request.POST['id']
username_up = request.POST['username_up']
email = request.POST['email']
password_up = request.POST['password_up']
priKey = PrivateKey()
pubKey = priKey.publicKey()
new_user = UserProfile.objects.create(
id=id,
username_up=username_up,
email=email,
password_up=password_up,
public_key=pubKey.toString(compressed=False), # 存储公钥
private_key=priKey.toString(), # 存储私钥
avatar='avatars/default_avatar.png'
)
new_user.save()
LogData = Log.objects.create(
username = new_user.username_up,
documentname = "无",
operation = f'用户{new_user.username_up}{timezone.now()}注册了账号。'
)
LogData.save()
# 添加成功消息
messages.success(request, '注册成功,请登录。')
time.sleep(3)
return redirect('login')
return render(request, 'register.html') # 替换为你的模板路径
def index(request):
# 检查用户是否登录
if 'user_id' in request.session:
user_id = request.session['user_id']
username = request.session['username']
try:
# 根据用户名查询用户的访问权限
user_profile = UserProfile.objects.get(username_up=username)
access_level = user_profile.access_level
# 获取用户头像的 URL
user_avatar_url = user_profile.avatar.url if user_profile.avatar else '/avatars/default_avatar.png'
print(user_avatar_url)
# 调整图片大小
# 假设用户头像在media文件夹下的avatars文件夹内
if user_avatar_url and 'avatars' in user_avatar_url:
image_path = join(settings.BASE_DIR, 'web', user_avatar_url.lstrip('/').replace('/', '\\')) # 移除url开头的斜杠并替换斜杠为反斜杠
print(settings.MEDIA_ROOT)
print(user_avatar_url[1:])
print(image_path)
image = Image.open(image_path)
resized_image = image.resize((60, 60)) # 设置新的宽度和高度
resized_image.save(image_path) # 覆盖原始图片文件
# 构建完整的 URL
user_avatar_full_url = request.build_absolute_uri(user_avatar_url)
return render(request, 'index.html',
{'user_id': user_id, 'username': username, 'access_level': access_level,
'user_avatar_url': user_avatar_full_url})
except UserProfile.DoesNotExist:
# 处理未找到用户的情况
# 可以引发 Http404 异常或者进行其他适当的处理
pass
else:
# 用户未登录,可以重定向到登录页面或其他处理
return redirect('login')
def save_document(request: HttpRequest):
if request.method == 'POST':
try:
# 获取当前登录的用户
current_user = request.session['user_id']
current_user_name = request.session['username']
data = json.loads(request.body)
title = data.get('title')
content = data.get('content')
security_level = data.get('securityLevel')
cc_office = data.get('cc_office')
print(data)
file_address = data.get('file_address')
# 获取当前时间
current_time = timezone.now()
docname = file_address + '.docx'
# 创建一个新的文档对象
doc = Docu()
# 添加标题
doc.add_heading(title, level=1)
# 添加 HTML 内容
doc.add_paragraph(content)
# 保存文档
file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'docx', docname)
# 在保存文档前打印文件路径
print("File path:", file_path)
# 检查文档对象是否被正确创建
print("Document:", doc)
doc.save(file_path)
# 保存文档信息到数据库
new_document = Document.objects.create(
document_name=title,
document_owner = current_user,
issuing_office=current_user,
issue_date=current_time,
security_level=security_level,
cc_office=cc_office,
file_type='docx',
modifier=current_user,
modified_date=current_time,
file_address=docname
)
new_document.save()
print("Document saved successfully to the database.")
key = get_or_generate_key(file_address) ##sm4密钥
encrypt_and_hash_file(file_path, key, file_address)
# 删除原始文件
os.remove(file_path)
print(f'原文件已删除:{file_path}')
sender = UserProfile.objects.get(id=current_user)
sender_private_key = sender.private_key
sender_public_key = sender.public_key
cc_office_user = UserProfile.objects.get(username_up=cc_office)
cc_office_private_key = cc_office_user.private_key
cc_office_public_key = cc_office_user.public_key
key_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (file_address + 'key.dat'))
print(key_path)
key_encrypt_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (file_address + 'encryptkey.dat'))
encrypt_data(public_key=cc_office_public_key,
private_key=cc_office_private_key,
input_file=key_path,
output_file=key_encrypt_path)
os.remove(key_path)
key_sign_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'sign', (file_address + 'signkey.dat'))
print(key_sign_path)
sign_data(public_key=sender_public_key,
private_key=sender_private_key,
input_file=key_encrypt_path,
signature_file=key_sign_path)
LogData = Log.objects.create(
username=current_user_name,
documentname=new_document.document_name,
operation=f'用户{current_user_name}{timezone.now()}创建了公文:{new_document.document_name}。'
)
LogData.save()
return HttpResponse({'message': 'Document saved successfully'}) # 返回成功消息
except Exception as e:
traceback.print_exc() # 打印异常信息到控制台
return HttpResponse({'message': 'Internal Server Error'}, status=500)
return HttpResponse({'message': 'Invalid request'}, status=400) # 处理无效请求
@csrf_exempt
def delete_document(request, documentname):
current_user = request.session['username']
if request.method == 'DELETE':
print(f'要删除的公文:{documentname}')
# 从数据库中查找要删除的文档
document = get_object_or_404(Document, document_name=documentname)
file_enc_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'docx', (documentname + '.enc'))
file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'docxs', (documentname + '.docx'))
key_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (documentname + 'key.dat'))
key_enc_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (documentname + 'encryptkey.dat'))
keyhash_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'secure', (documentname + 'hash_decrypted.dat'))
keyorig_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'secure', (documentname + 'hash_original.dat'))
keysign_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'sign', (documentname + 'signkey.dat'))
if os.path.exists(file_enc_path):
os.remove(file_enc_path)
print(f"File {file_enc_path} deleted successfully")
else:
print(f"File {file_enc_path} does not exist")
if os.path.exists(file_path):
os.remove(file_path)
print(f"File {file_path} deleted successfully")
else:
print(f"File {file_path} does not exist")
if os.path.exists(key_path):
os.remove(key_path)
print(f"File {key_path} deleted successfully")
else:
print(f"File {key_path} does not exist")
if os.path.exists(key_enc_path):
os.remove(key_enc_path)
print(f"File {key_enc_path} deleted successfully")
else:
print(f"File {key_enc_path} does not exist")
if os.path.exists(keyhash_path):
os.remove(keyhash_path)
print(f"File {keyhash_path} deleted successfully")
else:
print(f"File {keyhash_path} does not exist")
if os.path.exists(keyorig_path):
os.remove(keyorig_path)
print(f"File {keyorig_path} deleted successfully")
else:
print(f"File {keyorig_path} does not exist")
if os.path.exists(keysign_path):
os.remove(keysign_path)
print(f"File {keysign_path} deleted successfully")
else:
print(f"File {keysign_path} does not exist")
LogData = Log.objects.create(
username=current_user,
documentname=document.document_name,
operation=f'用户{current_user}{timezone.now()}删除了公文:{document.document_name}。'
)
LogData.save()
# 删除文档
document.delete()
# 返回成功的 JSON 响应
return JsonResponse({'message': 'Document deleted successfully'})
# 如果请求方法不是 DELETE,则返回错误响应
return JsonResponse({'error': 'Invalid request method'}, status=400)
def get_users(request):
# 从数据库中获取用户信息
users = UserProfile.objects.all().values() # 假设 User 有合适的字段来表示用户信息
# 将查询到的数据转换为列表,并以 JSON 格式返回给前端
return JsonResponse(list(users), safe=False)
from .forms import UserForm
def create_user(request):
current_user = request.session['username']
if request.method == 'POST':
form = UserForm(request.POST)
if form.is_valid():
# 获取表单数据并保存到数据库
id = form.cleaned_data['id']
username = form.cleaned_data['username_up']
email = form.cleaned_data['email']
password = form.cleaned_data['password_up']
priKey = PrivateKey()
pubKey = priKey.publicKey()
# 保存到数据库中
UserProfile.objects.create(
id=id,
username_up=username,
email=email,
password_up=password,
public_key=pubKey.toString(compressed=False), # 存储公钥
private_key=priKey.toString(), # 存储私钥
avatar='avatars/default_avatar.png'
)
LogData = Log.objects.create(
username=current_user,
documentname="无",
operation=f'用户{current_user}{timezone.now()}创建了新用户{username}。'
)
LogData.save()
# 重定向到 index 页面,使用 HttpResponseRedirect 对象
return HttpResponseRedirect(reverse('index'))
else:
form = UserForm()
return render(request, 'adduser.html', {'form': form})
def delete_user(request, user_id):
current_user = request.session['username']
if request.method == 'DELETE':
user = get_object_or_404(UserProfile, id=user_id)
LogData = Log.objects.create(
username=current_user,
documentname="无",
operation=f'用户{current_user}{timezone.now()}删除了用户{user.username_up}。'
)
LogData.save()
user.delete()
return JsonResponse({'message': 'User deleted successfully'}, status=200)
else:
return JsonResponse({'message': 'Invalid request method'}, status=400)
def change_userinfo(request, user_id):
current_user = request.session['username']
try:
user_profile = UserProfile.objects.get(id=user_id)
# 如果是 POST 请求,即提交表单
if request.method == 'POST':
# 获取表单中的数据
username = request.POST.get('username_up')
email = request.POST.get('email')
password = request.POST.get('password_up')
# 更新用户信息
user_profile.username_up = username
user_profile.email = email
user_profile.password_up = password
LogData = Log.objects.create(
username=current_user,
documentname="无",
operation=f'用户{current_user}{timezone.now()}修改了{username}的用户信息。'
)
LogData.save()
# 保存更新后的信息到数据库
user_profile.save()
return redirect('index') # 更新成功后重定向到首页
else:
# 如果是 GET 请求,即用户访问修改页面
return render(request, 'change_userinfo.html', {'user_profile': user_profile})
except UserProfile.DoesNotExist:
pass # 处理用户不存在的情况
def manage_permission(request, user_id):
current_user = request.session['username']
# 根据 user_id 获取特定用户的信息或权限
user = UserProfile.objects.get(id=user_id)
if request.method == 'POST':
access_level = request.POST['access_level']
user.access_level = access_level
LogData = Log.objects.create(
username=current_user,
documentname="无",
operation=f'用户{current_user}{timezone.now()}修改了{user.username_up}的访问权限。'
)
LogData.save()
user.save() # 将更改后的访问权限保存到数据库
# 重定向到 index 页面
return redirect('index')
# 渲染 manage_permission.html 页面,并传递用户信息或权限
return render(request, 'manage_permission.html', {'user': user})
def get_documents_in_docmanager(request):
current_user = request.session['user_id']
# 从数据库中获取所有公文
documents = Document.objects.filter(document_owner=current_user, is_sent=0)
# 将公文数据转换为 JSON 格式
data = [
{
'current_user': current_user,
'id': doc.document_id,
'title': doc.document_name,
'securityLevel': doc.security_level,
'owner': doc.document_owner,
'office': doc.issuing_office,
'sendTime': doc.issue_date.strftime('%Y-%m-%d'), # 将日期格式化为字符串
'is_sent': doc.is_sent,
'cc_office': doc.cc_office,
'file_address': doc.file_address
# 可以添加其他字段
}
for doc in documents
]
# 发送 JSON 格式的数据给前端
return JsonResponse(data, safe=False)
def get_documents_in_docjudge(request):
current_user = request.session['username']
# 从数据库中获取所有公文
documents = Document.objects.filter(cc_office=current_user, is_sent=1)
# 将公文数据转换为 JSON 格式
data = [
{
'current_user': current_user,
'id': doc.document_id,
'title': doc.document_name,
'securityLevel': doc.security_level,
'owner': doc.document_owner,
'office': doc.issuing_office,
'sendTime': doc.issue_date.strftime('%Y-%m-%d'), # 将日期格式化为字符串
'is_sent': doc.is_sent,
'cc_office': doc.cc_office,
'file_address': doc.file_address,
'is_pass': doc.is_pass,
# 可以添加其他字段
}
for doc in documents
]
# 发送 JSON 格式的数据给前端
return JsonResponse(data, safe=False)
@csrf_exempt
def update_document_status(request):
current_user = request.session['username']
if request.method == 'POST':
data = json.loads(request.body)
document_name = data.get('documentName')
cc_office = data.get('ccOffice')
# 添加日志打印
print("Received document_name:", document_name)
print("Received cc_office:", cc_office)
try:
# 根据 document_id 获取对应的文档
document = Document.objects.get(document_name=document_name)
document.is_sent = 1
document.document_owner = cc_office
LogData = Log.objects.create(
username=current_user,
documentname=document.document_name,
operation=f'用户{current_user}{timezone.now()}发送了公文:{document.document_name}。'
)
LogData.save()
document.save()
return JsonResponse({'message': 'Document status updated successfully'})
except Document.DoesNotExist:
print(f"Document with ID {document_name} does not exist.")
return JsonResponse({'message': 'Document does not exist'}, status=404)
except Exception as e:
print("Error:", e)
return JsonResponse({'message': 'Internal Server Error'}, status=500)
return JsonResponse({'message': 'Invalid request'}, status=400)
@csrf_exempt
def approveDocument(request):
current_user = request.session['username']
if request.method == 'POST':
data = json.loads(request.body)
document_name = data.get('documentName')
# 添加日志打印
print("Received document_name:", document_name)
try:
# 根据 document_id 获取对应的文档
document = Document.objects.get(document_name=document_name)
document.is_pass = 1
document.is_sent = 1
LogData = Log.objects.create(
username=current_user,
documentname=document.document_name,
operation=f'用户{current_user}{timezone.now()}通过了公文:{document.document_name}。'
)
LogData.save()
document.save()
return JsonResponse({'message': 'Document status updated successfully'})
except Document.DoesNotExist:
print(f"Document with ID {document_name} does not exist.")
return JsonResponse({'message': 'Document does not exist'}, status=404)
except Exception as e:
print("Error:", e)
return JsonResponse({'message': 'Internal Server Error'}, status=500)
return JsonResponse({'message': 'Invalid request'}, status=400)
@csrf_exempt
def rejectDocument(request):
current_user = request.session['username']
if request.method == 'POST':
data = json.loads(request.body)
document_name = data.get('documentName')
doc_issuing_office = data.get('issuing_office')
# 添加日志打印
print("Received document_name:", document_name)
print("Received doc_issuing_office:", doc_issuing_office)
try:
# 根据 document_id 获取对应的文档
document = Document.objects.get(document_name=document_name)
document.is_pass = 0
document.is_sent = 0
document.document_owner = document.issuing_office
LogData = Log.objects.create(
username=current_user,
documentname=document.document_name,
operation=f'用户{current_user}{timezone.now()}拒绝了公文:{document.document_name}。'
)
LogData.save()
document.save()
return JsonResponse({'message': 'Document status updated successfully'})
except Document.DoesNotExist:
print(f"Document with ID {document_name} does not exist.")
return JsonResponse({'message': 'Document does not exist'}, status=404)
except Exception as e:
print("Error:", e)
return JsonResponse({'message': 'Internal Server Error'}, status=500)
return JsonResponse({'message': 'Invalid request'}, status=400)
def decrypt_document(request):
current_user = request.session['username']
if request.method == 'POST':
# 获取传递的文件地址或其他必要信息
data = json.loads(request.body)
file_title = data.get('file_title')
file_name = data.get('file_name')
print(file_title)
print(file_name)
doc = Document.objects.get(document_name=file_title)
file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'docx', file_name)
print(file_path)
doc_owner = doc.issuing_office
cc_office = doc.cc_office
sender_keyowner = UserProfile.objects.get(id=doc_owner)
sender_publickey = sender_keyowner.public_key
sender_privatekey = sender_keyowner.private_key
key_owner = UserProfile.objects.get(username_up=cc_office)
publickey = key_owner.public_key
privatekey = key_owner.private_key
key_sign_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'sign', (file_title + 'signkey.dat'))
key_encrypt_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (file_title + 'encryptkey.dat'))
verify_signature(public_key=sender_publickey,
private_key=sender_privatekey,
input_file=key_encrypt_path,
signature_file=key_sign_path)
key_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (file_title + 'key.dat'))
decrypt_data(public_key=publickey,
private_key=privatekey,
input_file=key_encrypt_path,
output_file=key_path)
key = get_or_generate_key(file_title)
print(f'对称密钥的类型为:{type(key)}')
# 在这里执行文件解密的操作
decrypted_file_address = decrypt_and_hash_file(file_path, key, file_title)
LogData = Log.objects.create(
username=current_user,
documentname=doc.document_name,
operation=f'用户{current_user}{timezone.now()}下载了公文:{doc.document_name}。'
)
LogData.save()
# 返回解密结果(成功或失败)
return JsonResponse({'message': '文件解密成功', 'decrypted_file_address': decrypted_file_address}) # 或者其他成功信息
else:
return JsonResponse({'message': '无效的请求'}, status=400)
## 密码功能实现
## SM4对称密码算法、SM3哈希摘要算法 ##
def get_or_generate_key(document_name):
# 尝试从外部输入获取密钥
key_file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'key', (document_name + 'key.dat'))
print(key_file_path)
if os.path.exists(key_file_path):
with open(key_file_path, 'rb') as key_file:
key = key_file.read()
print(key)
print(len(key))
if len(key) != 16:
print("密钥长度必须为16字节")
return None
else:
# 生成随机的16字节密钥
key = secrets.token_bytes(16)
with open(key_file_path, 'wb') as key_file:
key_file.write(key)
return key
def encrypt_and_hash_file(input_file_path, key, document_name):
# 读取文件内容
with open(input_file_path, 'rb') as file:
plaintext = file.read()
# 计算文件的哈希值
hash_value = sm3_hash(list(plaintext)) # Convert bytes to list
hash_file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'secure', (document_name + 'hash_original.dat'))
with open(hash_file_path, 'w') as hash_file:
hash_file.write(hash_value)
print(f'原文件的哈希值已保存到:{hash_file_path}')
# 初始化SM4加密器
crypt_sm4 = CryptSM4()
# 设置密钥
crypt_sm4.set_key(key, SM4_ENCRYPT)
# 加密文件内容
ciphertext = crypt_sm4.crypt_ecb(plaintext)
# 创建加密后的文件
encrypted_file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'docx', (document_name + '.enc'))
with open(encrypted_file_path, 'wb') as file:
file.write(ciphertext)
print(f'文件加密成功:{encrypted_file_path}')
def decrypt_and_hash_file(encrypted_file_path, key, document_name):
# 初始化SM4解密器
crypt_sm4 = CryptSM4()
# 设置密钥
crypt_sm4.set_key(key, SM4_DECRYPT)
# 读取加密文件内容
with open(encrypted_file_path, 'rb') as file:
ciphertext = file.read()
# 解密文件内容
plaintext = crypt_sm4.crypt_ecb(ciphertext)
# 创建解密后的文件
decrypted_file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'docxs', (document_name + '.docx'))
with open(decrypted_file_path, 'wb') as file:
file.write(plaintext)
print(f'文件解密成功:{decrypted_file_path}')
# 计算解密文件的哈希值
hash_value = sm3_hash(list(plaintext)) # Convert bytes to list
# 将哈希值保存到hash_decrypted.txt文件
hash_decrypted_file_path = os.path.join(settings.BASE_DIR, 'web', 'static', 'secure', (document_name + 'hash_decrypted.dat'))
with open(hash_decrypted_file_path, 'w') as hash_file:
hash_file.write(hash_value)
print(f'解密文件的哈希值已保存到:{hash_decrypted_file_path}')
# 比较原始哈希和解密后的哈希
hash_original_file = os.path.join(settings.BASE_DIR, 'web', 'static', 'secure',
(document_name + 'hash_original.dat'))
hash_decrypted_file = os.path.join(settings.BASE_DIR, 'web', 'static', 'secure',
(document_name + 'hash_decrypted.dat'))
with open(hash_original_file, 'rb') as original, open(hash_decrypted_file, 'rb') as decrypted:
original_hash = original.read()
decrypted_hash = decrypted.read()
if original_hash == decrypted_hash:
print("加密和解密后的文件内容一致。")
else:
print("加密和解密后的文件内容不一致。")
decrypted_file_path_str = f'/static/docxs/{document_name}'+'.docx'
return decrypted_file_path_str
## SM2算法实现 ##
## SM2密钥生成 ##
class CurveFp:
def __init__(self, A, B, P, N, Gx, Gy, name):
self.A = A
self.B = B
self.P = P
self.N = N
self.Gx = Gx
self.Gy = Gy
self.name = name
sm2p256v1 = CurveFp(
name="sm2p256v1",
A=0xFFFFFFFEFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00000000FFFFFFFFFFFFFFFC,
B=0x28E9FA9E9D9F5E344D5A9E4BCF6509A7F39789F515AB8F92DDBCBD414D940E93,
P=0xFFFFFFFEFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF00000000FFFFFFFFFFFFFFFF,
N=0xFFFFFFFEFFFFFFFFFFFFFFFFFFFFFFFF7203DF6B21C6052B53BBF40939D54123,
Gx=0x32C4AE2C1F1981195F9904466A39C9948FE30BBFF2660BE1715A4589334C74C7,
Gy=0xBC3736A2F4F6779C59BDCEE36B692153D0A9877CC62A474002DF32E52139F0A0
)
def multiply(a, n, N, A, P):
return fromJacobian(jacobianMultiply(toJacobian(a), n, N, A, P), P)
def add(a, b, A, P):
return fromJacobian(jacobianAdd(toJacobian(a), toJacobian(b), A, P), P)
def inv(a, n):
if a == 0:
return 0
lm, hm = 1, 0
low, high = a % n, n
while low > 1:
r = high // low
nm, new = hm - lm * r, high - low * r
lm, low, hm, high = nm, new, lm, low
return lm % n
def toJacobian(Xp_Yp):
Xp, Yp = Xp_Yp
return (Xp, Yp, 1)
def fromJacobian(Xp_Yp_Zp, P):
Xp, Yp, Zp = Xp_Yp_Zp
z = inv(Zp, P)
return ((Xp * z ** 2) % P, (Yp * z ** 3) % P)
def jacobianDouble(Xp_Yp_Zp, A, P):
Xp, Yp, Zp = Xp_Yp_Zp
if not Yp:
return (0, 0, 0)
ysq = (Yp ** 2) % P
S = (4 * Xp * ysq) % P
M = (3 * Xp ** 2 + A * Zp ** 4) % P
nx = (M ** 2 - 2 * S) % P
ny = (M * (S - nx) - 8 * ysq ** 2) % P
nz = (2 * Yp * Zp) % P
return (nx, ny, nz)
def jacobianAdd(Xp_Yp_Zp, Xq_Yq_Zq, A, P):
Xp, Yp, Zp = Xp_Yp_Zp
Xq, Yq, Zq = Xq_Yq_Zq
if not Yp:
return (Xq, Yq, Zq)
if not Yq:
return (Xp, Yp, Zp)
U1 = (Xp * Zq ** 2) % P
U2 = (Xq * Zp ** 2) % P
S1 = (Yp * Zq ** 3) % P
S2 = (Yq * Zp ** 3) % P
if U1 == U2:
if S1 != S2:
return (0, 0, 1)
return jacobianDouble((Xp, Yp, Zp), A, P)
H = U2 - U1
R = S2 - S1
H2 = (H * H) % P
H3 = (H * H2) % P
U1H2 = (U1 * H2) % P
nx = (R ** 2 - H3 - 2 * U1H2) % P
ny = (R * (U1H2 - nx) - S1 * H3) % P
nz = (H * Zp * Zq) % P
return (nx, ny, nz)
def jacobianMultiply(Xp_Yp_Zp, n, N, A, P):
Xp, Yp, Zp = Xp_Yp_Zp
if Yp == 0 or n == 0:
return (0, 0, 1)
if n == 1:
return (Xp, Yp, Zp)
if n < 0 or n >= N:
return jacobianMultiply((Xp, Yp, Zp), n % N, N, A, P)
if (n % 2) == 0:
return jacobianDouble(jacobianMultiply((Xp, Yp, Zp), n // 2, N, A, P), A, P)
if (n % 2) == 1:
return jacobianAdd(jacobianDouble(jacobianMultiply((Xp, Yp, Zp), n // 2, N, A, P), A, P), (Xp, Yp, Zp), A, P)
class PrivateKey:
def __init__(self, curve=sm2p256v1, secret=None):
self.curve = curve
self.secret = secret or SystemRandom().randrange(1, curve.N)
def publicKey(self):
curve = self.curve
xPublicKey, yPublicKey = multiply((curve.Gx, curve.Gy), self.secret, A=curve.A, P=curve.P, N=curve.N)
return PublicKey(xPublicKey, yPublicKey, curve)
def toString(self):
return "{}".format(str(hex(self.secret))[2:].zfill(64))
class PublicKey:
def __init__(self, x, y, curve):
self.x = x
self.y = y
self.curve = curve
def toString(self, compressed=True):
return {
True: str(hex(self.x))[2:],
False: "{}{}".format(str(hex(self.x))[2:].zfill(64), str(hex(self.y))[2:].zfill(64))
}.get(compressed)
## SM2加解密和签名验签 ##
def encrypt_data(public_key, private_key, input_file, output_file):
sm2_crypt = sm2.CryptSM2(public_key=public_key, private_key=private_key)
with open(input_file, 'rb') as f:
data = f.read()
print(public_key)
print(private_key)
print(type(public_key))
print(type(private_key))
encrypted_data = sm2_crypt.encrypt(data)
with open(output_file, 'wb') as f:
f.write(encrypted_data)
def decrypt_data(public_key, private_key, input_file, output_file):
sm2_crypt = sm2.CryptSM2(public_key=public_key, private_key=private_key)
with open(input_file, 'rb') as f:
encrypted_data = f.read()
decrypted_data = sm2_crypt.decrypt(encrypted_data)
with open(output_file, 'wb') as f:
f.write(decrypted_data)
def sign_data(public_key, private_key, input_file, signature_file):
sm2_crypt = sm2.CryptSM2(public_key=public_key, private_key=private_key)
with open(input_file, 'rb') as f:
data = f.read()
random_hex_str = func.random_hex(sm2_crypt.para_len)
signature = sm2_crypt.sign(data, random_hex_str)
with open(signature_file, 'wb') as f:
f.write(binascii.unhexlify(signature))
def verify_signature(public_key, private_key, input_file, signature_file):
sm2_crypt = sm2.CryptSM2(public_key=public_key, private_key=private_key)
with open(input_file, 'rb') as f:
data = f.read()
with open(signature_file, 'rb') as f:
signature = f.read()
if sm2_crypt.verify(binascii.hexlify(signature).decode(), data):
print("Signature verification successful")
else:
print("Signature verification failed")
def get_user_logs(request):
current_user = request.session.get('username')
user = UserProfile.objects.get(username_up=current_user)
print(user.username_up)
if user.access_level == 1: # 管理员权限
user_logs = Log.objects.filter(documentname="无") # 获取所有用户日志
user_logs_data = list(user_logs.values()) # 转换为字典列表
print(user_logs_data)
return JsonResponse(user_logs_data, safe=False)
if user.access_level != 1:
user_logs = Log.objects.filter(username=user.username_up, documentname="无")
user_logs_data = list(user_logs.values()) # 转换为字典列表
print(user_logs_data)
return JsonResponse(user_logs_data, safe=False)
def get_document_logs(request):
current_user = request.session.get('username')
user = UserProfile.objects.get(username_up=current_user)
if user.access_level == 1: # 管理员权限
document_logs = Log.objects.exclude(documentname="无") # 获取所有公文操作日志
document_logs_data = list(document_logs.values()) # 转换为字典列表
print(document_logs_data)
return JsonResponse(document_logs_data, safe=False)
if user.access_level != 1:
document_logs = Log.objects.exclude(documentname="无").filter(username=user.username_up)
document_logs_data = list(document_logs.values()) # 转换为字典列表
print(document_logs_data)
return JsonResponse(document_logs_data, safe=False)
def logout(request):
current_user = request.session['username']
# 清除会话信息
if 'user_id' in request.session:
LogData = Log.objects.create(
username=current_user,
documentname="无",
operation=f'用户{current_user}{timezone.now()}退出了系统。'
)
LogData.save()
del request.session['user_id']
del request.session['username']
# 重定向到主页或登录页
return redirect('index') # 'index' 应该是你的主页 URL 名称或路径
def upload_avatar_page(request):
# 传递表单给模板,以便在页面上显示上传表单
form = AvatarUploadForm()
return render(request, 'upload_avatar.html', {'form': form})
def upload_avatar(request):
current_user = request.session.get('username') # 使用 get 方法以避免键不存在时引发 KeyError
if request.method == 'POST':
form = AvatarUploadForm(request.POST, request.FILES)
if form.is_valid():
try:
user_profile = UserProfile.objects.get(username_up=current_user)
avatar_image = form.cleaned_data['avatarFile']
image = Image.open(avatar_image)
# 将图像转换为PNG格式并覆盖原始图像文件
image = image.convert("RGB")
image.save(user_profile.avatar.path, 'JPEG')
user_profile.save()
return redirect('index') # 重定向到首页或其他页面
except UserProfile.DoesNotExist:
# 处理用户不存在的情况
pass
else:
form = AvatarUploadForm()
return render(request, 'upload_avatar.html', {'form': form})
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
1
https://gitee.com/SHIBATORI/document-transmission-system.git
git@gitee.com:SHIBATORI/document-transmission-system.git
SHIBATORI
document-transmission-system
公文传输系统
master

搜索帮助