tests dsdb encrypted secrets module
[metze/samba/wip.git] / python / samba / tests / messaging.py
1 # -*- coding: utf-8 -*-
2 #
3 # Unix SMB/CIFS implementation.
4 # Copyright © Jelmer Vernooij <jelmer@samba.org> 2008
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 3 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 #
19
20 """Tests for samba.messaging."""
21 import samba
22 from samba.messaging import Messaging
23 from samba.tests import TestCase
24 import time
25 from samba.ndr import ndr_print
26 from samba.dcerpc import server_id
27 import random
28 import os
29 from samba.compat import integer_types
30
31
32 class MessagingTests(TestCase):
33
34     def get_context(self, *args, **kwargs):
35         kwargs['lp_ctx'] = samba.tests.env_loadparm()
36         return Messaging(*args, **kwargs)
37
38     def test_register(self):
39         x = self.get_context()
40         def callback():
41             pass
42         msg_type = x.register((callback, None))
43         self.assertTrue(isinstance(msg_type, integer_types))
44         x.deregister(callback, msg_type)
45
46     def test_all_servers(self):
47         x = self.get_context()
48         self.assertTrue(isinstance(x.irpc_all_servers(), list))
49
50     def test_by_name(self):
51         x = self.get_context()
52         for name in x.irpc_all_servers():
53             self.assertTrue(isinstance(x.irpc_servers_byname(name.name), list))
54
55     def test_unknown_name(self):
56         x = self.get_context()
57         self.assertRaises(KeyError,
58                           x.irpc_servers_byname, "samba.messaging test NONEXISTING")
59
60     def test_assign_server_id(self):
61         x = self.get_context()
62         self.assertTrue(isinstance(x.server_id, server_id.server_id))
63
64     def test_add_remove_name(self):
65         x = self.get_context()
66         name = "samba.messaging test-%d" % random.randint(1, 1000000)
67         x.irpc_add_name(name)
68         name_list = x.irpc_servers_byname(name)
69         self.assertEqual(len(name_list), 1)
70         self.assertEqual(ndr_print(x.server_id),
71                          ndr_print(name_list[0]))
72         x.irpc_remove_name(name)
73         self.assertRaises(KeyError,
74                           x.irpc_servers_byname, name)
75
76     def test_ping_speed(self):
77         got_ping = {"count": 0}
78         got_pong = {"count": 0}
79         timeout = False
80
81         msg_pong = 0
82         msg_ping = 0
83
84         server_ctx = self.get_context((0, 1))
85         def ping_callback(got_ping, msg_type, src, data):
86             got_ping["count"] += 1
87             server_ctx.send(src, msg_pong, data)
88
89         msg_ping = server_ctx.register((ping_callback, got_ping))
90
91         def pong_callback(got_pong, msg_type, src, data):
92             got_pong["count"] += 1
93
94         client_ctx = self.get_context((0, 2))
95         msg_pong = client_ctx.register((pong_callback, got_pong))
96
97         # Try both server_id forms (structure and tuple)
98         client_ctx.send((0, 1), msg_ping, "testing")
99
100         client_ctx.send((0, 1), msg_ping, "testing2")
101
102         start_time = time.time()
103
104         # NOTE WELL: If debugging this with GDB, then the timeout will
105         # fire while you are trying to understand it.
106
107         while (got_ping["count"] < 2 or got_pong["count"] < 2) and not timeout:
108             client_ctx.loop_once(0.1)
109             server_ctx.loop_once(0.1)
110             if time.time() - start_time > 1:
111                 timeout = True
112
113         self.assertEqual(got_ping["count"], 2)
114         self.assertEqual(got_pong["count"], 2)
115
116     def test_pid_defaulting(self):
117         got_ping = {"count": 0}
118         got_pong = {"count": 0}
119         timeout = False
120
121         msg_pong = 0
122         msg_ping = 0
123
124         pid = os.getpid()
125         server_ctx = self.get_context((pid, 1))
126         def ping_callback(got_ping, msg_type, src, data):
127             got_ping["count"] += 1
128             server_ctx.send(src, msg_pong, data)
129
130         msg_ping = server_ctx.register((ping_callback, got_ping))
131
132         def pong_callback(got_pong, msg_type, src, data):
133             got_pong["count"] += 1
134
135         client_ctx = self.get_context((2,))
136         msg_pong = client_ctx.register((pong_callback, got_pong))
137
138         # Try one and two element tuple forms
139         client_ctx.send((pid, 1), msg_ping, "testing")
140
141         client_ctx.send((1,), msg_ping, "testing2")
142
143         start_time = time.time()
144
145         # NOTE WELL: If debugging this with GDB, then the timeout will
146         # fire while you are trying to understand it.
147
148         while (got_ping["count"] < 2 or got_pong["count"] < 2) and not timeout:
149             client_ctx.loop_once(0.1)
150             server_ctx.loop_once(0.1)
151             if time.time() - start_time > 1:
152                 timeout = True
153
154         self.assertEqual(got_ping["count"], 2)
155         self.assertEqual(got_pong["count"], 2)