pymessaging: Add a hook to run the event loop, make callbacks practical
authorAndrew Bartlett <abartlet@samba.org>
Mon, 13 Mar 2017 23:39:13 +0000 (12:39 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Tue, 28 Mar 2017 07:23:11 +0000 (09:23 +0200)
These change allow us to write a messaging server in python.

The previous ping_speed test did not actually test anything, so
we use .loop_once() to make it actually work.  To enable practial use
a context is supplied in the tuple with the callback, and the server_id
for the reply is not placed inside an additional tuple.

In order to get at the internal event context on which to loop, we
expose imessaging_context in messaging_internal.h and allow the python
bindings to use that header.

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
python/samba/tests/messaging.py
source4/lib/messaging/messaging.c
source4/lib/messaging/messaging_internal.h [new file with mode: 0644]
source4/lib/messaging/pymessaging.c

index a70be96edc27bf13e98914938c64d42252a48a3c..6ee18e791e361e021b93209d7cee27f14c00912d 100644 (file)
@@ -21,8 +21,9 @@
 import samba
 from samba.messaging import Messaging
 from samba.tests import TestCase
-from samba.dcerpc.server_id import server_id
+import time
 from samba.ndr import ndr_print
+from samba.dcerpc import server_id
 import random
 
 class MessagingTests(TestCase):
@@ -35,7 +36,8 @@ class MessagingTests(TestCase):
         x = self.get_context()
         def callback():
             pass
-        msg_type = x.register(callback)
+        msg_type = x.register((callback, None))
+        self.assertTrue(isinstance(msg_type, long))
         x.deregister(callback, msg_type)
 
     def test_all_servers(self):
@@ -54,7 +56,7 @@ class MessagingTests(TestCase):
 
     def test_assign_server_id(self):
         x = self.get_context()
-        self.assertTrue(isinstance(x.server_id, server_id))
+        self.assertTrue(isinstance(x.server_id, server_id.server_id))
 
     def test_add_remove_name(self):
         x = self.get_context()
@@ -69,19 +71,41 @@ class MessagingTests(TestCase):
                           x.irpc_servers_byname, name)
 
     def test_ping_speed(self):
+        got_ping = {"count": 0}
+        got_pong = {"count": 0}
+        timeout = False
+
+        msg_pong = 0
+        msg_ping = 0
+
         server_ctx = self.get_context((0, 1))
-        def ping_callback(src, data):
-                server_ctx.send(src, data)
-        def exit_callback():
-                print "received exit"
-        msg_ping = server_ctx.register(ping_callback)
-        msg_exit = server_ctx.register(exit_callback)
-
-        def pong_callback():
-                print "received pong"
+        def ping_callback(got_ping, msg_type, src, data):
+            got_ping["count"] += 1
+            server_ctx.send(src, msg_pong, data)
+
+        msg_ping = server_ctx.register((ping_callback, got_ping))
+
+        def pong_callback(got_pong, msg_type, src, data):
+            got_pong["count"] += 1
+
         client_ctx = self.get_context((0, 2))
-        msg_pong = client_ctx.register(pong_callback)
+        msg_pong = client_ctx.register((pong_callback, got_pong))
 
+        # Try both server_id forms (structure and tuple)
         client_ctx.send((0, 1), msg_ping, "testing")
-        client_ctx.send((0, 1), msg_ping, "")
 
+        client_ctx.send((0, 1), msg_ping, "testing2")
+
+        start_time = time.time()
+
+        # NOTE WELL: If debugging this with GDB, then the timeout will
+        # fire while you are trying to understand it.
+
+        while (got_ping["count"] < 2 or got_pong["count"] < 2) and not timeout:
+            client_ctx.loop_once(0.1)
+            server_ctx.loop_once(0.1)
+            if time.time() - start_time > 1:
+                timeout = True
+
+        self.assertEqual(got_ping["count"], 2)
+        self.assertEqual(got_pong["count"], 2)
index 84df9345d80f3b24608ab1037c670ec4da7e5227..4d75f0976dcba1b5ad18a19d953d976db703a524 100644 (file)
@@ -24,6 +24,7 @@
 #include "lib/util/server_id.h"
 #include "system/filesys.h"
 #include "messaging/messaging.h"
