python/samba/tests: Use io.StringIO for py3 and StringIO.StrinIO for py2
[samba.git] / python / samba / tests / emulate / traffic.py
1 # Unit and integration tests for traffic.py
2 #
3 # Copyright (C) Catalyst IT Ltd. 2017
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 # from pprint import pprint
19 from samba.compat import StringIO
20
21 import samba.tests
22
23 from samba.emulate import traffic
24
25
26 TEST_FILE = 'testdata/traffic-sample-very-short.txt'
27
28
29 class TrafficEmulatorTests(samba.tests.TestCase):
30     def setUp(self):
31         self.model = traffic.TrafficModel()
32
33     def tearDown(self):
34         del self.model
35
36     def test_parse_ngrams_dns_included(self):
37         model = traffic.TrafficModel()
38         f = open(TEST_FILE)
39         (conversations,
40          interval,
41          duration,
42          dns_counts) = traffic.ingest_summaries([f], dns_mode='include')
43         f.close()
44         model.learn(conversations)
45         expected_ngrams = {
46             ('-', '-'): ['dns:0', 'dns:0', 'dns:0', 'ldap:3'],
47             ('-', 'dns:0'): ['dns:0', 'dns:0', 'dns:0'],
48             ('-', 'ldap:3'): ['wait:0'],
49             ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
50             ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
51             ('dns:0', 'dns:0'): ['dns:0', 'dns:0', 'dns:0', 'wait:0'],
52             ('dns:0', 'wait:0'): ['cldap:3'],
53             ('kerberos:', 'ldap:3'): ['-'],
54             ('ldap:3', 'wait:0'): ['ldap:2'],
55             ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
56             ('wait:0', 'cldap:3'): ['cldap:3'],
57             ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
58         }
59         expected_query_details = {
60             'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
61                         ('', '', '', 'Netlogon', '', '', ''),
62                         ('', '', '', 'Netlogon', '', '', '')],
63             'dns:0': [(), (), (), (), (), (), (), (), ()],
64             'kerberos:': [('',)],
65             'ldap:2': [('', '', '', '', '', '', '')],
66             'ldap:3': [('',
67                         '',
68                         '',
69                         'subschemaSubentry,dsServiceName,namingContexts,'
70                         'defaultNamingContext,schemaNamingContext,'
71                         'configurationNamingContext,rootDomainNamingContext,'
72                         'supportedControl,supportedLDAPVersion,'
73                         'supportedLDAPPolicies,supportedSASLMechanisms,'
74                         'dnsHostName,ldapServiceName,serverName,'
75                         'supportedCapabilities',
76                         '',
77                         '',
78                         ''),
79                        ('2', 'DC,DC', '', 'cn', '', '', '')],
80             'rpc_netlogon:29': [()]
81         }
82         self.maxDiff = 5000
83         ngrams = {k: sorted(v) for k, v in model.ngrams.items()}
84         details = {k: sorted(v) for k, v in model.query_details.items()}
85
86         self.assertEqual(expected_ngrams, ngrams)
87         self.assertEqual(expected_query_details, details)
88         # We use a stringIO instead of a temporary file
89         f = StringIO()
90         model.save(f)
91
92         model2 = traffic.TrafficModel()
93         f.seek(0)
94         model2.load(f)
95
96         self.assertEqual(expected_ngrams, model2.ngrams)
97         self.assertEqual(expected_query_details, model2.query_details)
98
99     def test_parse_ngrams(self):
100         f = open(TEST_FILE)
101         (conversations,
102          interval,
103          duration,
104          dns_counts) = traffic.ingest_summaries([f])
105         f.close()
106         self.model.learn(conversations, dns_counts)
107         # print 'ngrams'
108         # pprint(self.model.ngrams, width=50)
109         # print 'query_details'
110         # pprint(self.model.query_details, width=55)
111         expected_ngrams = {
112             ('-', '-'): ['cldap:3', 'ldap:3'],
113             ('-', 'cldap:3'): ['cldap:3'],
114             ('-', 'ldap:3'): ['wait:0'],
115             ('cldap:3', 'cldap:3'): ['cldap:3', 'wait:0'],
116             ('cldap:3', 'wait:0'): ['rpc_netlogon:29'],
117             ('kerberos:', 'ldap:3'): ['-'],
118             ('ldap:3', 'wait:0'): ['ldap:2'],
119             ('rpc_netlogon:29', 'kerberos:'): ['ldap:3'],
120             ('wait:0', 'rpc_netlogon:29'): ['kerberos:']
121         }
122
123         expected_query_details = {
124             'cldap:3': [('', '', '', 'Netlogon', '', '', ''),
125                         ('', '', '', 'Netlogon', '', '', ''),
126                         ('', '', '', 'Netlogon', '', '', '')],
127             'kerberos:': [('',)],
128             'ldap:2': [('', '', '', '', '', '', '')],
129             'ldap:3': [('',
130                         '',
131                         '',
132                         'subschemaSubentry,dsServiceName,namingContexts,'
133                         'defaultNamingContext,schemaNamingContext,'
134                         'configurationNamingContext,rootDomainNamingContext,'
135                         'supportedControl,supportedLDAPVersion,'
136                         'supportedLDAPPolicies,supportedSASLMechanisms,'
137                         'dnsHostName,ldapServiceName,serverName,'
138                         'supportedCapabilities',
139                         '',
140                         '',
141                         ''),
142                        ('2', 'DC,DC', '', 'cn', '', '', '')],
143             'rpc_netlogon:29': [()]
144         }
145         self.maxDiff = 5000
146         ngrams = {k: sorted(v) for k, v in self.model.ngrams.items()}
147         details = {k: sorted(v) for k, v in self.model.query_details.items()}
148
149         self.assertEqual(expected_ngrams, ngrams)
150         self.assertEqual(expected_query_details, details)
151         # We use a stringIO instead of a temporary file
152         f = StringIO()
153         self.model.save(f)
154
155         model2 = traffic.TrafficModel()
156         f.seek(0)
157         model2.load(f)
158
159         self.assertEqual(expected_ngrams, model2.ngrams)
160         self.assertEqual(expected_query_details, model2.query_details)