PY3: change shebang to python3 in source4/torture dir
[vlendec/samba-autobuild/.git] / source4 / torture / drs / python / drs_base.py
1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*-
3 #
4 # Unix SMB/CIFS implementation.
5 # Copyright (C) Kamen Mazdrashki <kamenim@samba.org> 2011
6 # Copyright (C) Andrew Bartlett <abartlet@samba.org> 2016
7 # Copyright (C) Catalyst IT Ltd. 2016
8 #
9 # This program is free software; you can redistribute it and/or modify
10 # it under the terms of the GNU General Public License as published by
11 # the Free Software Foundation; either version 3 of the License, or
12 # (at your option) any later version.
13 #
14 # This program is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 # GNU General Public License for more details.
18 #
19 # You should have received a copy of the GNU General Public License
20 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 #
22
23 from __future__ import print_function
24 import sys
25 import time
26 import os
27 import ldb
28
29 sys.path.insert(0, "bin/python")
30 import samba.tests
31 from samba.tests.samba_tool.base import SambaToolCmdTest
32 from samba import dsdb
33 from samba.dcerpc import drsuapi, misc, drsblobs, security
34 from samba.ndr import ndr_unpack, ndr_pack
35 from samba.drs_utils import drs_DsBind
36 from samba import gensec
37 from ldb import (
38     SCOPE_BASE,
39     Message,
40     FLAG_MOD_REPLACE,
41 )
42 from samba.compat import cmp_fn
43 from samba.compat import get_string
44
45
46 class DrsBaseTestCase(SambaToolCmdTest):
47     """Base class implementation for all DRS python tests.
48        It is intended to provide common initialization and
49        and functionality used by all DRS tests in drs/python
50        test package. For instance, DC1 and DC2 are always used
51        to pass URLs for DCs to test against"""
52
53     def setUp(self):
54         super(DrsBaseTestCase, self).setUp()
55         creds = self.get_credentials()
56         creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
57
58         # connect to DCs
59         url_dc = samba.tests.env_get_var_value("DC1")
60         (self.ldb_dc1, self.info_dc1) = samba.tests.connect_samdb_ex(url_dc,
61                                                                      ldap_only=True)
62         url_dc = samba.tests.env_get_var_value("DC2")
63         (self.ldb_dc2, self.info_dc2) = samba.tests.connect_samdb_ex(url_dc,
64                                                                      ldap_only=True)
65         self.test_ldb_dc = self.ldb_dc1
66
67         # cache some of RootDSE props
68         self.schema_dn = str(self.info_dc1["schemaNamingContext"][0])
69         self.domain_dn = str(self.info_dc1["defaultNamingContext"][0])
70         self.config_dn = str(self.info_dc1["configurationNamingContext"][0])
71         self.forest_level = int(self.info_dc1["forestFunctionality"][0])
72
73         # we will need DCs DNS names for 'samba-tool drs' command
74         self.dnsname_dc1 = str(self.info_dc1["dnsHostName"][0])
75         self.dnsname_dc2 = str(self.info_dc2["dnsHostName"][0])
76
77         # for debugging the test code
78         self._debug = False
79
80     def tearDown(self):
81         super(DrsBaseTestCase, self).tearDown()
82
83     def set_test_ldb_dc(self, ldb_dc):
84         """Sets which DC's LDB we perform operations on during the test"""
85         self.test_ldb_dc = ldb_dc
86
87     def _GUID_string(self, guid):
88         return get_string(self.test_ldb_dc.schema_format_value("objectGUID", guid))
89
90     def _ldap_schemaUpdateNow(self, sam_db):
91         rec = {"dn": "",
92                "schemaUpdateNow": "1"}
93         m = Message.from_dict(sam_db, rec, FLAG_MOD_REPLACE)
94         sam_db.modify(m)
95
96     def _deleted_objects_dn(self, sam_ldb):
97         wkdn = "<WKGUID=18E2EA80684F11D2B9AA00C04F79F805,%s>" % self.domain_dn
98         res = sam_ldb.search(base=wkdn,
99                              scope=SCOPE_BASE,
100                              controls=["show_deleted:1"])
101         self.assertEquals(len(res), 1)
102         return str(res[0]["dn"])
103
104     def _lost_and_found_dn(self, sam_ldb, nc):
105         wkdn = "<WKGUID=%s,%s>" % (dsdb.DS_GUID_LOSTANDFOUND_CONTAINER, nc)
106         res = sam_ldb.search(base=wkdn,
107                              scope=SCOPE_BASE)
108         self.assertEquals(len(res), 1)
109         return str(res[0]["dn"])
110
111     def _make_obj_name(self, prefix):
112         return prefix + time.strftime("%s", time.gmtime())
113
114     def _samba_tool_cmd_list(self, drs_command):
115         # make command line credentials string
116
117         ccache_name = self.get_creds_ccache_name()
118
119         # Tunnel the command line credentials down to the
120         # subcommand to avoid a new kinit
121         cmdline_auth = "--krb5-ccache=%s" % ccache_name
122
123         # bin/samba-tool drs <drs_command> <cmdline_auth>
124         return ["drs", drs_command, cmdline_auth]
125
126     def _net_drs_replicate(self, DC, fromDC, nc_dn=None, forced=True,
127                            local=False, full_sync=False, single=False):
128         if nc_dn is None:
129             nc_dn = self.domain_dn
130         # make base command line
131         samba_tool_cmdline = self._samba_tool_cmd_list("replicate")
132         # bin/samba-tool drs replicate <Dest_DC_NAME> <Src_DC_NAME> <Naming Context>
133         samba_tool_cmdline += [DC, fromDC, nc_dn]
134
135         if forced:
136             samba_tool_cmdline += ["--sync-forced"]
137         if local:
138             samba_tool_cmdline += ["--local"]
139         if full_sync:
140             samba_tool_cmdline += ["--full-sync"]
141         if single:
142             samba_tool_cmdline += ["--single-object"]
143
144         (result, out, err) = self.runsubcmd(*samba_tool_cmdline)
145         self.assertCmdSuccess(result, out, err)
146         self.assertEquals(err, "", "Shouldn't be any error messages")
147
148     def _enable_inbound_repl(self, DC):
149         # make base command line
150         samba_tool_cmd = self._samba_tool_cmd_list("options")
151         # disable replication
152         samba_tool_cmd += [DC, "--dsa-option=-DISABLE_INBOUND_REPL"]
153         (result, out, err) = self.runsubcmd(*samba_tool_cmd)
154         self.assertCmdSuccess(result, out, err)
155         self.assertEquals(err, "", "Shouldn't be any error messages")
156
157     def _disable_inbound_repl(self, DC):
158         # make base command line
159         samba_tool_cmd = self._samba_tool_cmd_list("options")
160         # disable replication
161         samba_tool_cmd += [DC, "--dsa-option=+DISABLE_INBOUND_REPL"]
162         (result, out, err) = self.runsubcmd(*samba_tool_cmd)
163         self.assertCmdSuccess(result, out, err)
164         self.assertEquals(err, "", "Shouldn't be any error messages")
165
166     def _enable_all_repl(self, DC):
167         self._enable_inbound_repl(DC)
168         # make base command line
169         samba_tool_cmd = self._samba_tool_cmd_list("options")
170         # enable replication
171         samba_tool_cmd += [DC, "--dsa-option=-DISABLE_OUTBOUND_REPL"]
172         (result, out, err) = self.runsubcmd(*samba_tool_cmd)
173         self.assertCmdSuccess(result, out, err)
174         self.assertEquals(err, "", "Shouldn't be any error messages")
175
176     def _disable_all_repl(self, DC):
177         self._disable_inbound_repl(DC)
178         # make base command line
179         samba_tool_cmd = self._samba_tool_cmd_list("options")
180         # disable replication
181         samba_tool_cmd += [DC, "--dsa-option=+DISABLE_OUTBOUND_REPL"]
182         (result, out, err) = self.runsubcmd(*samba_tool_cmd)
183         self.assertCmdSuccess(result, out, err)
184         self.assertEquals(err, "", "Shouldn't be any error messages")
185
186     def _get_highest_hwm_utdv(self, ldb_conn):
187         res = ldb_conn.search("", scope=ldb.SCOPE_BASE, attrs=["highestCommittedUSN"])
188         hwm = drsuapi.DsReplicaHighWaterMark()
189         hwm.tmp_highest_usn = int(res[0]["highestCommittedUSN"][0])
190         hwm.reserved_usn = 0
191         hwm.highest_usn = hwm.tmp_highest_usn
192
193         utdv = drsuapi.DsReplicaCursorCtrEx()
194         cursors = []
195         c1 = drsuapi.DsReplicaCursor()
196         c1.source_dsa_invocation_id = misc.GUID(ldb_conn.get_invocation_id())
197         c1.highest_usn = hwm.highest_usn
198         cursors.append(c1)
199         utdv.count = len(cursors)
200         utdv.cursors = cursors
201         return (hwm, utdv)
202
203     def _get_identifier(self, ldb_conn, dn):
204         res = ldb_conn.search(dn, scope=ldb.SCOPE_BASE,
205                               attrs=["objectGUID", "objectSid"])
206         id = drsuapi.DsReplicaObjectIdentifier()
207         id.guid = ndr_unpack(misc.GUID, res[0]['objectGUID'][0])
208         if "objectSid" in res[0]:
209             id.sid = ndr_unpack(security.dom_sid, res[0]['objectSid'][0])
210         id.dn = str(res[0].dn)
211         return id
212
213     def _get_ctr6_links(self, ctr6):
214         """
215         Unpacks the linked attributes from a DsGetNCChanges response
216         and returns them as a list.
217         """
218         ctr6_links = []
219         for lidx in range(0, ctr6.linked_attributes_count):
220             l = ctr6.linked_attributes[lidx]
221             try:
222                 target = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
223                                     l.value.blob)
224             except:
225                 target = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
226                                     l.value.blob)
227             al = AbstractLink(l.attid, l.flags,
228                               l.identifier.guid,
229                               target.guid, target.dn)
230             ctr6_links.append(al)
231
232         return ctr6_links
233
234     def _get_ctr6_object_guids(self, ctr6):
235         """Returns all the object GUIDs in a GetNCChanges response"""
236         guid_list = []
237
238         obj = ctr6.first_object
239         for i in range(0, ctr6.object_count):
240             guid_list.append(str(obj.object.identifier.guid))
241             obj = obj.next_object
242
243         return guid_list
244
245     def _ctr6_debug(self, ctr6):
246         """
247         Displays basic info contained in a DsGetNCChanges response.
248         Having this debug code allows us to see the difference in behaviour
249         between Samba and Windows easier. Turn on the self._debug flag to see it.
250         """
251
252         if self._debug:
253             print("------------ recvd CTR6 -------------")
254
255             next_object = ctr6.first_object
256             for i in range(0, ctr6.object_count):
257                 print("Obj %d: %s %s" % (i, next_object.object.identifier.dn[:25],
258                                          next_object.object.identifier.guid))
259                 next_object = next_object.next_object
260
261             print("Linked Attributes: %d" % ctr6.linked_attributes_count)
262             for lidx in range(0, ctr6.linked_attributes_count):
263                 l = ctr6.linked_attributes[lidx]
264                 try:
265                     target = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
266                                         l.value.blob)
267                 except:
268                     target = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3Binary,
269                                         l.value.blob)
270
271                 print("Link Tgt %s... <-- Src %s"
272                       % (target.dn[:25], l.identifier.guid))
273                 state = "Del"
274                 if l.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE:
275                     state = "Act"
276                 print("  v%u %s changed %u" % (l.meta_data.version, state,
277                                                l.meta_data.originating_change_time))
278
279             print("HWM:     %d" % (ctr6.new_highwatermark.highest_usn))
280             print("Tmp HWM: %d" % (ctr6.new_highwatermark.tmp_highest_usn))
281             print("More data: %d" % (ctr6.more_data))
282
283     def _get_replication(self, replica_flags,
284                          drs_error=drsuapi.DRSUAPI_EXOP_ERR_NONE, drs=None, drs_handle=None,
285                          highwatermark=None, uptodateness_vector=None,
286                          more_flags=0, max_objects=133, exop=0,
287                          dest_dsa=drsuapi.DRSUAPI_DS_BIND_GUID_W2K3,
288                          source_dsa=None, invocation_id=None, nc_dn_str=None):
289         """
290         Builds a DsGetNCChanges request based on the information provided
291         and returns the response received from the DC.
292         """
293         if source_dsa is None:
294             source_dsa = self.test_ldb_dc.get_ntds_GUID()
295         if invocation_id is None:
296             invocation_id = self.test_ldb_dc.get_invocation_id()
297         if nc_dn_str is None:
298             nc_dn_str = self.test_ldb_dc.domain_dn()
299
300         if highwatermark is None:
301             if self.default_hwm is None:
302                 (highwatermark, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
303             else:
304                 highwatermark = self.default_hwm
305
306         if drs is None:
307             drs = self.drs
308         if drs_handle is None:
309             drs_handle = self.drs_handle
310
311         req10 = self._getnc_req10(dest_dsa=dest_dsa,
312                                   invocation_id=invocation_id,
313                                   nc_dn_str=nc_dn_str,
314                                   exop=exop,
315                                   max_objects=max_objects,
316                                   replica_flags=replica_flags,
317                                   more_flags=more_flags)
318         req10.highwatermark = highwatermark
319         if uptodateness_vector is not None:
320             uptodateness_vector_v1 = drsuapi.DsReplicaCursorCtrEx()
321             cursors = []
322             for i in range(0, uptodateness_vector.count):
323                 c = uptodateness_vector.cursors[i]
324                 c1 = drsuapi.DsReplicaCursor()
325                 c1.source_dsa_invocation_id = c.source_dsa_invocation_id
326                 c1.highest_usn = c.highest_usn
327                 cursors.append(c1)
328             uptodateness_vector_v1.count = len(cursors)
329             uptodateness_vector_v1.cursors = cursors
330             req10.uptodateness_vector = uptodateness_vector_v1
331         (level, ctr) = drs.DsGetNCChanges(drs_handle, 10, req10)
332         self._ctr6_debug(ctr)
333
334         self.assertEqual(level, 6, "expected level 6 response!")
335         self.assertEqual(ctr.source_dsa_guid, misc.GUID(source_dsa))
336         self.assertEqual(ctr.source_dsa_invocation_id, misc.GUID(invocation_id))
337         self.assertEqual(ctr.extended_ret, drs_error)
338
339         return ctr
340
341     def _check_replication(self, expected_dns, replica_flags, expected_links=[],
342                            drs_error=drsuapi.DRSUAPI_EXOP_ERR_NONE, drs=None, drs_handle=None,
343                            highwatermark=None, uptodateness_vector=None,
344                            more_flags=0, more_data=False,
345                            dn_ordered=True, links_ordered=True,
346                            max_objects=133, exop=0,
347                            dest_dsa=drsuapi.DRSUAPI_DS_BIND_GUID_W2K3,
348                            source_dsa=None, invocation_id=None, nc_dn_str=None,
349                            nc_object_count=0, nc_linked_attributes_count=0):
350         """
351         Makes sure that replication returns the specific error given.
352         """
353
354         # send a DsGetNCChanges to the DC
355         ctr6 = self._get_replication(replica_flags,
356                                      drs_error, drs, drs_handle,
357                                      highwatermark, uptodateness_vector,
358                                      more_flags, max_objects, exop, dest_dsa,
359                                      source_dsa, invocation_id, nc_dn_str)
360
361         # check the response is what we expect
362         self._check_ctr6(ctr6, expected_dns, expected_links,
363                          nc_object_count=nc_object_count, more_data=more_data,
364                          dn_ordered=dn_ordered)
365         return (ctr6.new_highwatermark, ctr6.uptodateness_vector)
366
367     def _get_ctr6_dn_list(self, ctr6):
368         """
369         Returns the DNs contained in a DsGetNCChanges response.
370         """
371         dn_list = []
372         next_object = ctr6.first_object
373         for i in range(0, ctr6.object_count):
374             dn_list.append(next_object.object.identifier.dn)
375             next_object = next_object.next_object
376         self.assertEqual(next_object, None)
377
378         return dn_list
379
380     def _check_ctr6(self, ctr6, expected_dns=[], expected_links=[],
381                     dn_ordered=True, links_ordered=True,
382                     more_data=False, nc_object_count=0,
383                     nc_linked_attributes_count=0, drs_error=0):
384         """
385         Check that a ctr6 matches the specified parameters.
386         """
387         ctr6_raw_dns = self._get_ctr6_dn_list(ctr6)
388
389         # filter out changes to the RID Set objects, as these can happen
390         # intermittently and mess up the test assertions
391         ctr6_dns = []
392         for dn in ctr6_raw_dns:
393             if "CN=RID Set," in dn or "CN=RID Manager$," in dn:
394                 print("Removing {0} from GetNCChanges reply".format(dn))
395             else:
396                 ctr6_dns.append(dn)
397
398         self.assertEqual(len(ctr6_dns), len(expected_dns),
399                          "Received unexpected objects (%s)" % ctr6_dns)
400         self.assertEqual(ctr6.object_count, len(ctr6_raw_dns))
401         self.assertEqual(ctr6.linked_attributes_count, len(expected_links))
402         self.assertEqual(ctr6.more_data, more_data)
403         self.assertEqual(ctr6.nc_object_count, nc_object_count)
404         self.assertEqual(ctr6.nc_linked_attributes_count, nc_linked_attributes_count)
405         self.assertEqual(ctr6.drs_error[0], drs_error)
406
407         i = 0
408         for dn in expected_dns:
409             # Expect them back in the exact same order as specified.
410             if dn_ordered:
411                 self.assertNotEqual(ctr6_dns[i], None)
412                 self.assertEqual(ctr6_dns[i], dn)
413                 i = i + 1
414             # Don't care what order
415             else:
416                 self.assertTrue(dn in ctr6_dns, "Couldn't find DN '%s' anywhere in ctr6 response." % dn)
417
418         # Extract the links from the response
419         ctr6_links = self._get_ctr6_links(ctr6)
420         expected_links.sort()
421
422         lidx = 0
423         for el in expected_links:
424             if links_ordered:
425                 self.assertEqual(el, ctr6_links[lidx])
426                 lidx += 1
427             else:
428                 self.assertTrue(el in ctr6_links, "Couldn't find link '%s' anywhere in ctr6 response." % el)
429
430     def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
431                    replica_flags=0, max_objects=0, partial_attribute_set=None,
432                    partial_attribute_set_ex=None, mapping_ctr=None):
433         req8 = drsuapi.DsGetNCChangesRequest8()
434
435         req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
436         req8.source_dsa_invocation_id = misc.GUID(invocation_id)
437         req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
438         req8.naming_context.dn = str(nc_dn_str)
439         req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
440         req8.highwatermark.tmp_highest_usn = 0
441         req8.highwatermark.reserved_usn = 0
442         req8.highwatermark.highest_usn = 0
443         req8.uptodateness_vector = None
444         req8.replica_flags = replica_flags
445         req8.max_object_count = max_objects
446         req8.max_ndr_size = 402116
447         req8.extended_op = exop
448         req8.fsmo_info = 0
449         req8.partial_attribute_set = partial_attribute_set
450         req8.partial_attribute_set_ex = partial_attribute_set_ex
451         if mapping_ctr:
452             req8.mapping_ctr = mapping_ctr
453         else:
454             req8.mapping_ctr.num_mappings = 0
455             req8.mapping_ctr.mappings = None
456
457         return req8
458
459     def _getnc_req10(self, dest_dsa, invocation_id, nc_dn_str, exop,
460                      replica_flags=0, max_objects=0, partial_attribute_set=None,
461                      partial_attribute_set_ex=None, mapping_ctr=None,
462                      more_flags=0):
463         req10 = drsuapi.DsGetNCChangesRequest10()
464
465         req10.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
466         req10.source_dsa_invocation_id = misc.GUID(invocation_id)
467         req10.naming_context = drsuapi.DsReplicaObjectIdentifier()
468         req10.naming_context.dn = str(nc_dn_str)
469         req10.highwatermark = drsuapi.DsReplicaHighWaterMark()
470         req10.highwatermark.tmp_highest_usn = 0
471         req10.highwatermark.reserved_usn = 0
472         req10.highwatermark.highest_usn = 0
473         req10.uptodateness_vector = None
474         req10.replica_flags = replica_flags
475         req10.max_object_count = max_objects
476         req10.max_ndr_size = 402116
477         req10.extended_op = exop
478         req10.fsmo_info = 0
479         req10.partial_attribute_set = partial_attribute_set
480         req10.partial_attribute_set_ex = partial_attribute_set_ex
481         if mapping_ctr:
482             req10.mapping_ctr = mapping_ctr
483         else:
484             req10.mapping_ctr.num_mappings = 0
485             req10.mapping_ctr.mappings = None
486         req10.more_flags = more_flags
487
488         return req10
489
490     def _ds_bind(self, server_name, creds=None):
491         binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
492
493         if creds is None:
494             creds = self.get_credentials()
495         drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), creds)
496         (drs_handle, supported_extensions) = drs_DsBind(drs)
497         return (drs, drs_handle)
498
499     def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
500         partial_attribute_set = drsuapi.DsPartialAttributeSet()
501         partial_attribute_set.attids = attids
502         partial_attribute_set.num_attids = len(attids)
503         return partial_attribute_set
504
505
506 class AbstractLink:
507     def __init__(self, attid, flags, identifier, targetGUID,
508                  targetDN=""):
509         self.attid = attid
510         self.flags = flags
511         self.identifier = str(identifier)
512         self.selfGUID_blob = ndr_pack(identifier)
513         self.targetGUID = str(targetGUID)
514         self.targetGUID_blob = ndr_pack(targetGUID)
515         self.targetDN = targetDN
516
517     def __repr__(self):
518         return "AbstractLink(0x%08x, 0x%08x, %s, %s)" % (
519                 self.attid, self.flags, self.identifier, self.targetGUID)
520
521     def __internal_cmp__(self, other, verbose=False):
522         """See CompareLinks() in MS-DRSR section 4.1.10.5.17"""
523         if not isinstance(other, AbstractLink):
524             if verbose:
525                 print("AbstractLink.__internal_cmp__(%r, %r) => wrong type" % (self, other))
526             return NotImplemented
527
528         c = cmp_fn(self.selfGUID_blob, other.selfGUID_blob)
529         if c != 0:
530             if verbose:
531                 print("AbstractLink.__internal_cmp__(%r, %r) => %d different identifier" % (self, other, c))
532             return c
533
534         c = other.attid - self.attid
535         if c != 0:
536             if verbose:
537                 print("AbstractLink.__internal_cmp__(%r, %r) => %d different attid" % (self, other, c))
538             return c
539
540         self_active = self.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
541         other_active = other.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
542
543         c = self_active - other_active
544         if c != 0:
545             if verbose:
546                 print("AbstractLink.__internal_cmp__(%r, %r) => %d different FLAG_ACTIVE" % (self, other, c))
547             return c
548
549         c = cmp_fn(self.targetGUID_blob, other.targetGUID_blob)
550         if c != 0:
551             if verbose:
552                 print("AbstractLink.__internal_cmp__(%r, %r) => %d different target" % (self, other, c))
553             return c
554
555         c = self.flags - other.flags
556         if c != 0:
557             if verbose:
558                 print("AbstractLink.__internal_cmp__(%r, %r) => %d different flags" % (self, other, c))
559             return c
560
561         return 0
562
563     def __lt__(self, other):
564         c = self.__internal_cmp__(other)
565         if c == NotImplemented:
566             return NotImplemented
567         if c < 0:
568             return True
569         return False
570
571     def __le__(self, other):
572         c = self.__internal_cmp__(other)
573         if c == NotImplemented:
574             return NotImplemented
575         if c <= 0:
576             return True
577         return False
578
579     def __eq__(self, other):
580         c = self.__internal_cmp__(other, verbose=True)
581         if c == NotImplemented:
582             return NotImplemented
583         if c == 0:
584             return True
585         return False
586
587     def __ne__(self, other):
588         c = self.__internal_cmp__(other)
589         if c == NotImplemented:
590             return NotImplemented
591         if c != 0:
592             return True
593         return False
594
595     def __gt__(self, other):
596         c = self.__internal_cmp__(other)
597         if c == NotImplemented:
598             return NotImplemented
599         if c > 0:
600             return True
601         return False
602
603     def __ge__(self, other):
604         c = self.__internal_cmp__(other)
605         if c == NotImplemented:
606             return NotImplemented
607         if c >= 0:
608             return True
609         return False
610
611     def __hash__(self):
612         return hash((self.attid, self.flags, self.identifier, self.targetGUID))