aefd57e551615325ea6b68f95408d7957f1ad9e4
[kai/samba-autobuild/.git] / python / samba / tests / auth_log_base.py
1 # Unix SMB/CIFS implementation.
2 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3 #
4 # This program is free software; you can redistribute it and/or modify
5 # it under the terms of the GNU General Public License as published by
6 # the Free Software Foundation; either version 3 of the License, or
7 # (at your option) any later version.
8 #
9 # This program is distributed in the hope that it will be useful,
10 # but WITHOUT ANY WARRANTY; without even the implied warranty of
11 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 # GNU General Public License for more details.
13 #
14 # You should have received a copy of the GNU General Public License
15 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 #
17
18 """Tests for the Auth and AuthZ logging.
19 """
20
21 from samba import auth
22 import samba.tests
23 from samba.messaging import Messaging
24 from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
25 from samba.dcerpc import srvsvc, dnsserver
26 import time
27 import json
28 import os
29 from samba import smb
30 from samba.samdb import SamDB
31
32 class AuthLogTestBase(samba.tests.TestCase):
33
34     def setUp(self):
35         super(AuthLogTestBase, self).setUp()
36         lp_ctx = self.get_loadparm()
37         self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx);
38         self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
39
40         def messageHandler( context, msgType, src, message):
41             # This does not look like sub unit output and it
42             # makes these tests much easier to debug.
43             print message
44             jsonMsg = json.loads(message)
45             context["messages"].append( jsonMsg)
46
47         self.context = { "messages": []}
48         self.msg_handler_and_context = (messageHandler, self.context)
49         self.msg_ctx.register(self.msg_handler_and_context,
50                               msg_type=MSG_AUTH_LOG)
51
52         self.discardMessages()
53
54         self.remoteAddress = None
55         self.server = os.environ["SERVER"]
56         self.connection = None
57
58     def tearDown(self):
59         if self.msg_handler_and_context:
60             self.msg_ctx.deregister(self.msg_handler_and_context,
61                                     msg_type=MSG_AUTH_LOG)
62
63
64     def waitForMessages(self, isLastExpectedMessage, connection=None):
65         """Wait for all the expected messages to arrive
66         The connection is passed through to keep the connection alive
67         until all the logging messages have been received.
68         """
69
70         def completed( messages):
71             for message in messages:
72                 if isRemote( message) and isLastExpectedMessage( message):
73                     return True
74             return False
75
76         def isRemote( message):
77             remote = None
78             if message["type"] == "Authorization":
79                 remote = message["Authorization"]["remoteAddress"]
80             elif message["type"] == "Authentication":
81                 remote = message["Authentication"]["remoteAddress"]
82             else:
83                 return False
84
85             try:
86                 addr = remote.split(":")
87                 return addr[1] == self.remoteAddress
88             except IndexError:
89                 return False
90
91         self.connection = connection
92
93         start_time = time.time()
94         while not completed( self.context["messages"]):
95             self.msg_ctx.loop_once(0.1)
96             if time.time() - start_time > 1:
97                 self.connection = None
98                 return []
99
100         self.connection = None
101         return filter( isRemote, self.context["messages"])
102
103     # Discard any previously queued messages.
104     def discardMessages(self):
105         self.msg_ctx.loop_once(0.001)
106         while len( self.context["messages"]):
107             self.msg_ctx.loop_once(0.001)
108         self.context["messages"] = []
109
110     # Remove any NETLOGON authentication messages
111     # NETLOGON is only performed once per session, so to avoid ordering
112     # dependencies within the tests it's best to strip out NETLOGON messages.
113     #
114     def remove_netlogon_messages(self, messages):
115         def is_not_netlogon(msg):
116             if "Authentication" not in msg:
117                 return True
118             sd = msg["Authentication"]["serviceDescription"]
119             return sd != "NETLOGON"
120
121         return list(filter(is_not_netlogon, messages))