域账号后台管理

  • 主要用于用户、共享目录,邮箱组权限等日常管理
  • 基于pyad模块(https://github.com/zakird/pyad)
  • 支持邮件通知及批量操作
  • LDAP登录(仅限CN Infrastructure Team && CN Development Team登录后台,域账号登录)
  • 后台地址:http://xx.xx.genewiz.com/admin
  • 部署于10.105.xx.xx,应用对AD Server进行操作!
  • Windows下的Apache无法通过daemon模式运行多个wsgi应用,故通过Django dev模式运行,通过Apache转发!(或者单独在另外一台Windows服务器单独部署)
  • \\GWNGFS\xx\Dev\AdminSet
  • 用户的全局管理未收敛至统一管理,为确保用户信息的准确性,对用户进行任何变更,请先更新用户信息!
账号管理

初次添加
upload successful
upload successful

点击保存按钮后,将会自动更新用户及用户属组信息!

已存在
upload successful

如果账号已添加过,可以勾选用户,更新用户最新的用户信息,支持批量!

def update_user(self, user):
“””更新用户信息”””
dn = adsearch.by_san(user)
user_from_ad = aduser.ADUser.from_dn(dn)
guid = user_from_ad.guid_str
sid = str(user_from_ad.sid).split(“:”)[-1]
san = user_from_ad.get_attribute(‘SamAccountName’, False)
“””if user has not email,will take AttributeError”””
try:
email = user_from_ad.get_attribute(‘mail’, False).lower()
except AttributeError:
email = None
create_time = parse(user_from_ad.get_attribute(‘whenCreated’,False).Format())
“”” if user never modify password,will take ValueError!”””
try:
pwd_last_set_time = user_from_ad.get_password_last_set()
except ValueError:
pwd_last_set_time = None
“”” if user never login,will take AttributeError”””
try:
last_login_time = user_from_ad.get_last_login()
except AttributeError:
last_login_time = None
uac = user_from_ad.get_user_account_control_settings()
is_enabled = not uac[‘ACCOUNTDISABLE’]
is_locked_out = uac[‘LOCKOUT’]
is_dial_in = user_from_ad.get_attribute(‘msNPAllowDialin’, False)
if is_dial_in is None:
is_dial_in = False
userobj, status = UserProfile.objects.get_or_create(username=user)
userobj.dn = dn
userobj.guid = guid
userobj.sid = sid
userobj.san = san
userobj.email = email
userobj.pwd_last_set_time = pwd_last_set_time
userobj.create_time = create_time
userobj.last_login_time = last_login_time
userobj.is_enabled = is_enabled
userobj.is_locked_out = is_locked_out
userobj.is_dial_in = is_dial_in
userobj.save()

try:
groups = user_from_ad.get_memberOfs()
except Exception as e:
return
groups_from_ad = []
groups_from_db = []
for group in groups:
san = group.get_attribute(‘SamAccountName’, False)
guid = group.guid_str
sid = str(group.sid).split(“:”)[-1]
gc = group.get_group_type()
gs = group.get_group_scope()
dn = group.dn
create_time = parse(group.get_attribute(‘whenCreated’, False).Format())
groupobj, status = GroupProfile.objects.get_or_create(san=san)
groupobj.san = san
groupobj.guid = guid
groupobj.sid = sid
groupobj.gc = gc
groupobj.gs = gs
groupobj.dn = dn
groupobj.create_time = create_time
groupobj.save()
groups_from_ad.append(san)

for group in userobj.groups_of.all():
san = group.san
groups_from_db.append(san)

intersection = set(groups_from_ad) & set(groups_from_db)
if intersection:
for group in list(set(groups_from_db) - intersection):
userobj.groups_of.remove(GroupProfile.objects.get(san=group))

for group in list(set(groups_from_ad) - intersection):
userobj.groups_of.add(GroupProfile.objects.get(san=group))
else:
userobj.groups_of.clear()
for group in groups_from_ad:
userobj.groups_of.add(GroupProfile.objects.get(san=group))

初次添加

upload successful

upload successful

点击保存按钮后,将会自动更新用户及用户属组信息!

已存在

upload successful

如果账号已添加过,可以勾选用户,更新用户最新的用户信息,支持批量!

def update_group(self, group):
“””更新组信息”””
dn = adsearch.by_san(group)
group_from_ad = adgroup.ADGroup.from_dn(dn)
san = group_from_ad.get_attribute(‘SamAccountName’, False)
guid = group_from_ad.guid_str
sid = str(group_from_ad.sid).split(“:”)[-1]
gc = group_from_ad.get_group_type()
gs = group_from_ad.get_group_scope()
dn = group_from_ad.dn
create_time = parse(group_from_ad.get_attribute(‘whenCreated’, False).Format())
groupobj, status = GroupProfile.objects.get_or_create(san=group)
groupobj.san = san
groupobj.guid = guid
groupobj.sid = sid
groupobj.gc = gc
groupobj.gs = gs
groupobj.dn = dn
groupobj.create_time = create_time
groupobj.save()

upload successful

def enabled_user(self, obj_list):
“””启用用户”””
res = {
‘enabled_success’: [],
‘enabled_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.enable()
res[‘enabled_success’].append(obj)
except Exception as e:
res[‘enabled_failer’].append(obj)
return res
def disabled_user(self, obj_list):
“””禁用用户”””
res = {
‘disabled_success’: [],
‘disabled_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.disable()
res[‘disabled_success’].append(obj)
except Exception as e:
res[‘disabled_failer’].append(obj)
return res
def disabled_user_lizhi(self, obj_list):
“””禁用用户-离职”””
res = {
‘disabled_success’: [],
‘disabled_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
ou = adcontainer.ADContainer.from_dn(“OU=China,OU=DisabledAccounts,DC=local,DC=genewiz,DC=com”)
try:
user_from_ad.disable()
“””
move操作,需要全局AD控制器,需要使用苏州域控制器
pyad.set_defaults(ldap_server=”10.105.21.20”, username=”jianhu.yong”, password=”xxx”)
“””
user_from_ad.move(ou)
res[‘disabled_success’].append(obj)
except Exception as e:
res[‘disabled_failer’].append(obj)
return res
def unlock_user(self, obj_list):
“”” 解锁用户
lockoutTime: 0
“””
res = {
‘unlock_success’: [],
‘unlock_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.update_attribute(‘lockoutTime’, 0)
res[‘unlock_success’].append(obj)
except Exception as e:
res[‘unlock_failer’].append(obj)
return res

upload successful

def enabled_vpn(self, obj_list):
“”” 启用VPN
Enabled: True
Disabled: False
Not Set: None
“””
res = {
‘enabled_success’: [],
‘enabled_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.update_attribute(‘msNPAllowDialin’, True)
res[‘enabled_success’].append(obj)
except Exception as e:
res[‘enabled_failer’].append(obj)
return res
def disabled_vpn(self, obj_list):
“”” 禁用VPN
Enabled: True
Disabled: False
Not Set: None
“””
res = {
‘disabled_success’: [],
‘disabled_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.update_attribute(‘msNPAllowDialin’, False)
res[‘disabled_success’].append(obj)
except Exception as e:
res[‘disabled_failer’].append(obj)
return res
def reset_password(self, obj_list, init_pwd):
“””
重置用户密码
“””
res = {
‘reset_success’: [],
‘reset_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.set_password(init_pwd)
res[‘reset_success’].append(obj)
except Exception as e:
res[‘reset_failer’].append(obj)
return res

upload successful

def reset_password_login_modify(self, obj_list, init_pwd):
“””
重置用户密码,如果用户密码设置了永不过期,登录修改密码将不会生效!
“””
res = {
‘reset_success’: [],
‘reset_failer’: []
}
for obj in obj_list:
user_from_ad = aduser.ADUser.from_dn(obj.dn)
try:
user_from_ad.set_password(init_pwd)
user_from_ad.force_pwd_change_on_login()
res[‘reset_success’].append(obj)
except Exception as e:
res[‘reset_failer’].append(obj)
return res
权限管理

分别新增共享目录(二级目录)只读及读写的信息!

upload successful
upload successful
upload successful
upload successful
upload successful

def add_members_to_smb(self, dn, smb_folders):
“””更新组成员信息”””
user1 = aduser.ADUser.from_dn(dn)
res = {
‘user_to_group_added’: [],
‘user_to_group_new’: []
}
for smb_folder in smb_folders:
group_object = adgroup.ADGroup.from_dn(smb_folder.group.dn)
if not user1.is_member_of(group_object):
group_object.add_members(user1)
res[‘user_to_group_new’].append(smb_folder)
else:
res[‘user_to_group_added’].append(smb_folder)
return res

def del_members_from_smb(self, dn, smb_folders):
“””更新组成员信息”””
user1 = aduser.ADUser.from_dn(dn)
res = {
‘user_to_group_deleted’: [],
‘user_to_group_new’: []
}
for smb_folder in smb_folders:
group_object = adgroup.ADGroup.from_dn(smb_folder.group.dn)
if user1.is_member_of(group_object):
group_object.remove_members(user1)
res[‘user_to_group_new’].append(smb_folder)
else:
res[‘user_to_group_deleted’].append(smb_folder)
return res

upload successful
upload successful

def add_members_to_eg(self, dn, enabled_egs):
“””更新组成员信息”””
user1 = aduser.ADUser.from_dn(dn)
res = {
‘user_to_group_added’: [],
‘user_to_group_new’: []
}
for enabled_eg in enabled_egs:
group_object = adgroup.ADGroup.from_dn(enabled_eg.dn)
if not user1.is_member_of(group_object):
group_object.add_members(user1)
res[‘user_to_group_new’].append(enabled_eg)
else:
res[‘user_to_group_added’].append(enabled_eg)
return res

def del_members_from_eg(self, dn, disabled_egs):
“””更新组成员信息”””
user1 = aduser.ADUser.from_dn(dn)
res = {
‘user_to_group_deleted’: [],
‘user_to_group_new’: []
}
for disabled_eg in disabled_egs:
group_object = adgroup.ADGroup.from_dn(disabled_eg.dn)
if user1.is_member_of(group_object):
group_object.remove_members(user1)
res[‘user_to_group_new’].append(disabled_eg)
else:
res[‘user_to_group_deleted’].append(disabled_eg)
return res
通讯录管理
0%