+#include "messaging/messaging_internal.h"
 #include "../lib/util/dlinklist.h"
 #include "lib/socket/socket.h"
 #include "librpc/gen_ndr/ndr_irpc.h"
@@ -55,22 +56,6 @@ struct irpc_request {
        } incoming;
 };
 
-struct imessaging_context {
-       struct imessaging_context *prev, *next;
-       struct tevent_context *ev;
-       struct server_id server_id;
-       const char *sock_dir;
-       const char *lock_dir;
-       struct dispatch_fn **dispatch;
-       uint32_t num_types;
-       struct idr_context *dispatch_tree;
-       struct irpc_list *irpc;
-       struct idr_context *idr;
-       struct server_id_db *names;
-       struct timeval start_time;
-       void *msg_dgm_ref;
-};
-
 /* we have a linked list of dispatch handlers for each msg_type that
    this messaging server can deal with */
 struct dispatch_fn {
diff --git a/source4/lib/messaging/messaging_internal.h b/source4/lib/messaging/messaging_internal.h
new file mode 100644 (file)
index 0000000..93c5c4b
--- /dev/null
@@ -0,0 +1,36 @@
+/*
+   Unix SMB/CIFS implementation.
+
+   Samba internal messaging functions
+
+   Copyright (C) Andrew Tridgell 2004
+
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+struct imessaging_context {
+       struct imessaging_context *prev, *next;
+       struct tevent_context *ev;
+       struct server_id server_id;
+       const char *sock_dir;
+       const char *lock_dir;
+       struct dispatch_fn **dispatch;
+       uint32_t num_types;
+       struct idr_context *dispatch_tree;
+       struct irpc_list *irpc;
+       struct idr_context *idr;
+       struct server_id_db *names;
+       struct timeval start_time;
+       void *msg_dgm_ref;
+};
index 5b5408caddb435bf9502171a747c1db1df752621..2d1601d5fd38051de11e2036043311b5ca9fdc87 100644 (file)
@@ -34,6 +34,7 @@
 #include "librpc/rpc/dcerpc.h"
 #include "librpc/gen_ndr/server_id.h"
 #include <pytalloc.h>
+#include "messaging_internal.h"
 
 void initmessaging(void);
 
@@ -173,7 +174,8 @@ static void py_msg_callback_wrapper(struct imessaging_context *msg, void *privat
                               uint32_t msg_type, 
                               struct server_id server_id, DATA_BLOB *data)
 {
-       PyObject *py_server_id, *callback = (PyObject *)private_data;
+       PyObject *py_server_id, *callback_and_tuple = (PyObject *)private_data;
+       PyObject *callback, *py_private;
 
        struct server_id *p_server_id = talloc(NULL, struct server_id);
        if (!p_server_id) {
@@ -182,10 +184,18 @@ static void py_msg_callback_wrapper(struct imessaging_context *msg, void *privat
        }
        *p_server_id = server_id;
 
+       if (!PyArg_ParseTuple(callback_and_tuple, "OO",
+                             &callback,
+                             &py_private)) {
+               return;
+       }
+
        py_server_id = py_return_ndr_struct("samba.dcerpc.server_id", "server_id", p_server_id, p_server_id);
        talloc_unlink(NULL, p_server_id);
 
-       PyObject_CallFunction(callback, discard_const_p(char, "i(O)s#"), msg_type,
+       PyObject_CallFunction(callback, discard_const_p(char, "OiOs#"),
+                             py_private,
+                             msg_type,
                              py_server_id,
                              data->data, data->length);
 }
@@ -194,24 +204,30 @@ static PyObject *py_imessaging_register(PyObject *self, PyObject *args, PyObject
 {
        imessaging_Object *iface = (imessaging_Object *)self;
        int msg_type = -1;
-       PyObject *callback;
+       PyObject *callback_and_context;
        NTSTATUS status;
-       const char *kwnames[] = { "callback", "msg_type", NULL };
+       const char *kwnames[] = { "callback_and_context", "msg_type", NULL };
        
        if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|i:register", 
-               discard_const_p(char *, kwnames), &callback, &msg_type)) {
+               discard_const_p(char *, kwnames),
+                                        &callback_and_context, &msg_type)) {
+               return NULL;
+       }
+       if (!PyTuple_Check(callback_and_context)
+           || PyTuple_Size(callback_and_context) != 2) {
+               PyErr_SetString(PyExc_ValueError, "Expected of size 2 for callback_and_context");
                return NULL;
        }
 
-       Py_INCREF(callback);
+       Py_INCREF(callback_and_context);
 
        if (msg_type == -1) {
                uint32_t msg_type32 = msg_type;
-               status = imessaging_register_tmp(iface->msg_ctx, callback,
+               status = imessaging_register_tmp(iface->msg_ctx, callback_and_context,
                                                py_msg_callback_wrapper, &msg_type32);
                msg_type = msg_type32;
        } else {
-               status = imessaging_register(iface->msg_ctx, callback,
+               status = imessaging_register(iface->msg_ctx, callback_and_context,
                                    msg_type, py_msg_callback_wrapper);
        }
        if (NT_STATUS_IS_ERR(status)) {
@@ -241,6 +257,52 @@ static PyObject *py_imessaging_deregister(PyObject *self, PyObject *args, PyObje
        Py_RETURN_NONE;
 }
 
+static void simple_timer_handler(struct tevent_context *ev,
+                                struct tevent_timer *te,
+                                struct timeval current_time,
+                                void *private_data)
+{
+       return;
+}
+
+static PyObject *py_imessaging_loop_once(PyObject *self, PyObject *args, PyObject *kwargs)
+{
+       imessaging_Object *iface = (imessaging_Object *)self;
+       double offset;
+       int seconds;
+       struct timeval next_event;
+       struct tevent_timer *timer = NULL;
+       const char *kwnames[] = { "timeout", NULL };
+
+       TALLOC_CTX *frame = talloc_stackframe();
+
+       if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d",
+                                        discard_const_p(char *, kwnames), &offset)) {
+               TALLOC_FREE(frame);
+               return NULL;
+       }
+
+       if (offset != 0.0) {
+               seconds = offset;
+               offset -= seconds;
+               next_event = tevent_timeval_current_ofs(seconds, (int)(offset*1000000));
+
+               timer = tevent_add_timer(iface->msg_ctx->ev, frame, next_event, simple_timer_handler,
+                                        NULL);
+               if (timer == NULL) {
+                       PyErr_NoMemory();
+                       TALLOC_FREE(frame);
+                       return NULL;
+               }
+       }
+
+       tevent_loop_once(iface->msg_ctx->ev);
+
+       TALLOC_FREE(frame);
+
+       Py_RETURN_NONE;
+}
+
 static PyObject *py_irpc_add_name(PyObject *self, PyObject *args, PyObject *kwargs)
 {
        imessaging_Object *iface = (imessaging_Object *)self;
@@ -371,9 +433,17 @@ static PyMethodDef py_imessaging_methods[] = {
        { "send", (PyCFunction)py_imessaging_send, METH_VARARGS|METH_KEYWORDS,
                "S.send(target, msg_type, data) -> None\nSend a message" },
        { "register", (PyCFunction)py_imessaging_register, METH_VARARGS|METH_KEYWORDS,
-               "S.register(callback, msg_type=None) -> msg_type\nRegister a message handler" },
+               "S.register((callback, context), msg_type=None) -> msg_type\nRegister a message handler.  "
+               "The callback and context must be supplied as a two-element tuple." },
        { "deregister", (PyCFunction)py_imessaging_deregister, METH_VARARGS|METH_KEYWORDS,
-               "S.deregister(callback, msg_type) -> None\nDeregister a message handler" },
+               "S.deregister((callback, context), msg_type) -> None\nDeregister a message handler "
+               "The callback and context must be supplied as the exact same two-element tuple "
+               "as was used as registration time." },
+       { "loop_once", (PyCFunction)py_imessaging_loop_once, METH_VARARGS|METH_KEYWORDS,
+               "S.loop_once(timeout) -> None\n"
+               "Loop on the internal event context until we get an event "
+               "(which might be a message calling the callback), "
+               "timeout after timeout seconds (if not 0)" },
        { "irpc_add_name", (PyCFunction)py_irpc_add_name, METH_VARARGS,
                "S.irpc_add_name(name) -> None\n"
                "Add this context to the list of server_id values that "