1 Star 1 Fork 2

tube / Flask-AD

forked from 注册个名好难 / Flask-AD 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
克隆/下载
ldap_lib.py 7.98 KB
一键复制 编辑 原始数据 按行查看 历史
注册个名好难 提交于 2023-05-27 13:06 . 后端功能
# LDAP类,实现操作ad的所有方法,还有部分功能
#
from ldap3 import *
import json
class LDAP:
def __init__(self, yml):
"""
类的初始化函数,给类属性赋值,同时进行连接,避免后续每个函数执行时都需要连接
:param yml: 格式参考类属性
"""
self.host = yml.host
self.port = int(yml.port) or 636
self.username = yml.username
self.password = yml.password
self.dc = yml.base_dc
self.self_service_dc = 'OU=自主注册,' + self.dc
self.connect_secret()
# self.server = Server(self.host, get_info=ALL) # 未加密连接
# self.server = Server(self.host, use_ssl=True, get_info=ALL) # 加密连接
def connect(self):
# 未加密的连接,无法修改密码
self.server = Server(self.host, self.port, get_info=ALL)
self.conn = Connection(server=self.server, user=self.dc.split(',')[0].split('=')[-1] + '\\' + self.username,
password=self.password,
authentication=NTLM, auto_bind=True)
return self.conn.bind() # 绑定信息
def connect_secret(self):
# 加密的连接,功能齐全,目前会一直使用这个
self.server = Server(self.host, self.port, use_ssl=True, get_info=ALL)
self.conn = Connection(server=self.server, user=self.dc.split(',')[0].split('=')[-1] + '\\' + self.username,
password=self.password,
authentication=NTLM, auto_bind=True)
self.conn.start_tls()
return self.conn.bind() # 绑定信息
def get_all_users(self):
"""
获取objectclass属性名为person的所有用户,其实就是用户账号
:return: 作为返回给前端的数据接口的数据
"""
# self.connect_secret()
self.conn.search(self.dc, '(objectclass=person)')
# self.conn.search(self.dc, '(objectclass=user)') # 备选字段
data = []
# 数据格式说明示例:data = [{'dn':dn,'username':username},{'dn':dn,'username':username}]
for i in self.conn.entries:
b = i.entry_to_json()
# b实际上是str类型,并非字典类型
# 如果dn里面带OU字段,则是AD域用户,如果不带OU字段,则是本地用户。
#
# b = {
# "attributes": {},
# "dn": "CN=adsfasf,OU=\u8d22\u52a1,DC=145t,DC=com"
# }
if 'CN=Users' in b:
# 跳过Users组的用户
pass
else:
# 转换类型成字典,然后获取dn信息
dn = json.loads(b)["dn"]
# 从dn信息提取用户名
username = dn.split(',')[0].replace('CN=', '')
if 'OU=Domain Controllers' in dn:
# 跳过系统自带主机用户名
continue
data.append({'dn': dn, 'username': username})
return data
def get_all_ou(self, query_dn=''):
# self.connect_secret()
# 如果有dn信息,则只查询该OU下级的信息
# 如果没有dn信息,则默认查询所有信息
if not query_dn:
query_dn = self.dc
self.conn.search(search_base=query_dn, search_filter='(objectclass=organizationalUnit)',
attributes=ALL_ATTRIBUTES)
res = self.conn.response_to_json()
res = json.loads(res)['entries']
# 完整的DN信息,示例:'OU=会计,OU=财务,DC=145t,DC=com'
dn = []
# 去掉OU=,示例:'会计,财务,DC=145t,DC=com'
dn_name = []
for i in res:
# ou_name.append(i['attributes']['name'])
# name = i['attributes']['distinguishedName']
name = i['dn']
# if 'OU=Domain' in name:
# 跳过内置的组织
# continue
dn.append(name)
dn_name.append(name.replace('OU=', ''))
return dn, dn_name, query_dn
def create_ou(self, ou, dn=''):
self.connect_secret()
top_ou = []
for i in self.get_all_ou(dn)[1]:
top_ou.append(i[-3])
if ou in top_ou:
return 0, '部门已存在'
else:
a = self.conn.add(ou, 'organizationalUnit')
return a, '处理完成'
def delete_ou(self, dn):
self.connect_secret()
if type(dn) == 'list':
ou_dn = dn
else:
ou_dn = eval(dn)
if len(self.get_all_ou(ou_dn)[1]) > 1:
msg = '存在子部门,请先删除子部门'
else:
result = self.conn.delete(ou_dn)
if result:
msg = '删除成功'
else:
msg = '删除失败,请检查部门是否存在员工'
return msg
def create_user(self, **kwargs):
"""
创建用户
分为三步
1、添加用户名
2、设置密码
3、启用用户
:param kwargs:
:return:
"""
# self.connect_secret()
# kwargs = {'name': user_name, 'user_passwrod': user_password, 'dn': dn}
msg = []
dn = kwargs['dn']
dn.insert(0, 'CN=' + kwargs['name'])
msg_add = self.conn.add(dn, object_class=['top', 'Person', 'user', 'organizationalPerson'], attributes={
"cn": "{}".format(kwargs['name']),
"uid": "{}".format(kwargs['name']),
"sn": "{}".format(kwargs['name']),
"displayName": "{}".format(kwargs['name']),
'sAMAccountName': "{}".format(kwargs['name']),
'userPrincipalName': "{}".format(kwargs['name'])
})
if msg_add:
print('添加用户成功')
msg.append('添加用户成功')
else:
print('添加用户失败')
msg.append('添加用户失败')
return msg
# 先设置一个默认密码,创建完用户后再修改成输入的密码
default_password = '```111qqq'
msg_mp = self.conn.extend.microsoft.modify_password(dn, default_password)
if msg_mp: # 设置默认密码成功
# 修改密码为输入的密码
msg_mp1 = self.conn.extend.microsoft.modify_password(dn, kwargs['user_passwrod'])
if msg_mp1:
# 修改成功的提示
msg.append('密码设置成功')
else:
# 修改失败的提示
msg.append(f'您输入的密码复杂度不符合要求,已设置密码为:{default_password}')
else: # 设置默认密码失败
# 尝试直接设置成输入的密码
msg_mp2 = self.conn.extend.microsoft.modify_password(dn, default_password)
if msg_mp2:
# 设置成功的提示
msg.append('密码设置成功')
else:
# 设置失败的提示
msg.append(f'初始化密码失败,请联系管理员')
# 启用用户
msg_enable = self.conn.modify(dn, changes={'userAccountControl': [('MODIFY_REPLACE', [512])]})
if msg_enable:
msg.append('启用成功')
else:
msg.append('启用失败')
return msg
def delete_user(self, dn):
self.connect_secret()
return self.conn.delete(dn)
def modify_password(self, dn, password):
"""
:param dn:
:param password:
:return:
"""
# self.connect_secret()
msg_mp = self.conn.extend.microsoft.modify_password(dn, password)
if msg_mp:
msg = f'修改密码成功'
else:
msg = f'设置密码失败,请检查密码复杂度是否符合要求'
return msg
Python
1
https://gitee.com/0x0021/flask-ad.git
git@gitee.com:0x0021/flask-ad.git
0x0021
flask-ad
Flask-AD
master

搜索帮助