1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 # GNU General Public License for more details.
14 # You should have received a copy of the GNU General Public License
15 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 """Tests for the Auth and AuthZ logging.
21 from samba import auth
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
25 from samba.dcerpc import srvsvc, dnsserver
30 from samba.samdb import SamDB
32 class AuthLogTestBase(samba.tests.TestCase):
35 super(AuthLogTestBase, self).setUp()
36 lp_ctx = self.get_loadparm()
37 self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx);
38 self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
40 def messageHandler( context, msgType, src, message):
41 # This does not look like sub unit output and it
42 # makes these tests much easier to debug.
44 jsonMsg = json.loads(message)
45 context["messages"].append( jsonMsg)
47 self.context = { "messages": []}
48 self.msg_handler_and_context = (messageHandler, self.context)
49 self.msg_ctx.register(self.msg_handler_and_context,
50 msg_type=MSG_AUTH_LOG)
52 self.discardMessages()
54 self.remoteAddress = None
55 self.server = os.environ["SERVER"]
56 self.connection = None
59 if self.msg_handler_and_context:
60 self.msg_ctx.deregister(self.msg_handler_and_context,
61 msg_type=MSG_AUTH_LOG)
64 def waitForMessages(self, isLastExpectedMessage, connection=None):
65 """Wait for all the expected messages to arrive
66 The connection is passed through to keep the connection alive
67 until all the logging messages have been received.
70 def completed( messages):
71 for message in messages:
72 if isRemote( message) and isLastExpectedMessage( message):
76 def isRemote( message):
78 if message["type"] == "Authorization":
79 remote = message["Authorization"]["remoteAddress"]
80 elif message["type"] == "Authentication":
81 remote = message["Authentication"]["remoteAddress"]
86 addr = remote.split(":")
87 return addr[1] == self.remoteAddress
91 self.connection = connection
93 start_time = time.time()
94 while not completed( self.context["messages"]):
95 self.msg_ctx.loop_once(0.1)
96 if time.time() - start_time > 1:
97 self.connection = None
100 self.connection = None
101 return filter( isRemote, self.context["messages"])
103 # Discard any previously queued messages.
104 def discardMessages(self):
105 self.msg_ctx.loop_once(0.001)
106 while len( self.context["messages"]):
107 self.msg_ctx.loop_once(0.001)
108 self.context["messages"] = []
110 # Remove any NETLOGON authentication messages
111 # NETLOGON is only performed once per session, so to avoid ordering
112 # dependencies within the tests it's best to strip out NETLOGON messages.
114 def remove_netlogon_messages(self, messages):
115 def is_not_netlogon(msg):
116 if "Authentication" not in msg:
118 sd = msg["Authentication"]["serviceDescription"]
119 return sd != "NETLOGON"
121 return list(filter(is_not_netlogon, messages))