6edd0f6e953523dcc216c79b5e7fcd28ed36b8fe
[amitay/samba.git] / python / samba / tests / auth_log_base.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 from __future__ import print_function
19 """Tests for the Auth and AuthZ logging.
20 """
21
22 import samba.tests
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
25 import time
26 import json
27 import os
28 import re
29
30
31 class AuthLogTestBase(samba.tests.TestCase):
32
33     def setUp(self):
34         super(AuthLogTestBase, self).setUp()
35         lp_ctx = self.get_loadparm()
36         self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
37         self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
38
39         def messageHandler(context, msgType, src, message):
40             # This does not look like sub unit output and it
41             # makes these tests much easier to debug.
42             print(message)
43             jsonMsg = json.loads(message)
44             context["messages"].append(jsonMsg)
45
46         self.context = {"messages": []}
47         self.msg_handler_and_context = (messageHandler, self.context)
48         self.msg_ctx.register(self.msg_handler_and_context,
49                               msg_type=MSG_AUTH_LOG)
50
51         self.discardMessages()
52
53         self.remoteAddress = None
54         self.server = os.environ["SERVER"]
55         self.connection = None
56
57     def tearDown(self):
58         if self.msg_handler_and_context:
59             self.msg_ctx.deregister(self.msg_handler_and_context,
60                                     msg_type=MSG_AUTH_LOG)
61             self.msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
62
63     def waitForMessages(self, isLastExpectedMessage, connection=None):
64         """Wait for all the expected messages to arrive
65         The connection is passed through to keep the connection alive
66         until all the logging messages have been received.
67         """
68
69         def completed(messages):
70             for message in messages:
71                 if isRemote(message) and isLastExpectedMessage(message):
72                     return True
73             return False
74
75         def isRemote(message):
76             remote = None
77             if message["type"] == "Authorization":
78                 remote = message["Authorization"]["remoteAddress"]
79             elif message["type"] == "Authentication":
80                 remote = message["Authentication"]["remoteAddress"]
81             else:
82                 return False
83
84             try:
85                 addr = remote.split(":")
86                 return addr[1] == self.remoteAddress
87             except IndexError:
88                 return False
89
90         self.connection = connection
91
92         start_time = time.time()
93         while not completed(self.context["messages"]):
94             self.msg_ctx.loop_once(0.1)
95             if time.time() - start_time > 1:
96                 self.connection = None
97                 return []
98
99         self.connection = None
100         return filter(isRemote, self.context["messages"])
101
102     # Discard any previously queued messages.
103     def discardMessages(self):
104         self.msg_ctx.loop_once(0.001)
105         while len(self.context["messages"]):
106             self.msg_ctx.loop_once(0.001)
107         self.context["messages"] = []
108
109     # Remove any NETLOGON authentication messages
110     # NETLOGON is only performed once per session, so to avoid ordering
111     # dependencies within the tests it's best to strip out NETLOGON messages.
112     #
113     def remove_netlogon_messages(self, messages):
114         def is_not_netlogon(msg):
115             if "Authentication" not in msg:
116                 return True
117             sd = msg["Authentication"]["serviceDescription"]
118             return sd != "NETLOGON"
119
120         return list(filter(is_not_netlogon, messages))
121
122     GUID_RE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
123
124     #
125     # Is the supplied GUID string correctly formatted
126     #
127     def is_guid(self, guid):
128         return re.match(self.GUID_RE, guid)