d6bcf06ac40ad177d565e25547c527a551685d89
[obnox/samba/samba-obnox.git] / source3 / lib / ctdbd_conn.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) 2007 by Volker Lendecke
5    Copyright (C) 2007 by Andrew Tridgell
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "util_tdb.h"
23 #include "serverid.h"
24 #include "ctdbd_conn.h"
25 #include "system/select.h"
26 #include "lib/sys_rw_data.h"
27 #include "lib/util/iov_buf.h"
28
29 #include "messages.h"
30
31 /* paths to these include files come from --with-ctdb= in configure */
32
33 #include "ctdb.h"
34 #include "ctdb_private.h"
35
36 struct ctdbd_srvid_cb {
37         uint64_t srvid;
38         int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
39                   uint64_t dst_srvid,
40                   const uint8_t *msg, size_t msglen,
41                   void *private_data);
42         void *private_data;
43 };
44
45 struct ctdbd_connection {
46         const char *sockname;   /* Needed in ctdbd_traverse */
47         struct messaging_context *msg_ctx;
48         uint32_t reqid;
49         uint32_t our_vnn;
50         uint64_t rand_srvid;
51         struct ctdbd_srvid_cb *callbacks;
52         int fd;
53         struct tevent_fd *fde;
54         int timeout;
55 };
56
57 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
58 {
59         conn->reqid += 1;
60         if (conn->reqid == 0) {
61                 conn->reqid += 1;
62         }
63         return conn->reqid;
64 }
65
66 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
67                               uint32_t vnn, uint32_t opcode,
68                               uint64_t srvid, uint32_t flags, TDB_DATA data,
69                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
70                               int *cstatus);
71
72 /*
73  * exit on fatal communications errors with the ctdbd daemon
74  */
75 static void cluster_fatal(const char *why)
76 {
77         DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
78         /* we don't use smb_panic() as we don't want to delay to write
79            a core file. We need to release this process id immediately
80            so that someone else can take over without getting sharing
81            violations */
82         _exit(1);
83 }
84
85 /*
86  *
87  */
88 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
89 {
90         if (DEBUGLEVEL < 11) {
91                 return;
92         }
93         DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
94                       (int)hdr->length, (int)hdr->ctdb_magic,
95                       (int)hdr->ctdb_version, (int)hdr->generation,
96                       (int)hdr->operation, (int)hdr->reqid));
97 }
98
99 /*
100  * Register a srvid with ctdbd
101  */
102 NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
103                              int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
104                                        uint64_t dst_srvid,
105                                        const uint8_t *msg, size_t msglen,
106                                        void *private_data),
107                              void *private_data)
108 {
109
110         NTSTATUS status;
111         int cstatus;
112         size_t num_callbacks;
113         struct ctdbd_srvid_cb *tmp;
114
115         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
116                                CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
117                                tdb_null, NULL, NULL, &cstatus);
118         if (!NT_STATUS_IS_OK(status)) {
119                 return status;
120         }
121
122         num_callbacks = talloc_array_length(conn->callbacks);
123
124         tmp = talloc_realloc(conn, conn->callbacks, struct ctdbd_srvid_cb,
125                              num_callbacks + 1);
126         if (tmp == NULL) {
127                 return NT_STATUS_NO_MEMORY;
128         }
129         conn->callbacks = tmp;
130
131         conn->callbacks[num_callbacks] = (struct ctdbd_srvid_cb) {
132                 .srvid = srvid, .cb = cb, .private_data = private_data
133         };
134
135         return NT_STATUS_OK;
136 }
137
138 static int ctdbd_msg_call_back(struct ctdbd_connection *conn,
139                                struct ctdb_req_message *msg)
140 {
141         size_t msg_len;
142         size_t i, num_callbacks;
143
144         msg_len = msg->hdr.length;
145         if (msg_len < offsetof(struct ctdb_req_message, data)) {
146                 DEBUG(10, ("%s: len %u too small\n", __func__,
147                            (unsigned)msg_len));
148                 return 0;
149         }
150         msg_len -= offsetof(struct ctdb_req_message, data);
151
152         if (msg_len < msg->datalen) {
153                 DEBUG(10, ("%s: msg_len=%u < msg->datalen=%u\n", __func__,
154                            (unsigned)msg_len, (unsigned)msg->datalen));
155                 return 0;
156         }
157
158         num_callbacks = talloc_array_length(conn->callbacks);
159
160         for (i=0; i<num_callbacks; i++) {
161                 struct ctdbd_srvid_cb *cb = &conn->callbacks[i];
162
163                 if ((cb->srvid == msg->srvid) && (cb->cb != NULL)) {
164                         int ret;
165
166                         ret = cb->cb(msg->hdr.srcnode, msg->hdr.destnode,
167                                      msg->srvid, msg->data, msg->datalen,
168                                      cb->private_data);
169                         if (ret != 0) {
170                                 return ret;
171                         }
172                 }
173         }
174         return 0;
175 }
176
177 /*
178  * get our vnn from the cluster
179  */
180 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
181 {
182         int32_t cstatus=-1;
183         NTSTATUS status;
184         status = ctdbd_control(conn,
185                                CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
186                                tdb_null, NULL, NULL, &cstatus);
187         if (!NT_STATUS_IS_OK(status)) {
188                 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
189                 return status;
190         }
191         *vnn = (uint32_t)cstatus;
192         return status;
193 }
194
195 /*
196  * Are we active (i.e. not banned or stopped?)
197  */
198 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
199 {
200         int32_t cstatus=-1;
201         NTSTATUS status;
202         TDB_DATA outdata;
203         struct ctdb_node_map *m;
204         uint32_t failure_flags;
205         bool ret = false;
206         int i;
207
208         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
209                                CTDB_CONTROL_GET_NODEMAP, 0, 0,
210                                tdb_null, talloc_tos(), &outdata, &cstatus);
211         if (!NT_STATUS_IS_OK(status)) {
212                 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
213                 return false;
214         }
215         if ((cstatus != 0) || (outdata.dptr == NULL)) {
216                 DEBUG(2, ("Received invalid ctdb data\n"));
217                 return false;
218         }
219
220         m = (struct ctdb_node_map *)outdata.dptr;
221
222         for (i=0; i<m->num; i++) {
223                 if (vnn == m->nodes[i].pnn) {
224                         break;
225                 }
226         }
227
228         if (i == m->num) {
229                 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
230                           (int)vnn));
231                 goto fail;
232         }
233
234         failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
235                 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
236
237         if ((m->nodes[i].flags & failure_flags) != 0) {
238                 DEBUG(2, ("Node has status %x, not active\n",
239                           (int)m->nodes[i].flags));
240                 goto fail;
241         }
242
243         ret = true;
244 fail:
245         TALLOC_FREE(outdata.dptr);
246         return ret;
247 }
248
249 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
250 {
251         return conn->our_vnn;
252 }
253
254 /*
255  * Get us a ctdb connection
256  */
257
258 static int ctdbd_connect(const char *sockname, int *pfd)
259 {
260         struct sockaddr_un addr = { 0, };
261         int fd;
262         socklen_t salen;
263         size_t namelen;
264
265         fd = socket(AF_UNIX, SOCK_STREAM, 0);
266         if (fd == -1) {
267                 int err = errno;
268                 DEBUG(3, ("Could not create socket: %s\n", strerror(err)));
269                 return err;
270         }
271
272         addr.sun_family = AF_UNIX;
273
274         namelen = strlcpy(addr.sun_path, sockname, sizeof(addr.sun_path));
275         if (namelen >= sizeof(addr.sun_path)) {
276                 DEBUG(3, ("%s: Socket name too long: %s\n", __func__,
277                           sockname));
278                 close(fd);
279                 return ENAMETOOLONG;
280         }
281
282         salen = sizeof(struct sockaddr_un);
283
284         if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
285                 int err = errno;
286                 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
287                           strerror(err)));
288                 close(fd);
289                 return err;
290         }
291
292         *pfd = fd;
293         return 0;
294 }
295
296 static int ctdb_read_packet(int fd, int timeout, TALLOC_CTX *mem_ctx,
297                             struct ctdb_req_header **result)
298 {
299         struct ctdb_req_header *req;
300         int ret, revents;
301         uint32_t msglen;
302         ssize_t nread;
303
304         if (timeout != -1) {
305                 ret = poll_one_fd(fd, POLLIN, timeout, &revents);
306                 if (ret == -1) {
307                         return errno;
308                 }
309                 if (ret == 0) {
310                         return ETIMEDOUT;
311                 }
312                 if (ret != 1) {
313                         return EIO;
314                 }
315         }
316
317         nread = read_data(fd, &msglen, sizeof(msglen));
318         if (nread == -1) {
319                 return errno;
320         }
321         if (nread == 0) {
322                 return EIO;
323         }
324
325         if (msglen < sizeof(struct ctdb_req_header)) {
326                 return EIO;
327         }
328
329         req = talloc_size(mem_ctx, msglen);
330         if (req == NULL) {
331                 return ENOMEM;
332         }
333         talloc_set_name_const(req, "struct ctdb_req_header");
334
335         req->length = msglen;
336
337         nread = read_data(fd, ((char *)req) + sizeof(msglen),
338                           msglen - sizeof(msglen));
339         if (nread == -1) {
340                 return errno;
341         }
342         if (nread == 0) {
343                 return EIO;
344         }
345
346         *result = req;
347         return 0;
348 }
349
350 /*
351  * Read a full ctdbd request. If we have a messaging context, defer incoming
352  * messages that might come in between.
353  */
354
355 static int ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
356                          TALLOC_CTX *mem_ctx, struct ctdb_req_header **result)
357 {
358         struct ctdb_req_header *hdr;
359         int ret;
360
361  next_pkt:
362
363         ret = ctdb_read_packet(conn->fd, conn->timeout, mem_ctx, &hdr);
364         if (ret != 0) {
365                 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
366                 cluster_fatal("ctdbd died\n");
367         }
368
369         DEBUG(11, ("Received ctdb packet\n"));
370         ctdb_packet_dump(hdr);
371
372         if (hdr->operation == CTDB_REQ_MESSAGE) {
373                 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
374
375                 if (conn->msg_ctx == NULL) {
376                         DEBUG(1, ("Got a message without having a msg ctx, "
377                                   "dropping msg %llu\n",
378                                   (long long unsigned)msg->srvid));
379                         TALLOC_FREE(hdr);
380                         goto next_pkt;
381                 }
382
383                 ret = ctdbd_msg_call_back(conn, msg);
384                 if (ret != 0) {
385                         TALLOC_FREE(hdr);
386                         return ret;
387                 }
388
389                 TALLOC_FREE(hdr);
390                 goto next_pkt;
391         }
392
393         if ((reqid != 0) && (hdr->reqid != reqid)) {
394                 /* we got the wrong reply */
395                 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
396                          "been %u\n", hdr->reqid, reqid));
397                 TALLOC_FREE(hdr);
398                 goto next_pkt;
399         }
400
401         *result = talloc_move(mem_ctx, &hdr);
402
403         return 0;
404 }
405
406 static int ctdbd_connection_destructor(struct ctdbd_connection *c)
407 {
408         TALLOC_FREE(c->fde);
409         if (c->fd != -1) {
410                 close(c->fd);
411                 c->fd = -1;
412         }
413         return 0;
414 }
415 /*
416  * Get us a ctdbd connection
417  */
418
419 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
420                                       const char *sockname, int timeout,
421                                       struct ctdbd_connection **pconn)
422 {
423         struct ctdbd_connection *conn;
424         int ret;
425         NTSTATUS status;
426
427         if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
428                 DEBUG(0, ("talloc failed\n"));
429                 return NT_STATUS_NO_MEMORY;
430         }
431
432         conn->sockname = talloc_strdup(conn, sockname);
433         if (conn->sockname == NULL) {
434                 DBG_ERR("%s: talloc failed\n", __func__);
435                 status = NT_STATUS_NO_MEMORY;
436                 goto fail;
437         }
438
439         conn->timeout = timeout;
440
441         if (conn->timeout == 0) {
442                 conn->timeout = -1;
443         }
444
445         ret = ctdbd_connect(conn->sockname, &conn->fd);
446         if (ret != 0) {
447                 status = map_nt_error_from_unix(ret);
448                 DEBUG(1, ("ctdbd_connect failed: %s\n", strerror(ret)));
449                 goto fail;
450         }
451         talloc_set_destructor(conn, ctdbd_connection_destructor);
452
453         status = get_cluster_vnn(conn, &conn->our_vnn);
454
455         if (!NT_STATUS_IS_OK(status)) {
456                 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
457                 goto fail;
458         }
459
460         if (!ctdbd_working(conn, conn->our_vnn)) {
461                 DEBUG(2, ("Node is not working, can not connect\n"));
462                 status = NT_STATUS_INTERNAL_DB_ERROR;
463                 goto fail;
464         }
465
466         generate_random_buffer((unsigned char *)&conn->rand_srvid,
467                                sizeof(conn->rand_srvid));
468
469         status = register_with_ctdbd(conn, conn->rand_srvid, NULL, NULL);
470
471         if (!NT_STATUS_IS_OK(status)) {
472                 DEBUG(5, ("Could not register random srvid: %s\n",
473                           nt_errstr(status)));
474                 goto fail;
475         }
476
477         *pconn = conn;
478         return NT_STATUS_OK;
479
480  fail:
481         TALLOC_FREE(conn);
482         return status;
483 }
484
485 /*
486  * Get us a ctdbd connection and register us as a process
487  */
488
489 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
490                                     const char *sockname, int timeout,
491                                     struct ctdbd_connection **pconn)
492 {
493         struct ctdbd_connection *conn;
494         NTSTATUS status;
495
496         status = ctdbd_init_connection(mem_ctx, sockname, timeout, &conn);
497
498         if (!NT_STATUS_IS_OK(status)) {
499                 return status;
500         }
501
502         status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL);
503         if (!NT_STATUS_IS_OK(status)) {
504                 goto fail;
505         }
506
507         *pconn = conn;
508         return NT_STATUS_OK;
509
510  fail:
511         TALLOC_FREE(conn);
512         return status;
513 }
514
515 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
516 {
517         return conn->msg_ctx;
518 }
519
520 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
521 {
522         return conn->fd;
523 }
524
525 /*
526  * Packet handler to receive and handle a ctdb message
527  */
528 static int ctdb_handle_message(struct ctdbd_connection *conn,
529                                struct ctdb_req_header *hdr)
530 {
531         struct ctdb_req_message *msg;
532
533         if (hdr->operation != CTDB_REQ_MESSAGE) {
534                 DEBUG(0, ("Received async msg of type %u, discarding\n",
535                           hdr->operation));
536                 return EINVAL;
537         }
538
539         msg = (struct ctdb_req_message *)hdr;
540
541         ctdbd_msg_call_back(conn, msg);
542
543         return 0;
544 }
545
546 /*
547  * The ctdbd socket is readable asynchronuously
548  */
549
550 static void ctdbd_socket_handler(struct tevent_context *event_ctx,
551                                  struct tevent_fd *event,
552                                  uint16_t flags,
553                                  void *private_data)
554 {
555         struct ctdbd_connection *conn = talloc_get_type_abort(
556                 private_data, struct ctdbd_connection);
557         struct ctdb_req_header *hdr = NULL;
558         int ret;
559
560         ret = ctdb_read_packet(conn->fd, conn->timeout, talloc_tos(), &hdr);
561         if (ret != 0) {
562                 DEBUG(0, ("ctdb_read_packet failed: %s\n", strerror(ret)));
563                 cluster_fatal("ctdbd died\n");
564         }
565
566         ret = ctdb_handle_message(conn, hdr);
567
568         TALLOC_FREE(hdr);
569
570         if (ret != 0) {
571                 DEBUG(10, ("could not handle incoming message: %s\n",
572                            strerror(ret)));
573         }
574 }
575
576 /*
577  * Prepare a ctdbd connection to receive messages
578  */
579
580 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
581                                 struct messaging_context *msg_ctx)
582 {
583         SMB_ASSERT(conn->msg_ctx == NULL);
584         SMB_ASSERT(conn->fde == NULL);
585
586         if (!(conn->fde = tevent_add_fd(messaging_tevent_context(msg_ctx),
587                                        conn,
588                                        conn->fd,
589                                        TEVENT_FD_READ,
590                                        ctdbd_socket_handler,
591                                        conn))) {
592                 DEBUG(0, ("event_add_fd failed\n"));
593                 return NT_STATUS_NO_MEMORY;
594         }
595
596         conn->msg_ctx = msg_ctx;
597
598         return NT_STATUS_OK;
599 }
600
601 NTSTATUS ctdbd_messaging_send_iov(struct ctdbd_connection *conn,
602                                   uint32_t dst_vnn, uint64_t dst_srvid,
603                                   const struct iovec *iov, int iovlen)
604 {
605         struct ctdb_req_message r;
606         struct iovec iov2[iovlen+1];
607         size_t buflen = iov_buflen(iov, iovlen);
608         ssize_t nwritten;
609
610         r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
611         r.hdr.ctdb_magic = CTDB_MAGIC;
612         r.hdr.ctdb_version = CTDB_PROTOCOL;
613         r.hdr.generation = 1;
614         r.hdr.operation  = CTDB_REQ_MESSAGE;
615         r.hdr.destnode   = dst_vnn;
616         r.hdr.srcnode    = conn->our_vnn;
617         r.hdr.reqid      = 0;
618         r.srvid          = dst_srvid;
619         r.datalen        = buflen;
620
621         DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
622         ctdb_packet_dump(&r.hdr);
623
624         iov2[0].iov_base = &r;
625         iov2[0].iov_len = offsetof(struct ctdb_req_message, data);
626         memcpy(&iov2[1], iov, iovlen * sizeof(struct iovec));
627
628         nwritten = write_data_iov(conn->fd, iov2, iovlen+1);
629         if (nwritten == -1) {
630                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
631                 cluster_fatal("cluster dispatch daemon msg write error\n");
632         }
633
634         return NT_STATUS_OK;
635 }
636
637 /*
638  * send/recv a generic ctdb control message
639  */
640 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
641                               uint32_t vnn, uint32_t opcode,
642                               uint64_t srvid, uint32_t flags,
643                               TDB_DATA data,
644                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
645                               int *cstatus)
646 {
647         struct ctdb_req_control req;
648         struct ctdb_req_header *hdr;
649         struct ctdb_reply_control *reply = NULL;
650         struct iovec iov[2];
651         ssize_t nwritten;
652         NTSTATUS status;
653         int ret;
654
655         ZERO_STRUCT(req);
656         req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
657         req.hdr.ctdb_magic   = CTDB_MAGIC;
658         req.hdr.ctdb_version = CTDB_PROTOCOL;
659         req.hdr.operation    = CTDB_REQ_CONTROL;
660         req.hdr.reqid        = ctdbd_next_reqid(conn);
661         req.hdr.destnode     = vnn;
662         req.opcode           = opcode;
663         req.srvid            = srvid;
664         req.datalen          = data.dsize;
665         req.flags            = flags;
666
667         DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
668         ctdb_packet_dump(&req.hdr);
669
670         iov[0].iov_base = &req;
671         iov[0].iov_len = offsetof(struct ctdb_req_control, data);
672         iov[1].iov_base = data.dptr;
673         iov[1].iov_len = data.dsize;
674
675         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
676         if (nwritten == -1) {
677                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
678                 cluster_fatal("cluster dispatch daemon msg write error\n");
679         }
680
681         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
682                 if (cstatus) {
683                         *cstatus = 0;
684                 }
685                 return NT_STATUS_OK;
686         }
687
688         ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
689         if (ret != 0) {
690                 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
691                 status = map_nt_error_from_unix(ret);
692                 goto fail;
693         }
694
695         if (hdr->operation != CTDB_REPLY_CONTROL) {
696                 DEBUG(0, ("received invalid reply\n"));
697                 goto fail;
698         }
699         reply = (struct ctdb_reply_control *)hdr;
700
701         if (outdata) {
702                 if (!(outdata->dptr = (uint8_t *)talloc_memdup(
703                               mem_ctx, reply->data, reply->datalen))) {
704                         TALLOC_FREE(reply);
705                         return NT_STATUS_NO_MEMORY;
706                 }
707                 outdata->dsize = reply->datalen;
708         }
709         if (cstatus) {
710                 (*cstatus) = reply->status;
711         }
712
713         status = NT_STATUS_OK;
714
715  fail:
716         TALLOC_FREE(reply);
717         return status;
718 }
719
720 /*
721  * see if a remote process exists
722  */
723 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
724 {
725         struct server_id id;
726         bool result;
727
728         id.pid = pid;
729         id.vnn = vnn;
730
731         if (!ctdb_processes_exist(conn, &id, 1, &result)) {
732                 DEBUG(10, ("ctdb_processes_exist failed\n"));
733                 return false;
734         }
735         return result;
736 }
737
738 bool ctdb_processes_exist(struct ctdbd_connection *conn,
739                           const struct server_id *pids, int num_pids,
740                           bool *results)
741 {
742         TALLOC_CTX *frame = talloc_stackframe();
743         int i, num_received;
744         uint32_t *reqids;
745         bool result = false;
746
747         reqids = talloc_array(talloc_tos(), uint32_t, num_pids);
748         if (reqids == NULL) {
749                 goto fail;
750         }
751
752         for (i=0; i<num_pids; i++) {
753                 struct ctdb_req_control req;
754                 pid_t pid;
755                 struct iovec iov[2];
756                 ssize_t nwritten;
757
758                 results[i] = false;
759                 reqids[i] = ctdbd_next_reqid(conn);
760
761                 ZERO_STRUCT(req);
762
763                 /*
764                  * pids[i].pid is uint64_t, scale down to pid_t which
765                  * is the wire protocol towards ctdb.
766                  */
767                 pid = pids[i].pid;
768
769                 DEBUG(10, ("Requesting PID %d/%d, reqid=%d\n",
770                            (int)pids[i].vnn, (int)pid,
771                            (int)reqids[i]));
772
773                 req.hdr.length = offsetof(struct ctdb_req_control, data);
774                 req.hdr.length += sizeof(pid);
775                 req.hdr.ctdb_magic   = CTDB_MAGIC;
776                 req.hdr.ctdb_version = CTDB_PROTOCOL;
777                 req.hdr.operation    = CTDB_REQ_CONTROL;
778                 req.hdr.reqid        = reqids[i];
779                 req.hdr.destnode     = pids[i].vnn;
780                 req.opcode           = CTDB_CONTROL_PROCESS_EXISTS;
781                 req.srvid            = 0;
782                 req.datalen          = sizeof(pid);
783                 req.flags            = 0;
784
785                 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
786                 ctdb_packet_dump(&req.hdr);
787
788                 iov[0].iov_base = &req;
789                 iov[0].iov_len = offsetof(struct ctdb_req_control, data);
790                 iov[1].iov_base = &pid;
791                 iov[1].iov_len = sizeof(pid);
792
793                 nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
794                 if (nwritten == -1) {
795                         DEBUG(10, ("write_data_iov failed: %s\n",
796                                    strerror(errno)));
797                         goto fail;
798                 }
799         }
800
801         num_received = 0;
802
803         while (num_received < num_pids) {
804                 struct ctdb_req_header *hdr;
805                 struct ctdb_reply_control *reply;
806                 uint32_t reqid;
807                 int ret;
808
809                 ret = ctdb_read_req(conn, 0, talloc_tos(), &hdr);
810                 if (ret != 0) {
811                         DEBUG(10, ("ctdb_read_req failed: %s\n",
812                                    strerror(ret)));
813                         goto fail;
814                 }
815
816                 if (hdr->operation != CTDB_REPLY_CONTROL) {
817                         DEBUG(10, ("Received invalid reply\n"));
818                         goto fail;
819                 }
820                 reply = (struct ctdb_reply_control *)hdr;
821
822                 reqid = reply->hdr.reqid;
823
824                 DEBUG(10, ("Received reqid %d\n", (int)reqid));
825
826                 for (i=0; i<num_pids; i++) {
827                         if (reqid == reqids[i]) {
828                                 break;
829                         }
830                 }
831                 if (i == num_pids) {
832                         DEBUG(10, ("Received unknown record number %u\n",
833                                    (unsigned)reqid));
834                         goto fail;
835                 }
836                 results[i] = ((reply->status) == 0);
837                 TALLOC_FREE(reply);
838                 num_received += 1;
839         }
840
841         result = true;
842 fail:
843         TALLOC_FREE(frame);
844         return result;
845 }
846
847 /*
848  * Get a db path
849  */
850 char *ctdbd_dbpath(struct ctdbd_connection *conn,
851                    TALLOC_CTX *mem_ctx, uint32_t db_id)
852 {
853         NTSTATUS status;
854         TDB_DATA data;
855         TDB_DATA rdata = {0};
856         int32_t cstatus = 0;
857
858         data.dptr = (uint8_t*)&db_id;
859         data.dsize = sizeof(db_id);
860
861         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
862                                CTDB_CONTROL_GETDBPATH, 0, 0, data,
863                                mem_ctx, &rdata, &cstatus);
864         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
865                 DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
866                 return NULL;
867         }
868
869         return (char *)rdata.dptr;
870 }
871
872 /*
873  * attach to a ctdb database
874  */
875 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
876                          const char *name, uint32_t *db_id, int tdb_flags)
877 {
878         NTSTATUS status;
879         TDB_DATA data;
880         int32_t cstatus;
881         bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
882
883         data = string_term_tdb_data(name);
884
885         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
886                                persistent
887                                ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
888                                : CTDB_CONTROL_DB_ATTACH,
889                                tdb_flags, 0, data, NULL, &data, &cstatus);
890         if (!NT_STATUS_IS_OK(status)) {
891                 DEBUG(0, (__location__ " ctdb_control for db_attach "
892                           "failed: %s\n", nt_errstr(status)));
893                 return status;
894         }
895
896         if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
897                 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
898                 return NT_STATUS_INTERNAL_ERROR;
899         }
900
901         *db_id = *(uint32_t *)data.dptr;
902         talloc_free(data.dptr);
903
904         if (!(tdb_flags & TDB_SEQNUM)) {
905                 return NT_STATUS_OK;
906         }
907
908         data.dptr = (uint8_t *)db_id;
909         data.dsize = sizeof(*db_id);
910
911         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
912                                CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data,
913                                NULL, NULL, &cstatus);
914         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
915                 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
916                          "failed\n"));
917                 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
918                         status;
919         }
920
921         return NT_STATUS_OK;
922 }
923
924 /*
925  * force the migration of a record to this node
926  */
927 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
928                        TDB_DATA key)
929 {
930         struct ctdb_req_call req;
931         struct ctdb_req_header *hdr;
932         struct iovec iov[2];
933         ssize_t nwritten;
934         NTSTATUS status;
935         int ret;
936
937         ZERO_STRUCT(req);
938
939         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
940         req.hdr.ctdb_magic   = CTDB_MAGIC;
941         req.hdr.ctdb_version = CTDB_PROTOCOL;
942         req.hdr.operation    = CTDB_REQ_CALL;
943         req.hdr.reqid        = ctdbd_next_reqid(conn);
944         req.flags            = CTDB_IMMEDIATE_MIGRATION;
945         req.callid           = CTDB_NULL_FUNC;
946         req.db_id            = db_id;
947         req.keylen           = key.dsize;
948
949         DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
950         ctdb_packet_dump(&req.hdr);
951
952         iov[0].iov_base = &req;
953         iov[0].iov_len = offsetof(struct ctdb_req_call, data);
954         iov[1].iov_base = key.dptr;
955         iov[1].iov_len = key.dsize;
956
957         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
958         if (nwritten == -1) {
959                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
960                 cluster_fatal("cluster dispatch daemon msg write error\n");
961         }
962
963         ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
964         if (ret != 0) {
965                 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
966                 status = map_nt_error_from_unix(ret);
967                 goto fail;
968         }
969
970         if (hdr->operation != CTDB_REPLY_CALL) {
971                 DEBUG(0, ("received invalid reply\n"));
972                 status = NT_STATUS_INTERNAL_ERROR;
973                 goto fail;
974         }
975
976         status = NT_STATUS_OK;
977  fail:
978
979         TALLOC_FREE(hdr);
980         return status;
981 }
982
983 /*
984  * Fetch a record and parse it
985  */
986 NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
987                      TDB_DATA key, bool local_copy,
988                      void (*parser)(TDB_DATA key, TDB_DATA data,
989                                     void *private_data),
990                      void *private_data)
991 {
992         struct ctdb_req_call req;
993         struct ctdb_req_header *hdr = NULL;
994         struct ctdb_reply_call *reply;
995         struct iovec iov[2];
996         ssize_t nwritten;
997         NTSTATUS status;
998         uint32_t flags;
999         int ret;
1000
1001         flags = local_copy ? CTDB_WANT_READONLY : 0;
1002
1003         ZERO_STRUCT(req);
1004
1005         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1006         req.hdr.ctdb_magic   = CTDB_MAGIC;
1007         req.hdr.ctdb_version = CTDB_PROTOCOL;
1008         req.hdr.operation    = CTDB_REQ_CALL;
1009         req.hdr.reqid        = ctdbd_next_reqid(conn);
1010         req.flags            = flags;
1011         req.callid           = CTDB_FETCH_FUNC;
1012         req.db_id            = db_id;
1013         req.keylen           = key.dsize;
1014
1015         iov[0].iov_base = &req;
1016         iov[0].iov_len = offsetof(struct ctdb_req_call, data);
1017         iov[1].iov_base = key.dptr;
1018         iov[1].iov_len = key.dsize;
1019
1020         nwritten = write_data_iov(conn->fd, iov, ARRAY_SIZE(iov));
1021         if (nwritten == -1) {
1022                 DEBUG(3, ("write_data_iov failed: %s\n", strerror(errno)));
1023                 cluster_fatal("cluster dispatch daemon msg write error\n");
1024         }
1025
1026         ret = ctdb_read_req(conn, req.hdr.reqid, NULL, &hdr);
1027         if (ret != 0) {
1028                 DEBUG(10, ("ctdb_read_req failed: %s\n", strerror(ret)));
1029                 status = map_nt_error_from_unix(ret);
1030                 goto fail;
1031         }
1032
1033         if ((hdr == NULL) || (hdr->operation != CTDB_REPLY_CALL)) {
1034                 DEBUG(0, ("received invalid reply\n"));
1035                 status = NT_STATUS_INTERNAL_ERROR;
1036                 goto fail;
1037         }
1038         reply = (struct ctdb_reply_call *)hdr;
1039
1040         if (reply->datalen == 0) {
1041                 /*
1042                  * Treat an empty record as non-existing
1043                  */
1044                 status = NT_STATUS_NOT_FOUND;
1045                 goto fail;
1046         }
1047
1048         parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1049                private_data);
1050
1051         status = NT_STATUS_OK;
1052  fail:
1053         TALLOC_FREE(hdr);
1054         return status;
1055 }
1056
1057 /*
1058   Traverse a ctdb database. This uses a kind-of hackish way to open a second
1059   connection to ctdbd to avoid the hairy recursive and async problems with
1060   everything in-line.
1061 */
1062
1063 NTSTATUS ctdbd_traverse(struct ctdbd_connection *master, uint32_t db_id,
1064                         void (*fn)(TDB_DATA key, TDB_DATA data,
1065                                    void *private_data),
1066                         void *private_data)
1067 {
1068         struct ctdbd_connection *conn;
1069         NTSTATUS status;
1070
1071         TDB_DATA key, data;
1072         struct ctdb_traverse_start t;
1073         int cstatus;
1074
1075         become_root();
1076         status = ctdbd_init_connection(NULL, master->sockname, master->timeout,
1077                                        &conn);
1078         unbecome_root();
1079         if (!NT_STATUS_IS_OK(status)) {
1080                 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1081                           nt_errstr(status)));
1082                 return status;
1083         }
1084
1085         t.db_id = db_id;
1086         t.srvid = conn->rand_srvid;
1087         t.reqid = ctdbd_next_reqid(conn);
1088
1089         data.dptr = (uint8_t *)&t;
1090         data.dsize = sizeof(t);
1091
1092         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1093                                CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1094                                data, NULL, NULL, &cstatus);
1095
1096         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1097
1098                 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1099                          cstatus));
1100
1101                 if (NT_STATUS_IS_OK(status)) {
1102                         /*
1103                          * We need a mapping here
1104                          */
1105                         status = NT_STATUS_UNSUCCESSFUL;
1106                 }
1107                 TALLOC_FREE(conn);
1108                 return status;
1109         }
1110
1111         while (True) {
1112                 struct ctdb_req_header *hdr = NULL;
1113                 struct ctdb_req_message *m;
1114                 struct ctdb_rec_data *d;
1115                 int ret;
1116
1117                 ret = ctdb_read_packet(conn->fd, conn->timeout, conn, &hdr);
1118                 if (ret != 0) {
1119                         DEBUG(0, ("ctdb_read_packet failed: %s\n",
1120                                   strerror(ret)));
1121                         cluster_fatal("ctdbd died\n");
1122                 }
1123
1124                 if (hdr->operation != CTDB_REQ_MESSAGE) {
1125                         DEBUG(0, ("Got operation %u, expected a message\n",
1126                                   (unsigned)hdr->operation));
1127                         TALLOC_FREE(conn);
1128                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1129                 }
1130
1131                 m = (struct ctdb_req_message *)hdr;
1132                 d = (struct ctdb_rec_data *)&m->data[0];
1133                 if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1134                         DEBUG(0, ("Got invalid traverse data of length %d\n",
1135                                   (int)m->datalen));
1136                         TALLOC_FREE(conn);
1137                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1138                 }
1139
1140                 key.dsize = d->keylen;
1141                 key.dptr  = &d->data[0];
1142                 data.dsize = d->datalen;
1143                 data.dptr = &d->data[d->keylen];
1144
1145                 if (key.dsize == 0 && data.dsize == 0) {
1146                         /* end of traverse */
1147                         TALLOC_FREE(conn);
1148                         return NT_STATUS_OK;
1149                 }
1150
1151                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1152                         DEBUG(0, ("Got invalid ltdb header length %d\n",
1153                                   (int)data.dsize));
1154                         TALLOC_FREE(conn);
1155                         return NT_STATUS_UNEXPECTED_IO_ERROR;
1156                 }
1157                 data.dsize -= sizeof(struct ctdb_ltdb_header);
1158                 data.dptr += sizeof(struct ctdb_ltdb_header);
1159
1160                 if (fn != NULL) {
1161                         fn(key, data, private_data);
1162                 }
1163         }
1164         return NT_STATUS_OK;
1165 }
1166
1167 /*
1168    This is used to canonicalize a ctdb_sock_addr structure.
1169 */
1170 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1171                                       struct sockaddr_storage *out)
1172 {
1173         memcpy(out, in, sizeof (*out));
1174
1175 #ifdef HAVE_IPV6
1176         if (in->ss_family == AF_INET6) {
1177                 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1178                 const struct sockaddr_in6 *in6 =
1179                         (const struct sockaddr_in6 *)in;
1180                 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1181                 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1182                         memset(out, 0, sizeof(*out));
1183 #ifdef HAVE_SOCK_SIN_LEN
1184                         out4->sin_len = sizeof(*out);
1185 #endif
1186                         out4->sin_family = AF_INET;
1187                         out4->sin_port   = in6->sin6_port;
1188                         memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1189                 }
1190         }
1191 #endif
1192 }
1193
1194 /*
1195  * Register us as a server for a particular tcp connection
1196  */
1197
1198 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1199                             const struct sockaddr_storage *_server,
1200                             const struct sockaddr_storage *_client,
1201                             int (*cb)(uint32_t src_vnn, uint32_t dst_vnn,
1202                                       uint64_t dst_srvid,
1203                                       const uint8_t *msg, size_t msglen,
1204                                       void *private_data),
1205                             void *private_data)
1206 {
1207         struct ctdb_control_tcp_addr p;
1208         TDB_DATA data = { .dptr = (uint8_t *)&p, .dsize = sizeof(p) };
1209         NTSTATUS status;
1210         struct sockaddr_storage client;
1211         struct sockaddr_storage server;
1212
1213         /*
1214          * Only one connection so far
1215          */
1216
1217         smbd_ctdb_canonicalize_ip(_client, &client);
1218         smbd_ctdb_canonicalize_ip(_server, &server);
1219
1220         switch (client.ss_family) {
1221         case AF_INET:
1222                 memcpy(&p.dest.ip, &server, sizeof(p.dest.ip));
1223                 memcpy(&p.src.ip, &client, sizeof(p.src.ip));
1224                 break;
1225         case AF_INET6:
1226                 memcpy(&p.dest.ip6, &server, sizeof(p.dest.ip6));
1227                 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1228                 break;
1229         default:
1230                 return NT_STATUS_INTERNAL_ERROR;
1231         }
1232
1233         /*
1234          * We want to be told about IP releases
1235          */
1236
1237         status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP,
1238                                      cb, private_data);
1239         if (!NT_STATUS_IS_OK(status)) {
1240                 return status;
1241         }
1242
1243         /*
1244          * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1245          * can send an extra ack to trigger a reset for our client, so it
1246          * immediately reconnects
1247          */
1248         return ctdbd_control(conn, CTDB_CURRENT_NODE,
1249                              CTDB_CONTROL_TCP_CLIENT, 0,
1250                              CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1251 }
1252
1253 /*
1254   call a control on the local node
1255  */
1256 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1257                              uint64_t srvid, uint32_t flags, TDB_DATA data,
1258                              TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1259                              int *cstatus)
1260 {
1261         return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1262 }
1263
1264 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1265 {
1266         struct ctdb_client_notify_register reg_data;
1267         size_t struct_len;
1268         NTSTATUS status;
1269         int cstatus;
1270
1271         reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1272         reg_data.len = 1;
1273         reg_data.notify_data[0] = 0;
1274
1275         struct_len = offsetof(struct ctdb_client_notify_register,
1276                               notify_data) + reg_data.len;
1277
1278         status = ctdbd_control_local(
1279                 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1280                 make_tdb_data((uint8_t *)&reg_data, struct_len),
1281                 NULL, NULL, &cstatus);
1282         if (!NT_STATUS_IS_OK(status)) {
1283                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1284                           nt_errstr(status)));
1285         }
1286         return status;
1287 }
1288
1289 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1290 {
1291         struct ctdb_client_notify_deregister dereg_data;
1292         NTSTATUS status;
1293         int cstatus;
1294
1295         dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1296
1297         status = ctdbd_control_local(
1298                 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1299                 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1300                 NULL, NULL, &cstatus);
1301         if (!NT_STATUS_IS_OK(status)) {
1302                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1303                           nt_errstr(status)));
1304         }
1305         return status;
1306 }
1307
1308 NTSTATUS ctdbd_probe(const char *sockname, int timeout)
1309 {
1310         /*
1311          * Do a very early check if ctdbd is around to avoid an abort and core
1312          * later
1313          */
1314         struct ctdbd_connection *conn = NULL;
1315         NTSTATUS status;
1316
1317         status = ctdbd_messaging_connection(talloc_tos(), sockname, timeout,
1318                                             &conn);
1319
1320         /*
1321          * We only care if we can connect.
1322          */
1323         TALLOC_FREE(conn);
1324
1325         return status;
1326 }