1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
31 class AuthLogTestBase(samba.tests.TestCase):
34 super(AuthLogTestBase, self).setUp()
35 lp_ctx = self.get_loadparm()
36 self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
37 self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
39 def messageHandler(context, msgType, src, message):
40 # This does not look like sub unit output and it
41 # makes these tests much easier to debug.
43 jsonMsg = json.loads(message)
44 context["messages"].append(jsonMsg)
46 self.context = {"messages": []}
47 self.msg_handler_and_context = (messageHandler, self.context)
48 self.msg_ctx.register(self.msg_handler_and_context,
49 msg_type=MSG_AUTH_LOG)
51 self.discardMessages()
53 self.remoteAddress = None
54 self.server = os.environ["SERVER"]
55 self.connection = None
58 if self.msg_handler_and_context:
59 self.msg_ctx.deregister(self.msg_handler_and_context,
60 msg_type=MSG_AUTH_LOG)
61 self.msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
63 def waitForMessages(self, isLastExpectedMessage, connection=None):
64 """Wait for all the expected messages to arrive
65 The connection is passed through to keep the connection alive
66 until all the logging messages have been received.
69 def completed(messages):
70 for message in messages:
71 if isRemote(message) and isLastExpectedMessage(message):
75 def isRemote(message):
77 if message["type"] == "Authorization":
78 remote = message["Authorization"]["remoteAddress"]
79 elif message["type"] == "Authentication":
80 remote = message["Authentication"]["remoteAddress"]
85 addr = remote.split(":")
86 return addr[1] == self.remoteAddress
90 self.connection = connection
92 start_time = time.time()
93 while not completed(self.context["messages"]):
94 self.msg_ctx.loop_once(0.1)
95 if time.time() - start_time > 1:
96 self.connection = None
99 self.connection = None
100 return filter(isRemote, self.context["messages"])
102 # Discard any previously queued messages.
103 def discardMessages(self):
104 self.msg_ctx.loop_once(0.001)
105 while len(self.context["messages"]):
106 self.msg_ctx.loop_once(0.001)
107 self.context["messages"] = []
109 # Remove any NETLOGON authentication messages
110 # NETLOGON is only performed once per session, so to avoid ordering
111 # dependencies within the tests it's best to strip out NETLOGON messages.
113 def remove_netlogon_messages(self, messages):
114 def is_not_netlogon(msg):
115 if "Authentication" not in msg:
117 sd = msg["Authentication"]["serviceDescription"]
118 return sd != "NETLOGON"
120 return list(filter(is_not_netlogon, messages))
122 GUID_RE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
125 # Is the supplied GUID string correctly formatted
127 def is_guid(self, guid):
128 return re.match(self.GUID_RE, guid)