# compat functions
from functools import cmp_to_key as cmp_to_key_fn
- # compat types
- text_type = str
-
# alias
def ConfigParser(defaults=None, dict_type=dict, allow_no_value=False):
from configparser import ConfigParser
from io import BytesIO
from xml.etree.ElementTree import Element, SubElement
from samba.gp_parse import GPParser
-from samba.compat import text_type
# [MS-GPAC] Group Policy Audit Configuration
class GPAuditCsvParser(GPParser):
encoding = 'utf-8'
header = False
self.header = []
for v in r.findall('Value'):
- if not isinstance(v.text, text_type):
+ if not isinstance(v.text, str):
v.text = v.text.decode(self.output_encoding)
self.header.append(v.text)
else:
line = {}
for i, v in enumerate(r.findall('Value')):
line[self.header[i]] = v.text if v.text is not None else ''
- if not isinstance(self.header[i], text_type):
+ if not isinstance(self.header[i], str):
line[self.header[i]] = line[self.header[i]].decode(self.output_encoding)
self.lines.append(line)
import os
import tempfile
from collections import OrderedDict
-from samba.compat import text_type
from samba.compat import get_string
from samba.netcmd import CommandError
v = [rec[a]]
else:
v = rec[a]
- v = [x.encode('utf8') if isinstance(x, text_type) else x for x in v]
+ v = [x.encode('utf8') if isinstance(x, str) else x for x in v]
rattr = ctx.tmp_samdb.dsdb_DsReplicaAttribute(ctx.tmp_samdb, a, v)
attrs.append(rattr)
SuperCommand,
Option,
)
-from samba.compat import text_type
from samba.compat import get_bytes
from samba.compat import get_string
from . import common
self.outf.write("Sorry, passwords do not match.\n")
try:
- if not isinstance(password, text_type):
+ if not isinstance(password, str):
password = password.decode('utf8')
net.change_password(password)
except Exception as msg:
def get_utf8(a, b, username):
try:
- u = text_type(get_bytes(b), 'utf-16-le')
+ u = str(get_bytes(b), 'utf-16-le')
except UnicodeDecodeError as e:
self.outf.write("WARNING: '%s': CLEARTEXT is invalid UTF-16-LE unable to generate %s\n" % (
username, a))
import re
from samba.kcc import KCC, ldif_import_export
from samba.kcc.kcc_utils import KCCError
-from samba.compat import text_type
from samba.uptodateness import (
get_partition_maps,
get_partition,
given object."""
from hashlib import md5
tmp_str = str(x)
- if isinstance(tmp_str, text_type):
+ if isinstance(tmp_str, str):
tmp_str = tmp_str.encode('utf8')
c = int(md5(tmp_str).hexdigest()[:6], base=16) & 0x7f7f7f
return '#%06x' % c
from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import drsblobs, misc
from samba.common import normalise_int32
-from samba.compat import text_type
from samba.compat import get_bytes
from samba.dcerpc import security
if len(res) > 1:
raise Exception('Matched %u multiple users with filter "%s"' % (len(res), search_filter))
user_dn = res[0].dn
- if not isinstance(password, text_type):
+ if not isinstance(password, str):
pw = password.decode('utf-8')
else:
pw = password
from enum import IntEnum, unique
import samba.auth
import samba.dcerpc.base
-from samba.compat import text_type
from random import randint
from random import SystemRandom
from contextlib import contextmanager
from samba.tests import delete_force
from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
from samba.dcerpc.misc import SEC_CHAN_WKSTA
-from samba.compat import text_type
from samba.dcerpc.windows_event_ids import (
EVT_ID_SUCCESSFUL_LOGON,
EVT_LOGON_NETWORK
self.base_dn = self.ldb.domain_dn()
self.dn = ("cn=%s,cn=users,%s" % (self.netbios_name, self.base_dn))
- utf16pw = text_type('"' + self.machinepass + '"').encode('utf-16-le')
+ utf16pw = ('"' + self.machinepass + '"').encode('utf-16-le')
self.ldb.add({
"dn": self.dn,
"objectclass": "computer",
from samba.tests import delete_force
from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
from samba.dcerpc.misc import SEC_CHAN_WKSTA
-from samba.compat import text_type
from samba.dcerpc.windows_event_ids import (
EVT_ID_SUCCESSFUL_LOGON,
EVT_LOGON_NETWORK
else:
binding = "[schannel]"
- utf16pw = text_type('"' + self.machinepass + '"').encode('utf-16-le')
+ utf16pw = ('"' + self.machinepass + '"').encode('utf-16-le')
self.ldb.add({
"dn": self.samlogon_dn,
"objectclass": "computer",
from samba.credentials import Credentials
from samba.tests import TestCase
from samba.ndr import ndr_pack, ndr_unpack, ndr_unpack_out
-from samba.compat import text_type
from samba.ntstatus import (
NT_STATUS_CONNECTION_DISCONNECTED,
NT_STATUS_PIPE_DISCONNECTED,
from samba.dcerpc import unixinfo
from samba.tests import RpcInterfaceTestCase
-from samba.compat import text_type
class UnixinfoTests(RpcInterfaceTestCase):
infos = self.conn.GetPWUid(range(512))
self.assertEqual(512, len(infos))
self.assertEqual("/bin/false", infos[0].shell)
- self.assertTrue(isinstance(infos[0].homedir, text_type))
+ self.assertTrue(isinstance(infos[0].homedir, str))
def test_gidtosid(self):
self.conn.GidToSid(1000)
import binascii
from hashlib import md5
import crypt
-from samba.compat import text_type
USER_NAME = "PasswordHashTestUser"
def calc_digest(user, realm, password):
data = "%s:%s:%s" % (user, realm, password)
- if isinstance(data, text_type):
+ if isinstance(data, str):
data = data.encode('utf8')
return md5(data).hexdigest()
from samba.dcerpc import drsblobs, drsuapi, misc
from samba import drs_utils, net
from samba.credentials import Credentials
-from samba.compat import text_type
import binascii
import os
req8.destination_dsa_guid = null_guid
req8.source_dsa_invocation_id = null_guid
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
- req8.naming_context.dn = text_type(dn)
+ req8.naming_context.dn = dn
req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
req8.highwatermark.tmp_highest_usn = 0
from hashlib import md5
import random
import string
-from samba.compat import text_type
+
USER_NAME = "WdigestTestUser"
# Create a random 32 character password, containing only letters and
def calc_digest(user, realm, password):
data = "%s:%s:%s" % (user, realm, password)
- if isinstance(data, text_type):
+ if isinstance(data, str):
data = data.encode('utf8')
return "%s:%s:%s" % (user, realm, md5(data).hexdigest())
from samba.credentials import Credentials, DONT_USE_KERBEROS
from samba.auth import system_session
from samba.compat import get_string
-from samba.compat import text_type
+
from ldb import SCOPE_BASE, LdbError
from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
from ldb import ERR_ENTRY_ALREADY_EXISTS, ERR_UNWILLING_TO_PERFORM
username = "ldaptestuser"
password = "thatsAcomplPASS2"
- utf16pw = text_type('"' + password + '"').encode('utf-16-le')
+ utf16pw = ('"' + password + '"').encode('utf-16-le')
ldb.add({
"dn": "cn=ldaptestuser,cn=users," + self.base_dn,
from samba.tests.subunitrun import SubunitOptions, TestProgram
from samba.compat import cmp_fn
from samba.compat import cmp_to_key_fn
-from samba.compat import text_type
import samba.getopt as options
from samba.auth import system_session
def norm(x):
- if not isinstance(x, text_type):
+ if not isinstance(x, str):
x = x.decode('utf8')
return normalize('NFKC', x).upper()
from samba import gensec
from samba.kcc import kcc_utils
from samba.compat import get_string
-from samba.compat import text_type
import ldb
import dns.resolver
else:
name.dns_register = False
dns_names.names = [ name ]
- site_name = text_type(sub_vars['SITE'])
+ site_name = sub_vars['SITE']
global error_count
from samba.samdb import SamDB
from samba.auth import system_session
from samba.ndr import ndr_pack, ndr_unpack
-from samba.compat import text_type
+
def _samdb_fetch_pfm(samdb):
"""Fetch prefixMap stored in SamDB using LDB connection"""
req8.destination_dsa_guid = dest_dsa
req8.source_dsa_invocation_id = misc.GUID(samdb.get_invocation_id())
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
- req8.naming_context.dn = text_type(samdb.get_schema_basedn())
+ req8.naming_context.dn = samdb.get_schema_basedn()
req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
req8.highwatermark.tmp_highest_usn = 0
req8.highwatermark.reserved_usn = 0
import json
import ldb
import random
-from samba.compat import text_type
from samba.compat import get_string
GUID_RE = r'[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}'
for k in ("last attempt time",
"last attempt message",
"last success"):
- self.assertTrue(isinstance(r[k], text_type))
+ self.assertTrue(isinstance(r[k], str))
self.assertRegexpMatches(r["DSA objectGUID"], '^%s$' % GUID_RE)
self.assertTrue(isinstance(r["consecutive failures"], int))