s3:ctdbd_conn: add ctdb_serverids_exist_supported()
[obnox/samba/samba-obnox.git] / source3 / lib / ctdbd_conn.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) 2007 by Volker Lendecke
5    Copyright (C) 2007 by Andrew Tridgell
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 #include "includes.h"
22 #include "util_tdb.h"
23 #include "serverid.h"
24 #include "ctdbd_conn.h"
25
26 #ifdef CLUSTER_SUPPORT
27
28 #include "ctdb_packet.h"
29 #include "messages.h"
30
31 /*
32  * It is not possible to include ctdb.h and tdb_compat.h (included via
33  * some other include above) without warnings. This fixes those
34  * warnings.
35  */
36
37 #ifdef typesafe_cb
38 #undef typesafe_cb
39 #endif
40
41 #ifdef typesafe_cb_preargs
42 #undef typesafe_cb_preargs
43 #endif
44
45 #ifdef typesafe_cb_postargs
46 #undef typesafe_cb_postargs
47 #endif
48
49 /* paths to these include files come from --with-ctdb= in configure */
50
51 #include "ctdb.h"
52 #include "ctdb_private.h"
53
54 struct ctdbd_connection {
55         struct messaging_context *msg_ctx;
56         uint32_t reqid;
57         uint32_t our_vnn;
58         uint64_t rand_srvid;
59         struct ctdb_packet_context *pkt;
60         struct tevent_fd *fde;
61
62         bool (*release_ip_handler)(const char *ip_addr, void *private_data);
63         void *release_ip_priv;
64 };
65
66 static uint32_t ctdbd_next_reqid(struct ctdbd_connection *conn)
67 {
68         conn->reqid += 1;
69         if (conn->reqid == 0) {
70                 conn->reqid += 1;
71         }
72         return conn->reqid;
73 }
74
75 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
76                               uint32_t vnn, uint32_t opcode,
77                               uint64_t srvid, uint32_t flags, TDB_DATA data,
78                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
79                               int *cstatus);
80
81 /*
82  * exit on fatal communications errors with the ctdbd daemon
83  */
84 static void cluster_fatal(const char *why)
85 {
86         DEBUG(0,("cluster fatal event: %s - exiting immediately\n", why));
87         /* we don't use smb_panic() as we don't want to delay to write
88            a core file. We need to release this process id immediately
89            so that someone else can take over without getting sharing
90            violations */
91         _exit(1);
92 }
93
94 /*
95  *
96  */
97 static void ctdb_packet_dump(struct ctdb_req_header *hdr)
98 {
99         if (DEBUGLEVEL < 11) {
100                 return;
101         }
102         DEBUGADD(11, ("len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d\n",
103                       (int)hdr->length, (int)hdr->ctdb_magic,
104                       (int)hdr->ctdb_version, (int)hdr->generation,
105                       (int)hdr->operation, (int)hdr->reqid));
106 }
107
108 /*
109  * Register a srvid with ctdbd
110  */
111 NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid)
112 {
113
114         int cstatus;
115         return ctdbd_control(conn, CTDB_CURRENT_NODE,
116                              CTDB_CONTROL_REGISTER_SRVID, srvid, 0,
117                              tdb_null, NULL, NULL, &cstatus);
118 }
119
120 /*
121  * get our vnn from the cluster
122  */
123 static NTSTATUS get_cluster_vnn(struct ctdbd_connection *conn, uint32_t *vnn)
124 {
125         int32_t cstatus=-1;
126         NTSTATUS status;
127         status = ctdbd_control(conn,
128                                CTDB_CURRENT_NODE, CTDB_CONTROL_GET_PNN, 0, 0,
129                                tdb_null, NULL, NULL, &cstatus);
130         if (!NT_STATUS_IS_OK(status)) {
131                 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
132                 return status;
133         }
134         *vnn = (uint32_t)cstatus;
135         return status;
136 }
137
138 /*
139  * Are we active (i.e. not banned or stopped?)
140  */
141 static bool ctdbd_working(struct ctdbd_connection *conn, uint32_t vnn)
142 {
143         int32_t cstatus=-1;
144         NTSTATUS status;
145         TDB_DATA outdata;
146         struct ctdb_node_map *m;
147         uint32_t failure_flags;
148         bool ret = false;
149         int i;
150
151         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
152                                CTDB_CONTROL_GET_NODEMAP, 0, 0,
153                                tdb_null, talloc_tos(), &outdata, &cstatus);
154         if (!NT_STATUS_IS_OK(status)) {
155                 DEBUG(1, ("ctdbd_control failed: %s\n", nt_errstr(status)));
156                 return false;
157         }
158         if ((cstatus != 0) || (outdata.dptr == NULL)) {
159                 DEBUG(2, ("Received invalid ctdb data\n"));
160                 return false;
161         }
162
163         m = (struct ctdb_node_map *)outdata.dptr;
164
165         for (i=0; i<m->num; i++) {
166                 if (vnn == m->nodes[i].pnn) {
167                         break;
168                 }
169         }
170
171         if (i == m->num) {
172                 DEBUG(2, ("Did not find ourselves (node %d) in nodemap\n",
173                           (int)vnn));
174                 goto fail;
175         }
176
177         failure_flags = NODE_FLAGS_BANNED | NODE_FLAGS_DISCONNECTED
178                 | NODE_FLAGS_PERMANENTLY_DISABLED | NODE_FLAGS_STOPPED;
179
180         if ((m->nodes[i].flags & failure_flags) != 0) {
181                 DEBUG(2, ("Node has status %x, not active\n",
182                           (int)m->nodes[i].flags));
183                 goto fail;
184         }
185
186         ret = true;
187 fail:
188         TALLOC_FREE(outdata.dptr);
189         return ret;
190 }
191
192 uint32_t ctdbd_vnn(const struct ctdbd_connection *conn)
193 {
194         return conn->our_vnn;
195 }
196
197 /*
198  * Get us a ctdb connection
199  */
200
201 static NTSTATUS ctdbd_connect(TALLOC_CTX *mem_ctx,
202                               struct ctdb_packet_context **presult)
203 {
204         struct ctdb_packet_context *result;
205         const char *sockname = lp_ctdbd_socket();
206         struct sockaddr_un addr = { 0, };
207         int fd;
208         socklen_t salen;
209
210         fd = socket(AF_UNIX, SOCK_STREAM, 0);
211         if (fd == -1) {
212                 DEBUG(3, ("Could not create socket: %s\n", strerror(errno)));
213                 return map_nt_error_from_unix(errno);
214         }
215
216         addr.sun_family = AF_UNIX;
217         snprintf(addr.sun_path, sizeof(addr.sun_path), "%s", sockname);
218
219         salen = sizeof(struct sockaddr_un);
220         if (connect(fd, (struct sockaddr *)(void *)&addr, salen) == -1) {
221                 DEBUG(1, ("connect(%s) failed: %s\n", sockname,
222                           strerror(errno)));
223                 close(fd);
224                 return map_nt_error_from_unix(errno);
225         }
226
227         if (!(result = ctdb_packet_init(mem_ctx, fd))) {
228                 close(fd);
229                 return NT_STATUS_NO_MEMORY;
230         }
231
232         *presult = result;
233         return NT_STATUS_OK;
234 }
235
236 /*
237  * Do we have a complete ctdb packet in the queue?
238  */
239
240 static bool ctdb_req_complete(const uint8_t *buf, size_t available,
241                               size_t *length,
242                               void *private_data)
243 {
244         uint32_t msglen;
245
246         if (available < sizeof(msglen)) {
247                 return False;
248         }
249
250         msglen = *((const uint32_t *)buf);
251
252         DEBUG(11, ("msglen = %d\n", msglen));
253
254         if (msglen < sizeof(struct ctdb_req_header)) {
255                 DEBUG(0, ("Got invalid msglen: %d, expected at least %d for "
256                           "the req_header\n", (int)msglen,
257                           (int)sizeof(struct ctdb_req_header)));
258                 cluster_fatal("ctdbd protocol error\n");
259         }
260
261         if (available < msglen) {
262                 return false;
263         }
264
265         *length = msglen;
266         return true;
267 }
268
269 /*
270  * State necessary to defer an incoming message while we are waiting for a
271  * ctdb reply.
272  */
273
274 struct deferred_msg_state {
275         struct messaging_context *msg_ctx;
276         struct messaging_rec *rec;
277 };
278
279 /*
280  * Timed event handler for the deferred message
281  */
282
283 static void deferred_message_dispatch(struct tevent_context *event_ctx,
284                                       struct tevent_timer *te,
285                                       struct timeval now,
286                                       void *private_data)
287 {
288         struct deferred_msg_state *state = talloc_get_type_abort(
289                 private_data, struct deferred_msg_state);
290
291         messaging_dispatch_rec(state->msg_ctx, state->rec);
292         TALLOC_FREE(state);
293         TALLOC_FREE(te);
294 }
295
296 struct req_pull_state {
297         TALLOC_CTX *mem_ctx;
298         DATA_BLOB req;
299 };
300
301 /*
302  * Pull a ctdb request out of the incoming ctdb_packet queue
303  */
304
305 static NTSTATUS ctdb_req_pull(uint8_t *buf, size_t length,
306                               void *private_data)
307 {
308         struct req_pull_state *state = (struct req_pull_state *)private_data;
309
310         state->req.data = talloc_move(state->mem_ctx, &buf);
311         state->req.length = length;
312         return NT_STATUS_OK;
313 }
314
315 /*
316  * Fetch a messaging_rec from an incoming ctdb style message
317  */
318
319 static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
320                                                      size_t overall_length,
321                                                      struct ctdb_req_message *msg)
322 {
323         struct messaging_rec *result;
324         DATA_BLOB blob;
325         enum ndr_err_code ndr_err;
326
327         if ((overall_length < offsetof(struct ctdb_req_message, data))
328             || (overall_length
329                 < offsetof(struct ctdb_req_message, data) + msg->datalen)) {
330
331                 cluster_fatal("got invalid msg length");
332         }
333
334         if (!(result = talloc(mem_ctx, struct messaging_rec))) {
335                 DEBUG(0, ("talloc failed\n"));
336                 return NULL;
337         }
338
339         blob = data_blob_const(msg->data, msg->datalen);
340
341         ndr_err = ndr_pull_struct_blob(
342                 &blob, result, result,
343                 (ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
344
345         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
346                 DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
347                           ndr_errstr(ndr_err)));
348                 TALLOC_FREE(result);
349                 return NULL;
350         }
351
352         if (DEBUGLEVEL >= 11) {
353                 DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
354                 NDR_PRINT_DEBUG(messaging_rec, result);
355         }
356
357         return result;
358 }
359
360 static NTSTATUS ctdb_packet_fd_read_sync(struct ctdb_packet_context *ctx)
361 {
362         int timeout = lp_ctdb_timeout();
363
364         if (timeout == 0) {
365                 timeout = -1;
366         }
367         return ctdb_packet_fd_read_sync_timeout(ctx, timeout);
368 }
369
370 /*
371  * Read a full ctdbd request. If we have a messaging context, defer incoming
372  * messages that might come in between.
373  */
374
375 static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
376                               TALLOC_CTX *mem_ctx, void *result)
377 {
378         struct ctdb_req_header *hdr;
379         struct req_pull_state state;
380         NTSTATUS status;
381
382  next_pkt:
383         ZERO_STRUCT(state);
384         state.mem_ctx = mem_ctx;
385
386         while (!ctdb_packet_handler(conn->pkt, ctdb_req_complete,
387                                     ctdb_req_pull, &state, &status)) {
388                 /*
389                  * Not enough data
390                  */
391                 status = ctdb_packet_fd_read_sync(conn->pkt);
392
393                 if (NT_STATUS_EQUAL(status, NT_STATUS_NETWORK_BUSY)) {
394                         /* EAGAIN */
395                         continue;
396                 } else if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
397                         /* EAGAIN */
398                         continue;
399                 }
400
401                 if (!NT_STATUS_IS_OK(status)) {
402                         DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
403                         cluster_fatal("ctdbd died\n");
404                 }
405         }
406
407         if (!NT_STATUS_IS_OK(status)) {
408                 DEBUG(0, ("Could not read ctdb_packet: %s\n", nt_errstr(status)));
409                 cluster_fatal("ctdbd died\n");
410         }
411
412         hdr = (struct ctdb_req_header *)state.req.data;
413
414         DEBUG(11, ("Received ctdb packet\n"));
415         ctdb_packet_dump(hdr);
416
417         if (hdr->operation == CTDB_REQ_MESSAGE) {
418                 struct tevent_timer *evt;
419                 struct deferred_msg_state *msg_state;
420                 struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
421
422                 if (conn->msg_ctx == NULL) {
423                         DEBUG(1, ("Got a message without having a msg ctx, "
424                                   "dropping msg %llu\n",
425                                   (long long unsigned)msg->srvid));
426                         goto next_pkt;
427                 }
428
429                 if ((conn->release_ip_handler != NULL)
430                     && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
431                         bool ret;
432
433                         /* must be dispatched immediately */
434                         DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
435                         ret = conn->release_ip_handler((const char *)msg->data,
436                                                        conn->release_ip_priv);
437                         if (ret) {
438                                 /*
439                                  * We need to release the ip,
440                                  * so return an error to the upper layers.
441                                  *
442                                  * We make sure we don't trigger this again.
443                                  */
444                                 conn->release_ip_handler = NULL;
445                                 conn->release_ip_priv = NULL;
446                                 return NT_STATUS_ADDRESS_CLOSED;
447                         }
448                         TALLOC_FREE(hdr);
449                         goto next_pkt;
450                 }
451
452                 if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
453                     || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)) {
454
455                         DEBUG(1, ("ctdb_read_req: Got %s message\n",
456                                   (msg->srvid == CTDB_SRVID_RECONFIGURE)
457                                   ? "cluster reconfigure" : "SAMBA_NOTIFY"));
458
459                         messaging_send(conn->msg_ctx,
460                                        messaging_server_id(conn->msg_ctx),
461                                        MSG_SMB_BRL_VALIDATE, &data_blob_null);
462                         messaging_send(conn->msg_ctx,
463                                        messaging_server_id(conn->msg_ctx),
464                                        MSG_DBWRAP_G_LOCK_RETRY,
465                                        &data_blob_null);
466                         TALLOC_FREE(hdr);
467                         goto next_pkt;
468                 }
469
470                 msg_state = talloc(NULL, struct deferred_msg_state);
471                 if (msg_state == NULL) {
472                         DEBUG(0, ("talloc failed\n"));
473                         TALLOC_FREE(hdr);
474                         goto next_pkt;
475                 }
476
477                 if (!(msg_state->rec = ctdb_pull_messaging_rec(
478                               msg_state, state.req.length, msg))) {
479                         DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
480                         TALLOC_FREE(msg_state);
481                         TALLOC_FREE(hdr);
482                         goto next_pkt;
483                 }
484
485                 TALLOC_FREE(hdr);
486
487                 msg_state->msg_ctx = conn->msg_ctx;
488
489                 /*
490                  * We're waiting for a call reply, but an async message has
491                  * crossed. Defer dispatching to the toplevel event loop.
492                  */
493                 evt = tevent_add_timer(conn->msg_ctx->event_ctx,
494                                       conn->msg_ctx->event_ctx,
495                                       timeval_zero(),
496                                       deferred_message_dispatch,
497                                       msg_state);
498                 if (evt == NULL) {
499                         DEBUG(0, ("event_add_timed failed\n"));
500                         TALLOC_FREE(msg_state);
501                         TALLOC_FREE(hdr);
502                         goto next_pkt;
503                 }
504
505                 goto next_pkt;
506         }
507
508         if ((reqid != 0) && (hdr->reqid != reqid)) {
509                 /* we got the wrong reply */
510                 DEBUG(0,("Discarding mismatched ctdb reqid %u should have "
511                          "been %u\n", hdr->reqid, reqid));
512                 TALLOC_FREE(hdr);
513                 goto next_pkt;
514         }
515
516         *((void **)result) = talloc_move(mem_ctx, &hdr);
517
518         return NT_STATUS_OK;
519 }
520
521 /*
522  * Get us a ctdbd connection
523  */
524
525 static NTSTATUS ctdbd_init_connection(TALLOC_CTX *mem_ctx,
526                                       struct ctdbd_connection **pconn)
527 {
528         struct ctdbd_connection *conn;
529         NTSTATUS status;
530
531         if (!(conn = talloc_zero(mem_ctx, struct ctdbd_connection))) {
532                 DEBUG(0, ("talloc failed\n"));
533                 return NT_STATUS_NO_MEMORY;
534         }
535
536         status = ctdbd_connect(conn, &conn->pkt);
537
538         if (!NT_STATUS_IS_OK(status)) {
539                 DEBUG(10, ("ctdbd_connect failed: %s\n", nt_errstr(status)));
540                 goto fail;
541         }
542
543         status = get_cluster_vnn(conn, &conn->our_vnn);
544
545         if (!NT_STATUS_IS_OK(status)) {
546                 DEBUG(10, ("get_cluster_vnn failed: %s\n", nt_errstr(status)));
547                 goto fail;
548         }
549
550         if (!ctdbd_working(conn, conn->our_vnn)) {
551                 DEBUG(2, ("Node is not working, can not connect\n"));
552                 status = NT_STATUS_INTERNAL_DB_ERROR;
553                 goto fail;
554         }
555
556         generate_random_buffer((unsigned char *)&conn->rand_srvid,
557                                sizeof(conn->rand_srvid));
558
559         status = register_with_ctdbd(conn, conn->rand_srvid);
560
561         if (!NT_STATUS_IS_OK(status)) {
562                 DEBUG(5, ("Could not register random srvid: %s\n",
563                           nt_errstr(status)));
564                 goto fail;
565         }
566
567         *pconn = conn;
568         return NT_STATUS_OK;
569
570  fail:
571         TALLOC_FREE(conn);
572         return status;
573 }
574
575 /*
576  * Get us a ctdbd connection and register us as a process
577  */
578
579 NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
580                                     struct ctdbd_connection **pconn)
581 {
582         struct ctdbd_connection *conn;
583         NTSTATUS status;
584
585         status = ctdbd_init_connection(mem_ctx, &conn);
586
587         if (!NT_STATUS_IS_OK(status)) {
588                 return status;
589         }
590
591         status = register_with_ctdbd(conn, (uint64_t)getpid());
592         if (!NT_STATUS_IS_OK(status)) {
593                 goto fail;
594         }
595
596         status = register_with_ctdbd(conn, MSG_SRVID_SAMBA);
597         if (!NT_STATUS_IS_OK(status)) {
598                 goto fail;
599         }
600
601         status = register_with_ctdbd(conn, CTDB_SRVID_SAMBA_NOTIFY);
602         if (!NT_STATUS_IS_OK(status)) {
603                 goto fail;
604         }
605
606         *pconn = conn;
607         return NT_STATUS_OK;
608
609  fail:
610         TALLOC_FREE(conn);
611         return status;
612 }
613
614 struct messaging_context *ctdb_conn_msg_ctx(struct ctdbd_connection *conn)
615 {
616         return conn->msg_ctx;
617 }
618
619 int ctdbd_conn_get_fd(struct ctdbd_connection *conn)
620 {
621         return ctdb_packet_get_fd(conn->pkt);
622 }
623
624 /*
625  * Packet handler to receive and handle a ctdb message
626  */
627 static NTSTATUS ctdb_handle_message(uint8_t *buf, size_t length,
628                                     void *private_data)
629 {
630         struct ctdbd_connection *conn = talloc_get_type_abort(
631                 private_data, struct ctdbd_connection);
632         struct ctdb_req_message *msg;
633         struct messaging_rec *msg_rec;
634
635         msg = (struct ctdb_req_message *)buf;
636
637         if (msg->hdr.operation != CTDB_REQ_MESSAGE) {
638                 DEBUG(0, ("Received async msg of type %u, discarding\n",
639                           msg->hdr.operation));
640                 TALLOC_FREE(buf);
641                 return NT_STATUS_INVALID_PARAMETER;
642         }
643
644         if ((conn->release_ip_handler != NULL)
645             && (msg->srvid == CTDB_SRVID_RELEASE_IP)) {
646                 bool ret;
647
648                 /* must be dispatched immediately */
649                 DEBUG(10, ("received CTDB_SRVID_RELEASE_IP\n"));
650                 ret = conn->release_ip_handler((const char *)msg->data,
651                                                conn->release_ip_priv);
652                 if (ret) {
653                         /*
654                          * We need to release the ip.
655                          *
656                          * We make sure we don't trigger this again.
657                          */
658                         conn->release_ip_handler = NULL;
659                         conn->release_ip_priv = NULL;
660                 }
661                 TALLOC_FREE(buf);
662                 return NT_STATUS_OK;
663         }
664
665         SMB_ASSERT(conn->msg_ctx != NULL);
666
667         if ((msg->srvid == CTDB_SRVID_RECONFIGURE)
668             || (msg->srvid == CTDB_SRVID_SAMBA_NOTIFY)){
669                 DEBUG(0,("Got cluster reconfigure message\n"));
670                 /*
671                  * when the cluster is reconfigured or someone of the
672                  * family has passed away (SAMBA_NOTIFY), we need to
673                  * clean the brl database
674                  */
675                 messaging_send(conn->msg_ctx,
676                                messaging_server_id(conn->msg_ctx),
677                                MSG_SMB_BRL_VALIDATE, &data_blob_null);
678
679                 messaging_send(conn->msg_ctx,
680                                messaging_server_id(conn->msg_ctx),
681                                MSG_DBWRAP_G_LOCK_RETRY,
682                                &data_blob_null);
683
684                 TALLOC_FREE(buf);
685                 return NT_STATUS_OK;
686         }
687
688         /* only messages to our pid or the broadcast are valid here */
689         if (msg->srvid != getpid() && msg->srvid != MSG_SRVID_SAMBA) {
690                 DEBUG(0,("Got unexpected message with srvid=%llu\n", 
691                          (unsigned long long)msg->srvid));
692                 TALLOC_FREE(buf);
693                 return NT_STATUS_OK;
694         }
695
696         if (!(msg_rec = ctdb_pull_messaging_rec(NULL, length, msg))) {
697                 DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
698                 TALLOC_FREE(buf);
699                 return NT_STATUS_NO_MEMORY;
700         }
701
702         messaging_dispatch_rec(conn->msg_ctx, msg_rec);
703
704         TALLOC_FREE(msg_rec);
705         TALLOC_FREE(buf);
706         return NT_STATUS_OK;
707 }
708
709 /*
710  * The ctdbd socket is readable asynchronuously
711  */
712
713 static void ctdbd_socket_handler(struct tevent_context *event_ctx,
714                                  struct tevent_fd *event,
715                                  uint16 flags,
716                                  void *private_data)
717 {
718         struct ctdbd_connection *conn = talloc_get_type_abort(
719                 private_data, struct ctdbd_connection);
720
721         NTSTATUS status;
722
723         status = ctdb_packet_fd_read(conn->pkt);
724
725         if (!NT_STATUS_IS_OK(status)) {
726                 DEBUG(0, ("packet_fd_read failed: %s\n", nt_errstr(status)));
727                 cluster_fatal("ctdbd died\n");
728         }
729
730         while (ctdb_packet_handler(conn->pkt, ctdb_req_complete,
731                               ctdb_handle_message, conn, &status)) {
732                 if (!NT_STATUS_IS_OK(status)) {
733                         DEBUG(10, ("could not handle incoming message: %s\n",
734                                    nt_errstr(status)));
735                 }
736         }
737 }
738
739 /*
740  * Prepare a ctdbd connection to receive messages
741  */
742
743 NTSTATUS ctdbd_register_msg_ctx(struct ctdbd_connection *conn,
744                                 struct messaging_context *msg_ctx)
745 {
746         SMB_ASSERT(conn->msg_ctx == NULL);
747         SMB_ASSERT(conn->fde == NULL);
748
749         if (!(conn->fde = tevent_add_fd(msg_ctx->event_ctx, conn,
750                                        ctdb_packet_get_fd(conn->pkt),
751                                        TEVENT_FD_READ,
752                                        ctdbd_socket_handler,
753                                        conn))) {
754                 DEBUG(0, ("event_add_fd failed\n"));
755                 return NT_STATUS_NO_MEMORY;
756         }
757
758         conn->msg_ctx = msg_ctx;
759
760         return NT_STATUS_OK;
761 }
762
763 /*
764  * Send a messaging message across a ctdbd
765  */
766
767 NTSTATUS ctdbd_messaging_send(struct ctdbd_connection *conn,
768                               uint32_t dst_vnn, uint64_t dst_srvid,
769                               struct messaging_rec *msg)
770 {
771         DATA_BLOB blob;
772         NTSTATUS status;
773         enum ndr_err_code ndr_err;
774
775         ndr_err = ndr_push_struct_blob(
776                 &blob, talloc_tos(), msg,
777                 (ndr_push_flags_fn_t)ndr_push_messaging_rec);
778
779         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
780                 DEBUG(0, ("ndr_push_struct_blob failed: %s\n",
781                           ndr_errstr(ndr_err)));
782                 return ndr_map_error2ntstatus(ndr_err);
783         }
784
785         status = ctdbd_messaging_send_blob(conn, dst_vnn, dst_srvid,
786                                            blob.data, blob.length);
787         TALLOC_FREE(blob.data);
788         return status;
789 }
790
791 NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
792                                    uint32_t dst_vnn, uint64_t dst_srvid,
793                                    const uint8_t *buf, size_t buflen)
794 {
795         struct ctdb_req_message r;
796         NTSTATUS status;
797
798         r.hdr.length = offsetof(struct ctdb_req_message, data) + buflen;
799         r.hdr.ctdb_magic = CTDB_MAGIC;
800         r.hdr.ctdb_version = CTDB_VERSION;
801         r.hdr.generation = 1;
802         r.hdr.operation  = CTDB_REQ_MESSAGE;
803         r.hdr.destnode   = dst_vnn;
804         r.hdr.srcnode    = conn->our_vnn;
805         r.hdr.reqid      = 0;
806         r.srvid          = dst_srvid;
807         r.datalen        = buflen;
808
809         DEBUG(10, ("ctdbd_messaging_send: Sending ctdb packet\n"));
810         ctdb_packet_dump(&r.hdr);
811
812         status = ctdb_packet_send(
813                 conn->pkt, 2,
814                 data_blob_const(&r, offsetof(struct ctdb_req_message, data)),
815                 data_blob_const(buf, buflen));
816
817         if (!NT_STATUS_IS_OK(status)) {
818                 DEBUG(0, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
819                 return status;
820         }
821
822         status = ctdb_packet_flush(conn->pkt);
823         if (!NT_STATUS_IS_OK(status)) {
824                 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
825                 cluster_fatal("cluster dispatch daemon msg write error\n");
826         }
827         return NT_STATUS_OK;
828 }
829
830 /*
831  * send/recv a generic ctdb control message
832  */
833 static NTSTATUS ctdbd_control(struct ctdbd_connection *conn,
834                               uint32_t vnn, uint32_t opcode,
835                               uint64_t srvid, uint32_t flags,
836                               TDB_DATA data,
837                               TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
838                               int *cstatus)
839 {
840         struct ctdb_req_control req;
841         struct ctdb_reply_control *reply = NULL;
842         struct ctdbd_connection *new_conn = NULL;
843         NTSTATUS status;
844
845         if (conn == NULL) {
846                 status = ctdbd_init_connection(NULL, &new_conn);
847
848                 if (!NT_STATUS_IS_OK(status)) {
849                         DEBUG(10, ("Could not init temp connection: %s\n",
850                                    nt_errstr(status)));
851                         goto fail;
852                 }
853
854                 conn = new_conn;
855         }
856
857         ZERO_STRUCT(req);
858         req.hdr.length = offsetof(struct ctdb_req_control, data) + data.dsize;
859         req.hdr.ctdb_magic   = CTDB_MAGIC;
860         req.hdr.ctdb_version = CTDB_VERSION;
861         req.hdr.operation    = CTDB_REQ_CONTROL;
862         req.hdr.reqid        = ctdbd_next_reqid(conn);
863         req.hdr.destnode     = vnn;
864         req.opcode           = opcode;
865         req.srvid            = srvid;
866         req.datalen          = data.dsize;
867         req.flags            = flags;
868
869         DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
870         ctdb_packet_dump(&req.hdr);
871
872         status = ctdb_packet_send(
873                 conn->pkt, 2,
874                 data_blob_const(&req, offsetof(struct ctdb_req_control, data)),
875                 data_blob_const(data.dptr, data.dsize));
876
877         if (!NT_STATUS_IS_OK(status)) {
878                 DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
879                 goto fail;
880         }
881
882         status = ctdb_packet_flush(conn->pkt);
883
884         if (!NT_STATUS_IS_OK(status)) {
885                 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
886                 cluster_fatal("cluster dispatch daemon control write error\n");
887         }
888
889         if (flags & CTDB_CTRL_FLAG_NOREPLY) {
890                 TALLOC_FREE(new_conn);
891                 if (cstatus) {
892                         *cstatus = 0;
893                 }
894                 return NT_STATUS_OK;
895         }
896
897         status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
898
899         if (!NT_STATUS_IS_OK(status)) {
900                 DEBUG(10, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
901                 goto fail;
902         }
903
904         if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
905                 DEBUG(0, ("received invalid reply\n"));
906                 goto fail;
907         }
908
909         if (outdata) {
910                 if (!(outdata->dptr = (uint8 *)talloc_memdup(
911                               mem_ctx, reply->data, reply->datalen))) {
912                         TALLOC_FREE(reply);
913                         return NT_STATUS_NO_MEMORY;
914                 }
915                 outdata->dsize = reply->datalen;
916         }
917         if (cstatus) {
918                 (*cstatus) = reply->status;
919         }
920
921         status = NT_STATUS_OK;
922
923  fail:
924         TALLOC_FREE(new_conn);
925         TALLOC_FREE(reply);
926         return status;
927 }
928
929 /*
930  * see if a remote process exists
931  */
932 bool ctdbd_process_exists(struct ctdbd_connection *conn, uint32_t vnn, pid_t pid)
933 {
934         struct server_id id;
935         bool result;
936
937         id.pid = pid;
938         id.vnn = vnn;
939
940         if (!ctdb_processes_exist(conn, &id, 1, &result)) {
941                 DEBUG(10, ("ctdb_processes_exist failed\n"));
942                 return false;
943         }
944         return result;
945 }
946
947 bool ctdb_processes_exist(struct ctdbd_connection *conn,
948                           const struct server_id *pids, int num_pids,
949                           bool *results)
950 {
951         TALLOC_CTX *frame = talloc_stackframe();
952         int i, num_received;
953         NTSTATUS status;
954         uint32_t *reqids;
955         bool result = false;
956
957         reqids = talloc_array(talloc_tos(), uint32_t, num_pids);
958         if (reqids == NULL) {
959                 goto fail;
960         }
961
962         for (i=0; i<num_pids; i++) {
963                 struct ctdb_req_control req;
964                 pid_t pid;
965
966                 results[i] = false;
967                 reqids[i] = ctdbd_next_reqid(conn);
968
969                 ZERO_STRUCT(req);
970
971                 /*
972                  * pids[i].pid is uint64_t, scale down to pid_t which
973                  * is the wire protocol towards ctdb.
974                  */
975                 pid = pids[i].pid;
976
977                 DEBUG(10, ("Requesting PID %d/%d, reqid=%d\n",
978                            (int)pids[i].vnn, (int)pid,
979                            (int)reqids[i]));
980
981                 req.hdr.length = offsetof(struct ctdb_req_control, data);
982                 req.hdr.length += sizeof(pid);
983                 req.hdr.ctdb_magic   = CTDB_MAGIC;
984                 req.hdr.ctdb_version = CTDB_VERSION;
985                 req.hdr.operation    = CTDB_REQ_CONTROL;
986                 req.hdr.reqid        = reqids[i];
987                 req.hdr.destnode     = pids[i].vnn;
988                 req.opcode           = CTDB_CONTROL_PROCESS_EXISTS;
989                 req.srvid            = 0;
990                 req.datalen          = sizeof(pid);
991                 req.flags            = 0;
992
993                 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
994                 ctdb_packet_dump(&req.hdr);
995
996                 status = ctdb_packet_send(
997                         conn->pkt, 2,
998                         data_blob_const(
999                                 &req, offsetof(struct ctdb_req_control, data)),
1000                         data_blob_const(&pid, sizeof(pid)));
1001                 if (!NT_STATUS_IS_OK(status)) {
1002                         DEBUG(10, ("ctdb_packet_send failed: %s\n",
1003                                    nt_errstr(status)));
1004                         goto fail;
1005                 }
1006         }
1007
1008         status = ctdb_packet_flush(conn->pkt);
1009         if (!NT_STATUS_IS_OK(status)) {
1010                 DEBUG(10, ("ctdb_packet_flush failed: %s\n",
1011                            nt_errstr(status)));
1012                 goto fail;
1013         }
1014
1015         num_received = 0;
1016
1017         while (num_received < num_pids) {
1018                 struct ctdb_reply_control *reply = NULL;
1019                 uint32_t reqid;
1020
1021                 status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply);
1022                 if (!NT_STATUS_IS_OK(status)) {
1023                         DEBUG(10, ("ctdb_read_req failed: %s\n",
1024                                    nt_errstr(status)));
1025                         goto fail;
1026                 }
1027
1028                 if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
1029                         DEBUG(10, ("Received invalid reply\n"));
1030                         goto fail;
1031                 }
1032
1033                 reqid = reply->hdr.reqid;
1034
1035                 DEBUG(10, ("Received reqid %d\n", (int)reqid));
1036
1037                 for (i=0; i<num_pids; i++) {
1038                         if (reqid == reqids[i]) {
1039                                 break;
1040                         }
1041                 }
1042                 if (i == num_pids) {
1043                         DEBUG(10, ("Received unknown record number %u\n",
1044                                    (unsigned)reqid));
1045                         goto fail;
1046                 }
1047                 results[i] = ((reply->status) == 0);
1048                 TALLOC_FREE(reply);
1049                 num_received += 1;
1050         }
1051
1052         result = true;
1053 fail:
1054         TALLOC_FREE(frame);
1055         return result;
1056 }
1057
1058 struct ctdb_vnn_list {
1059         uint32_t vnn;
1060         uint32_t reqid;
1061         unsigned num_srvids;
1062         unsigned num_filled;
1063         uint64_t *srvids;
1064         unsigned *pid_indexes;
1065 };
1066
1067 /*
1068  * Get a list of all vnns mentioned in a list of
1069  * server_ids. vnn_indexes tells where in the vnns array we have to
1070  * place the pids.
1071  */
1072 static bool ctdb_collect_vnns(TALLOC_CTX *mem_ctx,
1073                               const struct server_id *pids, unsigned num_pids,
1074                               struct ctdb_vnn_list **pvnns,
1075                               unsigned *pnum_vnns)
1076 {
1077         struct ctdb_vnn_list *vnns = NULL;
1078         unsigned *vnn_indexes = NULL;
1079         unsigned i, num_vnns = 0;
1080
1081         vnn_indexes = talloc_array(mem_ctx, unsigned, num_pids);
1082         if (vnn_indexes == NULL) {
1083                 DEBUG(1, ("talloc_array failed\n"));
1084                 goto fail;
1085         }
1086
1087         for (i=0; i<num_pids; i++) {
1088                 unsigned j;
1089                 uint32_t vnn = pids[i].vnn;
1090
1091                 for (j=0; j<num_vnns; j++) {
1092                         if (vnn == vnns[j].vnn) {
1093                                 break;
1094                         }
1095                 }
1096                 vnn_indexes[i] = j;
1097
1098                 if (j < num_vnns) {
1099                         /*
1100                          * Already in the array
1101                          */
1102                         vnns[j].num_srvids += 1;
1103                         continue;
1104                 }
1105                 vnns = talloc_realloc(mem_ctx, vnns, struct ctdb_vnn_list,
1106                                       num_vnns+1);
1107                 if (vnns == NULL) {
1108                         DEBUG(1, ("talloc_realloc failed\n"));
1109                         goto fail;
1110                 }
1111                 vnns[num_vnns].vnn = vnn;
1112                 vnns[num_vnns].num_srvids = 1;
1113                 vnns[num_vnns].num_filled = 0;
1114                 num_vnns += 1;
1115         }
1116         for (i=0; i<num_vnns; i++) {
1117                 struct ctdb_vnn_list *vnn = &vnns[i];
1118
1119                 vnn->srvids = talloc_array(vnns, uint64_t, vnn->num_srvids);
1120                 if (vnn->srvids == NULL) {
1121                         DEBUG(1, ("talloc_array failed\n"));
1122                         goto fail;
1123                 }
1124                 vnn->pid_indexes = talloc_array(vnns, unsigned,
1125                                                 vnn->num_srvids);
1126                 if (vnn->pid_indexes == NULL) {
1127                         DEBUG(1, ("talloc_array failed\n"));
1128                         goto fail;
1129                 }
1130         }
1131         for (i=0; i<num_pids; i++) {
1132                 struct ctdb_vnn_list *vnn = &vnns[vnn_indexes[i]];
1133                 vnn->srvids[vnn->num_filled] = pids[i].unique_id;
1134                 vnn->pid_indexes[vnn->num_filled] = i;
1135                 vnn->num_filled += 1;
1136         }
1137
1138         TALLOC_FREE(vnn_indexes);
1139         *pvnns = vnns;
1140         *pnum_vnns = num_vnns;
1141         return true;
1142 fail:
1143         TALLOC_FREE(vnns);
1144         TALLOC_FREE(vnn_indexes);
1145         return false;
1146 }
1147
1148 bool ctdb_serverids_exist_supported(struct ctdbd_connection *conn)
1149 {
1150 #ifndef HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL
1151         return false;
1152 #else /* HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL */
1153         return true;
1154 #endif /* HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL */
1155 }
1156
1157 #ifdef HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL
1158
1159 bool ctdb_serverids_exist(struct ctdbd_connection *conn,
1160                           const struct server_id *pids, unsigned num_pids,
1161                           bool *results)
1162 {
1163         unsigned i, num_received;
1164         NTSTATUS status;
1165         struct ctdb_vnn_list *vnns = NULL;
1166         unsigned num_vnns;
1167         bool result = false;
1168
1169         if (!ctdb_collect_vnns(talloc_tos(), pids, num_pids,
1170                                &vnns, &num_vnns)) {
1171                 DEBUG(1, ("ctdb_collect_vnns failed\n"));
1172                 goto fail;
1173         }
1174
1175         for (i=0; i<num_vnns; i++) {
1176                 struct ctdb_vnn_list *vnn = &vnns[i];
1177                 struct ctdb_req_control req;
1178
1179                 vnn->reqid = ctdbd_next_reqid(conn);
1180
1181                 ZERO_STRUCT(req);
1182
1183                 DEBUG(10, ("Requesting VNN %d, reqid=%d, num_srvids=%u\n",
1184                            (int)vnn->vnn, (int)vnn->reqid, vnn->num_srvids));
1185
1186                 req.hdr.length = offsetof(struct ctdb_req_control, data);
1187                 req.hdr.ctdb_magic   = CTDB_MAGIC;
1188                 req.hdr.ctdb_version = CTDB_VERSION;
1189                 req.hdr.operation    = CTDB_REQ_CONTROL;
1190                 req.hdr.reqid        = vnn->reqid;
1191                 req.hdr.destnode     = vnn->vnn;
1192                 req.opcode           = CTDB_CONTROL_CHECK_SRVIDS;
1193                 req.srvid            = 0;
1194                 req.datalen          = sizeof(uint64_t) * vnn->num_srvids;
1195                 req.hdr.length      += req.datalen;
1196                 req.flags            = 0;
1197
1198                 DEBUG(10, ("ctdbd_control: Sending ctdb packet\n"));
1199                 ctdb_packet_dump(&req.hdr);
1200
1201                 status = ctdb_packet_send(
1202                         conn->pkt, 2,
1203                         data_blob_const(
1204                                 &req, offsetof(struct ctdb_req_control,
1205                                                data)),
1206                         data_blob_const(vnn->srvids, req.datalen));
1207                 if (!NT_STATUS_IS_OK(status)) {
1208                         DEBUG(1, ("ctdb_packet_send failed: %s\n",
1209                                   nt_errstr(status)));
1210                         goto fail;
1211                 }
1212         }
1213
1214         status = ctdb_packet_flush(conn->pkt);
1215         if (!NT_STATUS_IS_OK(status)) {
1216                 DEBUG(1, ("ctdb_packet_flush failed: %s\n",
1217                           nt_errstr(status)));
1218                 goto fail;
1219         }
1220
1221         num_received = 0;
1222
1223         while (num_received < num_vnns) {
1224                 struct ctdb_reply_control *reply = NULL;
1225                 struct ctdb_vnn_list *vnn;
1226                 uint32_t reqid;
1227                 uint8_t *reply_data;
1228
1229                 status = ctdb_read_req(conn, 0, talloc_tos(), (void *)&reply);
1230                 if (!NT_STATUS_IS_OK(status)) {
1231                         DEBUG(1, ("ctdb_read_req failed: %s\n",
1232                                   nt_errstr(status)));
1233                         goto fail;
1234                 }
1235
1236                 if (reply->hdr.operation != CTDB_REPLY_CONTROL) {
1237                         DEBUG(1, ("Received invalid reply %u\n",
1238                                   (unsigned)reply->hdr.operation));
1239                         goto fail;
1240                 }
1241
1242                 reqid = reply->hdr.reqid;
1243
1244                 DEBUG(10, ("Received reqid %d\n", (int)reqid));
1245
1246                 for (i=0; i<num_vnns; i++) {
1247                         if (reqid == vnns[i].reqid) {
1248                                 break;
1249                         }
1250                 }
1251                 if (i == num_vnns) {
1252                         DEBUG(1, ("Received unknown reqid number %u\n",
1253                                   (unsigned)reqid));
1254                         goto fail;
1255                 }
1256
1257                 DEBUG(10, ("Found index %u\n", i));
1258
1259                 vnn = &vnns[i];
1260
1261                 DEBUG(10, ("Received vnn %u, vnn->num_srvids %u, datalen %u\n",
1262                            (unsigned)vnn->vnn, vnn->num_srvids,
1263                            (unsigned)reply->datalen));
1264
1265                 if (reply->datalen >= ((vnn->num_srvids+7)/8)) {
1266                         /*
1267                          * Got a real reply
1268                          */
1269                         reply_data = reply->data;
1270                 } else {
1271                         /*
1272                          * Got an error reply
1273                          */
1274                         DEBUG(5, ("Received short reply len %d, status %u, "
1275                                   "errorlen %u\n",
1276                                   (unsigned)reply->datalen,
1277                                   (unsigned)reply->status,
1278                                   (unsigned)reply->errorlen));
1279                         dump_data(5, reply->data, reply->errorlen);
1280
1281                         /*
1282                          * This will trigger everything set to false
1283                          */
1284                         reply_data = NULL;
1285                 }
1286
1287                 for (i=0; i<vnn->num_srvids; i++) {
1288                         int idx = vnn->pid_indexes[i];
1289
1290                         if (pids[i].unique_id ==
1291                             SERVERID_UNIQUE_ID_NOT_TO_VERIFY) {
1292                                 results[idx] = true;
1293                                 continue;
1294                         }
1295                         results[idx] =
1296                                 (reply_data != NULL) &&
1297                                 ((reply_data[i/8] & (1<<(i%8))) != 0);
1298                 }
1299
1300                 TALLOC_FREE(reply);
1301                 num_received += 1;
1302         }
1303
1304         result = true;
1305 fail:
1306         TALLOC_FREE(vnns);
1307         return result;
1308 }
1309
1310 #endif /* HAVE_CTDB_CONTROL_CHECK_SRVIDS_DECL */
1311
1312 /*
1313  * Get a db path
1314  */
1315 char *ctdbd_dbpath(struct ctdbd_connection *conn,
1316                    TALLOC_CTX *mem_ctx, uint32_t db_id)
1317 {
1318         NTSTATUS status;
1319         TDB_DATA data;
1320         int32_t cstatus;
1321
1322         data.dptr = (uint8_t*)&db_id;
1323         data.dsize = sizeof(db_id);
1324
1325         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1326                                CTDB_CONTROL_GETDBPATH, 0, 0, data, 
1327                                mem_ctx, &data, &cstatus);
1328         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
1329                 DEBUG(0,(__location__ " ctdb_control for getdbpath failed\n"));
1330                 return NULL;
1331         }
1332
1333         return (char *)data.dptr;
1334 }
1335
1336 /*
1337  * attach to a ctdb database
1338  */
1339 NTSTATUS ctdbd_db_attach(struct ctdbd_connection *conn,
1340                          const char *name, uint32_t *db_id, int tdb_flags)
1341 {
1342         NTSTATUS status;
1343         TDB_DATA data;
1344         int32_t cstatus;
1345         bool persistent = (tdb_flags & TDB_CLEAR_IF_FIRST) == 0;
1346
1347         data = string_term_tdb_data(name);
1348
1349         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1350                                persistent
1351                                ? CTDB_CONTROL_DB_ATTACH_PERSISTENT
1352                                : CTDB_CONTROL_DB_ATTACH,
1353                                tdb_flags, 0, data, NULL, &data, &cstatus);
1354         if (!NT_STATUS_IS_OK(status)) {
1355                 DEBUG(0, (__location__ " ctdb_control for db_attach "
1356                           "failed: %s\n", nt_errstr(status)));
1357                 return status;
1358         }
1359
1360         if (cstatus != 0 || data.dsize != sizeof(uint32_t)) {
1361                 DEBUG(0,(__location__ " ctdb_control for db_attach failed\n"));
1362                 return NT_STATUS_INTERNAL_ERROR;
1363         }
1364
1365         *db_id = *(uint32_t *)data.dptr;
1366         talloc_free(data.dptr);
1367
1368         if (!(tdb_flags & TDB_SEQNUM)) {
1369                 return NT_STATUS_OK;
1370         }
1371
1372         data.dptr = (uint8_t *)db_id;
1373         data.dsize = sizeof(*db_id);
1374
1375         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1376                                CTDB_CONTROL_ENABLE_SEQNUM, 0, 0, data, 
1377                                NULL, NULL, &cstatus);
1378         if (!NT_STATUS_IS_OK(status) || cstatus != 0) {
1379                 DEBUG(0,(__location__ " ctdb_control for enable seqnum "
1380                          "failed\n"));
1381                 return NT_STATUS_IS_OK(status) ? NT_STATUS_INTERNAL_ERROR :
1382                         status;
1383         }
1384
1385         return NT_STATUS_OK;
1386 }
1387
1388 /*
1389  * force the migration of a record to this node
1390  */
1391 NTSTATUS ctdbd_migrate(struct ctdbd_connection *conn, uint32_t db_id,
1392                        TDB_DATA key)
1393 {
1394         struct ctdb_req_call req;
1395         struct ctdb_reply_call *reply;
1396         NTSTATUS status;
1397
1398         ZERO_STRUCT(req);
1399
1400         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1401         req.hdr.ctdb_magic   = CTDB_MAGIC;
1402         req.hdr.ctdb_version = CTDB_VERSION;
1403         req.hdr.operation    = CTDB_REQ_CALL;
1404         req.hdr.reqid        = ctdbd_next_reqid(conn);
1405         req.flags            = CTDB_IMMEDIATE_MIGRATION;
1406         req.callid           = CTDB_NULL_FUNC;
1407         req.db_id            = db_id;
1408         req.keylen           = key.dsize;
1409
1410         DEBUG(10, ("ctdbd_migrate: Sending ctdb packet\n"));
1411         ctdb_packet_dump(&req.hdr);
1412
1413         status = ctdb_packet_send(
1414                 conn->pkt, 2,
1415                 data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
1416                 data_blob_const(key.dptr, key.dsize));
1417
1418         if (!NT_STATUS_IS_OK(status)) {
1419                 DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
1420                 return status;
1421         }
1422
1423         status = ctdb_packet_flush(conn->pkt);
1424
1425         if (!NT_STATUS_IS_OK(status)) {
1426                 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
1427                 cluster_fatal("cluster dispatch daemon control write error\n");
1428         }
1429
1430         status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
1431
1432         if (!NT_STATUS_IS_OK(status)) {
1433                 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1434                 goto fail;
1435         }
1436
1437         if (reply->hdr.operation != CTDB_REPLY_CALL) {
1438                 DEBUG(0, ("received invalid reply\n"));
1439                 status = NT_STATUS_INTERNAL_ERROR;
1440                 goto fail;
1441         }
1442
1443         status = NT_STATUS_OK;
1444  fail:
1445
1446         TALLOC_FREE(reply);
1447         return status;
1448 }
1449
1450 /*
1451  * Fetch a record and parse it
1452  */
1453 NTSTATUS ctdbd_parse(struct ctdbd_connection *conn, uint32_t db_id,
1454                      TDB_DATA key, bool local_copy,
1455                      void (*parser)(TDB_DATA key, TDB_DATA data,
1456                                     void *private_data),
1457                      void *private_data)
1458 {
1459         struct ctdb_req_call req;
1460         struct ctdb_reply_call *reply;
1461         NTSTATUS status;
1462         uint32_t flags;
1463
1464 #ifdef HAVE_CTDB_WANT_READONLY_DECL
1465         flags = local_copy ? CTDB_WANT_READONLY : 0;
1466 #else
1467         flags = 0;
1468 #endif
1469
1470         ZERO_STRUCT(req);
1471
1472         req.hdr.length = offsetof(struct ctdb_req_call, data) + key.dsize;
1473         req.hdr.ctdb_magic   = CTDB_MAGIC;
1474         req.hdr.ctdb_version = CTDB_VERSION;
1475         req.hdr.operation    = CTDB_REQ_CALL;
1476         req.hdr.reqid        = ctdbd_next_reqid(conn);
1477         req.flags            = flags;
1478         req.callid           = CTDB_FETCH_FUNC;
1479         req.db_id            = db_id;
1480         req.keylen           = key.dsize;
1481
1482         status = ctdb_packet_send(
1483                 conn->pkt, 2,
1484                 data_blob_const(&req, offsetof(struct ctdb_req_call, data)),
1485                 data_blob_const(key.dptr, key.dsize));
1486
1487         if (!NT_STATUS_IS_OK(status)) {
1488                 DEBUG(3, ("ctdb_packet_send failed: %s\n", nt_errstr(status)));
1489                 return status;
1490         }
1491
1492         status = ctdb_packet_flush(conn->pkt);
1493
1494         if (!NT_STATUS_IS_OK(status)) {
1495                 DEBUG(3, ("write to ctdbd failed: %s\n", nt_errstr(status)));
1496                 cluster_fatal("cluster dispatch daemon control write error\n");
1497         }
1498
1499         status = ctdb_read_req(conn, req.hdr.reqid, NULL, (void *)&reply);
1500
1501         if (!NT_STATUS_IS_OK(status)) {
1502                 DEBUG(0, ("ctdb_read_req failed: %s\n", nt_errstr(status)));
1503                 goto fail;
1504         }
1505
1506         if (reply->hdr.operation != CTDB_REPLY_CALL) {
1507                 DEBUG(0, ("received invalid reply\n"));
1508                 status = NT_STATUS_INTERNAL_ERROR;
1509                 goto fail;
1510         }
1511
1512         if (reply->datalen == 0) {
1513                 /*
1514                  * Treat an empty record as non-existing
1515                  */
1516                 status = NT_STATUS_NOT_FOUND;
1517                 goto fail;
1518         }
1519
1520         parser(key, make_tdb_data(&reply->data[0], reply->datalen),
1521                private_data);
1522
1523         status = NT_STATUS_OK;
1524  fail:
1525         TALLOC_FREE(reply);
1526         return status;
1527 }
1528
1529 struct ctdbd_traverse_state {
1530         void (*fn)(TDB_DATA key, TDB_DATA data, void *private_data);
1531         void *private_data;
1532 };
1533
1534 /*
1535  * Handle a traverse record coming in on the ctdbd connection
1536  */
1537
1538 static NTSTATUS ctdb_traverse_handler(uint8_t *buf, size_t length,
1539                                       void *private_data)
1540 {
1541         struct ctdbd_traverse_state *state =
1542                 (struct ctdbd_traverse_state *)private_data;
1543
1544         struct ctdb_req_message *m;
1545         struct ctdb_rec_data *d;
1546         TDB_DATA key, data;
1547
1548         m = (struct ctdb_req_message *)buf;
1549
1550         if (length < sizeof(*m) || m->hdr.length != length) {
1551                 DEBUG(0, ("Got invalid message of length %d\n", (int)length));
1552                 TALLOC_FREE(buf);
1553                 return NT_STATUS_UNEXPECTED_IO_ERROR;
1554         }
1555
1556         d = (struct ctdb_rec_data *)&m->data[0];
1557         if (m->datalen < sizeof(uint32_t) || m->datalen != d->length) {
1558                 DEBUG(0, ("Got invalid traverse data of length %d\n",
1559                           (int)m->datalen));
1560                 TALLOC_FREE(buf);
1561                 return NT_STATUS_UNEXPECTED_IO_ERROR;
1562         }
1563
1564         key.dsize = d->keylen;
1565         key.dptr  = &d->data[0];
1566         data.dsize = d->datalen;
1567         data.dptr = &d->data[d->keylen];                
1568
1569         if (key.dsize == 0 && data.dsize == 0) {
1570                 /* end of traverse */
1571                 return NT_STATUS_END_OF_FILE;
1572         }
1573
1574         if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
1575                 DEBUG(0, ("Got invalid ltdb header length %d\n",
1576                           (int)data.dsize));
1577                 TALLOC_FREE(buf);
1578                 return NT_STATUS_UNEXPECTED_IO_ERROR;
1579         }
1580         data.dsize -= sizeof(struct ctdb_ltdb_header);
1581         data.dptr += sizeof(struct ctdb_ltdb_header);
1582
1583         if (state->fn) {
1584                 state->fn(key, data, state->private_data);
1585         }
1586
1587         TALLOC_FREE(buf);
1588         return NT_STATUS_OK;
1589 }
1590
1591 /*
1592   Traverse a ctdb database. This uses a kind-of hackish way to open a second
1593   connection to ctdbd to avoid the hairy recursive and async problems with
1594   everything in-line.
1595 */
1596
1597 NTSTATUS ctdbd_traverse(uint32_t db_id,
1598                         void (*fn)(TDB_DATA key, TDB_DATA data,
1599                                    void *private_data),
1600                         void *private_data)
1601 {
1602         struct ctdbd_connection *conn;
1603         NTSTATUS status;
1604
1605         TDB_DATA data;
1606         struct ctdb_traverse_start t;
1607         int cstatus;
1608         struct ctdbd_traverse_state state;
1609
1610         become_root();
1611         status = ctdbd_init_connection(NULL, &conn);
1612         unbecome_root();
1613         if (!NT_STATUS_IS_OK(status)) {
1614                 DEBUG(0, ("ctdbd_init_connection failed: %s\n",
1615                           nt_errstr(status)));
1616                 return status;
1617         }
1618
1619         t.db_id = db_id;
1620         t.srvid = conn->rand_srvid;
1621         t.reqid = ctdbd_next_reqid(conn);
1622
1623         data.dptr = (uint8_t *)&t;
1624         data.dsize = sizeof(t);
1625
1626         status = ctdbd_control(conn, CTDB_CURRENT_NODE,
1627                                CTDB_CONTROL_TRAVERSE_START, conn->rand_srvid, 0,
1628                                data, NULL, NULL, &cstatus);
1629
1630         if (!NT_STATUS_IS_OK(status) || (cstatus != 0)) {
1631
1632                 DEBUG(0,("ctdbd_control failed: %s, %d\n", nt_errstr(status),
1633                          cstatus));
1634
1635                 if (NT_STATUS_IS_OK(status)) {
1636                         /*
1637                          * We need a mapping here
1638                          */
1639                         status = NT_STATUS_UNSUCCESSFUL;
1640                 }
1641                 goto done;
1642         }
1643
1644         state.fn = fn;
1645         state.private_data = private_data;
1646
1647         while (True) {
1648
1649                 status = NT_STATUS_OK;
1650
1651                 if (ctdb_packet_handler(conn->pkt, ctdb_req_complete,
1652                                    ctdb_traverse_handler, &state, &status)) {
1653
1654                         if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
1655                                 status = NT_STATUS_OK;
1656                                 break;
1657                         }
1658
1659                         /*
1660                          * There might be more in the queue
1661                          */
1662                         continue;
1663                 }
1664
1665                 if (!NT_STATUS_IS_OK(status)) {
1666                         break;
1667                 }
1668
1669                 status = ctdb_packet_fd_read_sync(conn->pkt);
1670
1671                 if (NT_STATUS_EQUAL(status, NT_STATUS_RETRY)) {
1672                         /*
1673                          * There might be more in the queue
1674                          */
1675                         continue;
1676                 }
1677
1678                 if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
1679                         status = NT_STATUS_OK;
1680                         break;
1681                 }
1682
1683                 if (!NT_STATUS_IS_OK(status)) {
1684                         DEBUG(0, ("ctdb_packet_fd_read_sync failed: %s\n", nt_errstr(status)));
1685                         cluster_fatal("ctdbd died\n");
1686                 }
1687         }
1688
1689  done:
1690         TALLOC_FREE(conn);
1691         return status;
1692 }
1693
1694 /*
1695    This is used to canonicalize a ctdb_sock_addr structure.
1696 */
1697 static void smbd_ctdb_canonicalize_ip(const struct sockaddr_storage *in,
1698                                       struct sockaddr_storage *out)
1699 {
1700         memcpy(out, in, sizeof (*out));
1701
1702 #ifdef HAVE_IPV6
1703         if (in->ss_family == AF_INET6) {
1704                 const char prefix[12] = { 0,0,0,0,0,0,0,0,0,0,0xff,0xff };
1705                 const struct sockaddr_in6 *in6 =
1706                         (const struct sockaddr_in6 *)in;
1707                 struct sockaddr_in *out4 = (struct sockaddr_in *)out;
1708                 if (memcmp(&in6->sin6_addr, prefix, 12) == 0) {
1709                         memset(out, 0, sizeof(*out));
1710 #ifdef HAVE_SOCK_SIN_LEN
1711                         out4->sin_len = sizeof(*out);
1712 #endif
1713                         out4->sin_family = AF_INET;
1714                         out4->sin_port   = in6->sin6_port;
1715                         memcpy(&out4->sin_addr, &in6->sin6_addr.s6_addr[12], 4);
1716                 }
1717         }
1718 #endif
1719 }
1720
1721 /*
1722  * Register us as a server for a particular tcp connection
1723  */
1724
1725 NTSTATUS ctdbd_register_ips(struct ctdbd_connection *conn,
1726                             const struct sockaddr_storage *_server,
1727                             const struct sockaddr_storage *_client,
1728                             bool (*release_ip_handler)(const char *ip_addr,
1729                                                        void *private_data),
1730                             void *private_data)
1731 {
1732         /*
1733          * we still use ctdb_control_tcp for ipv4
1734          * because we want to work against older ctdb
1735          * versions at runtime
1736          */
1737         struct ctdb_control_tcp p4;
1738 #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR
1739         struct ctdb_control_tcp_addr p;
1740 #endif
1741         TDB_DATA data;
1742         NTSTATUS status;
1743         struct sockaddr_storage client;
1744         struct sockaddr_storage server;
1745
1746         /*
1747          * Only one connection so far
1748          */
1749         SMB_ASSERT(conn->release_ip_handler == NULL);
1750
1751         smbd_ctdb_canonicalize_ip(_client, &client);
1752         smbd_ctdb_canonicalize_ip(_server, &server);
1753
1754         switch (client.ss_family) {
1755         case AF_INET:
1756                 memcpy(&p4.dest, &server, sizeof(p4.dest));
1757                 memcpy(&p4.src, &client, sizeof(p4.src));
1758                 data.dptr = (uint8_t *)&p4;
1759                 data.dsize = sizeof(p4);
1760                 break;
1761 #ifdef HAVE_STRUCT_CTDB_CONTROL_TCP_ADDR
1762         case AF_INET6:
1763                 memcpy(&p.dest.ip6, &server, sizeof(p.dest.ip6));
1764                 memcpy(&p.src.ip6, &client, sizeof(p.src.ip6));
1765                 data.dptr = (uint8_t *)&p;
1766                 data.dsize = sizeof(p);
1767                 break;
1768 #endif
1769         default:
1770                 return NT_STATUS_INTERNAL_ERROR;
1771         }
1772
1773         conn->release_ip_handler = release_ip_handler;
1774         conn->release_ip_priv = private_data;
1775
1776         /*
1777          * We want to be told about IP releases
1778          */
1779
1780         status = register_with_ctdbd(conn, CTDB_SRVID_RELEASE_IP);
1781         if (!NT_STATUS_IS_OK(status)) {
1782                 return status;
1783         }
1784
1785         /*
1786          * inform ctdb of our tcp connection, so if IP takeover happens ctdb
1787          * can send an extra ack to trigger a reset for our client, so it
1788          * immediately reconnects
1789          */
1790         return ctdbd_control(conn, CTDB_CURRENT_NODE, 
1791                              CTDB_CONTROL_TCP_CLIENT, 0,
1792                              CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL, NULL);
1793 }
1794
1795 /*
1796  * We want to handle reconfigure events
1797  */
1798 NTSTATUS ctdbd_register_reconfigure(struct ctdbd_connection *conn)
1799 {
1800         return register_with_ctdbd(conn, CTDB_SRVID_RECONFIGURE);
1801 }
1802
1803 /*
1804   call a control on the local node
1805  */
1806 NTSTATUS ctdbd_control_local(struct ctdbd_connection *conn, uint32_t opcode,
1807                              uint64_t srvid, uint32_t flags, TDB_DATA data,
1808                              TALLOC_CTX *mem_ctx, TDB_DATA *outdata,
1809                              int *cstatus)
1810 {
1811         return ctdbd_control(conn, CTDB_CURRENT_NODE, opcode, srvid, flags, data, mem_ctx, outdata, cstatus);
1812 }
1813
1814 NTSTATUS ctdb_watch_us(struct ctdbd_connection *conn)
1815 {
1816         struct ctdb_client_notify_register reg_data;
1817         size_t struct_len;
1818         NTSTATUS status;
1819         int cstatus;
1820
1821         reg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1822         reg_data.len = 1;
1823         reg_data.notify_data[0] = 0;
1824
1825         struct_len = offsetof(struct ctdb_client_notify_register,
1826                               notify_data) + reg_data.len;
1827
1828         status = ctdbd_control_local(
1829                 conn, CTDB_CONTROL_REGISTER_NOTIFY, conn->rand_srvid, 0,
1830                 make_tdb_data((uint8_t *)&reg_data, struct_len),
1831                 NULL, NULL, &cstatus);
1832         if (!NT_STATUS_IS_OK(status)) {
1833                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1834                           nt_errstr(status)));
1835         }
1836         return status;
1837 }
1838
1839 NTSTATUS ctdb_unwatch(struct ctdbd_connection *conn)
1840 {
1841         struct ctdb_client_notify_deregister dereg_data;
1842         NTSTATUS status;
1843         int cstatus;
1844
1845         dereg_data.srvid = CTDB_SRVID_SAMBA_NOTIFY;
1846
1847         status = ctdbd_control_local(
1848                 conn, CTDB_CONTROL_DEREGISTER_NOTIFY, conn->rand_srvid, 0,
1849                 make_tdb_data((uint8_t *)&dereg_data, sizeof(dereg_data)),
1850                 NULL, NULL, &cstatus);
1851         if (!NT_STATUS_IS_OK(status)) {
1852                 DEBUG(1, ("ctdbd_control_local failed: %s\n",
1853                           nt_errstr(status)));
1854         }
1855         return status;
1856 }
1857
1858 NTSTATUS ctdbd_probe(void)
1859 {
1860         /*
1861          * Do a very early check if ctdbd is around to avoid an abort and core
1862          * later
1863          */
1864         struct ctdbd_connection *conn = NULL;
1865         NTSTATUS status;
1866
1867         status = ctdbd_messaging_connection(talloc_tos(), &conn);
1868
1869         /*
1870          * We only care if we can connect.
1871          */
1872         TALLOC_FREE(conn);
1873
1874         return status;
1875 }
1876
1877 #else
1878
1879 NTSTATUS ctdbd_probe(void)
1880 {
1881         return NT_STATUS_OK;
1882 }
1883
1884 NTSTATUS ctdbd_messaging_send_blob(struct ctdbd_connection *conn,
1885                                    uint32_t dst_vnn, uint64_t dst_srvid,
1886                                    const uint8_t *buf, size_t buflen)
1887 {
1888         return NT_STATUS_NOT_IMPLEMENTED;
1889 }
1890
1891 #endif