c316bdd5785b322448e4ebcc5213fb79000be971
[samba.git] / python / samba / tests / segfault.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Copyright (C) Catalyst.Net Ltd. 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 """Test whether various python calls segfault when given unexpected input.
20 """
21
22 import samba.tests
23 import os
24 import sys
25 from samba.net import Net, LIBNET_JOIN_AUTOMATIC
26 from samba.credentials import DONT_USE_KERBEROS
27 from samba import NTSTATUSError, ntstatus
28 from samba.dcerpc import misc, drsuapi, samr, unixinfo, dnsserver
29 from samba import auth, gensec
30 from samba.samdb import SamDB
31 from samba import netbios
32 from samba import registry
33 from samba import ldb
34 from samba import messaging
35
36 import traceback
37
38
39 def segfault_detector(f):
40     def wrapper(*args, **kwargs):
41         pid = os.fork()
42         if pid == 0:
43             try:
44                 f(*args, **kwargs)
45             except Exception as e:
46                 traceback.print_exc()
47             sys.stderr.flush()
48             sys.stdout.flush()
49             os._exit(0)
50
51         pid2, status = os.waitpid(pid, 0)
52         if os.WIFSIGNALED(status):
53             signal = os.WTERMSIG(status)
54             raise AssertionError("Failed with signal %d" % signal)
55
56     return wrapper
57
58
59 def no_gdb_backtrace(f):
60     from os import environ
61     def w(*args, **kwargs):
62         environ['PLEASE_NO_GDB_BACKTRACE'] = '1'
63         f(*args, **kwargs)
64         del environ['PLEASE_NO_GDB_BACKTRACE']
65
66     return w
67
68
69 class SegfaultTests(samba.tests.TestCase):
70     def get_lp_et_al(self):
71         server = os.environ["SERVER"]
72         lp = self.get_loadparm()
73
74         creds = self.insta_creds(template=self.get_credentials(),
75                                  kerberos_state=DONT_USE_KERBEROS)
76         return lp, creds, server
77
78     def get_samdb(self):
79         lp, creds, server = self.get_lp_et_al()
80         url = 'ldap://' + server
81         ldb = SamDB(url, credentials=creds, lp=lp)
82         return ldb
83
84     @segfault_detector
85     def test_net_replicate_init__1(self):
86         lp, creds, server = self.get_lp_et_al()
87         net = Net(creds, lp, server=server)
88         net.replicate_init(42, lp, None, misc.GUID())
89
90     @no_gdb_backtrace
91     @segfault_detector
92     def test_net_replicate_init__3(self):
93         # third argument is also unchecked
94         samdb = self.get_samdb()
95         lp, creds, server = self.get_lp_et_al()
96         net = Net(creds, lp, server=server)
97         net.replicate_init(samdb, lp, 42, misc.GUID())
98
99     @segfault_detector
100     def test_net_replicate_chunk_1(self):
101         lp, creds, server = self.get_lp_et_al()
102         ctr = drsuapi.DsGetNCChangesCtr6()
103         net = Net(creds, lp, server=server)
104         net.replicate_chunk(42, 1, ctr)
105
106     @segfault_detector
107     def test_auth_context_gensec_start_server(self):
108         a = auth.AuthContext(ldb=42, methods=['sam'])
109         # there is no failure yet because the ldb is not actually
110         # dereferenced.
111         g = gensec.Security.start_server(auth_context=a)
112         # and still the ldb is not dereferenced...
113
114     @segfault_detector
115     def test_auth_user_session(self):
116         s = auth.user_session(ldb=42, principal='foo')
117
118     @segfault_detector
119     def test_gensec_start_server(self):
120         gensec.Security.start_server(auth_context=42)
121
122     @segfault_detector
123     def test_netbios_query_name(self):
124         n = netbios.Node()
125         t = n.query_name((42, 'foo'), 'localhost')
126
127     @segfault_detector
128     def test_encrypt_netr_crypt_password(self):
129         lp, creds, server = self.get_lp_et_al()
130         creds.encrypt_netr_crypt_password(42)
131
132     @segfault_detector
133     def test_hive_open_ldb(self):
134         # we don't need to provide a valid path because we segfault first
135         try:
136             registry.open_ldb('', credentials=42)
137         except ldb.LdbError as e:
138             print("failed with %s" % e)
139
140     @segfault_detector
141     def test_hive_open_hive(self):
142         # we don't need to provide a valid path because we segfault first
143         try:
144             registry.open_hive('s', 's', 's', 's')
145         except ldb.LdbError as e:
146             print("failed with %s" % e)
147
148     @segfault_detector
149     def test_ldb_add_nameless_element(self):
150         m = ldb.Message()
151         e = ldb.MessageElement('q')
152         try:
153             m.add(e)
154         except ldb.LdbError:
155             pass
156         str(m)
157
158     @segfault_detector
159     def test_ldb_register_module(self):
160         ldb.register_module('')
161
162     @segfault_detector
163     def test_messaging_deregister(self):
164         messaging.deregister('s', 's', 's', False)
165
166     @segfault_detector
167     def test_rpcecho(self):
168         from samba.dcerpc import echo
169         echo.rpcecho("")
170
171     @segfault_detector
172     def test_dcerpc_idl_ref_elements(self):
173         """There are many pidl generated functions that crashed on this
174         pattern, where a NULL pointer was created rather than an empty
175         structure."""
176         samr.Connect5().out_info_out = 1
177
178     @segfault_detector
179     def test_dcerpc_idl_unixinfo_elements(self):
180         """Dereferencing is sufficient to crash"""
181         unixinfo.GetPWUid().out_infos
182
183     @segfault_detector
184     def test_dcerpc_idl_inline_arrays(self):
185         """Inline arrays were incorrectly handled."""
186         dnsserver.DNS_RPC_SERVER_INFO_DOTNET().pExtensions
187
188     @segfault_detector
189     def test_dcerpc_idl_set_inline_arrays(self):
190         """Setting an inline array was incorrectly handled."""
191         a = dnsserver.DNS_EXTENSION();
192         x = dnsserver.DNS_RPC_DP_INFO();
193         x.pwszReserved = [a, a, a]
194
195     @no_gdb_backtrace
196     @segfault_detector
197     def test_dnsp_string_list(self):
198         from samba.dcerpc import dnsp
199         # We segfault if s.count is greater than the length of s.str
200         s = dnsp.string_list()
201         s.count = 3
202         s.str
203
204     @no_gdb_backtrace
205     @segfault_detector
206     def test_dns_record(self):
207         from samba.dnsserver import TXTRecord
208         from samba.dcerpc import dnsp, dnsserver
209         # there are many others here
210         rec = TXTRecord(["a", "b", "c"])
211         rec.wType = dnsp.DNS_TYPE_A
212         rec.data
213
214     @no_gdb_backtrace
215     @segfault_detector
216     def test_ldb_msg_diff(self):
217         samdb = self.get_samdb()
218
219         msg = ldb.Message()
220         msg.dn = ldb.Dn(samdb, '')
221         diff = samdb.msg_diff(msg, msg)
222
223         del msg
224         diff.dn