Dont set next_interval to 0.
[metze/ctdb/wip.git] / libctdb / messages.c
1 #include "libctdb_private.h"
2 #include "messages.h"
3 #include "io_elem.h"
4 #include <ctdb.h>
5 #include <tdb.h>
6 #include <ctdb_protocol.h>
7 #include <stdlib.h>
8 #include <string.h>
9 #include <errno.h>
10
11 /* Remove type-safety macros. */
12 #undef ctdb_set_message_handler_send
13 #undef ctdb_set_message_handler_recv
14 #undef ctdb_remove_message_handler_send
15
16 struct message_handler_info {
17         struct message_handler_info *next, *prev;
18
19         uint64_t srvid;
20         ctdb_message_fn_t handler;
21         void *handler_data;
22 };
23
24 void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr)
25 {
26         struct message_handler_info *i;
27         struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
28         TDB_DATA data;
29         bool found;
30
31         data.dptr = msg->data;
32         data.dsize = msg->datalen;
33
34         /* Note: we want to call *every* handler: there may be more than one */
35         for (i = ctdb->message_handlers; i; i = i->next) {
36                 if (i->srvid == msg->srvid) {
37                         i->handler(ctdb, msg->srvid, data, i->handler_data);
38                         found = true;
39                 }
40         }
41         if (!found) {
42                 DEBUG(ctdb, LOG_WARNING,
43                       "ctdb_service: messsage for unregistered srvid %llu",
44                       msg->srvid);
45         }
46 }
47
48 void remove_message_handlers(struct ctdb_connection *ctdb)
49 {
50         struct message_handler_info *i;
51
52         /* ctdbd should unregister automatically when we close fd, so we don't
53            need to do that here. */
54         while ((i = ctdb->message_handlers) != NULL) {
55                 DLIST_REMOVE(ctdb->message_handlers, i);
56                 free(i);
57         }
58 }
59
60 bool ctdb_set_message_handler_recv(struct ctdb_connection *ctdb,
61                                    struct ctdb_request *req)
62 {
63         struct message_handler_info *info = req->extra;
64         struct ctdb_reply_control *reply;
65
66         reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_REGISTER_SRVID);
67         if (!reply) {
68                 return false;
69         }
70         if (reply->status != 0) {
71                 DEBUG(ctdb, LOG_ERR,
72                       "ctdb_set_message_handler_recv: status %i",
73                       reply->status);
74                 return false;
75         }
76
77         /* Put ourselves in list of handlers. */
78         DLIST_ADD(ctdb->message_handlers, info);
79         /* Keep safe from destructor */
80         req->extra = NULL;
81         return true;
82 }
83
84 static void free_info(struct ctdb_connection *ctdb, struct ctdb_request *req)
85 {
86         free(req->extra);
87 }
88
89 struct ctdb_request *
90 ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
91                               ctdb_message_fn_t handler, void *handler_data,
92                               ctdb_callback_t callback, void *private_data)
93 {
94         struct message_handler_info *info;
95         struct ctdb_request *req;
96
97         info = malloc(sizeof(*info));
98         if (!info) {
99                 DEBUG(ctdb, LOG_ERR,
100                       "ctdb_set_message_handler_send: allocating info");
101                 return NULL;
102         }
103
104         req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID,
105                                        CTDB_CURRENT_NODE, NULL, 0,
106                                        callback, private_data);
107         if (!req) {
108                 DEBUG(ctdb, LOG_ERR,
109                       "ctdb_set_message_handler_send: allocating request");
110                 free(info);
111                 return NULL;
112         }
113         req->extra = info;
114         req->extra_destructor = free_info;
115         req->hdr.control->srvid = srvid;
116
117         info->srvid = srvid;
118         info->handler = handler;
119         info->handler_data = handler_data;
120
121         DEBUG(ctdb, LOG_DEBUG,
122               "ctdb_set_message_handler_send: sending request %u for id %llu",
123               req->hdr.hdr->reqid, srvid);
124         return req;
125 }
126
127 struct ctdb_request *
128 ctdb_remove_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid,
129                                  ctdb_message_fn_t handler, void *hdata,
130                                  ctdb_callback_t callback, void *cbdata)
131 {
132         struct message_handler_info *i;
133         struct ctdb_request *req;
134
135         for (i = ctdb->message_handlers; i; i = i->next) {
136                 if (i->srvid == srvid
137                     && i->handler == handler && i->handler_data == hdata) {
138                         break;
139                 }
140         }
141         if (!i) {
142                 DEBUG(ctdb, LOG_ALERT,
143                       "ctdb_remove_message_handler_send: no such handler");
144                 errno = ENOENT;
145                 return NULL;
146         }
147
148         req = new_ctdb_control_request(ctdb, CTDB_CONTROL_DEREGISTER_SRVID,
149                                        CTDB_CURRENT_NODE, NULL, 0,
150                                        callback, cbdata);
151         if (!req) {
152                 DEBUG(ctdb, LOG_ERR,
153                       "ctdb_remove_message_handler_send: allocating request");
154                 return NULL;
155         }
156         req->hdr.control->srvid = srvid;
157         req->extra = i;
158
159         DEBUG(ctdb, LOG_DEBUG,
160               "ctdb_set_remove_handler_send: sending request %u for id %llu",
161               req->hdr.hdr->reqid, srvid);
162         return req;
163 }
164
165 bool ctdb_remove_message_handler_recv(struct ctdb_connection *ctdb,
166                                       struct ctdb_request *req)
167 {
168         struct message_handler_info *handler = req->extra;
169         struct ctdb_reply_control *reply;
170
171         reply = unpack_reply_control(ctdb, req, CTDB_CONTROL_DEREGISTER_SRVID);
172         if (!reply) {
173                 return false;
174         }
175         if (reply->status != 0) {
176                 DEBUG(ctdb, LOG_ERR,
177                       "ctdb_remove_message_handler_recv: status %i",
178                       reply->status);
179                 return false;
180         }
181
182         /* Remove ourselves from list of handlers. */
183         DLIST_REMOVE(ctdb->message_handlers, handler);
184         free(handler);
185         /* Crash if they call this again! */
186         req->extra = NULL;
187         return true;
188 }
189
190 bool ctdb_send_message(struct ctdb_connection *ctdb,
191                       uint32_t pnn, uint64_t srvid,
192                       TDB_DATA data)
193 {
194         struct ctdb_request *req;
195         struct ctdb_req_message *pkt;
196
197         /* We just discard it once it's finished: no reply. */
198         req = new_ctdb_request(offsetof(struct ctdb_req_message, data) + data.dsize,
199                                ctdb_cancel_callback, NULL);
200         if (!req) {
201                 DEBUG(ctdb, LOG_ERR, "ctdb_set_message: allocating message");
202                 return false;
203         }
204
205         io_elem_init_req_header(req->io,
206                                 CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
207
208         pkt = req->hdr.message;
209         pkt->srvid = srvid;
210         pkt->datalen = data.dsize;
211         memcpy(pkt->data, data.dptr, data.dsize);
212         DLIST_ADD_END(ctdb->outq, req, struct ctdb_request);
213         return true;
214 }