1 Star 1 Fork 2

tube / Flask-AD

forked from 注册个名好难 / Flask-AD 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ad_manage_views.py 9.56 KB
一键复制 编辑 原始数据 按行查看 历史
注册个名好难 提交于 2023-05-27 13:06 . 后端功能
import time
import openpyxl
import threading
from ldap_lib import *
from function_lib import *
ad_manage_views = Blueprint('ad_manage_views', __name__)
def get_ldap():
"""
初始化ldap连接,返回连接信息
:return:
"""
try:
with app.app_context():
yml = ServerConfig.query.filter(ServerConfig.server_name == '默认服务器').first()
new_ldap1 = LDAP(yml)
return new_ldap1
except Exception as e:
print('获取yml失败', e)
return None
new_ldap = get_ldap()
def df(time1=180):
"""
:param time1: 程序执行时间间隔
:return:
"""
global new_ldap
while True:
new_ldap = get_ldap()
time.sleep(time1)
# 子线程定时刷新ldap连接
t = threading.Thread(target=df, args=(180,))
t.daemon = True
t.start()
@ad_manage_views.route('/refresh_ldap_conn', methods=DEFAULT_METHODS)
@login_required
def refresh_ldap_conn():
"""
重新初始化ldap的连接
:return: 无需返回任何信息
"""
global new_ldap
new_ldap = get_ldap()
return redirect(url_for('manage'))
@ad_manage_views.route('/batch', methods=API_METHODS)
@login_required
def batch():
"""
从前端接收批量导入的文件,然后导入文件里的用户和部门信息
:return:
"""
try:
# 接收文件
file = request.files.get("file")
# 打开文件
wb = openpyxl.load_workbook(file)
# 将上传的文件备份保存到本地
# wb.save(f".\{datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H%M%S')}批量导入数据.xlsx")
# 读取表格
sheet_user = wb['users']
msg_add_ou = [] # 保存创建部门的结果
msg_add_user = [] # 保存创建用户的结果
dic = {} # 用来保存部门对应的用户信息的字典
# 判断dc信息与服务器是否一致
if check_dn(new_ldap.dc, sheet_user.cell(3, 3).value):
return 'dc信息与服务器不符'
# 遍历表格的第3列,获取dn信息
for i in range(3, sheet_user.max_row + 1):
# dn作为键,值为空
dic[sheet_user.cell(i, 3).value] = []
# 键值为dn,轮询键值,创建dn
for k, _ in dic.items():
result = new_ldap.create_ou(k.split(','))
msg_add_ou.append([k, result])
# 遍历添加用户
for i in range(3, sheet_user.max_row + 1):
user = {'name': sheet_user.cell(i, 1).value,
'user_passwrod': sheet_user.cell(i, 2).value,
'dn': sheet_user.cell(i, 3).value.split(',')}
result = new_ldap.create_user(**user)
msg_add_user.append([user['dn'], result])
data = {'code': 1, 'msg': [msg_add_ou, msg_add_user]}
except Exception as e:
data = {'code': 2, 'msg': [e]}
return data
@ad_manage_views.route('/manage_users', methods=DEFAULT_METHODS)
@login_required
def manage_users():
"""
AD用户信息展示
:return: 返回html页面
"""
return render_template('manage_users.html')
@ad_manage_views.route('/data_users', methods=API_METHODS)
@login_required
def data_users():
"""
获取所有AD用户信息
:return: 返回操作结果以及操作成功获取到的数据
"""
try:
all_users = new_ldap.get_all_users()
data = {"code": 0, "msg": "", "count": len(all_users), "data": all_users}
except Exception as e:
data = {"code": 1, "msg": e, "count": 0, "data": [[]]}
return data
@ad_manage_views.route('/add_user', methods=API_METHODS)
@login_required
def add_user():
"""
添加AD用户
:return: 返回操作结果
"""
try:
username = request.form.get('username')
password = request.form.get('password')
dn = request.form.get('dn').split(',')
dic = {'name': username, 'user_passwrod': password, 'dn': dn}
msg = new_ldap.create_user(**dic)
data = {'code': 1, 'msg': msg}
except Exception as e:
data = {'code': 2, 'msg': e}
return data
@ad_manage_views.route('/delete_user', methods=API_METHODS)
@login_required
def delete_user():
"""
删除AD用户
:return: 返回操作结果
"""
try:
dn = request.form.get('dn').split(',')
msg = new_ldap.delete_user(dn)
if msg:
data = {'code': 1, 'msg': '删除成功'}
else:
data = {'code': 1, 'msg': '删除失败,请联系管理员'}
except Exception as e:
data = {'code': 2, 'msg': e}
return data
@ad_manage_views.route('/change_password', methods=API_METHODS)
@login_required
def change_password():
"""
修改AD用户的密码
:return: 返回操作结果
"""
try:
# 新密码
password = request.form.get('password')
# 用户完整的DN
dn = request.form.get('dn')
msg = new_ldap.modify_password(dn, password)
data = {'code': 1, 'msg': msg}
except Exception as e:
data = {'code': 2, 'msg': e}
return data
@ad_manage_views.route('/disable_user', methods=API_METHODS)
@login_required
def disable_user():
pass
@ad_manage_views.route('/manage_ou', methods=DEFAULT_METHODS)
@login_required
def manage_ou():
"""
返回html页面即可
:return:
"""
return render_template('manage_ou.html')
@ad_manage_views.route('/manage_group', methods=DEFAULT_METHODS)
@login_required
def manage_group():
"""
返回html页面即可
:return:
"""
return render_template('manage_group.html')
@ad_manage_views.route('/data_ou_bak', methods=API_METHODS)
@login_required
def data_ou_bak():
"""
废弃,等待删除
获取所有部门信息,返回操作结果以及操作成功的数据
:return:
"""
try:
# yml = ServerConfig.query.filter(ServerConfig.server_name == '默认服务器').first()
# new_ldap1 = LDAP(yml)
ee = new_ldap.get_all_ou(query_dn='')
ou_list = []
for i in range(len(ee[0])):
e = ee[1][i]
name = e.split(',')[0]
name_dn = ee[0][i]
parent_id = str(e.split(',')[1:]).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '')
ou_list.append({'dn': e, 'parent_id': parent_id, 'name': name, 'name_dn': name_dn})
tree = list_to_tree(ou_list, ee[2])
data = {"code": 0, "msg": "success", "count": 1, "data": [tree]}
except Exception as e:
data = {"code": 0, "msg": e, "count": 1, "data": [{}]}
return data
@ad_manage_views.route('/data_ou', methods=API_METHODS)
@login_required
def data_ou():
"""
获取所有部门信息,返回操作结果以及操作成功的数据
:return:
"""
try:
# yml = ServerConfig.query.filter(ServerConfig.server_name == '默认服务器').first()
# new_ldap1 = LDAP(yml)
ee = new_ldap.get_all_ou(query_dn='')
ou_list = []
for i in range(len(ee[0])):
e = ee[1][i]
title = e.split(',')[0]
name_dn = ee[0][i]
parent_id = str(e.split(',')[1:]).replace('[', '').replace(']', '').replace('\'', '').replace(' ', '')
ou_list.append(
{'id': i + 1, 'dn': e, 'parent_id': parent_id, 'title': title, 'name': title, 'name_dn': name_dn})
tree = list_to_tree(ou_list, ee[2])
data = {"code": 0, "msg": "success", "count": 1, "data": [tree]}
except Exception as e:
data = {"code": 0, "msg": e, "count": 1, "data": [{}]}
return data
@ad_manage_views.route('/add_ou', methods=API_METHODS)
@login_required
def add_ou():
"""
添加部门,返回操作结果
:return:
"""
try:
# 从前端获取数据
ou = request.form.get('ou_name')
dn = request.form.get('keywords')
if not dn:
ou_dn = new_ldap.dc.split(',')
else:
if type(dn) == 'list':
ou_dn = dn
else:
ou_dn = dn.split(',')
ou_dn.insert(0, 'OU=' + ou)
msg = new_ldap.create_ou(ou_dn, dn)
data = {'code': 1, 'msg': msg}
except Exception as e:
# 获取失败
data = {'code': 2, 'msg': e}
return data
@ad_manage_views.route('/delete_ou', methods=API_METHODS)
@login_required
def delete_ou():
"""
删除部门,返回操作结果
:return:
"""
try:
# 数据格式的问题,需要处理
dn = str(request.form.get('keywords').split(','))
msg = new_ldap.delete_ou(dn)
data = {'code': 1, 'msg': msg}
except Exception as e:
data = {'code': 2, 'msg': e}
return data
@ad_manage_views.route('/modify_ou', methods=API_METHODS)
@login_required
def modify_ou():
try:
# 完整的dn,包括用户名cn
old_ou = request.form.get('old_ou')
# 提取用户名cn
cn = old_ou.split(',')[0]
# 新的ou,无需cn
new_ou = request.form.get('new_ou')
'''
CN = 'sdfjsadkfj3'
old_dn ='CN=sdfjsadkfj3,OU=自主注册,DC=145t,DC=com'
new_dn = 'OU=人事,DC=145t,DC=com'
'''
res = new_ldap.conn.modify_dn(dn=old_ou, relative_dn=cn, new_superior=new_ou)
data = {'code': 1, 'msg': res}
except Exception as e:
data = {'code': 2, 'msg': e}
return data
Python
1
https://gitee.com/0x0021/flask-ad.git
git@gitee.com:0x0021/flask-ad.git
0x0021
flask-ad
Flask-AD
master

搜索帮助