# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests for the Auth and AuthZ logging.
"""
-
from samba import auth
import samba.tests
from samba.messaging import Messaging
msg = messages[2]
self.assertEquals("Authorization", msg["type"])
serviceDescription = "SMB"
- print "binding %s" % binding
+ print("binding %s" % binding)
if binding == "[smb2]":
serviceDescription = "SMB2"
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests for the Auth and AuthZ logging.
"""
def messageHandler( context, msgType, src, message):
# This does not look like sub unit output and it
# makes these tests much easier to debug.
- print message
+ print(message)
jsonMsg = json.loads(message)
context["messages"].append( jsonMsg)
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests for the Auth and AuthZ logging of password changes.
"""
credentials=self.get_credentials(),
lp=self.get_loadparm())
- print "ldb %s" % type(self.ldb)
+ print("ldb %s" % type(self.ldb))
# Gets back the basedn
base_dn = self.ldb.domain_dn()
- print "base_dn %s" % base_dn
+ print("base_dn %s" % base_dn)
# Gets back the configuration basedn
configuration_dn = self.ldb.get_config_basedn().get_linearized()
messages = self.waitForMessages(isLastExpectedMessage)
- print "Received %d messages" % len(messages)
+ print("Received %d messages" % len(messages))
self.assertEquals(8,
len(messages),
"Did not receive the expected number of messages")
)
messages = self.waitForMessages(isLastExpectedMessage)
- print "Received %d messages" % len(messages)
+ print("Received %d messages" % len(messages))
self.assertEquals(4,
len(messages),
"Did not receive the expected number of messages")
pass
messages = self.waitForMessages(isLastExpectedMessage)
- print "Received %d messages" % len(messages)
+ print("Received %d messages" % len(messages))
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
pass
messages = self.waitForMessages(isLastExpectedMessage)
- print "Received %d messages" % len(messages)
+ print("Received %d messages" % len(messages))
self.assertEquals(4,
len(messages),
"Did not receive the expected number of messages")
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Blackbox tests for ndrdump."""
import os
for p in [ "../../../../../source4/librpc/tests", "../../../../../librpc/tests"]:
data_path_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), p))
- print data_path_dir
+ print(data_path_dir)
if os.path.exists(data_path_dir):
break
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests for samba.dcerpc.dnsserver"""
import os
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
except AssertionError as e:
- print e
+ print(e)
num_failures = num_failures + 1
# Also try to update valid records to invalid ones, making sure this fails
try:
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
except AssertionError as e:
- print e
+ print(e)
num_failures = num_failures + 1
self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
import sys
sys.path.insert(0, "bin/python")
nblocks -= self.initial_blocks
leaked_blocks = (nblocks - num_expected)
if leaked_blocks != 0:
- print "Leaked %d blocks" % leaked_blocks
+ print("Leaked %d blocks" % leaked_blocks)
def check_type(self, interface, typename, type):
- print "Checking type %s" % typename
+ print("Checking type %s" % typename)
v = type()
for n in dir(v):
if n[0] == '_':
value = getattr(v, n)
except TypeError as errstr:
if str(errstr) == "unknown union level":
- print "ERROR: Unknown union level in %s.%s" % (typename, n)
+ print("ERROR: Unknown union level in %s.%s" % (typename, n))
self.errcount += 1
continue
- print str(errstr)[1:21]
+ print(str(errstr)[1:21])
if str(errstr)[0:21] == "Can not convert C Type":
- print "ERROR: Unknown C type for %s.%s" % (typename, n)
+ print("ERROR: Unknown C type for %s.%s" % (typename, n))
self.errcount += 1
continue
else:
- print "ERROR: Failed to instantiate %s.%s" % (typename, n)
+ print("ERROR: Failed to instantiate %s.%s" % (typename, n))
self.errcount += 1
continue
except Exception:
- print "ERROR: Failed to instantiate %s.%s" % (typename, n)
+ print("ERROR: Failed to instantiate %s.%s" % (typename, n))
self.errcount += 1
continue
# now try setting the value back
try:
- print "Setting %s.%s" % (typename, n)
+ print("Setting %s.%s" % (typename, n))
setattr(v, n, value)
except Exception as e:
if isinstance(e, AttributeError) and str(e).endswith("is read-only"):
# readonly, ignore
continue
else:
- print "ERROR: Failed to set %s.%s: %r: %s" % (typename, n, e.__class__, e)
+ print("ERROR: Failed to set %s.%s: %r: %s" % (typename, n, e.__class__, e))
self.errcount += 1
continue
# and try a comparison
try:
if value != getattr(v, n):
- print "ERROR: Comparison failed for %s.%s: %r != %r" % (typename, n, value, getattr(v, n))
+ print("ERROR: Comparison failed for %s.%s: %r != %r" % (typename, n, value, getattr(v, n)))
continue
except Exception as e:
- print "ERROR: compare exception for %s.%s: %r: %s" % (typename, n, e.__class__, e)
+ print("ERROR: compare exception for %s.%s: %r: %s" % (typename, n, e.__class__, e))
continue
def check_interface(self, interface, iname):
self.check_type(interface, n, value)
self.check_blocks(None, initial_blocks)
except Exception as e:
- print "ERROR: Failed to check_type %s.%s: %r: %s" % (iname, n, e.__class__, e)
+ print("ERROR: Failed to check_type %s.%s: %r: %s" % (iname, n, e.__class__, e))
self.errcount += 1
elif callable(value):
pass # Method
else:
- print "UNKNOWN: %s=%s" % (n, value)
+ print("UNKNOWN: %s=%s" % (n, value))
if self.errcount - errcount != 0:
- print "Found %d errors in %s" % (self.errcount - errcount, iname)
+ print("Found %d errors in %s" % (self.errcount - errcount, iname))
def check_all_interfaces(self):
for iname in dir(samba.dcerpc):
continue
if iname == 'ClientConnection' or iname == 'base':
continue
- print "Checking interface %s" % iname
+ print("Checking interface %s" % iname)
iface = getattr(samba.dcerpc, iname)
initial_blocks = talloc.total_blocks(None)
self.check_interface(iface, iname)
if errcount == 0:
sys.exit(0)
else:
- print "%d failures" % errcount
+ print("%d failures" % errcount)
sys.exit(1)
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
import os
import sys
import struct
name = "%s.%s" % (self.server, self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "%s" % (self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "%s.%s" % (self.server, self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "%s.%s" % (self.server, self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "%s.%s" % (self.server, self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
# Check the record
name = "cname_test.%s" % self.get_dns_domain()
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "%s.%s" % (self.server, self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
name = "%s.%s" % ('fakefakefake', self.get_dns_domain())
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
- print "asking for ", q.name
+ print("asking for ", q.name)
questions.append(q)
self.finish_name_packet(p, questions)
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
from samba.tests import TestCaseInTempDir
from samba.dcerpc import dns, dnsp
from samba import gensec, tests
try:
send_packet = ndr.ndr_pack(packet)
if dump:
- print self.hexdump(send_packet)
+ print(self.hexdump(send_packet))
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
s.settimeout(timeout)
s.connect((host, 53))
s.sendall(send_packet, 0)
recv_packet = s.recv(2048, 0)
if dump:
- print self.hexdump(recv_packet)
+ print(self.hexdump(recv_packet))
response = ndr.ndr_unpack(dns.name_packet, recv_packet)
return (response, recv_packet)
finally:
try:
send_packet = ndr.ndr_pack(packet)
if dump:
- print self.hexdump(send_packet)
+ print(self.hexdump(send_packet))
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
s.settimeout(timeout)
s.connect((host, 53))
recv_packet = s.recv(0xffff + 2, 0)
if dump:
- print self.hexdump(recv_packet)
+ print(self.hexdump(recv_packet))
response = ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
finally:
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
import os
import sys
import struct
try:
send_packet = ndr.ndr_pack(packet)
if dump:
- print self.hexdump(send_packet)
+ print(self.hexdump(send_packet))
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
s.settimeout(timeout)
s.connect((host, 53))
s.send(send_packet, 0)
recv_packet = s.recv(2048, 0)
if dump:
- print self.hexdump(recv_packet)
+ print(self.hexdump(recv_packet))
return ndr.ndr_unpack(dns.name_packet, recv_packet)
finally:
if s is not None:
def test_double_forwarder_first_frozen(self):
if len(dns_servers) < 2:
- print "Ignoring test_double_forwarder_first_frozen"
+ print("Ignoring test_double_forwarder_first_frozen")
return
s1 = self.start_toy_server(dns_servers[0], 53, 'forwarder1')
s2 = self.start_toy_server(dns_servers[1], 53, 'forwarder2')
def test_double_forwarder_first_down(self):
if len(dns_servers) < 2:
- print "Ignoring test_double_forwarder_first_down"
+ print("Ignoring test_double_forwarder_first_down")
return
s2 = self.start_toy_server(dns_servers[1], 53, 'forwarder2')
ad = contact_real_server(server_ip, 53)
def test_double_forwarder_both_slow(self):
if len(dns_servers) < 2:
- print "Ignoring test_double_forwarder_both_slow"
+ print("Ignoring test_double_forwarder_both_slow")
return
s1 = self.start_toy_server(dns_servers[0], 53, 'forwarder1')
s2 = self.start_toy_server(dns_servers[1], 53, 'forwarder2')
def test_cname_forwarding_with_slow_server(self):
if len(dns_servers) < 2:
- print "Ignoring test_cname_forwarding_with_slow_server"
+ print("Ignoring test_cname_forwarding_with_slow_server")
return
s1 = self.start_toy_server(dns_servers[0], 53, 'forwarder1')
s2 = self.start_toy_server(dns_servers[1], 53, 'forwarder2')
def test_cname_forwarding_with_server_down(self):
if len(dns_servers) < 2:
- print "Ignoring test_cname_forwarding_with_server_down"
+ print("Ignoring test_cname_forwarding_with_server_down")
return
s2 = self.start_toy_server(dns_servers[1], 53, 'forwarder2')
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Based on the EchoServer example from python docs
+from __future__ import print_function
import SocketServer
import time
import sys
def debug(msg):
if VERBOSE:
sys.stdout.flush()
- print "\033[00;36m%s\033[00m" % msg
+ print("\033[00;36m%s\033[00m" % msg)
sys.stdout.flush()
timeout = 0
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
+from __future__ import print_function
"""Tests for the samba3sam LDB module, which maps Samba3 LDAP to AD LDAP."""
import os
return open(os.path.join(datadir, filename), 'r').read()
def ldb_debug(l, text):
- print text
+ print(text)
class MapBaseTestCase(TestCaseInTempDir):