s3:libsmb: allow store_cldap_reply() to work with a ipv6 response
[samba.git] / python / samba / tests / blackbox / smbcontrol_process.py
1 # Blackbox tests for the smbcontrol fault injection commands
2 #
3 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18 # As the test terminates and sleeps samba processes these tests need to run
19 # in the preforkrestartdc test environment to prevent them impacting other
20 # tests.
21 #
22 from __future__ import print_function
23 import time
24 from samba.tests import BlackboxTestCase, BlackboxProcessError
25 from samba.messaging import Messaging
26
27 COMMAND = "bin/smbcontrol"
28 PING = "ping"
29
30
31 class SmbcontrolProcessBlockboxTests(BlackboxTestCase):
32
33     def setUp(self):
34         super(SmbcontrolProcessBlockboxTests, self).setUp()
35         lp_ctx = self.get_loadparm()
36         self.msg_ctx = Messaging(lp_ctx=lp_ctx)
37
38     def get_process_data(self):
39         services = self.msg_ctx.irpc_all_servers()
40
41         processes = []
42         for service in services:
43             for id in service.ids:
44                 processes.append((service.name, id.pid))
45         return processes
46
47     def get_process(self, name):
48         processes = self.get_process_data()
49         for pname, pid in processes:
50             if name == pname:
51                 return pid
52         return None
53
54     def test_inject_fault(self):
55         INJECT = "inject"
56         FAULT = "segv"
57         #
58         # Note that this process name needs to be different to the one used
59         # in the sleep test to avoid a race condition
60         #
61         pid = self.get_process("rpc_server")
62
63         #
64         # Ensure we can ping the process before injecting a fault.
65         #
66         try:
67             self.check_run("%s %s %s" % (COMMAND, pid, PING),
68                            msg="trying to ping rpc_server")
69         except BlackboxProcessError as e:
70             self.fail("Unable to ping rpc_server process")
71
72         #
73         # Now inject a fault.
74         #
75         try:
76             self.check_run("%s %s %s %s" % (COMMAND, pid, INJECT, FAULT),
77                            msg="injecting fault into rpc_server")
78         except BlackboxProcessError as e:
79             print(e)
80             self.fail("Unable to inject a fault into the rpc_server process")
81
82         #
83         # The process should have died, so we should not be able to ping it
84         #
85         try:
86             self.check_run("%s %s %s" % (COMMAND, pid, PING),
87                            msg="trying to ping rpc_server")
88             self.fail("Could ping rpc_server process")
89         except BlackboxProcessError as e:
90             pass
91
92     def test_sleep(self):
93         SLEEP = "sleep"  # smbcontrol sleep command
94         DURATION = 5     # duration to sleep server for
95         DELTA = 1        # permitted error for the sleep duration
96
97         #
98         # Note that this process name needs to be different to the one used
99         # in the inject fault test to avoid a race condition
100         #
101         pid = self.get_process("ldap_server")
102         #
103         # Ensure we can ping the process before getting it to sleep
104         #
105         try:
106             self.check_run("%s %s %s" % (COMMAND, pid, PING),
107                            msg="trying to ping rpc_server")
108         except BlackboxProcessError as e:
109             self.fail("Unable to ping rpc_server process")
110
111         #
112         # Now ask it to sleep
113         #
114         start = time.time()
115         try:
116             self.check_run("%s %s %s %s" % (COMMAND, pid, SLEEP, DURATION),
117                            msg="putting rpc_server to sleep for %d" % DURATION)
118         except BlackboxProcessError as e:
119             print(e)
120             self.fail("Failed to get rpc_server to sleep for %d" % DURATION)
121
122         #
123         # The process should be sleeping and not respond until it wakes
124         #
125         try:
126             self.check_run("%s %s %s" % (COMMAND, pid, PING),
127                            msg="trying to ping rpc_server")
128             end = time.time()
129             duration = end - start
130             self.assertGreater(duration + DELTA, DURATION)
131         except BlackboxProcessError as e:
132             self.fail("Unable to ping rpc_server process")