pytest/segfault: segfault with nameless element
[sharpe/samba-autobuild/.git] / python / samba / tests / segfault.py
1 # Unix SMB/CIFS implementation.
2 #
3 # Copyright (C) Catalyst.Net Ltd. 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 """Test whether various python calls segfault when given unexpected input.
20 """
21
22 import samba.tests
23 import os
24 import sys
25 from samba.net import Net, LIBNET_JOIN_AUTOMATIC
26 from samba.credentials import DONT_USE_KERBEROS
27 from samba import NTSTATUSError, ntstatus
28 from samba.dcerpc import misc, drsuapi
29 from samba import auth, gensec
30 from samba.samdb import SamDB
31 from samba import netbios
32 from samba import registry
33 from samba import ldb
34
35 import traceback
36
37 def segfault_detector(f):
38     def wrapper(*args, **kwargs):
39         pid = os.fork()
40         if pid == 0:
41             try:
42                 f(*args, **kwargs)
43             except Exception as e:
44                 traceback.print_exc()
45             sys.stderr.flush()
46             sys.stdout.flush()
47             os._exit(0)
48
49         pid2, status = os.waitpid(pid, 0)
50         signal = status & 255
51         if os.WIFSIGNALED(status):
52             signal = os.WTERMSIG(status)
53             raise AssertionError("Failed with signal %d" % signal)
54
55     return wrapper
56
57
58 class SegfaultTests(samba.tests.TestCase):
59     def get_lp_et_al(self):
60         server = os.environ["SERVER"]
61         lp = self.get_loadparm()
62
63         creds = self.insta_creds(template=self.get_credentials(),
64                                  kerberos_state=DONT_USE_KERBEROS)
65         return lp, creds, server
66
67     def get_samdb(self):
68         lp, creds, server = self.get_lp_et_al()
69         url = 'ldap://' + server
70         ldb = SamDB(url, credentials=creds, lp=lp)
71         return ldb
72
73     @segfault_detector
74     def test_net_replicate_init__1(self):
75         lp, creds, server = self.get_lp_et_al()
76         net = Net(creds, lp, server=server)
77         net.replicate_init(42, lp, None, misc.GUID())
78
79     @segfault_detector
80     def test_net_replicate_init__3(self):
81         # third argument is also unchecked
82         samdb = self.get_samdb()
83         lp, creds, server = self.get_lp_et_al()
84         net = Net(creds, lp, server=server)
85         net.replicate_init(samdb, lp, 42, misc.GUID())
86
87     @segfault_detector
88     def test_net_replicate_chunk_1(self):
89         lp, creds, server = self.get_lp_et_al()
90         ctr = drsuapi.DsGetNCChangesCtr6()
91         net = Net(creds, lp, server=server)
92         net.replicate_chunk(42, 1, ctr)
93
94     @segfault_detector
95     def test_auth_context_gensec_start_server(self):
96         a = auth.AuthContext(ldb=42, methods=['sam'])
97         # there is no failure yet because the ldb is not actually
98         # dereferenced.
99         g = gensec.Security.start_server(auth_context=a)
100         # and still the ldb is not dereferenced...
101
102     @segfault_detector
103     def test_auth_user_session(self):
104         s = auth.user_session(ldb=42, principal='foo')
105
106     @segfault_detector
107     def test_gensec_start_server(self):
108         gensec.Security.start_server(auth_context=42)
109
110     @segfault_detector
111     def test_netbios_query_name(self):
112         n = netbios.Node()
113         t = n.query_name((42, 'foo'), 'localhost')
114
115     @segfault_detector
116     def test_encrypt_netr_crypt_password(self):
117         lp, creds, server = self.get_lp_et_al()
118         creds.encrypt_netr_crypt_password(42)
119
120     @segfault_detector
121     def test_hive_open_ldb(self):
122         # we don't need to provide a valid path because we segfault first
123         try:
124             registry.open_ldb('', credentials=42)
125         except ldb.LdbError as e:
126             print("failed with %s" % e)
127
128     @segfault_detector
129     def test_ldb_add_nameless_element(self):
130         m = ldb.Message()
131         e = ldb.MessageElement('q')
132         try:
133             m.add(e)
134         except ldb.LdbError:
135             pass
136         str(m)