python/samba/tests: Fix auth_log messaging problems in py3
[amitay/samba.git] / python / samba / tests / auth_log_base.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
20 """
21
22 import samba.tests
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
25 import time
26 import json
27 import os
28 import re
29
30 msg_ctxs = []
31
32 class AuthLogTestBase(samba.tests.TestCase):
33
34     def setUp(self):
35         super(AuthLogTestBase, self).setUp()
36         lp_ctx = self.get_loadparm()
37         self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
38         global msg_ctxs
39         msg_ctxs.append(self.msg_ctx)
40         self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
41
42         def messageHandler(context, msgType, src, message):
43             # This does not look like sub unit output and it
44             # makes these tests much easier to debug.
45             print(message)
46             jsonMsg = json.loads(message)
47             context["messages"].append(jsonMsg)
48
49         self.context = {"messages": []}
50         self.msg_handler_and_context = (messageHandler, self.context)
51         self.msg_ctx.register(self.msg_handler_and_context,
52                               msg_type=MSG_AUTH_LOG)
53
54         self.discardMessages()
55
56         self.remoteAddress = None
57         self.server = os.environ["SERVER"]
58         self.connection = None
59
60     def tearDown(self):
61         if self.msg_handler_and_context:
62             self.msg_ctx.deregister(self.msg_handler_and_context,
63                                     msg_type=MSG_AUTH_LOG)
64             self.msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
65
66     def waitForMessages(self, isLastExpectedMessage, connection=None):
67         """Wait for all the expected messages to arrive
68         The connection is passed through to keep the connection alive
69         until all the logging messages have been received.
70         """
71
72         def completed(messages):
73             for message in messages:
74                 if isRemote(message) and isLastExpectedMessage(message):
75                     return True
76             return False
77
78         def isRemote(message):
79             remote = None
80             if message["type"] == "Authorization":
81                 remote = message["Authorization"]["remoteAddress"]
82             elif message["type"] == "Authentication":
83                 remote = message["Authentication"]["remoteAddress"]
84             else:
85                 return False
86
87             try:
88                 addr = remote.split(":")
89                 return addr[1] == self.remoteAddress
90             except IndexError:
91                 return False
92
93         self.connection = connection
94
95         start_time = time.time()
96         while not completed(self.context["messages"]):
97             self.msg_ctx.loop_once(0.1)
98             if time.time() - start_time > 1:
99                 self.connection = None
100                 return []
101
102         self.connection = None
103         return list(filter(isRemote, self.context["messages"]))
104
105     # Discard any previously queued messages.
106     def discardMessages(self):
107         self.msg_ctx.loop_once(0.001)
108         while len(self.context["messages"]):
109             self.msg_ctx.loop_once(0.001)
110         self.context["messages"] = []
111
112     # Remove any NETLOGON authentication messages
113     # NETLOGON is only performed once per session, so to avoid ordering
114     # dependencies within the tests it's best to strip out NETLOGON messages.
115     #
116     def remove_netlogon_messages(self, messages):
117         def is_not_netlogon(msg):
118             if "Authentication" not in msg:
119                 return True
120             sd = msg["Authentication"]["serviceDescription"]
121             return sd != "NETLOGON"
122
123         return list(filter(is_not_netlogon, messages))
124
125     GUID_RE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
126
127     #
128     # Is the supplied GUID string correctly formatted
129     #
130     def is_guid(self, guid):
131         return re.match(self.GUID_RE, guid)