1 # Unit and integration tests for traffic.py
3 # Copyright (C) Catalyst IT Ltd. 2017
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 # from pprint import pprint
19 from cStringIO import StringIO
23 from samba.emulate import traffic
26 TEST_FILE = 'testdata/traffic-sample-very-short.txt'
29 class TrafficEmulatorTests(samba.tests.TestCase):
31 self.model = traffic.TrafficModel()
36 def test_parse_ngrams_dns_included(self):
37 model = traffic.TrafficModel()
42 dns_counts) = traffic.ingest_summaries([f], dns_mode='include')
44 model.learn(conversations)
46 ('-', '-'): ['dns:0', 'dns:0', 'dns:0', 'ldap:3'],
47 ('-', 'dns:0'): ['dns:0', 'dns:0', 'dns:0'],
48 ('-', 'ldap:3'): ['wait:0'],
49 ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
50 ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
51 ('dns:0', 'dns:0'): ['dns:0', 'dns:0', 'dns:0', 'wait:0'],
52 ('dns:0', 'wait:0'): ['cldap:3'],
53 ('kerberos:', 'ldap:3'): ['-'],
54 ('ldap:3', 'wait:0'): ['ldap:2'],
55 ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
56 ('wait:0', 'cldap:3'): ['cldap:3'],
57 ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
59 expected_query_details = {
60 'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
61 ('', '', '', 'Netlogon', '', '', ''),
62 ('', '', '', 'Netlogon', '', '', '')],
63 'dns:0': [(), (), (), (), (), (), (), (), ()],
65 'ldap:2': [('', '', '', '', '', '', '')],
69 'subschemaSubentry,dsServiceName,namingContexts,'
70 'defaultNamingContext,schemaNamingContext,'
71 'configurationNamingContext,rootDomainNamingContext,'
72 'supportedControl,supportedLDAPVersion,'
73 'supportedLDAPPolicies,supportedSASLMechanisms,'
74 'dnsHostName,ldapServiceName,serverName,'
75 'supportedCapabilities',
79 ('2', 'DC,DC', '', 'cn', '', '', '')],
80 'rpc_netlogon:29': [()]
83 ngrams = {k: sorted(v) for k, v in model.ngrams.items()}
84 details = {k: sorted(v) for k, v in model.query_details.items()}
86 self.assertEqual(expected_ngrams, ngrams)
87 self.assertEqual(expected_query_details, details)
88 # We use a stringIO instead of a temporary file
92 model2 = traffic.TrafficModel()
96 self.assertEqual(expected_ngrams, model2.ngrams)
97 self.assertEqual(expected_query_details, model2.query_details)
99 def test_parse_ngrams(self):
104 dns_counts) = traffic.ingest_summaries([f])
106 self.model.learn(conversations, dns_counts)
108 # pprint(self.model.ngrams, width=50)
109 # print 'query_details'
110 # pprint(self.model.query_details, width=55)
112 ('-', '-'): ['cldap:3', 'ldap:3'],
113 ('-', 'cldap:3'): ['cldap:3'],
114 ('-', 'ldap:3'): ['wait:0'],
115 ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
116 ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
117 ('kerberos:', 'ldap:3'): ['-'],
118 ('ldap:3', 'wait:0'): ['ldap:2'],
119 ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
120 ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
123 expected_query_details = {
124 'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
125 ('', '', '', 'Netlogon', '', '', ''),
126 ('', '', '', 'Netlogon', '', '', '')],
127 'kerberos:': [('',)],
128 'ldap:2': [('', '', '', '', '', '', '')],
132 'subschemaSubentry,dsServiceName,namingContexts,'
133 'defaultNamingContext,schemaNamingContext,'
134 'configurationNamingContext,rootDomainNamingContext,'
135 'supportedControl,supportedLDAPVersion,'
136 'supportedLDAPPolicies,supportedSASLMechanisms,'
137 'dnsHostName,ldapServiceName,serverName,'
138 'supportedCapabilities',
142 ('2', 'DC,DC', '', 'cn', '', '', '')],
143 'rpc_netlogon:29': [()]
146 ngrams = {k: sorted(v) for k, v in self.model.ngrams.items()}
147 details = {k: sorted(v) for k, v in self.model.query_details.items()}
149 self.assertEqual(expected_ngrams, ngrams)
150 self.assertEqual(expected_query_details, details)
151 # We use a stringIO instead of a temporary file
155 model2 = traffic.TrafficModel()
159 self.assertEqual(expected_ngrams, model2.ngrams)
160 self.assertEqual(expected_query_details, model2.query_details)