1 # Tests for SamDb password change audit logging.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the SamDb logging of password changes.
23 from samba.dcerpc.messaging import MSG_DSDB_LOG, DSDB_EVENT_NAME
24 from ldb import ERR_NO_SUCH_OBJECT
25 from samba.samdb import SamDB
26 from samba.auth import system_session
29 from samba.tests.audit_log_base import AuditLogTestBase
30 from samba.tests import delete_force
31 from samba.net import Net
33 from samba.dcerpc import security, lsa
35 USER_NAME = "auditlogtestuser"
36 USER_PASS = samba.generate_random_password(32, 32)
39 class AuditLogDsdbTests(AuditLogTestBase):
42 self.message_type = MSG_DSDB_LOG
43 self.event_type = DSDB_EVENT_NAME
44 super(AuditLogDsdbTests, self).setUp()
46 self.remoteAddress = os.environ["CLIENT_IP"]
47 self.server_ip = os.environ["SERVER_IP"]
49 host = "ldap://%s" % os.environ["SERVER"]
50 self.ldb = SamDB(url=host,
51 session_info=system_session(),
52 credentials=self.get_credentials(),
53 lp=self.get_loadparm())
54 self.server = os.environ["SERVER"]
56 # Gets back the basedn
57 self.base_dn = self.ldb.domain_dn()
59 # Get the old "dSHeuristics" if it was set
60 dsheuristics = self.ldb.get_dsheuristics()
62 # Set the "dSHeuristics" to activate the correct "userPassword"
64 self.ldb.set_dsheuristics("000000001")
66 # Reset the "dSHeuristics" as they were before
67 self.addCleanup(self.ldb.set_dsheuristics, dsheuristics)
69 # Get the old "minPwdAge"
70 minPwdAge = self.ldb.get_minPwdAge()
72 # Set it temporarily to "0"
73 self.ldb.set_minPwdAge("0")
74 self.base_dn = self.ldb.domain_dn()
76 # Reset the "minPwdAge" as it was before
77 self.addCleanup(self.ldb.set_minPwdAge, minPwdAge)
79 # (Re)adds the test user USER_NAME with password USER_PASS
80 delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
82 "dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
83 "objectclass": "user",
84 "sAMAccountName": USER_NAME,
85 "userPassword": USER_PASS
89 # Discard the messages from the setup code
91 def discardSetupMessages(self, dn):
92 self.waitForMessages(2, dn=dn)
93 self.discardMessages()
96 self.discardMessages()
97 super(AuditLogDsdbTests, self).tearDown()
99 def haveExpectedTxn(self, expected):
100 if self.context["txnMessage"] is not None:
101 txn = self.context["txnMessage"]["dsdbTransaction"]
102 if txn["transactionId"] == expected:
106 def waitForTransaction(self, expected, connection=None):
107 """Wait for a transaction message to arrive
108 The connection is passed through to keep the connection alive
109 until all the logging messages have been received.
112 self.connection = connection
114 start_time = time.time()
115 while not self.haveExpectedTxn(expected):
116 self.msg_ctx.loop_once(0.1)
117 if time.time() - start_time > 1:
118 self.connection = None
121 self.connection = None
122 return self.context["txnMessage"]
124 def test_net_change_password(self):
126 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
127 self.discardSetupMessages(dn)
129 creds = self.insta_creds(template=self.get_credentials())
131 lp = self.get_loadparm()
132 net = Net(creds, lp, server=self.server)
133 password = "newPassword!!42"
135 net.change_password(newpassword=password.encode('utf-8'),
137 oldpassword=USER_PASS)
139 messages = self.waitForMessages(1, net, dn=dn)
140 print("Received %d messages" % len(messages))
143 "Did not receive the expected number of messages")
145 audit = messages[0]["dsdbChange"]
146 self.assertEquals("Modify", audit["operation"])
147 self.assertFalse(audit["performedAsSystem"])
148 self.assertTrue(dn.lower(), audit["dn"].lower())
149 self.assertRegexpMatches(audit["remoteAddress"],
151 session_id = self.get_session()
152 self.assertEquals(session_id, audit["sessionId"])
153 # We skip the check for self.get_service_description() as this
154 # is subject to a race between smbd and the s4 rpc_server code
155 # as to which will set the description as it is DCE/RPC over SMB
157 self.assertTrue(self.is_guid(audit["transactionId"]))
159 attributes = audit["attributes"]
160 self.assertEquals(1, len(attributes))
161 actions = attributes["clearTextPassword"]["actions"]
162 self.assertEquals(1, len(actions))
163 self.assertTrue(actions[0]["redacted"])
164 self.assertEquals("replace", actions[0]["action"])
166 def test_net_set_password(self):
168 dn = "CN=" + USER_NAME + ",CN=Users," + self.base_dn
169 self.discardSetupMessages(dn)
171 creds = self.insta_creds(template=self.get_credentials())
173 lp = self.get_loadparm()
174 net = Net(creds, lp, server=self.server)
175 password = "newPassword!!42"
176 domain = lp.get("workgroup")
178 net.set_password(newpassword=password.encode('utf-8'),
179 account_name=USER_NAME,
181 messages = self.waitForMessages(1, net, dn=dn)
182 print("Received %d messages" % len(messages))
185 "Did not receive the expected number of messages")
186 audit = messages[0]["dsdbChange"]
187 self.assertEquals("Modify", audit["operation"])
188 self.assertFalse(audit["performedAsSystem"])
189 self.assertEquals(dn, audit["dn"])
190 self.assertRegexpMatches(audit["remoteAddress"],
192 session_id = self.get_session()
193 self.assertEquals(session_id, audit["sessionId"])
194 # We skip the check for self.get_service_description() as this
195 # is subject to a race between smbd and the s4 rpc_server code
196 # as to which will set the description as it is DCE/RPC over SMB
198 self.assertTrue(self.is_guid(audit["transactionId"]))
200 attributes = audit["attributes"]
201 self.assertEquals(1, len(attributes))
202 actions = attributes["clearTextPassword"]["actions"]
203 self.assertEquals(1, len(actions))
204 self.assertTrue(actions[0]["redacted"])
205 self.assertEquals("replace", actions[0]["action"])
207 def test_ldap_change_password(self):
209 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
210 self.discardSetupMessages(dn)
212 new_password = samba.generate_random_password(32, 32)
213 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
214 self.ldb.modify_ldif(
216 "changetype: modify\n" +
217 "delete: userPassword\n" +
218 "userPassword: " + USER_PASS + "\n" +
219 "add: userPassword\n" +
220 "userPassword: " + new_password + "\n")
222 messages = self.waitForMessages(1)
223 print("Received %d messages" % len(messages))
226 "Did not receive the expected number of messages")
228 audit = messages[0]["dsdbChange"]
229 self.assertEquals("Modify", audit["operation"])
230 self.assertFalse(audit["performedAsSystem"])
231 self.assertEquals(dn, audit["dn"])
232 self.assertRegexpMatches(audit["remoteAddress"],
234 self.assertTrue(self.is_guid(audit["sessionId"]))
235 session_id = self.get_session()
236 self.assertEquals(session_id, audit["sessionId"])
237 service_description = self.get_service_description()
238 self.assertEquals(service_description, "LDAP")
240 attributes = audit["attributes"]
241 self.assertEquals(1, len(attributes))
242 actions = attributes["userPassword"]["actions"]
243 self.assertEquals(2, len(actions))
244 self.assertTrue(actions[0]["redacted"])
245 self.assertEquals("delete", actions[0]["action"])
246 self.assertTrue(actions[1]["redacted"])
247 self.assertEquals("add", actions[1]["action"])
249 def test_ldap_replace_password(self):
251 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
252 self.discardSetupMessages(dn)
254 new_password = samba.generate_random_password(32, 32)
255 self.ldb.modify_ldif(
257 "changetype: modify\n" +
258 "replace: userPassword\n" +
259 "userPassword: " + new_password + "\n")
261 messages = self.waitForMessages(1, dn=dn)
262 print("Received %d messages" % len(messages))
265 "Did not receive the expected number of messages")
267 audit = messages[0]["dsdbChange"]
268 self.assertEquals("Modify", audit["operation"])
269 self.assertFalse(audit["performedAsSystem"])
270 self.assertTrue(dn.lower(), audit["dn"].lower())
271 self.assertRegexpMatches(audit["remoteAddress"],
273 self.assertTrue(self.is_guid(audit["sessionId"]))
274 session_id = self.get_session()
275 self.assertEquals(session_id, audit["sessionId"])
276 service_description = self.get_service_description()
277 self.assertEquals(service_description, "LDAP")
278 self.assertTrue(self.is_guid(audit["transactionId"]))
280 attributes = audit["attributes"]
281 self.assertEquals(1, len(attributes))
282 actions = attributes["userPassword"]["actions"]
283 self.assertEquals(1, len(actions))
284 self.assertTrue(actions[0]["redacted"])
285 self.assertEquals("replace", actions[0]["action"])
287 def test_ldap_add_user(self):
289 # The setup code adds a user, so we check for the dsdb events
291 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
292 messages = self.waitForMessages(2, dn=dn)
293 print("Received %d messages" % len(messages))
296 "Did not receive the expected number of messages")
298 audit = messages[1]["dsdbChange"]
299 self.assertEquals("Add", audit["operation"])
300 self.assertFalse(audit["performedAsSystem"])
301 self.assertEquals(dn, audit["dn"])
302 self.assertRegexpMatches(audit["remoteAddress"],
304 session_id = self.get_session()
305 self.assertEquals(session_id, audit["sessionId"])
306 service_description = self.get_service_description()
307 self.assertEquals(service_description, "LDAP")
308 self.assertTrue(self.is_guid(audit["sessionId"]))
309 self.assertTrue(self.is_guid(audit["transactionId"]))
311 attributes = audit["attributes"]
312 self.assertEquals(3, len(attributes))
314 actions = attributes["objectclass"]["actions"]
315 self.assertEquals(1, len(actions))
316 self.assertEquals("add", actions[0]["action"])
317 self.assertEquals(1, len(actions[0]["values"]))
318 self.assertEquals("user", actions[0]["values"][0]["value"])
320 actions = attributes["sAMAccountName"]["actions"]
321 self.assertEquals(1, len(actions))
322 self.assertEquals("add", actions[0]["action"])
323 self.assertEquals(1, len(actions[0]["values"]))
324 self.assertEquals(USER_NAME, actions[0]["values"][0]["value"])
326 actions = attributes["userPassword"]["actions"]
327 self.assertEquals(1, len(actions))
328 self.assertEquals("add", actions[0]["action"])
329 self.assertTrue(actions[0]["redacted"])
331 def test_samdb_delete_user(self):
333 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
334 self.discardSetupMessages(dn)
336 self.ldb.deleteuser(USER_NAME)
338 messages = self.waitForMessages(1, dn=dn)
339 print("Received %d messages" % len(messages))
342 "Did not receive the expected number of messages")
344 audit = messages[0]["dsdbChange"]
345 self.assertEquals("Delete", audit["operation"])
346 self.assertFalse(audit["performedAsSystem"])
347 self.assertTrue(dn.lower(), audit["dn"].lower())
348 self.assertRegexpMatches(audit["remoteAddress"],
350 self.assertTrue(self.is_guid(audit["sessionId"]))
351 self.assertEquals(0, audit["statusCode"])
352 self.assertEquals("Success", audit["status"])
353 session_id = self.get_session()
354 self.assertEquals(session_id, audit["sessionId"])
355 service_description = self.get_service_description()
356 self.assertEquals(service_description, "LDAP")
358 transactionId = audit["transactionId"]
359 message = self.waitForTransaction(transactionId)
360 audit = message["dsdbTransaction"]
361 self.assertEquals("commit", audit["action"])
362 self.assertTrue(self.is_guid(audit["transactionId"]))
363 self.assertTrue(audit["duration"] > 0)
365 def test_samdb_delete_non_existent_dn(self):
367 DOES_NOT_EXIST = "doesNotExist"
368 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
369 self.discardSetupMessages(dn)
371 dn = "cn=" + DOES_NOT_EXIST + ",cn=users," + self.base_dn
374 self.fail("Exception not thrown")
378 messages = self.waitForMessages(1)
379 print("Received %d messages" % len(messages))
382 "Did not receive the expected number of messages")
384 audit = messages[0]["dsdbChange"]
385 self.assertEquals("Delete", audit["operation"])
386 self.assertFalse(audit["performedAsSystem"])
387 self.assertTrue(dn.lower(), audit["dn"].lower())
388 self.assertRegexpMatches(audit["remoteAddress"],
390 self.assertEquals(ERR_NO_SUCH_OBJECT, audit["statusCode"])
391 self.assertEquals("No such object", audit["status"])
392 self.assertTrue(self.is_guid(audit["sessionId"]))
393 session_id = self.get_session()
394 self.assertEquals(session_id, audit["sessionId"])
395 service_description = self.get_service_description()
396 self.assertEquals(service_description, "LDAP")
398 transactionId = audit["transactionId"]
399 message = self.waitForTransaction(transactionId)
400 audit = message["dsdbTransaction"]
401 self.assertEquals("rollback", audit["action"])
402 self.assertTrue(self.is_guid(audit["transactionId"]))
403 self.assertTrue(audit["duration"] > 0)
405 def test_create_and_delete_secret_over_lsa(self):
407 dn = "cn=Test Secret,CN=System," + self.base_dn
408 self.discardSetupMessages(dn)
410 creds = self.insta_creds(template=self.get_credentials())
411 lsa_conn = lsa.lsarpc(
412 "ncacn_np:%s" % self.server,
415 lsa_handle = lsa_conn.OpenPolicy2(
417 attr=lsa.ObjectAttribute(),
418 access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
419 secret_name = lsa.String()
420 secret_name.string = "G$Test"
421 lsa_conn.CreateSecret(
424 access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
426 messages = self.waitForMessages(1, dn=dn)
427 print("Received %d messages" % len(messages))
430 "Did not receive the expected number of messages")
432 audit = messages[0]["dsdbChange"]
433 self.assertEquals("Add", audit["operation"])
434 self.assertTrue(audit["performedAsSystem"])
435 self.assertTrue(dn.lower(), audit["dn"].lower())
436 self.assertRegexpMatches(audit["remoteAddress"],
438 self.assertTrue(self.is_guid(audit["sessionId"]))
439 session_id = self.get_session()
440 self.assertEquals(session_id, audit["sessionId"])
442 # We skip the check for self.get_service_description() as this
443 # is subject to a race between smbd and the s4 rpc_server code
444 # as to which will set the description as it is DCE/RPC over SMB
446 attributes = audit["attributes"]
447 self.assertEquals(2, len(attributes))
449 object_class = attributes["objectClass"]
450 self.assertEquals(1, len(object_class["actions"]))
451 action = object_class["actions"][0]
452 self.assertEquals("add", action["action"])
453 values = action["values"]
454 self.assertEquals(1, len(values))
455 self.assertEquals("secret", values[0]["value"])
457 cn = attributes["cn"]
458 self.assertEquals(1, len(cn["actions"]))
459 action = cn["actions"][0]
460 self.assertEquals("add", action["action"])
461 values = action["values"]
462 self.assertEquals(1, len(values))
463 self.assertEquals("Test Secret", values[0]["value"])
466 # Now delete the secret.
467 self.discardMessages()
468 h = lsa_conn.OpenSecret(
471 access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED)
473 lsa_conn.DeleteObject(h)
474 messages = self.waitForMessages(1, dn=dn)
475 print("Received %d messages" % len(messages))
478 "Did not receive the expected number of messages")
480 dn = "cn=Test Secret,CN=System," + self.base_dn
481 audit = messages[0]["dsdbChange"]
482 self.assertEquals("Delete", audit["operation"])
483 self.assertTrue(audit["performedAsSystem"])
484 self.assertTrue(dn.lower(), audit["dn"].lower())
485 self.assertRegexpMatches(audit["remoteAddress"],
487 self.assertTrue(self.is_guid(audit["sessionId"]))
488 session_id = self.get_session()
489 self.assertEquals(session_id, audit["sessionId"])
491 # We skip the check for self.get_service_description() as this
492 # is subject to a race between smbd and the s4 rpc_server code
493 # as to which will set the description as it is DCE/RPC over SMB
495 def test_modify(self):
497 dn = "cn=" + USER_NAME + ",cn=users," + self.base_dn
498 self.discardSetupMessages(dn)
501 # Add an attribute value
503 self.ldb.modify_ldif(
505 "changetype: modify\n" +
506 "add: carLicense\n" +
507 "carLicense: license-01\n")
509 messages = self.waitForMessages(1, dn=dn)
510 print("Received %d messages" % len(messages))
513 "Did not receive the expected number of messages")
515 audit = messages[0]["dsdbChange"]
516 self.assertEquals("Modify", audit["operation"])
517 self.assertFalse(audit["performedAsSystem"])
518 self.assertEquals(dn, audit["dn"])
519 self.assertRegexpMatches(audit["remoteAddress"],
521 self.assertTrue(self.is_guid(audit["sessionId"]))
522 session_id = self.get_session()
523 self.assertEquals(session_id, audit["sessionId"])
524 service_description = self.get_service_description()
525 self.assertEquals(service_description, "LDAP")
527 attributes = audit["attributes"]
528 self.assertEquals(1, len(attributes))
529 actions = attributes["carLicense"]["actions"]
530 self.assertEquals(1, len(actions))
531 self.assertEquals("add", actions[0]["action"])
532 values = actions[0]["values"]
533 self.assertEquals(1, len(values))
534 self.assertEquals("license-01", values[0]["value"])
537 # Add an another value to the attribute
539 self.discardMessages()
540 self.ldb.modify_ldif(
542 "changetype: modify\n" +
543 "add: carLicense\n" +
544 "carLicense: license-02\n")
546 messages = self.waitForMessages(1, dn=dn)
547 print("Received %d messages" % len(messages))
550 "Did not receive the expected number of messages")
551 attributes = messages[0]["dsdbChange"]["attributes"]
552 self.assertEquals(1, len(attributes))
553 actions = attributes["carLicense"]["actions"]
554 self.assertEquals(1, len(actions))
555 self.assertEquals("add", actions[0]["action"])
556 values = actions[0]["values"]
557 self.assertEquals(1, len(values))
558 self.assertEquals("license-02", values[0]["value"])
561 # Add an another two values to the attribute
563 self.discardMessages()
564 self.ldb.modify_ldif(
566 "changetype: modify\n" +
567 "add: carLicense\n" +
568 "carLicense: license-03\n" +
569 "carLicense: license-04\n")
571 messages = self.waitForMessages(1, dn=dn)
572 print("Received %d messages" % len(messages))
575 "Did not receive the expected number of messages")
576 attributes = messages[0]["dsdbChange"]["attributes"]
577 self.assertEquals(1, len(attributes))
578 actions = attributes["carLicense"]["actions"]
579 self.assertEquals(1, len(actions))
580 self.assertEquals("add", actions[0]["action"])
581 values = actions[0]["values"]
582 self.assertEquals(2, len(values))
583 self.assertEquals("license-03", values[0]["value"])
584 self.assertEquals("license-04", values[1]["value"])
587 # delete two values to the attribute
589 self.discardMessages()
590 self.ldb.modify_ldif(
592 "changetype: delete\n" +
593 "delete: carLicense\n" +
594 "carLicense: license-03\n" +
595 "carLicense: license-04\n")
597 messages = self.waitForMessages(1, dn=dn)
598 print("Received %d messages" % len(messages))
601 "Did not receive the expected number of messages")
602 attributes = messages[0]["dsdbChange"]["attributes"]
603 self.assertEquals(1, len(attributes))
604 actions = attributes["carLicense"]["actions"]
605 self.assertEquals(1, len(actions))
606 self.assertEquals("delete", actions[0]["action"])
607 values = actions[0]["values"]
608 self.assertEquals(2, len(values))
609 self.assertEquals("license-03", values[0]["value"])
610 self.assertEquals("license-04", values[1]["value"])
613 # replace two values to the attribute
615 self.discardMessages()
616 self.ldb.modify_ldif(
618 "changetype: delete\n" +
619 "replace: carLicense\n" +
620 "carLicense: license-05\n" +
621 "carLicense: license-06\n")
623 messages = self.waitForMessages(1, dn=dn)
624 print("Received %d messages" % len(messages))
627 "Did not receive the expected number of messages")
628 attributes = messages[0]["dsdbChange"]["attributes"]
629 self.assertEquals(1, len(attributes))
630 actions = attributes["carLicense"]["actions"]
631 self.assertEquals(1, len(actions))
632 self.assertEquals("replace", actions[0]["action"])
633 values = actions[0]["values"]
634 self.assertEquals(2, len(values))
635 self.assertEquals("license-05", values[0]["value"])
636 self.assertEquals("license-06", values[1]["value"])