-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathrealm.py
More file actions
201 lines (161 loc) · 5.89 KB
/
realm.py
File metadata and controls
201 lines (161 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import os, sys, subprocess, configparser
from linuxmusterLinuxclient7 import logging, computer
def join(domain, user):
"""
Joins the computer to an AD
:param domain: The domain to join
:type domain: str
:param user: The admin user used for joining
:type user: str
:return: True on success, False otherwise
:rtype: bool
"""
# join the domain using the kerberos ticket
joinCommand = ["realm", "join", "-v", domain, "-U", user]
if subprocess.call(joinCommand) != 0:
print()
logging.error('Failed! Did you enter the correct password?')
return False
logging.info("It looks like the domain was joined successfully.")
return True
def leave(domain):
"""
Leave a domain
:param domain: The domain to leave
:type domain: str
:return: True on success, False otherwise
:rtype: bool
"""
leaveCommand = ["realm", "leave", domain]
return subprocess.call(leaveCommand) == 0
def leaveAll():
"""
Leaves all joined domains
:return: True on success, False otherwise
:rtype: bool
"""
logging.info("Cleaning / leaving all domain joins")
rc, joinedDomains = getJoinedDomains()
if not rc:
return False
for joinedDomain in joinedDomains:
logging.info("* {}".format(joinedDomain))
if not leave(joinedDomain):
logging.error("-> Failed! Aborting!")
return False
logging.info("-> Done!")
return True
def isJoined():
"""
Checks if the computer is joined to a domain
:return: True if it is joined to one or more domains, False otherwise
:rtype: bool
"""
rc, joinedDomains = getJoinedDomains()
if not rc:
return False
else:
return len(joinedDomains) > 0
def pullKerberosTicketForComputerAccount():
"""
Pulls a kerberos ticket using the computer account from `/etc/krb5.keytab`
:return: True on success, False otherwise
:rtype: bool
"""
return subprocess.call(["kinit", "-k", computer.krbHostName()]) == 0
def verifyDomainJoin():
"""
Checks if the domain join actually works.
:return: True if it does, False otherwise
:rtype: bool
"""
logging.info("Testing if the domain join actually works")
if not isJoined():
logging.error("No domain is joined!")
return False
logging.info("* Checking if the group \"domain users\" exists")
if subprocess.call(["getent", "group", "domain users"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) != 0:
logging.error("The \"domain users\" group does not exists! Users wont be able to log in!")
logging.error("This is sometimes related to /etc/nsswitch.conf.")
return False
# Try to get a kerberos ticket for the computer account
logging.info("* Trying to get a kerberos ticket for the Computer Account")
if not pullKerberosTicketForComputerAccount():
logging.error("Could not get a kerberos ticket for the Computer Account!")
logging.error("Logins of non-cached users WILL NOT WORK!")
logging.error("Please try to re-join the Domain.")
return False
logging.info("The domain join is working!")
return True
def getJoinedDomains():
"""
Returns all joined domains
:return: Tuple (success, list of joined domians)
:rtype: tuple
"""
result = subprocess.run("realm list --name-only", stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to read domain joins!")
return False, None
return True, list(filter(None, result.stdout.split("\n")))
def discoverDomains(domain=None):
"""
Searches for avialable domains on the current network
:param domain: The domain to discover (optional)
:type domain: str
:return: Tuple (success, list of available domains)
:rtype: tuple
"""
command = ["realm", "discover", "--name-only"]
if domain != None:
command.append(domain)
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode != 0:
logging.error("Failed to discover available domains!")
return False, None
return True, list(filter(None, result.stdout.split("\n")))
def getDomainConfig(domain):
"""
Looks up all relevant properties of a domain:
- domain controller IP
- domain name
:param domain: The domain to check
:type domain: str
:return: Tuple (success, dict with domain config)
:rtype: tuple
"""
result = subprocess.run("adcli info '{}'".format(domain), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, shell=True)
if result.returncode != 0:
logging.error("Failed to get details of domain {}!".format(domain))
return False, None
rawConfig = _readConfigFromString(result.stdout)
try:
rawDomainConfig = rawConfig["domain"]
except KeyError:
logging.error("Error when reading domain details")
return False, None
domainConfig = {}
try:
domainConfig["domain-controller"] = rawDomainConfig["domain-controller"]
domainConfig["domain-name"] = rawDomainConfig["domain-name"]
except KeyError:
logging.error("Error when reading domain details (2)")
return False, None
return True, domainConfig
def clearUserCache():
"""
Clears the local user cache
:return: True on success, False otherwise
:rtype: bool
"""
# clean sssd cache
logging.info("Cleaning sssd cache.")
subprocess.call(["sssctl", "cache-remove", "--stop", "--start", "--override"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return True
# --------------------
# - Helper functions -
# --------------------
def _readConfigFromString(string):
configParser = configparser.ConfigParser()
configParser.read_string(string)
return configParser