tests: Add SMB Py binding .deltree test case
[sfrench/samba-autobuild/.git] / python / samba / tests / smb.py
1 # -*- coding: utf-8 -*-
2 # Unix SMB/CIFS implementation. Tests for smb manipulation
3 # Copyright (C) David Mulder <dmulder@suse.com> 2018
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import samba
19 import os
20 import random
21 import sys
22 from samba import smb
23 from samba import NTSTATUSError
24 from samba.ntstatus import (NT_STATUS_OBJECT_NAME_NOT_FOUND,
25                             NT_STATUS_OBJECT_PATH_NOT_FOUND)
26
27 PY3 = sys.version_info[0] == 3
28 addom = 'addom.samba.example.com/'
29 test_contents = 'abcd' * 256
30 utf_contents = u'Süßigkeiten Äpfel ' * 128
31 test_literal_bytes_embed_nulls = b'\xff\xfe\x14\x61\x00\x00\x62\x63\x64' * 256
32 binary_contents = b'\xff\xfe'
33 binary_contents = binary_contents + "Hello cruel world of python3".encode('utf8') * 128
34 test_dir = os.path.join(addom, 'testing_%d' % random.randint(0, 0xFFFF))
35 test_file = os.path.join(test_dir, 'testing').replace('/', '\\')
36
37
38 class SMBTests(samba.tests.TestCase):
39     def setUp(self):
40         super(SMBTests, self).setUp()
41         self.server = os.environ["SERVER"]
42         creds = self.insta_creds(template=self.get_credentials())
43         self.conn = smb.SMB(self.server,
44                             "sysvol",
45                             lp=self.get_loadparm(),
46                             creds=creds)
47         self.conn.mkdir(test_dir)
48
49     def tearDown(self):
50         super(SMBTests, self).tearDown()
51         try:
52             self.conn.deltree(test_dir)
53         except:
54             pass
55
56     def test_list(self):
57         # check a basic listing returns the items we expect
58         ls = [f['name'] for f in self.conn.list(addom)]
59         self.assertIn('scripts', ls,
60                       msg='"scripts" directory not found in sysvol')
61         self.assertIn('Policies', ls,
62                       msg='"Policies" directory not found in sysvol')
63         self.assertNotIn('..', ls,
64                          msg='Parent (..) found in directory listing')
65         self.assertNotIn('.', ls,
66                          msg='Current dir (.) found in directory listing')
67
68         # using a '*' mask should be the same as using no mask
69         ls_wildcard = [f['name'] for f in self.conn.list(addom, "*")]
70         self.assertEqual(ls, ls_wildcard)
71
72         # applying a mask should only return items that match that mask
73         ls_pol = [f['name'] for f in self.conn.list(addom, "Pol*")]
74         expected = ["Policies"]
75         self.assertEqual(ls_pol, expected)
76
77         # each item in the listing is a has with expected keys
78         expected_keys = ['attrib', 'mtime', 'name', 'short_name', 'size']
79         for item in self.conn.list(addom):
80             for key in expected_keys:
81                 self.assertIn(key, item,
82                               msg="Key '%s' not in listing '%s'" % (key, item))
83
84     def test_deltree(self):
85         """The smb.deltree API should delete files and sub-dirs"""
86         # create some test sub-dirs
87         dirpaths = []
88         empty_dirs = []
89         cur_dir = test_dir
90         for subdir in ["subdir-X", "subdir-Y", "subdir-Z"]:
91             path = self.make_sysvol_path(cur_dir, subdir)
92             self.conn.mkdir(path)
93             dirpaths.append(path)
94             cur_dir = path
95
96             # create another empty dir just for kicks
97             path = self.make_sysvol_path(cur_dir, "another")
98             self.conn.mkdir(path)
99             empty_dirs.append(path)
100
101         # create some files in these directories
102         filepaths = []
103         for subdir in dirpaths:
104             for i in range(1, 4):
105                 contents = "I'm file {0} in dir {1}!".format(i, subdir)
106                 path = self.make_sysvol_path(subdir, "file-{0}.txt".format(i))
107                 self.conn.savefile(path, test_contents.encode('utf8'))
108                 filepaths.append(path)
109
110         # sanity-check these dirs/files exist
111         for subdir in dirpaths + empty_dirs:
112             self.assertTrue(self.conn.chkpath(subdir),
113                             "Failed to create {0}".format(subdir))
114         for path in filepaths:
115             self.assertTrue(self.file_exists(path),
116                             "Failed to create {0}".format(path))
117
118         # try using deltree to remove a single empty directory
119         path = empty_dirs.pop(0)
120         self.conn.deltree(path)
121         self.assertFalse(self.conn.chkpath(path),
122                          "Failed to delete {0}".format(path))
123
124         # try using deltree to remove a single file
125         path = filepaths.pop(0)
126         self.conn.deltree(path)
127         self.assertFalse(self.file_exists(path),
128                          "Failed to delete {0}".format(path))
129
130         # delete the top-level dir
131         self.conn.deltree(test_dir)
132
133         # now check that all the dirs/files are no longer there
134         for subdir in dirpaths + empty_dirs:
135             self.assertFalse(self.conn.chkpath(subdir),
136                              "Failed to delete {0}".format(subdir))
137         for path in filepaths:
138             self.assertFalse(self.file_exists(path),
139                              "Failed to delete {0}".format(path))
140
141     def file_exists(self, filepath):
142         """Returns whether a regular file exists (by trying to open it)"""
143         try:
144             self.conn.loadfile(filepath)
145             exists = True;
146         except NTSTATUSError as err:
147             if (err.args[0] == NT_STATUS_OBJECT_NAME_NOT_FOUND or
148                 err.args[0] == NT_STATUS_OBJECT_PATH_NOT_FOUND):
149                 exists = False
150             else:
151                 raise err
152         return exists
153
154     def test_unlink(self):
155         """
156         The smb.unlink API should delete file
157         """
158         # create the test file
159         self.assertFalse(self.file_exists(test_file))
160         self.conn.savefile(test_file, binary_contents)
161         self.assertTrue(self.file_exists(test_file))
162
163         # delete it and check that it's gone
164         self.conn.unlink(test_file)
165         self.assertFalse(self.file_exists(test_file))
166
167     def test_chkpath(self):
168         """Tests .chkpath determines whether or not a directory exists"""
169
170         self.assertTrue(self.conn.chkpath(test_dir))
171
172         # should return False for a non-existent directory
173         bad_dir = self.make_sysvol_path(test_dir, 'dont_exist')
174         self.assertFalse(self.conn.chkpath(bad_dir))
175
176         # should return False for files (because they're not directories)
177         self.conn.savefile(test_file, binary_contents)
178         self.assertFalse(self.conn.chkpath(test_file))
179
180         # check correct result after creating and then deleting a new dir
181         new_dir = self.make_sysvol_path(test_dir, 'test-new')
182         self.conn.mkdir(new_dir)
183         self.assertTrue(self.conn.chkpath(new_dir))
184         self.conn.rmdir(new_dir)
185         self.assertFalse(self.conn.chkpath(new_dir))
186
187     def test_save_load_text(self):
188
189         self.conn.savefile(test_file, test_contents.encode('utf8'))
190
191         contents = self.conn.loadfile(test_file)
192         self.assertEquals(contents.decode('utf8'), test_contents,
193                           msg='contents of test file did not match what was written')
194
195         # check we can overwrite the file with new contents
196         new_contents = 'wxyz' * 128
197         self.conn.savefile(test_file, new_contents.encode('utf8'))
198         contents = self.conn.loadfile(test_file)
199         self.assertEquals(contents.decode('utf8'), new_contents,
200                           msg='contents of test file did not match what was written')
201
202     # with python2 this will save/load str type (with embedded nulls)
203     # with python3 this will save/load bytes type
204     def test_save_load_string_bytes(self):
205         self.conn.savefile(test_file, test_literal_bytes_embed_nulls)
206
207         contents = self.conn.loadfile(test_file)
208         self.assertEquals(contents, test_literal_bytes_embed_nulls,
209                           msg='contents of test file did not match what was written')
210
211     # python3 only this will save/load unicode
212     def test_save_load_utfcontents(self):
213         if PY3:
214             self.conn.savefile(test_file, utf_contents.encode('utf8'))
215
216             contents = self.conn.loadfile(test_file)
217             self.assertEquals(contents.decode('utf8'), utf_contents,
218                               msg='contents of test file did not match what was written')
219
220     # with python2 this will save/load str type
221     # with python3 this will save/load bytes type
222     def test_save_binary_contents(self):
223         self.conn.savefile(test_file, binary_contents)
224
225         contents = self.conn.loadfile(test_file)
226         self.assertEquals(contents, binary_contents,
227                           msg='contents of test file did not match what was written')
228
229     def make_sysvol_path(self, dirpath, filename):
230         # return the dir + filename as a sysvol path
231         return os.path.join(dirpath, filename).replace('/', '\\')