From 801c1856a39cbc2fd65183a7d6d8b7526dbc251e Mon Sep 17 00:00:00 2001 From: Noel Power Date: Tue, 21 Mar 2017 08:29:59 +0000 Subject: [PATCH] s3/script/tests: Add simple (smb1 & smb2) get/set/list tests for smbcquotas BUG: https://bugzilla.samba.org/show_bug.cgi?id=13553 Signed-off-by: Noel Power Reviewed-by: Jeremy Allison --- selftest/target/Samba3.pm | 9 + source3/script/tests/getset_quota.py | 154 ++++++++++++++++ source3/script/tests/test_smbcquota.py | 244 +++++++++++++++++++++++++ source3/script/tests/test_smbcquota.sh | 46 +++++ source3/selftest/tests.py | 1 + 5 files changed, 454 insertions(+) create mode 100755 source3/script/tests/getset_quota.py create mode 100755 source3/script/tests/test_smbcquota.py create mode 100755 source3/script/tests/test_smbcquota.sh diff --git a/selftest/target/Samba3.pm b/selftest/target/Samba3.pm index 5b8cf9ea6d8..447c1e8e3a7 100755 --- a/selftest/target/Samba3.pm +++ b/selftest/target/Samba3.pm @@ -885,6 +885,9 @@ sub setup_fileserver push(@dirs, "$dfree_share_dir/subdir2"); push(@dirs, "$dfree_share_dir/subdir3"); + my $quotadir_dir="$share_dir/quota"; + push(@dirs, $quotadir_dir); + my $valid_users_sharedir="$share_dir/valid_users"; push(@dirs,$valid_users_sharedir); @@ -911,6 +914,8 @@ sub setup_fileserver usershare allow guests = yes usershare prefix allow list = $usershare_sharedir + get quota command = $prefix_abs/getset_quota.py + set quota command = $prefix_abs/getset_quota.py [lowercase] path = $lower_case_share_dir comment = smb username is [%U] @@ -2170,6 +2175,10 @@ sub provision($$$$$$$$$) vfs objects = acl_xattr fake_acls xattr_tdb fake_dfq inherit owner = yes include = $dfqconffile +[quotadir] + path = $shrdir/quota + admin users = $unix_name + [acl_xattr_ign_sysacl_posix] copy = tmp acl_xattr:ignore system acls = yes diff --git a/source3/script/tests/getset_quota.py b/source3/script/tests/getset_quota.py new file mode 100755 index 00000000000..0254aa5a3b3 --- /dev/null +++ b/source3/script/tests/getset_quota.py @@ -0,0 +1,154 @@ +#!/usr/bin/env python3 +# Unix SMB/CIFS implementation. +# Tests for smbcquotas +# Copyright (C) Noel Power 2017 + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import sys +import traceback +import logging +import os + +USER_QUOTAS = 1 +USER_DEFAULT_QUOTAS = 2 +GROUP_QUOTAS = 3 +GROUP_DEFAULT_QUOTAS = 4 + +#Quota model + +class Quota: + def __init__(self): + self.flags = 0 + self.quotatype = USER_DEFAULT_QUOTAS + self.uid = 0 + self.usedblocks = 0 + self.softlimit = 0 + self.hardlimit = 0 + self.hardlimit = 0 + self.usedinodes = 0 + self.slimitinodes = 0 + self.hlimitinodes = 0 + +def quota_to_str(item): + result = str(item.flags) + " " + str(item.usedblocks) + " " + str(item.softlimit) + " " + str(item.hardlimit) + " " + str(item.usedinodes) + " " + str(item.slimitinodes) + " " + str(item.hlimitinodes) + return result + +def quota_to_db_str(item): + result = item.uid + " " + str(item.usedblocks) + " " + str(item.softlimit) + " " + str(item.hardlimit) + " " + str(item.usedinodes) + " " + str(item.slimitinodes) + " " + str(item.hlimitinodes) + return result + +def load_quotas(input_file): + fileContents = open(input_file,"r") + lineno = 0 + quotas = [] + for line in fileContents: + if line.strip().startswith("#"): + continue + content = line.strip().split() + quota = Quota() + if len(content) < 7: + logging.debug("ignoring line %d, doesn't have enough fields\n"%lineno) + else: + quota.flags = 2 + quota.uid = content[0] + quota.usedblocks = content[1] + quota.softlimit = content[2] + quota.hardlimit = content[3] + quota.usedinodes = content[4] + quota.slimitinodes = content[5] + quota.hlimitinodes = content[6] + quotas.append(quota) + + fileContents.close() + return quotas + +def set_quotas(quota_list, output_file): + filecontents = open(output_file,"w+") + if filecontents == None: + return False; + lines = "" + for quota in quota_list: + next_line = quota_to_db_str(quota) + if next_line: + lines = lines + next_line + "\n" + filecontents.write(lines) + filecontents.close() + return True + +def get_quotas(uid, quota_list): + logging.debug("in get_quotas\n") + for quota in quota_list: + if quota.uid == uid: + return quota + return None + +def main(): + logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) + logging.debug("system args passed are %s\n"% str(sys.argv)) + quota_file_dir = os.path.dirname(sys.argv[0]); + quota_file_db = os.path.join(quota_file_dir,"quotas.db") + logging.debug("quota db is located %s\n", quota_file_db) + quota_list = load_quotas(quota_file_db) + logging.debug("quotas loaded have %s entries\n", len(quota_list)) + result = None + if len(sys.argv) == 4: + # Get Quota + directory = sys.argv[1] + if sys.argv[2] == "1": + query_type = USER_QUOTAS + elif sys.argv[2] == "2": + query_type = USER_DEFAULT_QUOTAS + elif sys.argv[2] == "3": + query_type = GROUP_QUOTAS + elif sys.argv[2] == "4": + query_type = GROUP_DEFAULT_QUOTAS + uid = sys.argv[3] + quota = get_quotas(uid, quota_list) + if quota is None: + logging.debug("no result for uid %s"%uid) + else: + result = quota_to_str(quota) + logging.debug("got result for uid %s\n"%uid); + if result is None: + result = "0 0 0 0 0 0 0" + logging.debug("for uid %s returning quotas %s\n"%(uid,result)) + print("%s"%result) + elif len(sys.argv) > 8: + # Set Quota + quota = Quota() + directory = sys.argv[1] + quota.query_type = sys.argv[2] + quota.uid = sys.argv[3] + quota.flags = sys.argv[4] + quota.softlimit = sys.argv[5] + quota.hardlimit = sys.argv[6] + quota.slimitinodes = sys.argv[7] + quota.hlimitinodes = sys.argv[8] + found = get_quotas(quota.uid, quota_list) + if found: + found.query_type = quota.query_type + found.uid = quota.uid + found.flags = quota.flags + found.softlimit = quota.softlimit + found.hardlimit = quota.hardlimit + found.slimitinodes = quota.slimitinodes + found.hlimitinodes = quota.hlimitinodes + else: + quota_list.append(quota) + if set_quotas(quota_list,quota_file_db): + print ("%s\n"%quota_to_str(quota_list[-1])) + return +if __name__ == '__main__': + main() diff --git a/source3/script/tests/test_smbcquota.py b/source3/script/tests/test_smbcquota.py new file mode 100755 index 00000000000..52061f2989f --- /dev/null +++ b/source3/script/tests/test_smbcquota.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +# Unix SMB/CIFS implementation. +# Tests for smbcquotas +# Copyright (C) Noel Power 2017 + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import os, subprocess, sys +import traceback +import logging +import shutil + +USER_QUOTAS = 1 +USER_DEFAULT_QUOTAS = 2 +GROUP_QUOTAS = 3 +GROUP_DEFAULT_QUOTAS = 4 +BLOCK_SIZE = 1024 +DEFAULT_SOFTLIM = 2 +DEFAULT_HARDLIM = 4 + +class test_env: + def __init__(self): + self.server = None + self.domain = None + self.username = None + self.password = None + self.envdir = None + self.quota_db = None + self.smbcquotas = None + self.users = [] + +class user_info: + def __init__(self): + self.uid = 0 + self.username = "" + self.softlim = 0 + self.hardlim = 0 + +class Quota: + def __init__(self): + self.flags = 0 + self.quotatype = USER_DEFAULT_QUOTAS + self.uid = 0 + self.usedblocks = 0 + self.softlimit = 0 + self.hardlimit = 0 + self.hardlimit = 0 + self.usedinodes = 0 + self.slimitinodes = 0 + self.hlimitinodes = 0 + +def init_quota_db(users, output_file): + filecontents = open(output_file,"w+") + lines = "" + default_values = "0 " + str(DEFAULT_SOFTLIM) + " " + str(DEFAULT_HARDLIM) + " 0 0 0" + for user in users: + lines = lines + user.uid + " " + default_values + "\n" + filecontents.write(lines) + filecontents.close() + +def load_quotas(input_file): + fileContents = open(input_file,"r") + lineno = 0 + quotas = [] + for line in fileContents: + if line.strip().startswith("#"): + continue + content = line.strip().split() + quota = Quota() + if len(content) < 7: + logging.debug("ignoring line %d, doesn't have enough fields\n"%lineno) + else: + quota.flags = 2 + quota.uid = content[0] + quota.usedblocks = content[1] + quota.softlimit = content[2] + quota.hardlimit = content[3] + quota.usedinodes = content[4] + quota.slimitinodes = content[5] + quota.hlimitinodes = content[6] + quotas.append(quota) + + fileContents.close() + return quotas + +def get_quotas(uid, quota_list): + for quota in quota_list: + if quota.uid == uid: + return quota + return None + +def get_users(): + output = subprocess.Popen(['getent', 'passwd'], + stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') + users = [] + for line in output: + info = line.split(':') + if len(info) > 3 and info[0]: + user = user_info() + user.username = info[0] + user.uid = info[2] + logging.debug("Adding user ->%s<-\n"%user.username) + users.append(user) + return users + + + +def smbcquota_output_to_userinfo(output): + infos = [] + for line in output: + if len(line) > 1: + username = line.strip(':').split()[0] + quota_info = line.split(':')[1].split('/') + if len(quota_info) > 2: + info = user_info() + info.username = username.strip() + info.softlim = int(quota_info[1].strip()) / BLOCK_SIZE + info.hardlim = int(quota_info[2].strip()) / BLOCK_SIZE + infos.append(info) + return infos + +def check_quota_limits(infos, softlim, hardlim): + if len(infos) < 1: + logging.debug("no users info to check :-(\n") + return False + for info in infos: + if int(info.softlim) != softlim: + logging.debug("expected softlimit %s got ->%s<-\n"%(softlim, info.softlim)) + return False + if int(info.hardlim) != hardlim: + logging.debug("expected hardlimit limit %s got %s\n"%(hardlim,info.hardlim)) + return False + return True + +class test_base: + def __init__(self, env): + self.env = env + def run(self, protocol): + pass + +class listtest(test_base): + def run(self, protocol): + init_quota_db(self.env.users, self.env.quota_db) + quotas = load_quotas(self.env.quota_db) + args = [self.env.smbcquotas]; + remaining_args = ['-U' + self.env.username + "%" + self.env.password, '-L', '//' + self.env.server + '/quotadir'] + if protocol == 'smb2': + args.append('-m smb2') + args.extend(remaining_args) + output = subprocess.Popen([self.env.smbcquotas, '-U' + self.env.username + "%" + self.env.password, '-L', '//' + self.env.server + '/quotadir'], stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') + infos = smbcquota_output_to_userinfo(output) + return check_quota_limits(infos, DEFAULT_SOFTLIM, DEFAULT_HARDLIM) +def get_uid(name, users): + for user in users: + if user.username == name: + return user.uid + return None + +class gettest(test_base): + def run(self, protocol): + init_quota_db(self.env.users, self.env.quota_db) + quotas = load_quotas(self.env.quota_db) + uid = get_uid(self.env.username, self.env.users) + output = subprocess.Popen([self.env.smbcquotas, '-U' + self.env.username + "%" + self.env.password, '-u' + self.env.username, '//' + self.env.server + '/quotadir'], stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') + user_infos = smbcquota_output_to_userinfo(output) + db_user_info = get_quotas(uid, quotas) + # double check, we compare the results from the db file + # the quota script the server uses compared to what + # smbcquota is telling us + return check_quota_limits(user_infos, int(db_user_info.softlimit), int(db_user_info.hardlimit)) + +class settest(test_base): + def run(self, protocol): + init_quota_db(self.env.users, self.env.quota_db) + quotas = load_quotas(self.env.quota_db) + uid = get_uid(self.env.username, self.env.users) + old_db_user_info = get_quotas(uid, quotas) + + #increase limits by 2 blocks + new_soft_limit = (int(old_db_user_info.softlimit) + 2) * BLOCK_SIZE + new_hard_limit = (int(old_db_user_info.hardlimit) + 2) * BLOCK_SIZE + + new_limits = "UQLIM:%s:%d/%d"%(self.env.username, new_soft_limit, new_hard_limit) + logging.debug("setting new limits %s"%new_limits) + + output = subprocess.Popen([self.env.smbcquotas, '-U' + self.env.username + "%" + self.env.password, '//' + self.env.server + '/quotadir', '-S', new_limits], stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') + logging.debug("output from smbcquota is %s"%output) + user_infos = smbcquota_output_to_userinfo(output) + return check_quota_limits(user_infos, new_soft_limit / BLOCK_SIZE, new_hard_limit / BLOCK_SIZE) + +# map of tests +subtest_descriptions = { + "list test" : listtest, + "get test" : gettest, + "set test" : settest +} + +def main(): + logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) + + logging.debug("got args %s\n"%str(sys.argv)) + + if len(sys.argv) < 7: + logging.debug ("Usage: test_smbcquota.py server domain username password envdir smbcquotas\n") + sys.exit(1) + env = test_env() + env.server = sys.argv[1] + env.domain = sys.argv[2] + env.username = sys.argv[3] + env.password = sys.argv[4] + env.envdir = sys.argv[5] + env.smbcquotas = sys.argv[6] + quota_script = os.path.join(os.path.dirname(sys.argv[0]), + "getset_quota.py") + #copy the quota script to the evironment + shutil.copy2(quota_script, env.envdir) + + env.quota_db = os.path.join(env.envdir, "quotas.db") + env.users = get_users() + for protocol in ['smb1', 'smb2']: + for key in subtest_descriptions.keys(): + test = subtest_descriptions[key](env) + logging.debug("running subtest '%s' using protocol '%s'\n"%(key,protocol)) + result = test.run(protocol) + if result == False: + logging.debug("subtest '%s' for '%s' failed\n"%(key,protocol)) + sys.exit(1) + else: + logging.debug("subtest '%s' for '%s' passed\n"%(key,protocol)) + sys.exit(0) + +if __name__ == '__main__': + main() diff --git a/source3/script/tests/test_smbcquota.sh b/source3/script/tests/test_smbcquota.sh new file mode 100755 index 00000000000..a61c2fe6d0b --- /dev/null +++ b/source3/script/tests/test_smbcquota.sh @@ -0,0 +1,46 @@ +#!/bin/sh +# Unix SMB/CIFS implementation. +# Tests for smbcquotas +# Copyright (C) Noel Power 2017 + +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +# +# Blackbox test wrapper for smbcquota +# +if [ $# -lt 6 ]; then +cat <