s3:lib/conn_tdb: implement connections_forall_read() based on smbXsrv_*_global_traverse()
[kai/samba.git] / source3 / lib / ctdb_conn.c
1 /*
2    Unix SMB/CIFS implementation.
3    Samba3 ctdb connection handling
4    Copyright (C) Volker Lendecke 2012
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/util/tevent_unix.h"
22 #include "ctdb_conn.h"
23
24 #ifdef CLUSTER_SUPPORT
25
26 #include "lib/async_req/async_sock.h"
27
28 struct ctdb_conn {
29         int fd;
30         struct tevent_queue *outqueue;
31 };
32
33 struct ctdb_conn_init_state {
34         struct sockaddr_un addr;
35         struct ctdb_conn *conn;
36 };
37
38 static void ctdb_conn_init_done(struct tevent_req *subreq);
39 static int ctdb_conn_destructor(struct ctdb_conn *conn);
40
41 struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
42                                        struct tevent_context *ev,
43                                        const char *sock)
44 {
45         struct tevent_req *req, *subreq;
46         struct ctdb_conn_init_state *state;
47
48         req = tevent_req_create(mem_ctx, &state, struct ctdb_conn_init_state);
49         if (req == NULL) {
50                 return NULL;
51         }
52
53         if (!lp_clustering()) {
54                 tevent_req_error(req, ENOSYS);
55                 return tevent_req_post(req, ev);
56         }
57
58         if (strlen(sock) >= sizeof(state->addr.sun_path)) {
59                 tevent_req_error(req, ENAMETOOLONG);
60                 return tevent_req_post(req, ev);
61         }
62
63         state->conn = talloc(state, struct ctdb_conn);
64         if (tevent_req_nomem(state->conn, req)) {
65                 return tevent_req_post(req, ev);
66         }
67
68         state->conn->outqueue = tevent_queue_create(
69                 state->conn, "ctdb outqueue");
70         if (tevent_req_nomem(state->conn->outqueue, req)) {
71                 return tevent_req_post(req, ev);
72         }
73
74         state->conn->fd = socket(AF_UNIX, SOCK_STREAM, 0);
75         if (state->conn->fd == -1) {
76                 tevent_req_error(req, errno);
77                 return tevent_req_post(req, ev);
78         }
79         talloc_set_destructor(state->conn, ctdb_conn_destructor);
80
81         state->addr.sun_family = AF_UNIX;
82         strncpy(state->addr.sun_path, sock, sizeof(state->addr.sun_path));
83
84         subreq = async_connect_send(state, ev, state->conn->fd,
85                                     (struct sockaddr *)&state->addr,
86                                     sizeof(state->addr));
87         if (tevent_req_nomem(subreq, req)) {
88                 return tevent_req_post(req, ev);
89         }
90         tevent_req_set_callback(subreq, ctdb_conn_init_done, req);
91         return req;
92 }
93
94 static int ctdb_conn_destructor(struct ctdb_conn *c)
95 {
96         if (c->fd != -1) {
97                 close(c->fd);
98                 c->fd = -1;
99         }
100         return 0;
101 }
102
103 static void ctdb_conn_init_done(struct tevent_req *subreq)
104 {
105         struct tevent_req *req = tevent_req_callback_data(
106                 subreq, struct tevent_req);
107         int ret, err;
108
109         ret = async_connect_recv(subreq, &err);
110         TALLOC_FREE(subreq);
111         if (ret == -1) {
112                 tevent_req_error(req, err);
113                 return;
114         }
115         tevent_req_done(req);
116 }
117
118 int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
119                         struct ctdb_conn **pconn)
120 {
121         struct ctdb_conn_init_state *state = tevent_req_data(
122                 req, struct ctdb_conn_init_state);
123         int err;
124
125         if (tevent_req_is_unix_error(req, &err)) {
126                 return err;
127         }
128         *pconn = talloc_move(mem_ctx, &state->conn);
129
130         return 0;
131 }
132
133 struct ctdb_conn_control_state {
134         struct tevent_context *ev;
135         struct ctdb_conn *conn;
136         struct ctdb_req_control req;
137         struct iovec iov[2];
138         struct ctdb_reply_control *reply;
139 };
140
141 static void ctdb_conn_control_written(struct tevent_req *subreq);
142 static void ctdb_conn_control_done(struct tevent_req *subreq);
143 static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p);
144
145 struct tevent_req *ctdb_conn_control_send(TALLOC_CTX *mem_ctx,
146                                           struct tevent_context *ev,
147                                           struct ctdb_conn *conn,
148                                           uint32_t vnn, uint32_t opcode,
149                                           uint64_t srvid, uint32_t flags,
150                                           uint8_t *data, size_t datalen)
151 {
152         struct tevent_req *req, *subreq;
153         struct ctdb_conn_control_state *state;
154         struct ctdb_req_header *hdr;
155
156         req = tevent_req_create(mem_ctx, &state,
157                                 struct ctdb_conn_control_state);
158         if (req == NULL) {
159                 return NULL;
160         }
161         state->ev = ev;
162         state->conn = conn;
163
164         hdr = &state->req.hdr;
165         hdr->length = offsetof(struct ctdb_req_control, data) + datalen;
166         hdr->ctdb_magic    = CTDB_MAGIC;
167         hdr->ctdb_version  = CTDB_VERSION;
168         hdr->operation     = CTDB_REQ_CONTROL;
169         hdr->reqid         = 1; /* FIXME */
170         hdr->destnode      = vnn;
171         state->req.opcode  = opcode;
172         state->req.srvid   = srvid;
173         state->req.datalen = datalen;
174         state->req.flags   = flags;
175
176         state->iov[0].iov_base = &state->req;
177         state->iov[0].iov_len = offsetof(struct ctdb_req_control, data);
178         state->iov[1].iov_base = data;
179         state->iov[1].iov_len = datalen;
180
181         subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
182                              state->iov, 2);
183         if (tevent_req_nomem(subreq, req)) {
184                 return tevent_req_post(req, ev);
185         }
186         tevent_req_set_callback(subreq, ctdb_conn_control_written, req);
187         return req;
188 }
189
190 static void ctdb_conn_control_written(struct tevent_req *subreq)
191 {
192         struct tevent_req *req = tevent_req_callback_data(
193                 subreq, struct tevent_req);
194         struct ctdb_conn_control_state *state = tevent_req_data(
195                 req, struct ctdb_conn_control_state);
196         ssize_t written;
197         int err;
198
199         written = writev_recv(subreq, &err);
200         TALLOC_FREE(subreq);
201         if (written == -1) {
202                 tevent_req_error(req, err);
203                 return;
204         }
205         subreq = read_packet_send(
206                 state, state->ev, state->conn->fd, sizeof(uint32_t),
207                 ctdb_packet_more, NULL);
208         if (tevent_req_nomem(subreq, req)) {
209                 return;
210         }
211         tevent_req_set_callback(subreq, ctdb_conn_control_done, req);
212 }
213
214 static ssize_t ctdb_packet_more(uint8_t *buf, size_t buflen, void *p)
215 {
216         uint32_t len;
217
218         if (buflen > sizeof(uint32_t)) {
219                 /* Been here, done */
220                 return 0;
221         }
222         memcpy(&len, buf, sizeof(len));
223         return (len - sizeof(uint32_t));
224 }
225
226 static void ctdb_conn_control_done(struct tevent_req *subreq)
227 {
228         struct tevent_req *req = tevent_req_callback_data(
229                 subreq, struct tevent_req);
230         struct ctdb_conn_control_state *state = tevent_req_data(
231                 req, struct ctdb_conn_control_state);
232         ssize_t nread;
233         uint8_t *buf;
234         int err;
235
236         nread = read_packet_recv(subreq, state, &buf, &err);
237         TALLOC_FREE(subreq);
238         if (nread == -1) {
239                 tevent_req_error(req, err);
240                 return;
241         }
242         state->reply = (struct ctdb_reply_control *)buf;
243         tevent_req_done(req);
244 }
245
246 int ctdb_conn_control_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
247                            struct ctdb_reply_control **preply)
248 {
249         struct ctdb_conn_control_state *state = tevent_req_data(
250                 req, struct ctdb_conn_control_state);
251         int err;
252
253         if (tevent_req_is_unix_error(req, &err)) {
254                 return err;
255         }
256         if (preply != NULL) {
257                 *preply = talloc_move(mem_ctx, &state->reply);
258         }
259         return 0;
260 }
261
262 struct ctdb_conn_msg_write_state {
263         struct ctdb_req_message ctdb_msg;
264         struct iovec iov[2];
265 };
266
267 static void ctdb_conn_msg_write_done(struct tevent_req *subreq);
268
269 struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
270                                             struct tevent_context *ev,
271                                             struct ctdb_conn *conn,
272                                             uint32_t vnn, uint64_t srvid,
273                                             uint8_t *msg, size_t msg_len)
274 {
275         struct tevent_req *req, *subreq;
276         struct ctdb_conn_msg_write_state *state;
277         struct ctdb_req_header *h;
278
279         req = tevent_req_create(mem_ctx, &state,
280                                 struct ctdb_conn_msg_write_state);
281         if (req == NULL) {
282                 return NULL;
283         }
284
285         h = &state->ctdb_msg.hdr;
286
287         h->length = offsetof(struct ctdb_req_message, data) + msg_len;
288         h->ctdb_magic = CTDB_MAGIC;
289         h->ctdb_version = CTDB_VERSION;
290         h->generation = 1;
291         h->operation  = CTDB_REQ_MESSAGE;
292         h->destnode   = vnn;
293         h->srcnode    = CTDB_CURRENT_NODE;
294         h->reqid      = 0;
295         state->ctdb_msg.srvid   = srvid;
296         state->ctdb_msg.datalen = msg_len;
297
298         state->iov[0].iov_base = &state->ctdb_msg;
299         state->iov[0].iov_len = offsetof(struct ctdb_req_message, data);
300         state->iov[1].iov_base = msg;
301         state->iov[1].iov_len = msg_len;
302
303         subreq = writev_send(state, ev, conn->outqueue, conn->fd, false,
304                              state->iov, 2);
305         if (tevent_req_nomem(subreq, req)) {
306                 return tevent_req_post(req, ev);
307         }
308         tevent_req_set_callback(subreq, ctdb_conn_msg_write_done, req);
309         return req;
310 }
311
312 static void ctdb_conn_msg_write_done(struct tevent_req *subreq)
313 {
314         struct tevent_req *req = tevent_req_callback_data(
315                 subreq, struct tevent_req);
316         ssize_t written;
317         int err;
318
319         written = writev_recv(subreq, &err);
320         TALLOC_FREE(subreq);
321         if (written == -1) {
322                 tevent_req_error(req, err);
323                 return;
324         }
325         tevent_req_done(req);
326 }
327
328 int ctdb_conn_msg_write_recv(struct tevent_req *req)
329 {
330         int err;
331         if (tevent_req_is_unix_error(req, &err)) {
332                 return err;
333         }
334         return 0;
335 }
336
337 struct ctdb_msg_channel {
338         struct ctdb_conn *conn;
339 };
340
341 struct ctdb_msg_channel_init_state {
342         struct tevent_context *ev;
343         struct ctdb_conn *conn;
344         uint64_t srvid;
345         struct ctdb_msg_channel *channel;
346 };
347
348 static void ctdb_msg_channel_init_connected(struct tevent_req *subreq);
349 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq);
350
351 struct tevent_req *ctdb_msg_channel_init_send(
352         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
353         const char *sock, uint64_t srvid)
354 {
355         struct tevent_req *req, *subreq;
356         struct ctdb_msg_channel_init_state *state;
357
358         req = tevent_req_create(mem_ctx, &state,
359                                 struct ctdb_msg_channel_init_state);
360         if (req == NULL) {
361                 return NULL;
362         }
363         state->ev = ev;
364         state->srvid = srvid;
365
366         subreq = ctdb_conn_init_send(state, ev, sock);
367         if (tevent_req_nomem(subreq, req)) {
368                 return tevent_req_post(req, ev);
369         }
370         tevent_req_set_callback(subreq, ctdb_msg_channel_init_connected, req);
371         return req;
372 }
373
374 static void ctdb_msg_channel_init_connected(struct tevent_req *subreq)
375 {
376         struct tevent_req *req = tevent_req_callback_data(
377                 subreq, struct tevent_req);
378         struct ctdb_msg_channel_init_state *state = tevent_req_data(
379                 req, struct ctdb_msg_channel_init_state);
380         int ret;
381
382         ret = ctdb_conn_init_recv(subreq, state, &state->conn);
383         TALLOC_FREE(subreq);
384         if (tevent_req_error(req, ret)) {
385                 return;
386         }
387         subreq = ctdb_conn_control_send(state, state->ev, state->conn,
388                                         CTDB_CURRENT_NODE,
389                                         CTDB_CONTROL_REGISTER_SRVID,
390                                         state->srvid, 0, NULL, 0);
391         if (tevent_req_nomem(subreq, req)) {
392                 return;
393         }
394         tevent_req_set_callback(
395                 subreq, ctdb_msg_channel_init_registered_srvid, req);
396 }
397
398 static void ctdb_msg_channel_init_registered_srvid(struct tevent_req *subreq)
399 {
400         struct tevent_req *req = tevent_req_callback_data(
401                 subreq, struct tevent_req);
402         struct ctdb_msg_channel_init_state *state = tevent_req_data(
403                 req, struct ctdb_msg_channel_init_state);
404         struct ctdb_reply_control *reply;
405         int ret;
406
407         ret = ctdb_conn_control_recv(subreq, talloc_tos(), &reply);
408         TALLOC_FREE(subreq);
409         if (tevent_req_error(req, ret)) {
410                 return;
411         }
412         if (reply->status != 0) {
413                 tevent_req_error(req, EIO);
414                 return;
415         }
416         state->channel = talloc(state, struct ctdb_msg_channel);
417         if (tevent_req_nomem(state->channel, req)) {
418                 return;
419         }
420         state->channel->conn = talloc_move(state->channel, &state->conn);
421         tevent_req_done(req);
422 }
423
424 int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
425                                struct ctdb_msg_channel **pchannel)
426 {
427         struct ctdb_msg_channel_init_state *state = tevent_req_data(
428                 req, struct ctdb_msg_channel_init_state);
429         int err;
430
431         if (tevent_req_is_unix_error(req, &err)) {
432                 return err;
433         }
434         *pchannel = talloc_move(mem_ctx, &state->channel);
435         return 0;
436 }
437
438 struct ctdb_msg_read_state {
439         size_t buflen;
440         uint8_t *buf;
441 };
442
443 static void ctdb_msg_channel_got_msg(struct tevent_req *subreq);
444
445 struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
446                                       struct tevent_context *ev,
447                                       struct ctdb_msg_channel *channel)
448 {
449         struct tevent_req *req, *subreq;
450         struct ctdb_msg_read_state *state;
451
452         req = tevent_req_create(mem_ctx, &state,
453                                 struct ctdb_msg_read_state);
454         if (req == NULL) {
455                 return NULL;
456         }
457         subreq = read_packet_send(state, ev, channel->conn->fd,
458                 sizeof(uint32_t), ctdb_packet_more, NULL);
459         if (tevent_req_nomem(subreq, req)) {
460                 return tevent_req_post(req, ev);
461         }
462         tevent_req_set_callback(subreq, ctdb_msg_channel_got_msg, req);
463         return req;
464 }
465
466 static void ctdb_msg_channel_got_msg(struct tevent_req *subreq)
467 {
468         struct tevent_req *req = tevent_req_callback_data(
469                 subreq, struct tevent_req);
470         struct ctdb_msg_read_state *state = tevent_req_data(
471                 req, struct ctdb_msg_read_state);
472         ssize_t nread;
473         uint8_t *buf;
474         int err;
475
476         nread = read_packet_recv(subreq, state, &buf, &err);
477         if (nread == -1) {
478                 tevent_req_error(req, err);
479                 return;
480         }
481         state->buflen = nread;
482         state->buf = buf;
483         tevent_req_done(req);
484 }
485
486 int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
487                        uint8_t **pmsg, size_t *pmsg_len)
488 {
489         struct ctdb_msg_read_state *state = tevent_req_data(
490                 req, struct ctdb_msg_read_state);
491         struct ctdb_req_header *hdr;
492         struct ctdb_req_message *msg;
493         uint8_t *buf;
494         int err;
495
496         if (tevent_req_is_unix_error(req, &err)) {
497                 return err;
498         }
499
500         hdr = (struct ctdb_req_header *)state->buf;
501         if (hdr->length != state->buflen) {
502                 DEBUG(10, ("Got invalid header length\n"));
503                 return EIO;
504         }
505         if (hdr->operation != CTDB_REQ_MESSAGE) {
506                 DEBUG(10, ("Expected %d (CTDB_REQ_MESSAGE), got %d\n",
507                            CTDB_REQ_MESSAGE, (int)hdr->operation));
508                 return EIO;
509         }
510         if (hdr->length < offsetof(struct ctdb_req_message, data)) {
511                 DEBUG(10, ("Got short msg, len=%d\n", (int)hdr->length));
512                 return EIO;
513         }
514
515         msg = (struct ctdb_req_message *)hdr;
516         if (msg->datalen >
517             hdr->length - offsetof(struct ctdb_req_message, data)) {
518                 DEBUG(10, ("Got invalid datalen %d\n", (int)msg->datalen));
519                 return EIO;
520         }
521
522         buf = (uint8_t *)talloc_memdup(mem_ctx, msg->data, msg->datalen);
523         if (buf == NULL) {
524                 return ENOMEM;
525         }
526         *pmsg = buf;
527         *pmsg_len = msg->datalen;
528         return 0;
529 }
530
531 #else
532
533 struct dummy_state {
534         uint8_t dummy;
535 };
536
537 static struct tevent_req *dummy_send(TALLOC_CTX *mem_ctx,
538                                      struct tevent_context *ev)
539 {
540         struct tevent_req *req;
541         struct dummy_state *state;
542         req = tevent_req_create(mem_ctx, &state, struct dummy_state);
543         if (req == NULL) {
544                 return NULL;
545         }
546         tevent_req_done(req);
547         return tevent_req_post(req, ev);
548 }
549
550 struct tevent_req *ctdb_conn_init_send(TALLOC_CTX *mem_ctx,
551                                        struct tevent_context *ev,
552                                        const char *sock)
553 {
554         return dummy_send(mem_ctx, ev);
555 }
556
557 int ctdb_conn_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
558                         struct ctdb_conn **pconn)
559 {
560         return ENOSYS;
561 }
562
563 struct tevent_req *ctdb_conn_msg_write_send(TALLOC_CTX *mem_ctx,
564                                             struct tevent_context *ev,
565                                             struct ctdb_conn *conn,
566                                             uint32_t vnn, uint64_t srvid,
567                                             uint8_t *msg, size_t msg_len)
568 {
569         return dummy_send(mem_ctx, ev);
570 }
571
572 int ctdb_conn_msg_write_recv(struct tevent_req *req)
573 {
574         return ENOSYS;
575 }
576
577 struct tevent_req *ctdb_msg_channel_init_send(
578         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
579         const char *sock, uint64_t srvid)
580 {
581         return dummy_send(mem_ctx, ev);
582 }
583
584 int ctdb_msg_channel_init_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
585                                struct ctdb_msg_channel **pchannel)
586 {
587         return ENOSYS;
588 }
589
590 struct tevent_req *ctdb_msg_read_send(TALLOC_CTX *mem_ctx,
591                                       struct tevent_context *ev,
592                                       struct ctdb_msg_channel *channel)
593 {
594         return dummy_send(mem_ctx, ev);
595 }
596
597 int ctdb_msg_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
598                        uint8_t **pmsg, size_t *pmsg_len)
599 {
600         return ENOSYS;
601 }
602
603 #endif