1bba8e7e82d44ee80d863c8334ed26ac4a67baf7
[obnox/samba/samba-obnox.git] / source3 / lib / ctdb_conn.c
1 /*
2    Unix SMB/CIFS implementation.
3    Samba3 ctdb connection handling
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
23
24 #include <tdb.h>
25
26 #include <ctdb_protocol.h>
27
28 #include "lib/async_req/async_sock.h"
29
30 struct ctdb_conn {
31         int fd;
32         struct tevent_queue *outqueue;
33 };
34
35 struct ctdb_conn_init_state {
36         struct sockaddr_un addr;
37         struct ctdb_conn *conn;
38 };
39
40 /*
41  * use the callbacks of async_connect_send to make sure
42  * we are connecting to CTDB as root
43  */
44 static void before_connect_cb(void *private_data) {
45         become_root();
46 }
47
48 static void after_connect_cb(void *private_data) {
49         unbecome_root();
50 }
51
52 static void ctdb_conn_init_done(struct tevent_req *subreq);
53 static int ctdb_conn_destructor(struct ctdb_conn *conn);
54
55 struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
56                                        struct tevent_context *ev,
57                                        const char *sock)
58 {
59         struct tevent_req *req, *subreq;
60         struct ctdb_conn_init_state *state;
61
62         req = tevent_req_create(mem_ctx, &state, struct ctdb_conn_init_state);
63         if (req == NULL) {
64                 return NULL;
65         }
66
67         if (!lp_clustering()) {
68                 tevent_req_error(req, ENOSYS);
69                 return tevent_req_post(req, ev);
70         }
71
72         if (strlen(sock) >= sizeof(state->addr.sun_path)) {
73                 tevent_req_error(req, ENAMETOOLONG);
74                 return tevent_req_post(req, ev);
75         }
76
77         state->conn = talloc(state, struct ctdb_conn);
78         if (tevent_req_nomem(state->conn, req)) {
79                 return tevent_req_post(req, ev);
80         }
81
82         state->conn->outqueue = tevent_queue_create(
83                 state->conn, "ctdb outqueue");
84         if (tevent_req_nomem(state->conn->outqueue, req)) {
85                 return tevent_req_post(req, ev);
86         }
87
88         state->conn->fd = socket(AF_UNIX, SOCK_STREAM, 0);
89         if (state->conn->fd == -1) {
90                 tevent_req_error(req, errno);
91                 return tevent_req_post(req, ev);
92         }
93         talloc_set_destructor(state->conn, ctdb_conn_destructor);
94
95         state->addr.sun_family = AF_UNIX;
96         strncpy(state->addr.sun_path, sock, sizeof(state->addr.sun_path));
97
98         subreq = async_connect_send(state, ev, state->conn->fd,
99                                     (struct sockaddr *)&state->addr,
100                                     sizeof(state->addr), before_connect_cb,
101                                     after_connect_cb, NULL);
102         if (tevent_req_nomem(subreq, req)) {
103                 return tevent_req_post(req, ev);
104         }
105         tevent_req_set_callback(subreq, ctdb_conn_init_done, req);
106         return req;
107 }
108
109 static int ctdb_conn_destructor(struct ctdb_conn *c)
110 {
111         if (c->fd != -1) {
112                 close(c->fd);
113                 c->fd = -1;
114         }
115         return 0;
116 }
117
118 static void ctdb_conn_init_done(struct tevent_req *subreq)
119 {
120         struct tevent_req *req = tevent_req_callback_data(
121                 subreq, struct tevent_req);
122         int ret, err;
123
124         ret = async_connect_recv(subreq, &err);
125         TALLOC_FREE(subreq);
126         if (ret == -1) {
127                 tevent_req_error(req, err);
128                 return;
129         }
130         tevent_req_done(req);
131 }
132
133 int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
134                         struct ctdb_conn **pconn)
135 {
136         struct ctdb_conn_init_state *state = tevent_req_data(
137                 req, struct ctdb_conn_init_state);
138         int err;
139
140         if (tevent_req_is_unix_error(req, &err)) {
141                 return err;
142         }
143         *pconn = talloc_move(mem_ctx, &state->conn);
144
145         return 0;
146 }
147
148 struct ctdb_conn_control_state {
149         struct tevent_context *ev;
150         struct ctdb_conn *conn;
151         struct ctdb_req_control req;
152         struct iovec iov[2];
153         struct ctdb_reply_control *reply;
154 };
155
156 static void ctdb_conn_control_written(struct tevent_req *subreq);
157 static void ctdb_conn_control_done(struct tevent_req *subreq);
158 static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p);
159
160 struct tevent_req *ctdb_conn_control_send(TALLOC_CTX *mem_ctx,
161                                           struct tevent_context *ev,
162                                           struct ctdb_conn *conn,
163                                           uint32_t vnn, uint32_t opcode,
164                                           uint64_t srvid, uint32_t flags,
165                                           uint8_t *data, size_t datalen)
166 {
167         struct tevent_req *req, *subreq;
168         struct ctdb_conn_control_state *state;
169         struct ctdb_req_header *hdr;
170
171         req = tevent_req_create(mem_ctx, &state,
172                                 struct ctdb_conn_control_state);
173         if (req == NULL) {
174                 return NULL;
175         }
176         state->ev = ev;
177         state->conn = conn;
178
179         hdr = &state->req.hdr;
180         hdr->length = offsetof(struct ctdb_req_control, data) + datalen;
181         hdr->ctdb_magic    = CTDB_MAGIC;
182         hdr->ctdb_version  = CTDB_VERSION;
183         hdr->operation     = CTDB_REQ_CONTROL;
184         hdr->reqid         = 1; /* FIXME */
185         hdr->destnode      = vnn;
186         state->req.opcode  = opcode;
187         state->req.srvid   = srvid;
188         state->req.datalen = datalen;
189         state->req.flags   = flags;
190
191         state->iov[0].iov_base = &state->req;
192         state->iov[0].iov_len = offsetof(struct ctdb_req_control, data);
193         state->iov[1].iov_base = data;
194         state->iov[1].iov_len = datalen;
195
196         subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
197                              state->iov, 2);
198         if (tevent_req_nomem(subreq, req)) {
199                 return tevent_req_post(req, ev);
200         }
201         tevent_req_set_callback(subreq, ctdb_conn_control_written, req);
202         return req;
203 }
204
205 static void ctdb_conn_control_written(struct tevent_req *subreq)
206 {
207         struct tevent_req *req = tevent_req_callback_data(
208                 subreq, struct tevent_req);
209         struct ctdb_conn_control_state *state = tevent_req_data(
210                 req, struct ctdb_conn_control_state);
211         ssize_t written;
212         int err;
213
214         written = writev_recv(subreq, &err);
215         TALLOC_FREE(subreq);
216         if (written == -1) {
217                 tevent_req_error(req, err);
218                 return;
219         }
220         subreq = read_packet_send(
221                 state, state->ev, state->conn->fd, sizeof(uint32_t),
222                 ctdb_packet_more, NULL);
223         if (tevent_req_nomem(subreq, req)) {
224                 return;
225         }
226         tevent_req_set_callback(subreq, ctdb_conn_control_done, req);
227 }
228
229 static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p)
230 {
231         uint32_t len;
232
233         if (buflen > sizeof(uint32_t)) {
234                 /* Been here, done */
235                 return 0;
236         }
237         memcpy(&len, buf, sizeof(len));
238
239         if (len < sizeof(uint32_t)) {
240                 return -1;
241         }
242
243         return (len - sizeof(uint32_t));
244 }
245
246 static void ctdb_conn_control_done(struct tevent_req *subreq)
247 {
248         struct tevent_req *req = tevent_req_callback_data(
249                 subreq, struct tevent_req);
250         struct ctdb_conn_control_state *state = tevent_req_data(
251                 req, struct ctdb_conn_control_state);
252         ssize_t nread;
253         uint8_t *buf;
254         int err;
255
256         nread = read_packet_recv(subreq, state, &buf, &err);
257         TALLOC_FREE(subreq);
258         if (nread == -1) {
259                 tevent_req_error(req, err);
260                 return;
261         }
262         state->reply = (struct ctdb_reply_control *)buf;
263         tevent_req_done(req);
264 }
265
266 int ctdb_conn_control_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
267                            struct ctdb_reply_control **preply)
268 {
269         struct ctdb_conn_control_state *state = tevent_req_data(
270                 req, struct ctdb_conn_control_state);
271         int err;
272
273         if (tevent_req_is_unix_error(req, &err)) {
274                 return err;
275         }
276         if (preply != NULL) {
277                 *preply = talloc_move(mem_ctx, &state->reply);
278         }
279         return 0;
280 }
281
282 struct ctdb_conn_msg_write_state {
283         struct ctdb_req_message ctdb_msg;
284         struct iovec iov[2];
285 };
286
287 static void ctdb_conn_msg_write_done(struct tevent_req *subreq);
288
289 struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
290                                             struct tevent_context *ev,
291                                             struct ctdb_conn *conn,
292                                             uint32_t vnn, uint64_t srvid,
293                                             uint8_t *msg, size_t msg_len)
294 {
295         struct tevent_req *req, *subreq;
296         struct ctdb_conn_msg_write_state *state;
297         struct ctdb_req_header *h;
298
299         req = tevent_req_create(mem_ctx, &state,
300                                 struct ctdb_conn_msg_write_state);
301         if (req == NULL) {
302                 return NULL;
303         }
304
305         h = &state->ctdb_msg.hdr;
306
307         h->length = offsetof(struct ctdb_req_message, data) + msg_len;
308         h->ctdb_magic = CTDB_MAGIC;
309         h->ctdb_version = CTDB_VERSION;
310         h->generation = 1;
311         h->operation  = CTDB_REQ_MESSAGE;
312         h->destnode   = vnn;
313         h->srcnode    = CTDB_CURRENT_NODE;
314         h->reqid      = 0;
315         state->ctdb_msg.srvid   = srvid;
316         state->ctdb_msg.datalen = msg_len;
317
318         state->iov[0].iov_base = &state->ctdb_msg;
319         state->iov[0].iov_len = offsetof(struct ctdb_req_message, data);
320         state->iov[1].iov_base = msg;
321         state->iov[1].iov_len = msg_len;
322
323         subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
324                              state->iov, 2);
325         if (tevent_req_nomem(subreq, req)) {
326                 return tevent_req_post(req, ev);
327         }
328         tevent_req_set_callback(subreq, ctdb_conn_msg_write_done, req);
329         return req;
330 }
331
332 static void ctdb_conn_msg_write_done(struct tevent_req *subreq)
333 {
334         struct tevent_req *req = tevent_req_callback_data(
335                 subreq, struct tevent_req);
336         ssize_t written;
337         int err;
338
339         written = writev_recv(subreq, &err);
340         TALLOC_FREE(subreq);
341         if (written == -1) {
342                 tevent_req_error(req, err);
343                 return;
344         }
345         tevent_req_done(req);
346 }
347
348 int ctdb_conn_msg_write_recv(struct tevent_req *req)
349 {
350         int err;
351         if (tevent_req_is_unix_error(req, &err)) {
352                 return err;
353         }
354         return 0;
355 }
356
357 struct ctdb_msg_channel {
358         struct ctdb_conn *conn;
359 };
360
361 struct ctdb_msg_channel_init_state {
362         struct tevent_context *ev;
363         struct ctdb_conn *conn;
364         uint64_t srvid;
365         struct ctdb_msg_channel *channel;
366 };
367
368 static void ctdb_msg_channel_init_connected(struct tevent_req *subreq);
369 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq);
370
371 struct tevent_req *ctdb_msg_channel_init_send(
372         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
373         const char *sock, uint64_t srvid)
374 {
375         struct tevent_req *req, *subreq;
376         struct ctdb_msg_channel_init_state *state;
377
378         req = tevent_req_create(mem_ctx, &state,
379                                 struct ctdb_msg_channel_init_state);
380         if (req == NULL) {
381                 return NULL;
382         }
383         state->ev = ev;
384         state->srvid = srvid;
385
386         subreq = ctdb_conn_init_send(state, ev, sock);
387         if (tevent_req_nomem(subreq, req)) {
388                 return tevent_req_post(req, ev);
389         }
390         tevent_req_set_callback(subreq, ctdb_msg_channel_init_connected, req);
391         return req;
392 }
393
394 static void ctdb_msg_channel_init_connected(struct tevent_req *subreq)
395 {
396         struct tevent_req *req = tevent_req_callback_data(
397                 subreq, struct tevent_req);
398         struct ctdb_msg_channel_init_state *state = tevent_req_data(
399                 req, struct ctdb_msg_channel_init_state);
400         int ret;
401
402         ret = ctdb_conn_init_recv(subreq, state, &state->conn);
403         TALLOC_FREE(subreq);
404         if (tevent_req_error(req, ret)) {
405                 return;
406         }
407         subreq = ctdb_conn_control_send(state, state->ev, state->conn,
408                                         CTDB_CURRENT_NODE,
409                                         CTDB_CONTROL_REGISTER_SRVID,
410                                         state->srvid, 0, NULL, 0);
411         if (tevent_req_nomem(subreq, req)) {
412                 return;
413         }
414         tevent_req_set_callback(
415                 subreq, ctdb_msg_channel_init_registered_srvid, req);
416 }
417
418 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq)
419 {
420         struct tevent_req *req = tevent_req_callback_data(
421                 subreq, struct tevent_req);
422         struct ctdb_msg_channel_init_state *state = tevent_req_data(
423                 req, struct ctdb_msg_channel_init_state);
424         struct ctdb_reply_control *reply;
425         int ret;
426
427         ret = ctdb_conn_control_recv(subreq, talloc_tos(), &reply);
428         TALLOC_FREE(subreq);
429         if (tevent_req_error(req, ret)) {
430                 return;
431         }
432         if (reply->status != 0) {
433                 tevent_req_error(req, EIO);
434                 return;
435         }
436         state->channel = talloc(state, struct ctdb_msg_channel);
437         if (tevent_req_nomem(state->channel, req)) {
438                 return;
439         }
440         state->channel->conn = talloc_move(state->channel, &state->conn);
441         tevent_req_done(req);
442 }
443
444 int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
445                                struct ctdb_msg_channel **pchannel)
446 {
447         struct ctdb_msg_channel_init_state *state = tevent_req_data(
448                 req, struct ctdb_msg_channel_init_state);
449         int err;
450
451         if (tevent_req_is_unix_error(req, &err)) {
452                 return err;
453         }
454         *pchannel = talloc_move(mem_ctx, &state->channel);
455         return 0;
456 }
457
458 struct ctdb_msg_read_state {
459         size_t buflen;
460         uint8_t *buf;
461 };
462
463 static void ctdb_msg_channel_got_msg(struct tevent_req *subreq);
464
465 struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
466                                       struct tevent_context *ev,
467                                       struct ctdb_msg_channel *channel)
468 {
469         struct tevent_req *req, *subreq;
470         struct ctdb_msg_read_state *state;
471
472         req = tevent_req_create(mem_ctx, &state,
473                                 struct ctdb_msg_read_state);
474         if (req == NULL) {
475                 return NULL;
476         }
477         subreq = read_packet_send(state, ev, channel->conn->fd,
478                 sizeof(uint32_t), ctdb_packet_more, NULL);
479         if (tevent_req_nomem(subreq, req)) {
480                 return tevent_req_post(req, ev);
481         }
482         tevent_req_set_callback(subreq, ctdb_msg_channel_got_msg, req);
483         return req;
484 }
485
486 static void ctdb_msg_channel_got_msg(struct tevent_req *subreq)
487 {
488         struct tevent_req *req = tevent_req_callback_data(
489                 subreq, struct tevent_req);
490         struct ctdb_msg_read_state *state = tevent_req_data(
491                 req, struct ctdb_msg_read_state);
492         ssize_t nread;
493         uint8_t *buf;
494         int err;
495
496         nread = read_packet_recv(subreq, state, &buf, &err);
497         if (nread == -1) {
498                 tevent_req_error(req, err);
499                 return;
500         }
501         state->buflen = nread;
502         state->buf = buf;
503         tevent_req_done(req);
504 }
505
506 int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
507                        uint8_t **pmsg, size_t *pmsg_len)
508 {
509         struct ctdb_msg_read_state *state = tevent_req_data(
510                 req, struct ctdb_msg_read_state);
511         struct ctdb_req_header *hdr;
512         struct ctdb_req_message *msg;
513         uint8_t *buf;
514         int err;
515
516         if (tevent_req_is_unix_error(req, &err)) {
517                 return err;
518         }
519
520         hdr = (struct ctdb_req_header *)state->buf;
521         if (hdr->length != state->buflen) {
522                 DEBUG(10, ("Got invalid header length\n"));
523                 return EIO;
524         }
525         if (hdr->operation != CTDB_REQ_MESSAGE) {
526                 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
527                            CTDB_REQ_MESSAGE, (int)hdr->operation));
528                 return EIO;
529         }
530         if (hdr->length < offsetof(struct ctdb_req_message, data)) {
531                 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr->length));
532                 return EIO;
533         }
534
535         msg = (struct ctdb_req_message *)hdr;
536         if (msg->datalen >
537             hdr->length - offsetof(struct ctdb_req_message, data)) {
538                 DEBUG(10, ("Got invalid datalen %d\n", (int)msg->datalen));
539                 return EIO;
540         }
541
542         buf = (uint8_t *)talloc_memdup(mem_ctx, msg->data, msg->datalen);
543         if (buf == NULL) {
544                 return ENOMEM;
545         }
546         *pmsg = buf;
547         *pmsg_len = msg->datalen;
548         return 0;
549 }