import json
import os
-from random import SystemRandom
-import string
import time
from samba.auth import system_session
msg = json.loads(m)
return (
msg["type"] == "Authentication" and
- msg["Authentication"]["serviceDescription"] == "SamLogon" and
- msg["Authentication"]["authDescription"] == "interactive")
+ msg["Authentication"]["serviceDescription"] == "SamLogon")
#
# Handler function for received authentication messages.
# Print the message to help debugging the tests.
# as it's a JSON message it does not look like a sub-unit message.
print(message)
- self.dc_msg = message
+ self.dc_msgs.append(message)
# Set up a messaging context to listen for authentication events on
# the domain controller.
msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
# Wait for the SamLogon message.
- # As the SamLogon is the only message we're interested ignore any other
- # messages.
- self.dc_msg = None
+ # As there could be other SamLogon's in progress we need to collect
+ # all the SamLogons and let the caller match them to the session.
+ self.dc_msgs = []
start_time = time.time()
- while not is_sam_logon(self.dc_msg) and (time.time() - start_time < 1):
+ while (time.time() - start_time < 1):
msg_ctx.loop_once(0.1)
- if self.dc_msg is None:
- os.write(w1, get_bytes("None"))
+ # Only interested in SamLogon messages, filter out the rest
+ msgs = list(filter(is_sam_logon, self.dc_msgs))
+ if msgs:
+ for m in msgs:
+ m += "\n"
+ os.write(w1, get_bytes(m))
else:
- os.write(w1, get_bytes(self.dc_msg))
+ os.write(w1, get_bytes("None\n"))
+ os.close(w1)
msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
self.user_creds.set_username(self.user_name)
self.user_creds.set_workstation(self.server)
+ #
+ # Check that the domain server received a SamLogon request for the
+ # current logon.
+ #
+ def check_domain_server_authentication(self, pipe, logon_id, description):
+
+ messages = os.read(pipe, 8192)
+ messages = get_string(messages)
+ if len(messages) == 0 or messages == "None":
+ self.fail("No Domain server authentication message")
+
+ #
+ # Look for the SamLogon request matching logon_id
+ msg = None
+ for message in messages.split("\n"):
+ msg = json.loads(get_string(message))
+ if logon_id == msg["Authentication"]["logonId"]:
+ break
+ msg = None
+
+ if msg is None:
+ self.fail("No Domain server authentication message")
+
+ #
+ # Validate that message contains the expected data
+ #
+ self.assertEquals("Authentication", msg["type"])
+ self.assertEquals(logon_id, msg["Authentication"]["logonId"])
+ self.assertEquals("SamLogon",
+ msg["Authentication"]["serviceDescription"])
+ self.assertEquals(description,
+ msg["Authentication"]["authDescription"])
+
def test_ntlm_auth(self):
def isLastExpectedMessage(msg):
logon_id = msg["Authentication"]["logonId"]
- message = os.read(pipe, 4096)
- msg = json.loads(get_string(message))
- self.assertEquals("Authentication", msg["type"])
- self.assertEquals(logon_id, msg["Authentication"]["logonId"])
- self.assertEquals("SamLogon",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals("interactive",
- msg["Authentication"]["authDescription"])
+ #
+ # Now check the Domain server authentication message
+ #
+ self.check_domain_server_authentication(pipe, logon_id, "interactive")
def test_wbinfo(self):
def isLastExpectedMessage(msg):
#
# Now check the Domain server authentication message
#
- message = os.read(pipe, 4096)
- msg = json.loads(get_string(message))
- self.assertEquals("Authentication", msg["type"])
- self.assertEquals(logon_id, msg["Authentication"]["logonId"])
- self.assertEquals("SamLogon",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals("network",
- msg["Authentication"]["authDescription"])
+ self.check_domain_server_authentication(pipe, logon_id, "network")
def test_wbinfo_ntlmv1(self):
def isLastExpectedMessage(msg):
#
# Now check the Domain server authentication message
#
- message = os.read(pipe, 4096)
- msg = json.loads(get_string(message))
- self.assertEquals("Authentication", msg["type"])
- self.assertEquals(logon_id, msg["Authentication"]["logonId"])
- self.assertEquals("SamLogon",
- msg["Authentication"]["serviceDescription"])
- self.assertEquals("network",
- msg["Authentication"]["authDescription"])
+ self.check_domain_server_authentication(pipe, logon_id, "network")