1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
32 class AuthLogTestBase(samba.tests.TestCase):
35 super(AuthLogTestBase, self).setUp()
36 lp_ctx = self.get_loadparm()
37 self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
39 msg_ctxs.append(self.msg_ctx)
40 self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
42 def messageHandler(context, msgType, src, message):
43 # This does not look like sub unit output and it
44 # makes these tests much easier to debug.
46 jsonMsg = json.loads(message)
47 context["messages"].append(jsonMsg)
49 self.context = {"messages": []}
50 self.msg_handler_and_context = (messageHandler, self.context)
51 self.msg_ctx.register(self.msg_handler_and_context,
52 msg_type=MSG_AUTH_LOG)
54 self.discardMessages()
56 self.remoteAddress = None
57 self.server = os.environ["SERVER"]
58 self.connection = None
61 if self.msg_handler_and_context:
62 self.msg_ctx.deregister(self.msg_handler_and_context,
63 msg_type=MSG_AUTH_LOG)
64 self.msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
66 def waitForMessages(self, isLastExpectedMessage, connection=None):
67 """Wait for all the expected messages to arrive
68 The connection is passed through to keep the connection alive
69 until all the logging messages have been received.
72 def completed(messages):
73 for message in messages:
74 if isRemote(message) and isLastExpectedMessage(message):
78 def isRemote(message):
80 if message["type"] == "Authorization":
81 remote = message["Authorization"]["remoteAddress"]
82 elif message["type"] == "Authentication":
83 remote = message["Authentication"]["remoteAddress"]
88 addr = remote.split(":")
89 return addr[1] == self.remoteAddress
93 self.connection = connection
95 start_time = time.time()
96 while not completed(self.context["messages"]):
97 self.msg_ctx.loop_once(0.1)
98 if time.time() - start_time > 1:
99 self.connection = None
102 self.connection = None
103 return list(filter(isRemote, self.context["messages"]))
105 # Discard any previously queued messages.
106 def discardMessages(self):
107 self.msg_ctx.loop_once(0.001)
108 while len(self.context["messages"]):
109 self.msg_ctx.loop_once(0.001)
110 self.context["messages"] = []
112 # Remove any NETLOGON authentication messages
113 # NETLOGON is only performed once per session, so to avoid ordering
114 # dependencies within the tests it's best to strip out NETLOGON messages.
116 def remove_netlogon_messages(self, messages):
117 def is_not_netlogon(msg):
118 if "Authentication" not in msg:
120 sd = msg["Authentication"]["serviceDescription"]
121 return sd != "NETLOGON"
123 return list(filter(is_not_netlogon, messages))
125 GUID_RE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
128 # Is the supplied GUID string correctly formatted
130 def is_guid(self, guid):
131 return re.match(self.GUID_RE, guid)