ctdb-protocol: Add req_tunnel counts to ctdb statistics
[samba.git] / ctdb / protocol / protocol_types.c
1 /*
2    CTDB protocol marshalling
3
4    Copyright (C) Amitay Isaacs  2015-2017
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22
23 #include <talloc.h>
24 #include <tdb.h>
25
26 #include "protocol.h"
27 #include "protocol_private.h"
28 #include "protocol_api.h"
29
30 size_t ctdb_tdb_data_len(TDB_DATA *in)
31 {
32         return in->dsize > UINT32_MAX ? UINT32_MAX : in->dsize;
33 }
34
35 void ctdb_tdb_data_push(TDB_DATA *in, uint8_t *buf, size_t *npush)
36 {
37         size_t len = ctdb_tdb_data_len(in);
38
39         if (len > 0) {
40                 memcpy(buf, in->dptr, len);
41         }
42
43         *npush = len;
44 }
45
46 int ctdb_tdb_data_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
47                        TDB_DATA *out, size_t *npull)
48 {
49         TDB_DATA val;
50
51         if (buflen > UINT32_MAX) {
52                 return EMSGSIZE;
53         }
54
55         val.dsize = buflen;
56         if (val.dsize > 0) {
57                 val.dptr = talloc_memdup(mem_ctx, buf, buflen);
58                 if (val.dptr == NULL) {
59                         return ENOMEM;
60                 }
61         } else {
62                 val.dptr = NULL;
63         }
64
65         *out = val;
66         *npull = buflen;
67         return 0;
68 }
69
70 size_t ctdb_tdb_datan_len(TDB_DATA *in)
71 {
72         uint32_t u32 = ctdb_tdb_data_len(in);
73
74         return ctdb_uint32_len(&u32) + u32;
75 }
76
77 void ctdb_tdb_datan_push(TDB_DATA *in, uint8_t *buf, size_t *npush)
78 {
79         size_t offset = 0, np;
80         uint32_t u32 = ctdb_tdb_data_len(in);
81
82         ctdb_uint32_push(&u32, buf+offset, &np);
83         offset += np;
84
85         ctdb_tdb_data_push(in, buf+offset, &np);
86         offset += np;
87
88         *npush = offset;
89 }
90
91 int ctdb_tdb_datan_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
92                         TDB_DATA *out, size_t *npull)
93 {
94         size_t offset = 0, np;
95         uint32_t u32;
96         int ret;
97
98         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &u32, &np);
99         if (ret != 0) {
100                 return ret;
101         }
102         offset += np;
103
104         if (buflen-offset < u32) {
105                 return EMSGSIZE;
106         }
107
108         ret = ctdb_tdb_data_pull(buf+offset, u32, mem_ctx, out, &np);
109         if (ret != 0) {
110                 return ret;
111         }
112         offset += np;
113
114         *npull = offset;
115         return 0;
116 }
117
118 size_t ctdb_latency_counter_len(struct ctdb_latency_counter *in)
119 {
120         return ctdb_int32_len(&in->num) +
121                 ctdb_padding_len(4) +
122                 ctdb_double_len(&in->min) +
123                 ctdb_double_len(&in->max) +
124                 ctdb_double_len(&in->total);
125 }
126
127 void ctdb_latency_counter_push(struct ctdb_latency_counter *in, uint8_t *buf,
128                                size_t *npush)
129 {
130         size_t offset = 0, np;
131
132         ctdb_int32_push(&in->num, buf+offset, &np);
133         offset += np;
134
135         ctdb_padding_push(4, buf+offset, &np);
136         offset += np;
137
138         ctdb_double_push(&in->min, buf+offset, &np);
139         offset += np;
140
141         ctdb_double_push(&in->max, buf+offset, &np);
142         offset += np;
143
144         ctdb_double_push(&in->total, buf+offset, &np);
145         offset += np;
146
147         *npush = offset;
148 }
149
150 int ctdb_latency_counter_pull(uint8_t *buf, size_t buflen,
151                               struct ctdb_latency_counter *out, size_t *npull)
152 {
153         size_t offset = 0, np;
154         int ret;
155
156         ret = ctdb_int32_pull(buf+offset, buflen-offset, &out->num, &np);
157         if (ret != 0) {
158                 return ret;
159         }
160         offset += np;
161
162         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
163         if (ret != 0) {
164                 return ret;
165         }
166         offset += np;
167
168         ret = ctdb_double_pull(buf+offset, buflen-offset, &out->min, &np);
169         if (ret != 0) {
170                 return ret;
171         }
172         offset += np;
173
174         ret = ctdb_double_pull(buf+offset, buflen-offset, &out->max, &np);
175         if (ret != 0) {
176                 return ret;
177         }
178         offset += np;
179
180         ret = ctdb_double_pull(buf+offset, buflen-offset, &out->total, &np);
181         if (ret != 0) {
182                 return ret;
183         }
184         offset += np;
185
186         *npull = offset;
187         return 0;
188 }
189
190 size_t ctdb_statistics_len(struct ctdb_statistics *in)
191 {
192         return ctdb_uint32_len(&in->num_clients) +
193                 ctdb_uint32_len(&in->frozen) +
194                 ctdb_uint32_len(&in->recovering) +
195                 ctdb_uint32_len(&in->client_packets_sent) +
196                 ctdb_uint32_len(&in->client_packets_recv) +
197                 ctdb_uint32_len(&in->node_packets_sent) +
198                 ctdb_uint32_len(&in->node_packets_recv) +
199                 ctdb_uint32_len(&in->keepalive_packets_sent) +
200                 ctdb_uint32_len(&in->keepalive_packets_recv) +
201                 ctdb_uint32_len(&in->node.req_call) +
202                 ctdb_uint32_len(&in->node.reply_call) +
203                 ctdb_uint32_len(&in->node.req_dmaster) +
204                 ctdb_uint32_len(&in->node.reply_dmaster) +
205                 ctdb_uint32_len(&in->node.reply_error) +
206                 ctdb_uint32_len(&in->node.req_message) +
207                 ctdb_uint32_len(&in->node.req_control) +
208                 ctdb_uint32_len(&in->node.reply_control) +
209                 ctdb_uint32_len(&in->node.req_tunnel) +
210                 ctdb_uint32_len(&in->client.req_call) +
211                 ctdb_uint32_len(&in->client.req_message) +
212                 ctdb_uint32_len(&in->client.req_control) +
213                 ctdb_uint32_len(&in->client.req_tunnel) +
214                 ctdb_uint32_len(&in->timeouts.call) +
215                 ctdb_uint32_len(&in->timeouts.control) +
216                 ctdb_uint32_len(&in->timeouts.traverse) +
217                 ctdb_padding_len(4) +
218                 ctdb_latency_counter_len(&in->reclock.ctdbd) +
219                 ctdb_latency_counter_len(&in->reclock.recd) +
220                 ctdb_uint32_len(&in->locks.num_calls) +
221                 ctdb_uint32_len(&in->locks.num_current) +
222                 ctdb_uint32_len(&in->locks.num_pending) +
223                 ctdb_uint32_len(&in->locks.num_failed) +
224                 ctdb_latency_counter_len(&in->locks.latency) +
225                 MAX_COUNT_BUCKETS * ctdb_uint32_len(&in->locks.buckets[0]) +
226                 ctdb_uint32_len(&in->total_calls) +
227                 ctdb_uint32_len(&in->pending_calls) +
228                 ctdb_uint32_len(&in->childwrite_calls) +
229                 ctdb_uint32_len(&in->pending_childwrite_calls) +
230                 ctdb_uint32_len(&in->memory_used) +
231                 ctdb_uint32_len(&in->__last_counter) +
232                 ctdb_uint32_len(&in->max_hop_count) +
233                 MAX_COUNT_BUCKETS *
234                         ctdb_uint32_len(&in->hop_count_bucket[0]) +
235                 ctdb_padding_len(4) +
236                 ctdb_latency_counter_len(&in->call_latency) +
237                 ctdb_latency_counter_len(&in->childwrite_latency) +
238                 ctdb_uint32_len(&in->num_recoveries) +
239                 ctdb_padding_len(4) +
240                 ctdb_timeval_len(&in->statistics_start_time) +
241                 ctdb_timeval_len(&in->statistics_current_time) +
242                 ctdb_uint32_len(&in->total_ro_delegations) +
243                 ctdb_uint32_len(&in->total_ro_revokes);
244 }
245
246 void ctdb_statistics_push(struct ctdb_statistics *in, uint8_t *buf,
247                           size_t *npush)
248 {
249         size_t offset = 0, np;
250         int i;
251
252         ctdb_uint32_push(&in->num_clients, buf+offset, &np);
253         offset += np;
254
255         ctdb_uint32_push(&in->frozen, buf+offset, &np);
256         offset += np;
257
258         ctdb_uint32_push(&in->recovering, buf+offset, &np);
259         offset += np;
260
261         ctdb_uint32_push(&in->client_packets_sent, buf+offset, &np);
262         offset += np;
263
264         ctdb_uint32_push(&in->client_packets_recv, buf+offset, &np);
265         offset += np;
266
267         ctdb_uint32_push(&in->node_packets_sent, buf+offset, &np);
268         offset += np;
269
270         ctdb_uint32_push(&in->node_packets_recv, buf+offset, &np);
271         offset += np;
272
273         ctdb_uint32_push(&in->keepalive_packets_sent, buf+offset, &np);
274         offset += np;
275
276         ctdb_uint32_push(&in->keepalive_packets_recv, buf+offset, &np);
277         offset += np;
278
279         ctdb_uint32_push(&in->node.req_call, buf+offset, &np);
280         offset += np;
281
282         ctdb_uint32_push(&in->node.reply_call, buf+offset, &np);
283         offset += np;
284
285         ctdb_uint32_push(&in->node.req_dmaster, buf+offset, &np);
286         offset += np;
287
288         ctdb_uint32_push(&in->node.reply_dmaster, buf+offset, &np);
289         offset += np;
290
291         ctdb_uint32_push(&in->node.reply_error, buf+offset, &np);
292         offset += np;
293
294         ctdb_uint32_push(&in->node.req_message, buf+offset, &np);
295         offset += np;
296
297         ctdb_uint32_push(&in->node.req_control, buf+offset, &np);
298         offset += np;
299
300         ctdb_uint32_push(&in->node.reply_control, buf+offset, &np);
301         offset += np;
302
303         ctdb_uint32_push(&in->node.req_tunnel, buf+offset, &np);
304         offset += np;
305
306         ctdb_uint32_push(&in->client.req_call, buf+offset, &np);
307         offset += np;
308
309         ctdb_uint32_push(&in->client.req_message, buf+offset, &np);
310         offset += np;
311
312         ctdb_uint32_push(&in->client.req_control, buf+offset, &np);
313         offset += np;
314
315         ctdb_uint32_push(&in->client.req_tunnel, buf+offset, &np);
316         offset += np;
317
318         ctdb_uint32_push(&in->timeouts.call, buf+offset, &np);
319         offset += np;
320
321         ctdb_uint32_push(&in->timeouts.control, buf+offset, &np);
322         offset += np;
323
324         ctdb_uint32_push(&in->timeouts.traverse, buf+offset, &np);
325         offset += np;
326
327         ctdb_padding_push(4, buf+offset, &np);
328         offset += np;
329
330         ctdb_latency_counter_push(&in->reclock.ctdbd, buf+offset, &np);
331         offset += np;
332
333         ctdb_latency_counter_push(&in->reclock.recd, buf+offset, &np);
334         offset += np;
335
336         ctdb_uint32_push(&in->locks.num_calls, buf+offset, &np);
337         offset += np;
338
339         ctdb_uint32_push(&in->locks.num_current, buf+offset, &np);
340         offset += np;
341
342         ctdb_uint32_push(&in->locks.num_pending, buf+offset, &np);
343         offset += np;
344
345         ctdb_uint32_push(&in->locks.num_failed, buf+offset, &np);
346         offset += np;
347
348         ctdb_latency_counter_push(&in->locks.latency, buf+offset, &np);
349         offset += np;
350
351         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
352                 ctdb_uint32_push(&in->locks.buckets[i], buf+offset, &np);
353                 offset += np;
354         }
355
356         ctdb_uint32_push(&in->total_calls, buf+offset, &np);
357         offset += np;
358
359         ctdb_uint32_push(&in->pending_calls, buf+offset, &np);
360         offset += np;
361
362         ctdb_uint32_push(&in->childwrite_calls, buf+offset, &np);
363         offset += np;
364
365         ctdb_uint32_push(&in->pending_childwrite_calls, buf+offset, &np);
366         offset += np;
367
368         ctdb_uint32_push(&in->memory_used, buf+offset, &np);
369         offset += np;
370
371         ctdb_uint32_push(&in->__last_counter, buf+offset, &np);
372         offset += np;
373
374         ctdb_uint32_push(&in->max_hop_count, buf+offset, &np);
375         offset += np;
376
377         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
378                 ctdb_uint32_push(&in->hop_count_bucket[i], buf+offset, &np);
379                 offset += np;
380         }
381
382         ctdb_padding_push(4, buf+offset, &np);
383         offset += np;
384
385         ctdb_latency_counter_push(&in->call_latency, buf+offset, &np);
386         offset += np;
387
388         ctdb_latency_counter_push(&in->childwrite_latency, buf+offset, &np);
389         offset += np;
390
391         ctdb_uint32_push(&in->num_recoveries, buf+offset, &np);
392         offset += np;
393
394         ctdb_padding_push(4, buf+offset, &np);
395         offset += np;
396
397         ctdb_timeval_push(&in->statistics_start_time, buf+offset, &np);
398         offset += np;
399
400         ctdb_timeval_push(&in->statistics_current_time, buf+offset, &np);
401         offset += np;
402
403         ctdb_uint32_push(&in->total_ro_delegations, buf+offset, &np);
404         offset += np;
405
406         ctdb_uint32_push(&in->total_ro_revokes, buf+offset, &np);
407         offset += np;
408
409         *npush = offset;
410 }
411
412 static int ctdb_statistics_pull_elems(uint8_t *buf, size_t buflen,
413                                       TALLOC_CTX *mem_ctx,
414                                       struct ctdb_statistics *out,
415                                       size_t *npull)
416 {
417         size_t offset = 0, np;
418         int ret, i;
419
420         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->num_clients,
421                                &np);
422         if (ret != 0) {
423                 return ret;
424         }
425         offset += np;
426
427         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->frozen, &np);
428         if (ret != 0) {
429                 return ret;
430         }
431         offset += np;
432
433         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->recovering,
434                                &np);
435         if (ret != 0) {
436                 return ret;
437         }
438         offset += np;
439
440         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
441                                &out->client_packets_sent, &np);
442         if (ret != 0) {
443                 return ret;
444         }
445         offset += np;
446
447         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
448                                &out->client_packets_recv, &np);
449         if (ret != 0) {
450                 return ret;
451         }
452         offset += np;
453
454         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
455                                &out->node_packets_sent, &np);
456         if (ret != 0) {
457                 return ret;
458         }
459         offset += np;
460
461         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
462                                &out->node_packets_recv, &np);
463         if (ret != 0) {
464                 return ret;
465         }
466         offset += np;
467
468         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
469                                &out->keepalive_packets_sent, &np);
470         if (ret != 0) {
471                 return ret;
472         }
473         offset += np;
474
475         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
476                                &out->keepalive_packets_recv, &np);
477         if (ret != 0) {
478                 return ret;
479         }
480         offset += np;
481
482         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
483                                &out->node.req_call, &np);
484         if (ret != 0) {
485                 return ret;
486         }
487         offset += np;
488
489         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
490                                &out->node.reply_call, &np);
491         if (ret != 0) {
492                 return ret;
493         }
494         offset += np;
495
496         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
497                                &out->node.req_dmaster, &np);
498         if (ret != 0) {
499                 return ret;
500         }
501         offset += np;
502
503         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
504                                &out->node.reply_dmaster, &np);
505         if (ret != 0) {
506                 return ret;
507         }
508         offset += np;
509
510         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
511                                &out->node.reply_error, &np);
512         if (ret != 0) {
513                 return ret;
514         }
515         offset += np;
516
517         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
518                                &out->node.req_message, &np);
519         if (ret != 0) {
520                 return ret;
521         }
522         offset += np;
523
524         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
525                                &out->node.req_control, &np);
526         if (ret != 0) {
527                 return ret;
528         }
529         offset += np;
530
531         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
532                                &out->node.reply_control, &np);
533         if (ret != 0) {
534                 return ret;
535         }
536         offset += np;
537
538         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
539                                &out->node.req_tunnel, &np);
540         if (ret != 0) {
541                 return ret;
542         }
543         offset += np;
544
545         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
546                                &out->client.req_call, &np);
547         if (ret != 0) {
548                 return ret;
549         }
550         offset += np;
551
552         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
553                                &out->client.req_message, &np);
554         if (ret != 0) {
555                 return ret;
556         }
557         offset += np;
558
559         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
560                                &out->client.req_control, &np);
561         if (ret != 0) {
562                 return ret;
563         }
564         offset += np;
565
566         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
567                                &out->client.req_tunnel, &np);
568         if (ret != 0) {
569                 return ret;
570         }
571         offset += np;
572
573         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
574                                &out->timeouts.call, &np);
575         if (ret != 0) {
576                 return ret;
577         }
578         offset += np;
579
580         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
581                                &out->timeouts.control, &np);
582         if (ret != 0) {
583                 return ret;
584         }
585         offset += np;
586
587         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
588                                &out->timeouts.traverse, &np);
589         if (ret != 0) {
590                 return ret;
591         }
592         offset += np;
593
594         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
595         if (ret != 0) {
596                 return ret;
597         }
598         offset += np;
599
600         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
601                                         &out->reclock.ctdbd, &np);
602         if (ret != 0) {
603                 return ret;
604         }
605         offset += np;
606
607         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
608                                         &out->reclock.recd, &np);
609         if (ret != 0) {
610                 return ret;
611         }
612         offset += np;
613
614         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
615                                &out->locks.num_calls, &np);
616         if (ret != 0) {
617                 return ret;
618         }
619         offset += np;
620
621         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
622                                &out->locks.num_current, &np);
623         if (ret != 0) {
624                 return ret;
625         }
626         offset += np;
627
628         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
629                                &out->locks.num_pending, &np);
630         if (ret != 0) {
631                 return ret;
632         }
633         offset += np;
634
635         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
636                                &out->locks.num_failed, &np);
637         if (ret != 0) {
638                 return ret;
639         }
640         offset += np;
641
642         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
643                                         &out->locks.latency, &np);
644         if (ret != 0) {
645                 return ret;
646         }
647         offset += np;
648
649         for (i=0;  i<MAX_COUNT_BUCKETS; i++) {
650                 ret = ctdb_uint32_pull(buf+offset, buflen-offset,
651                                        &out->locks.buckets[i], &np);
652                 if (ret != 0) {
653                         return ret;
654                 }
655                 offset += np;
656         }
657
658         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
659                                &out->total_calls, &np);
660         if (ret != 0) {
661                 return ret;
662         }
663         offset += np;
664
665         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
666                                &out->pending_calls, &np);
667         if (ret != 0) {
668                 return ret;
669         }
670         offset += np;
671
672         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
673                                &out->childwrite_calls, &np);
674         if (ret != 0) {
675                 return ret;
676         }
677         offset += np;
678
679         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
680                                &out->pending_childwrite_calls, &np);
681         if (ret != 0) {
682                 return ret;
683         }
684         offset += np;
685
686         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->memory_used,
687                                &np);
688         if (ret != 0) {
689                 return ret;
690         }
691         offset += np;
692
693         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
694                                &out->__last_counter, &np);
695         if (ret != 0) {
696                 return ret;
697         }
698         offset += np;
699
700         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
701                                &out->max_hop_count, &np);
702         if (ret != 0) {
703                 return ret;
704         }
705         offset += np;
706
707         for (i=0;  i<MAX_COUNT_BUCKETS; i++) {
708                 ret = ctdb_uint32_pull(buf+offset, buflen-offset,
709                                        &out->hop_count_bucket[i], &np);
710                 if (ret != 0) {
711                         return ret;
712                 }
713                 offset += np;
714         }
715
716         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
717         if (ret != 0) {
718                 return ret;
719         }
720         offset += np;
721
722         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
723                                         &out->call_latency, &np);
724         if (ret != 0) {
725                 return ret;
726         }
727         offset += np;
728
729         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
730                                         &out->childwrite_latency, &np);
731         if (ret != 0) {
732                 return ret;
733         }
734         offset += np;
735
736         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
737                                &out->num_recoveries, &np);
738         if (ret != 0) {
739                 return ret;
740         }
741         offset += np;
742
743         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
744         if (ret != 0) {
745                 return ret;
746         }
747         offset += np;
748
749         ret = ctdb_timeval_pull(buf+offset, buflen-offset,
750                                 &out->statistics_start_time, &np);
751         if (ret != 0) {
752                 return ret;
753         }
754         offset += np;
755
756         ret = ctdb_timeval_pull(buf+offset, buflen-offset,
757                                 &out->statistics_current_time, &np);
758         if (ret != 0) {
759                 return ret;
760         }
761         offset += np;
762
763         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
764                                &out->total_ro_delegations, &np);
765         if (ret != 0) {
766                 return ret;
767         }
768         offset += np;
769
770         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
771                                &out->total_ro_revokes, &np);
772         if (ret != 0) {
773                 return ret;
774         }
775         offset += np;
776
777         *npull = offset;
778         return 0;
779 }
780
781 int ctdb_statistics_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
782                          struct ctdb_statistics **out, size_t *npull)
783 {
784         struct ctdb_statistics *val;
785         size_t np;
786         int ret;
787
788         val = talloc(mem_ctx, struct ctdb_statistics);
789         if (val == NULL) {
790                 return ENOMEM;
791         }
792
793         ret = ctdb_statistics_pull_elems(buf, buflen, val, val, &np);
794         if (ret != 0) {
795                 talloc_free(val);
796                 return ret;
797         }
798
799         *out = val;
800         *npull = np;
801         return 0;
802 }
803
804 size_t ctdb_statistics_list_len(struct ctdb_statistics_list *in)
805 {
806         size_t len;
807
808         len = ctdb_int32_len(&in->num) + ctdb_padding_len(4);
809         if (in->num > 0) {
810                 len += in->num * ctdb_statistics_len(&in->stats[0]);
811         }
812
813         return len;
814 }
815
816 void ctdb_statistics_list_push(struct ctdb_statistics_list *in,
817                                uint8_t *buf, size_t *npush)
818 {
819         size_t offset = 0, np;
820         int i;
821
822         ctdb_int32_push(&in->num, buf+offset, &np);
823         offset += np;
824
825         ctdb_padding_push(4, buf+offset, &np);
826         offset += np;
827
828         for (i=0; i<in->num; i++) {
829                 ctdb_statistics_push(&in->stats[i], buf+offset, &np);
830                 offset += np;
831         }
832
833         *npush = offset;
834 }
835
836 int ctdb_statistics_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
837                               struct ctdb_statistics_list **out,
838                               size_t *npull)
839 {
840         struct ctdb_statistics_list *val;
841         size_t offset = 0, np;
842         int ret, i;
843
844         val = talloc(mem_ctx, struct ctdb_statistics_list);
845         if (val == NULL) {
846                 return ENOMEM;
847         }
848
849         ret = ctdb_int32_pull(buf+offset, buflen-offset, &val->num, &np);
850         if (ret != 0) {
851                 goto fail;
852         }
853         offset += np;
854
855         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
856         if (ret != 0) {
857                 goto fail;
858         }
859         offset += np;
860
861         if (val->num == 0) {
862                 val->stats = NULL;
863                 goto done;
864         }
865
866         val->stats = talloc_array(val, struct ctdb_statistics, val->num);
867         if (val->stats == NULL) {
868                 ret = ENOMEM;
869                 goto fail;
870         }
871
872         for (i=0; i<val->num; i++) {
873                 ret = ctdb_statistics_pull_elems(buf+offset, buflen-offset,
874                                                  val, &val->stats[i], &np);
875                 if (ret != 0) {
876                         goto fail;
877                 }
878                 offset += np;
879         }
880
881 done:
882         *out = val;
883         *npull = offset;
884         return 0;
885
886 fail:
887         talloc_free(val);
888         return ret;
889 }
890
891 size_t ctdb_vnn_map_len(struct ctdb_vnn_map *in)
892 {
893         size_t len;
894
895         len = ctdb_uint32_len(&in->generation) + ctdb_uint32_len(&in->size);
896         if (in->size > 0) {
897                 len += in->size * ctdb_uint32_len(&in->map[0]);
898         }
899
900         return len;
901 }
902
903 void ctdb_vnn_map_push(struct ctdb_vnn_map *in, uint8_t *buf, size_t *npush)
904 {
905         size_t offset = 0, np;
906         uint32_t i;
907
908         ctdb_uint32_push(&in->generation, buf+offset, &np);
909         offset += np;
910
911         ctdb_uint32_push(&in->size, buf+offset, &np);
912         offset += np;
913
914         for (i=0; i<in->size; i++) {
915                 ctdb_uint32_push(&in->map[i], buf+offset, &np);
916                 offset += np;
917         }
918
919         *npush = offset;
920 }
921
922 int ctdb_vnn_map_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
923                       struct ctdb_vnn_map **out, size_t *npull)
924 {
925         struct ctdb_vnn_map *val;
926         size_t offset = 0, np;
927         uint32_t i;
928         int ret;
929
930         val = talloc(mem_ctx, struct ctdb_vnn_map);
931         if (val == NULL) {
932                 return ENOMEM;
933         }
934
935         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->generation,
936                                &np);
937         if (ret != 0) {
938                 goto fail;
939         }
940         offset += np;
941
942         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->size, &np);
943         if (ret != 0) {
944                 goto fail;
945         }
946         offset += np;
947
948         if (val->size == 0) {
949                 val->map = NULL;
950                 goto done;
951         }
952
953         val->map = talloc_array(val, uint32_t, val->size);
954         if (val->map == NULL) {
955                 ret = ENOMEM;
956                 goto fail;
957         }
958
959         for (i=0; i<val->size; i++) {
960                 ret = ctdb_uint32_pull(buf+offset, buflen-offset,
961                                        &val->map[i], &np);
962                 if (ret != 0) {
963                         goto fail;
964                 }
965                 offset += np;
966         }
967
968 done:
969         *out = val;
970         *npull = offset;
971         return 0;
972
973 fail:
974         talloc_free(val);
975         return ret;
976 }
977
978 size_t ctdb_dbid_len(struct ctdb_dbid *in)
979 {
980         return ctdb_uint32_len(&in->db_id) +
981                 ctdb_uint8_len(&in->flags) +
982                 ctdb_padding_len(3);
983 }
984
985 void ctdb_dbid_push(struct ctdb_dbid *in, uint8_t *buf, size_t *npush)
986 {
987         size_t offset = 0, np;
988
989         ctdb_uint32_push(&in->db_id, buf+offset, &np);
990         offset += np;
991
992         ctdb_uint8_push(&in->flags, buf+offset, &np);
993         offset += np;
994
995         ctdb_padding_push(3, buf+offset, &np);
996         offset += np;
997
998         *npush = offset;
999 }
1000
1001 static int ctdb_dbid_pull_elems(uint8_t *buf, size_t buflen,
1002                                 TALLOC_CTX *mem_ctx, struct ctdb_dbid *out,
1003                                 size_t *npull)
1004 {
1005         size_t offset = 0, np;
1006         int ret;
1007
1008         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->db_id, &np);
1009         if (ret != 0) {
1010                 return ret;
1011         }
1012         offset += np;
1013
1014         ret = ctdb_uint8_pull(buf+offset, buflen-offset, &out->flags, &np);
1015         if (ret != 0) {
1016                 return ret;
1017         }
1018         offset += np;
1019
1020         ret = ctdb_padding_pull(buf+offset, buflen-offset, 3, &np);
1021         if (ret != 0) {
1022                 return ret;
1023         }
1024         offset += np;
1025
1026         *npull = offset;
1027         return 0;
1028 }
1029
1030 int ctdb_dbid_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1031                    struct ctdb_dbid **out, size_t *npull)
1032 {
1033         struct ctdb_dbid *val;
1034         size_t np;
1035         int ret;
1036
1037         val = talloc(mem_ctx, struct ctdb_dbid);
1038         if (val == NULL) {
1039                 return ENOMEM;
1040         }
1041
1042         ret = ctdb_dbid_pull_elems(buf, buflen, val, val, &np);
1043         if (ret != 0) {
1044                 talloc_free(val);
1045                 return ret;
1046         }
1047
1048         *out = val;
1049         *npull = np;
1050         return 0;
1051 }
1052
1053 size_t ctdb_dbid_map_len(struct ctdb_dbid_map *in)
1054 {
1055         size_t len;
1056
1057         len = ctdb_uint32_len(&in->num);
1058         if (in->num > 0) {
1059                 len += in->num * ctdb_dbid_len(&in->dbs[0]);
1060         }
1061
1062         return len;
1063 }
1064
1065 void ctdb_dbid_map_push(struct ctdb_dbid_map *in, uint8_t *buf, size_t *npush)
1066 {
1067         size_t offset = 0, np;
1068         uint32_t i;
1069
1070         ctdb_uint32_push(&in->num, buf+offset, &np);
1071         offset += np;
1072
1073         for (i=0; i<in->num; i++) {
1074                 ctdb_dbid_push(&in->dbs[i], buf+offset, &np);
1075                 offset += np;
1076         }
1077
1078         *npush = offset;
1079 }
1080
1081 int ctdb_dbid_map_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1082                        struct ctdb_dbid_map **out, size_t *npull)
1083 {
1084         struct ctdb_dbid_map *val;
1085         size_t offset = 0, np;
1086         uint32_t i;
1087         int ret;
1088
1089         val = talloc(mem_ctx, struct ctdb_dbid_map);
1090         if (val == NULL) {
1091                 return ENOMEM;
1092         }
1093
1094         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num, &np);
1095         if (ret != 0) {
1096                 goto fail;
1097         }
1098         offset += np;
1099
1100         if (val->num == 0) {
1101                 val->dbs = NULL;
1102                 goto done;
1103         }
1104
1105         val->dbs = talloc_array(val, struct ctdb_dbid, val->num);
1106         if (val->dbs == NULL) {
1107                 ret = ENOMEM;
1108                 goto fail;
1109         }
1110
1111         for (i=0; i<val->num; i++) {
1112                 ret = ctdb_dbid_pull_elems(buf+offset, buflen-offset, val,
1113                                            &val->dbs[i], &np);
1114                 if (ret != 0) {
1115                         goto fail;
1116                 }
1117                 offset += np;
1118         }
1119
1120 done:
1121         *out = val;
1122         *npull = offset;
1123         return 0;
1124
1125 fail:
1126         talloc_free(val);
1127         return ret;
1128 }
1129
1130 size_t ctdb_pulldb_len(struct ctdb_pulldb *in)
1131 {
1132         return ctdb_uint32_len(&in->db_id) +
1133                 ctdb_uint32_len(&in->lmaster);
1134 }
1135
1136 void ctdb_pulldb_push(struct ctdb_pulldb *in, uint8_t *buf, size_t *npush)
1137 {
1138         size_t offset = 0, np;
1139
1140         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1141         offset += np;
1142
1143         ctdb_uint32_push(&in->lmaster, buf+offset, &np);
1144         offset += np;
1145
1146         *npush = offset;
1147 }
1148
1149 int ctdb_pulldb_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1150                      struct ctdb_pulldb **out, size_t *npull)
1151 {
1152         struct ctdb_pulldb *val;
1153         size_t offset = 0, np;
1154         int ret;
1155
1156         val = talloc(mem_ctx, struct ctdb_pulldb);
1157         if (val == NULL) {
1158                 return ENOMEM;
1159         }
1160
1161         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
1162         if (ret != 0) {
1163                 talloc_free(val);
1164                 return ret;
1165         }
1166         offset += np;
1167
1168         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->lmaster, &np);
1169         if (ret != 0) {
1170                 talloc_free(val);
1171                 return ret;
1172         }
1173         offset += np;
1174
1175         *out = val;
1176         *npull = offset;
1177         return 0;
1178 }
1179
1180 size_t ctdb_pulldb_ext_len(struct ctdb_pulldb_ext *in)
1181 {
1182         return ctdb_uint32_len(&in->db_id) +
1183                 ctdb_uint32_len(&in->lmaster) +
1184                 ctdb_uint64_len(&in->srvid);
1185 }
1186
1187 void ctdb_pulldb_ext_push(struct ctdb_pulldb_ext *in, uint8_t *buf,
1188                           size_t *npush)
1189 {
1190         size_t offset = 0, np;
1191
1192         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1193         offset += np;
1194
1195         ctdb_uint32_push(&in->lmaster, buf+offset, &np);
1196         offset += np;
1197
1198         ctdb_uint64_push(&in->srvid, buf+offset, &np);
1199         offset += np;
1200
1201         *npush = offset;
1202 }
1203
1204 int ctdb_pulldb_ext_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1205                          struct ctdb_pulldb_ext **out, size_t *npull)
1206 {
1207         struct ctdb_pulldb_ext *val;
1208         size_t offset = 0, np;
1209         int ret;
1210
1211         val = talloc(mem_ctx, struct ctdb_pulldb_ext);
1212         if (val == NULL) {
1213                 return ENOMEM;
1214         }
1215
1216         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
1217         if (ret != 0) {
1218                 goto fail;
1219         }
1220         offset += np;
1221
1222         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->lmaster, &np);
1223         if (ret != 0) {
1224                 goto fail;
1225         }
1226         offset += np;
1227
1228         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
1229         if (ret != 0) {
1230                 goto fail;
1231         }
1232         offset += np;
1233
1234         *out = val;
1235         *npull = offset;
1236         return 0;
1237
1238 fail:
1239         talloc_free(val);
1240         return ret;
1241 }
1242
1243 size_t ctdb_ltdb_header_len(struct ctdb_ltdb_header *in)
1244 {
1245         return ctdb_uint64_len(&in->rsn) +
1246                 ctdb_uint32_len(&in->dmaster) +
1247                 ctdb_uint32_len(&in->reserved1) +
1248                 ctdb_uint32_len(&in->flags) +
1249                 ctdb_padding_len(4);
1250 }
1251
1252 void ctdb_ltdb_header_push(struct ctdb_ltdb_header *in, uint8_t *buf,
1253                            size_t *npush)
1254 {
1255         size_t offset = 0, np;
1256
1257         ctdb_uint64_push(&in->rsn, buf+offset, &np);
1258         offset += np;
1259
1260         ctdb_uint32_push(&in->dmaster, buf+offset, &np);
1261         offset += np;
1262
1263         ctdb_uint32_push(&in->reserved1, buf+offset, &np);
1264         offset += np;
1265
1266         ctdb_uint32_push(&in->flags, buf+offset, &np);
1267         offset += np;
1268
1269         ctdb_padding_push(4, buf+offset, &np);
1270         offset += np;
1271
1272         *npush = offset;
1273 }
1274
1275 int ctdb_ltdb_header_pull(uint8_t *buf, size_t buflen,
1276                           struct ctdb_ltdb_header *out, size_t *npull)
1277 {
1278         size_t offset = 0, np;
1279         int ret;
1280
1281         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &out->rsn, &np);
1282         if (ret != 0) {
1283                 return ret;
1284         }
1285         offset += np;
1286
1287         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->dmaster, &np);
1288         if (ret != 0) {
1289                 return ret;
1290         }
1291         offset += np;
1292
1293         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->reserved1,
1294                                &np);
1295         if (ret != 0) {
1296                 return ret;
1297         }
1298         offset += np;
1299
1300         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->flags, &np);
1301         if (ret != 0) {
1302                 return ret;
1303         }
1304         offset += np;
1305
1306         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
1307         if (ret != 0) {
1308                 return ret;
1309         }
1310         offset += np;
1311
1312         *npull = offset;
1313         return 0;
1314 }
1315
1316 int ctdb_ltdb_header_extract(TDB_DATA *data, struct ctdb_ltdb_header *header)
1317 {
1318         size_t np;
1319         int ret;
1320
1321         ret = ctdb_ltdb_header_pull(data->dptr, data->dsize, header, &np);
1322         if (ret != 0) {
1323                 return ret;
1324         }
1325
1326         data->dptr += np;
1327         data->dsize -= np;
1328
1329         return 0;
1330 }
1331
1332 size_t ctdb_rec_data_len(struct ctdb_rec_data *in)
1333 {
1334         uint32_t u32;
1335
1336         u32 = ctdb_uint32_len(&in->reqid) +
1337                 ctdb_tdb_datan_len(&in->key) +
1338                 ctdb_tdb_datan_len(&in->data);
1339
1340         if (in->header != NULL) {
1341                 u32 += ctdb_ltdb_header_len(in->header);
1342         }
1343
1344         return ctdb_uint32_len(&u32) + u32;
1345 }
1346
1347 void ctdb_rec_data_push(struct ctdb_rec_data *in, uint8_t *buf, size_t *npush)
1348 {
1349         size_t offset = 0, np;
1350         uint32_t u32;
1351
1352         u32 = ctdb_rec_data_len(in);
1353         ctdb_uint32_push(&u32, buf+offset, &np);
1354         offset += np;
1355
1356         ctdb_uint32_push(&in->reqid, buf+offset, &np);
1357         offset += np;
1358
1359         u32 = ctdb_tdb_data_len(&in->key);
1360         ctdb_uint32_push(&u32, buf+offset, &np);
1361         offset += np;
1362
1363         u32 = ctdb_tdb_data_len(&in->data);
1364         if (in->header != NULL) {
1365                 u32 += ctdb_ltdb_header_len(in->header);
1366         }
1367
1368         ctdb_uint32_push(&u32, buf+offset, &np);
1369         offset += np;
1370
1371         ctdb_tdb_data_push(&in->key, buf+offset, &np);
1372         offset += np;
1373
1374         /* If ltdb header is not NULL, then it is pushed as part of the data */
1375         if (in->header != NULL) {
1376                 ctdb_ltdb_header_push(in->header, buf+offset, &np);
1377                 offset += np;
1378         }
1379         ctdb_tdb_data_push(&in->data, buf+offset, &np);
1380         offset += np;
1381
1382         *npush = offset;
1383 }
1384
1385 static int ctdb_rec_data_pull_data(uint8_t *buf, size_t buflen,
1386                                    uint32_t *reqid,
1387                                    TDB_DATA *key, TDB_DATA *data,
1388                                    size_t *npull)
1389 {
1390         size_t offset = 0, np;
1391         size_t len;
1392         uint32_t u32;
1393         int ret;
1394
1395         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &u32, &np);
1396         if (ret != 0) {
1397                 return ret;
1398         }
1399         offset += np;
1400
1401         if (buflen < u32) {
1402                 return EMSGSIZE;
1403         }
1404         len = u32;
1405
1406         ret = ctdb_uint32_pull(buf+offset, len-offset, reqid, &np);
1407         if (ret != 0) {
1408                 return ret;
1409         }
1410         offset += np;
1411
1412         ret = ctdb_uint32_pull(buf+offset, len-offset, &u32, &np);
1413         if (ret != 0) {
1414                 return ret;
1415         }
1416         offset += np;
1417         key->dsize = u32;
1418
1419         ret = ctdb_uint32_pull(buf+offset, len-offset, &u32, &np);
1420         if (ret != 0) {
1421                 return ret;
1422         }
1423         offset += np;
1424         data->dsize = u32;
1425
1426         if (len-offset < key->dsize) {
1427                 return EMSGSIZE;
1428         }
1429
1430         key->dptr = buf+offset;
1431         offset += key->dsize;
1432
1433         if (len-offset < data->dsize) {
1434                 return EMSGSIZE;
1435         }
1436
1437         data->dptr = buf+offset;
1438         offset += data->dsize;
1439
1440         *npull = offset;
1441         return 0;
1442 }
1443
1444 static int ctdb_rec_data_pull_elems(uint8_t *buf, size_t buflen,
1445                                     TALLOC_CTX *mem_ctx,
1446                                     struct ctdb_rec_data *out,
1447                                     size_t *npull)
1448 {
1449         uint32_t reqid;
1450         TDB_DATA key, data;
1451         size_t np;
1452         int ret;
1453
1454         ret = ctdb_rec_data_pull_data(buf, buflen, &reqid, &key, &data, &np);
1455         if (ret != 0) {
1456                 return ret;
1457         }
1458
1459         out->reqid = reqid;
1460
1461         /* Always set header to NULL.  If it is required, extract it using
1462          * ctdb_rec_data_extract_header()
1463          */
1464         out->header = NULL;
1465
1466         out->key.dsize = key.dsize;
1467         if (key.dsize > 0) {
1468                 out->key.dptr = talloc_memdup(mem_ctx, key.dptr, key.dsize);
1469                 if (out->key.dptr == NULL) {
1470                         return ENOMEM;
1471                 }
1472         }
1473
1474         out->data.dsize = data.dsize;
1475         if (data.dsize > 0) {
1476                 out->data.dptr = talloc_memdup(mem_ctx, data.dptr, data.dsize);
1477                 if (out->data.dptr == NULL) {
1478                         return ENOMEM;
1479                 }
1480         }
1481
1482         *npull = np;
1483         return 0;
1484 }
1485
1486 int ctdb_rec_data_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1487                        struct ctdb_rec_data **out, size_t *npull)
1488 {
1489         struct ctdb_rec_data *val;
1490         size_t np;
1491         int ret;
1492
1493         val = talloc(mem_ctx, struct ctdb_rec_data);
1494         if (val == NULL) {
1495                 return ENOMEM;
1496         }
1497
1498         ret = ctdb_rec_data_pull_elems(buf, buflen, val, val, &np);
1499         if (ret != 0) {
1500                 TALLOC_FREE(val);
1501                 return ret;
1502         }
1503
1504         *out = val;
1505         *npull = np;
1506         return ret;
1507 }
1508
1509 size_t ctdb_rec_buffer_len(struct ctdb_rec_buffer *in)
1510 {
1511         return ctdb_uint32_len(&in->db_id) +
1512                 ctdb_uint32_len(&in->count) +
1513                 in->buflen;
1514 }
1515
1516 void ctdb_rec_buffer_push(struct ctdb_rec_buffer *in, uint8_t *buf,
1517                           size_t *npush)
1518 {
1519         size_t offset = 0, np;
1520
1521         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1522         offset += np;
1523
1524         ctdb_uint32_push(&in->count, buf+offset, &np);
1525         offset += np;
1526
1527         memcpy(buf+offset, in->buf, in->buflen);
1528         offset += in->buflen;
1529
1530         *npush = offset;
1531 }
1532
1533 int ctdb_rec_buffer_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1534                          struct ctdb_rec_buffer **out, size_t *npull)
1535 {
1536         struct ctdb_rec_buffer *val;
1537         size_t offset = 0, np;
1538         size_t length;
1539         int ret;
1540
1541         val = talloc(mem_ctx, struct ctdb_rec_buffer);
1542         if (val == NULL) {
1543                 return ENOMEM;
1544         }
1545
1546         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
1547         if (ret != 0) {
1548                 goto fail;
1549         }
1550         offset += np;
1551
1552         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->count, &np);
1553         if (ret != 0) {
1554                 goto fail;
1555         }
1556         offset += np;
1557
1558         /* Since there is no buflen provided, walk the records to
1559          * validate the length of the buffer.
1560          */
1561         val->buf = buf+offset;
1562         val->buflen = buflen-offset;
1563
1564         length = 0;
1565         ret = ctdb_rec_buffer_traverse(val, NULL, &length);
1566         if (ret != 0) {
1567                 goto fail;
1568         }
1569
1570         if (length > buflen-offset) {
1571                 ret = EMSGSIZE;
1572                 goto fail;
1573         }
1574
1575         val->buf = talloc_memdup(val, buf+offset, length);
1576         if (val->buf == NULL) {
1577                 ret = ENOMEM;
1578                 goto fail;
1579         }
1580         val->buflen = length;
1581         offset += length;
1582
1583         *out = val;
1584         *npull = offset;
1585         return 0;
1586
1587 fail:
1588         talloc_free(val);
1589         return ret;
1590 }
1591
1592 struct ctdb_rec_buffer *ctdb_rec_buffer_init(TALLOC_CTX *mem_ctx,
1593                                              uint32_t db_id)
1594 {
1595         struct ctdb_rec_buffer *recbuf;
1596
1597         recbuf = talloc_zero(mem_ctx, struct ctdb_rec_buffer);
1598         if (recbuf == NULL) {
1599                 return recbuf;
1600         }
1601
1602         recbuf->db_id = db_id;
1603
1604         return recbuf;
1605 }
1606
1607 int ctdb_rec_buffer_add(TALLOC_CTX *mem_ctx, struct ctdb_rec_buffer *recbuf,
1608                         uint32_t reqid, struct ctdb_ltdb_header *header,
1609                         TDB_DATA key, TDB_DATA data)
1610 {
1611         struct ctdb_rec_data recdata;
1612         size_t len, np;
1613         uint8_t *ptr;
1614
1615         recdata.reqid = reqid;
1616         recdata.header = header;
1617         recdata.key = key;
1618         recdata.data = data;
1619
1620         len = ctdb_rec_data_len(&recdata);
1621
1622         ptr = talloc_realloc(mem_ctx, recbuf->buf, uint8_t,
1623                              recbuf->buflen + len);
1624         if (ptr == NULL) {
1625                 return ENOMEM;
1626         }
1627
1628         ctdb_rec_data_push(&recdata, &ptr[recbuf->buflen], &np);
1629
1630         recbuf->count++;
1631         recbuf->buf = ptr;
1632         recbuf->buflen += np;
1633         return 0;
1634 }
1635
1636 int ctdb_rec_buffer_traverse(struct ctdb_rec_buffer *recbuf,
1637                              ctdb_rec_parser_func_t func,
1638                              void *private_data)
1639 {
1640         TDB_DATA key, data;
1641         uint32_t reqid;
1642         size_t offset, reclen;
1643         int ret = 0, i;
1644
1645         offset = 0;
1646         for (i=0; i<recbuf->count; i++) {
1647                 ret = ctdb_rec_data_pull_data(&recbuf->buf[offset],
1648                                               recbuf->buflen - offset,
1649                                               &reqid, &key, &data, &reclen);
1650                 if (ret != 0) {
1651                         return ret;
1652                 }
1653
1654                 if (func != NULL) {
1655                         ret = func(reqid, NULL, key, data, private_data);
1656                         if (ret != 0) {
1657                                 break;
1658                         }
1659                 }
1660
1661                 offset += reclen;
1662         }
1663
1664         if (ret != 0) {
1665                 return ret;
1666         }
1667
1668         if (func == NULL) {
1669                 size_t *length = (size_t *)private_data;
1670
1671                 *length = offset;
1672         }
1673
1674         return 0;
1675 }
1676
1677 int ctdb_rec_buffer_write(struct ctdb_rec_buffer *recbuf, int fd)
1678 {
1679         ssize_t n;
1680
1681         n = write(fd, &recbuf->db_id, sizeof(uint32_t));
1682         if (n == -1 || n != sizeof(uint32_t)) {
1683                 return (errno != 0 ? errno : EIO);
1684         }
1685         n = write(fd, &recbuf->count, sizeof(uint32_t));
1686         if (n == -1 || n != sizeof(uint32_t)) {
1687                 return (errno != 0 ? errno : EIO);
1688         }
1689         n = write(fd, &recbuf->buflen, sizeof(size_t));
1690         if (n == -1 || n != sizeof(size_t)) {
1691                 return (errno != 0 ? errno : EIO);
1692         }
1693         n = write(fd, recbuf->buf, recbuf->buflen);
1694         if (n == -1 || n != recbuf->buflen) {
1695                 return (errno != 0 ? errno : EIO);
1696         }
1697
1698         return 0;
1699 }
1700
1701 int ctdb_rec_buffer_read(int fd, TALLOC_CTX *mem_ctx,
1702                          struct ctdb_rec_buffer **out)
1703 {
1704         struct ctdb_rec_buffer *recbuf;
1705         ssize_t n;
1706
1707         recbuf = talloc(mem_ctx, struct ctdb_rec_buffer);
1708         if (recbuf == NULL) {
1709                 return ENOMEM;
1710         }
1711
1712         n = read(fd, &recbuf->db_id, sizeof(uint32_t));
1713         if (n == -1 || n != sizeof(uint32_t)) {
1714                 return (errno != 0 ? errno : EIO);
1715         }
1716         n = read(fd, &recbuf->count, sizeof(uint32_t));
1717         if (n == -1 || n != sizeof(uint32_t)) {
1718                 return (errno != 0 ? errno : EIO);
1719         }
1720         n = read(fd, &recbuf->buflen, sizeof(size_t));
1721         if (n == -1 || n != sizeof(size_t)) {
1722                 return (errno != 0 ? errno : EIO);
1723         }
1724
1725         recbuf->buf = talloc_size(recbuf, recbuf->buflen);
1726         if (recbuf->buf == NULL) {
1727                 return ENOMEM;
1728         }
1729
1730         n = read(fd, recbuf->buf, recbuf->buflen);
1731         if (n == -1 || n != recbuf->buflen) {
1732                 return (errno != 0 ? errno : EIO);
1733         }
1734
1735         *out = recbuf;
1736         return 0;
1737 }
1738
1739 size_t ctdb_traverse_start_len(struct ctdb_traverse_start *in)
1740 {
1741         return ctdb_uint32_len(&in->db_id) +
1742                 ctdb_uint32_len(&in->reqid) +
1743                 ctdb_uint64_len(&in->srvid);
1744 }
1745
1746 void ctdb_traverse_start_push(struct ctdb_traverse_start *in, uint8_t *buf,
1747                               size_t *npush)
1748 {
1749         size_t offset = 0, np;
1750
1751         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1752         offset += np;
1753
1754         ctdb_uint32_push(&in->reqid, buf+offset, &np);
1755         offset += np;
1756
1757         ctdb_uint64_push(&in->srvid, buf+offset, &np);
1758         offset += np;
1759
1760         *npush = offset;
1761 }
1762
1763 int ctdb_traverse_start_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1764                              struct ctdb_traverse_start **out, size_t *npull)
1765 {
1766         struct ctdb_traverse_start *val;
1767         size_t offset = 0, np;
1768         int ret;
1769
1770         val = talloc(mem_ctx, struct ctdb_traverse_start);
1771         if (val == NULL) {
1772                 return ENOMEM;
1773         }
1774
1775         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
1776         if (ret != 0) {
1777                 goto fail;
1778         }
1779         offset += np;
1780
1781         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->reqid, &np);
1782         if (ret != 0) {
1783                 goto fail;
1784         }
1785         offset += np;
1786
1787         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
1788         if (ret != 0) {
1789                 goto fail;
1790         }
1791         offset += np;
1792
1793         *out = val;
1794         *npull = offset;
1795         return 0;
1796
1797 fail:
1798         talloc_free(val);
1799         return ret;
1800 }
1801
1802 size_t ctdb_traverse_all_len(struct ctdb_traverse_all *in)
1803 {
1804         return ctdb_uint32_len(&in->db_id) +
1805                 ctdb_uint32_len(&in->reqid) +
1806                 ctdb_uint32_len(&in->pnn) +
1807                 ctdb_uint32_len(&in->client_reqid) +
1808                 ctdb_uint64_len(&in->srvid);
1809 }
1810
1811 void ctdb_traverse_all_push(struct ctdb_traverse_all *in, uint8_t *buf,
1812                             size_t *npush)
1813 {
1814         size_t offset = 0, np;
1815
1816         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1817         offset += np;
1818
1819         ctdb_uint32_push(&in->reqid, buf+offset, &np);
1820         offset += np;
1821
1822         ctdb_uint32_push(&in->pnn, buf+offset, &np);
1823         offset += np;
1824
1825         ctdb_uint32_push(&in->client_reqid, buf+offset, &np);
1826         offset += np;
1827
1828         ctdb_uint64_push(&in->srvid, buf+offset, &np);
1829         offset += np;
1830
1831         *npush = offset;
1832 }
1833
1834 int ctdb_traverse_all_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
1835                            struct ctdb_traverse_all **out, size_t *npull)
1836 {
1837         struct ctdb_traverse_all *val;
1838         size_t offset = 0, np;
1839         int ret;
1840
1841         val = talloc(mem_ctx, struct ctdb_traverse_all);
1842         if (val == NULL) {
1843                 return ENOMEM;
1844         }
1845
1846         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
1847         if (ret != 0) {
1848                 goto fail;
1849         }
1850         offset += np;
1851
1852         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->reqid, &np);
1853         if (ret != 0) {
1854                 goto fail;
1855         }
1856         offset += np;
1857
1858         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
1859         if (ret != 0) {
1860                 goto fail;
1861         }
1862         offset += np;
1863
1864         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->client_reqid,
1865                                &np);
1866         if (ret != 0) {
1867                 goto fail;
1868         }
1869         offset += np;
1870
1871         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
1872         if (ret != 0) {
1873                 goto fail;
1874         }
1875         offset += np;
1876
1877         *out = val;
1878         *npull = offset;
1879         return 0;
1880
1881 fail:
1882         talloc_free(val);
1883         return ret;
1884 }
1885
1886 size_t ctdb_traverse_start_ext_len(struct ctdb_traverse_start_ext *in)
1887 {
1888         return ctdb_uint32_len(&in->db_id) +
1889                 ctdb_uint32_len(&in->reqid) +
1890                 ctdb_uint64_len(&in->srvid) +
1891                 ctdb_bool_len(&in->withemptyrecords) +
1892                 ctdb_padding_len(7);
1893 }
1894
1895 void ctdb_traverse_start_ext_push(struct ctdb_traverse_start_ext *in,
1896                                   uint8_t *buf, size_t *npush)
1897 {
1898         size_t offset = 0, np;
1899
1900         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1901         offset += np;
1902
1903         ctdb_uint32_push(&in->reqid, buf+offset, &np);
1904         offset += np;
1905
1906         ctdb_uint64_push(&in->srvid, buf+offset, &np);
1907         offset += np;
1908
1909         ctdb_bool_push(&in->withemptyrecords, buf+offset, &np);
1910         offset += np;
1911
1912         ctdb_padding_push(7, buf+offset, &np);
1913         offset += np;
1914
1915         *npush = offset;
1916 }
1917
1918 int ctdb_traverse_start_ext_pull(uint8_t *buf, size_t buflen,
1919                                  TALLOC_CTX *mem_ctx,
1920                                  struct ctdb_traverse_start_ext **out,
1921                                  size_t *npull)
1922 {
1923         struct ctdb_traverse_start_ext *val;
1924         size_t offset = 0, np;
1925         int ret;
1926
1927         val = talloc(mem_ctx, struct ctdb_traverse_start_ext);
1928         if (val == NULL) {
1929                 return ENOMEM;
1930         }
1931
1932         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
1933         if (ret != 0) {
1934                 goto fail;
1935         }
1936         offset += np;
1937
1938         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->reqid, &np);
1939         if (ret != 0) {
1940                 goto fail;
1941         }
1942         offset += np;
1943
1944         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
1945         if (ret != 0) {
1946                 goto fail;
1947         }
1948         offset += np;
1949
1950         ret = ctdb_bool_pull(buf+offset, buflen-offset,
1951                              &val->withemptyrecords, &np);
1952         if (ret != 0) {
1953                 goto fail;
1954         }
1955         offset += np;
1956
1957         ret = ctdb_padding_pull(buf+offset, buflen-offset, 7, &np);
1958         if (ret != 0) {
1959                 goto fail;
1960         }
1961         offset += np;
1962
1963         *out = val;
1964         *npull = offset;
1965         return 0;
1966
1967 fail:
1968         talloc_free(val);
1969         return ret;
1970 }
1971
1972 size_t ctdb_traverse_all_ext_len(struct ctdb_traverse_all_ext *in)
1973 {
1974         return ctdb_uint32_len(&in->db_id) +
1975                 ctdb_uint32_len(&in->reqid) +
1976                 ctdb_uint32_len(&in->pnn) +
1977                 ctdb_uint32_len(&in->client_reqid) +
1978                 ctdb_uint64_len(&in->srvid) +
1979                 ctdb_bool_len(&in->withemptyrecords) +
1980                 ctdb_padding_len(7);
1981 }
1982
1983 void ctdb_traverse_all_ext_push(struct ctdb_traverse_all_ext *in,
1984                                 uint8_t *buf, size_t *npush)
1985 {
1986         size_t offset = 0, np;
1987
1988         ctdb_uint32_push(&in->db_id, buf+offset, &np);
1989         offset += np;
1990
1991         ctdb_uint32_push(&in->reqid, buf+offset, &np);
1992         offset += np;
1993
1994         ctdb_uint32_push(&in->pnn, buf+offset, &np);
1995         offset += np;
1996
1997         ctdb_uint32_push(&in->client_reqid, buf+offset, &np);
1998         offset += np;
1999
2000         ctdb_uint64_push(&in->srvid, buf+offset, &np);
2001         offset += np;
2002
2003         ctdb_bool_push(&in->withemptyrecords, buf+offset, &np);
2004         offset += np;
2005
2006         ctdb_padding_push(7, buf+offset, &np);
2007         offset += np;
2008
2009         *npush = offset;
2010 }
2011
2012 int ctdb_traverse_all_ext_pull(uint8_t *buf, size_t buflen,
2013                                TALLOC_CTX *mem_ctx,
2014                                struct ctdb_traverse_all_ext **out,
2015                                size_t *npull)
2016 {
2017         struct ctdb_traverse_all_ext *val;
2018         size_t offset = 0, np;
2019         int ret;
2020
2021         val = talloc(mem_ctx, struct ctdb_traverse_all_ext);
2022         if (val == NULL) {
2023                 return ENOMEM;
2024         }
2025
2026         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
2027         if (ret != 0) {
2028                 goto fail;
2029         }
2030         offset += np;
2031
2032         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->reqid, &np);
2033         if (ret != 0) {
2034                 goto fail;
2035         }
2036         offset += np;
2037
2038         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
2039         if (ret != 0) {
2040                 goto fail;
2041         }
2042         offset += np;
2043
2044         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->client_reqid,
2045                                &np);
2046         if (ret != 0) {
2047                 goto fail;
2048         }
2049         offset += np;
2050
2051         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
2052         if (ret != 0) {
2053                 goto fail;
2054         }
2055         offset += np;
2056
2057         ret = ctdb_bool_pull(buf+offset, buflen-offset,
2058                              &val->withemptyrecords, &np);
2059         if (ret != 0) {
2060                 goto fail;
2061         }
2062         offset += np;
2063
2064         ret = ctdb_padding_pull(buf+offset, buflen-offset, 7, &np);
2065         if (ret != 0) {
2066                 goto fail;
2067         }
2068         offset += np;
2069
2070         *out = val;
2071         *npull = offset;
2072         return 0;
2073
2074 fail:
2075         talloc_free(val);
2076         return ret;
2077 }
2078
2079 size_t ctdb_sock_addr_len(ctdb_sock_addr *in)
2080 {
2081         return sizeof(ctdb_sock_addr);
2082 }
2083
2084 void ctdb_sock_addr_push(ctdb_sock_addr *in, uint8_t *buf, size_t *npush)
2085 {
2086         memcpy(buf, in, sizeof(ctdb_sock_addr));
2087         *npush = sizeof(ctdb_sock_addr);
2088 }
2089
2090 int ctdb_sock_addr_pull_elems(uint8_t *buf, size_t buflen,
2091                               TALLOC_CTX *mem_ctx, ctdb_sock_addr *out,
2092                               size_t *npull)
2093 {
2094         if (buflen < sizeof(ctdb_sock_addr)) {
2095                 return EMSGSIZE;
2096         }
2097
2098         memcpy(out, buf, sizeof(ctdb_sock_addr));
2099         *npull = sizeof(ctdb_sock_addr);
2100
2101         return 0;
2102 }
2103
2104 int ctdb_sock_addr_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
2105                         ctdb_sock_addr **out, size_t *npull)
2106 {
2107         ctdb_sock_addr *val;
2108         size_t np;
2109         int ret;
2110
2111         val = talloc(mem_ctx, ctdb_sock_addr);
2112         if (val == NULL) {
2113                 return ENOMEM;
2114         }
2115
2116         ret = ctdb_sock_addr_pull_elems(buf, buflen, val, val, &np);
2117         if (ret != 0) {
2118                 talloc_free(val);
2119                 return ret;
2120         }
2121
2122         *out = val;
2123         *npull = np;
2124         return ret;
2125 }
2126
2127 size_t ctdb_connection_len(struct ctdb_connection *in)
2128 {
2129         return ctdb_sock_addr_len(&in->src) +
2130                 ctdb_sock_addr_len(&in->dst);
2131 }
2132
2133 void ctdb_connection_push(struct ctdb_connection *in, uint8_t *buf,
2134                           size_t *npush)
2135 {
2136         size_t offset = 0, np;
2137
2138         ctdb_sock_addr_push(&in->src, buf+offset, &np);
2139         offset += np;
2140
2141         ctdb_sock_addr_push(&in->dst, buf+offset, &np);
2142         offset += np;
2143
2144         *npush = offset;
2145 }
2146
2147 static int ctdb_connection_pull_elems(uint8_t *buf, size_t buflen,
2148                                       TALLOC_CTX *mem_ctx,
2149                                       struct ctdb_connection *out,
2150                                       size_t *npull)
2151 {
2152         size_t offset = 0, np;
2153         int ret;
2154
2155         ret = ctdb_sock_addr_pull_elems(buf+offset, buflen-offset,
2156                                         mem_ctx, &out->src, &np);
2157         if (ret != 0) {
2158                 return ret;
2159         }
2160         offset += np;
2161
2162         ret = ctdb_sock_addr_pull_elems(buf+offset, buflen-offset,
2163                                         mem_ctx, &out->dst, &np);
2164         if (ret != 0) {
2165                 return ret;
2166         }
2167         offset += np;
2168
2169         *npull = offset;
2170         return 0;
2171 }
2172
2173 int ctdb_connection_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
2174                          struct ctdb_connection **out, size_t *npull)
2175 {
2176         struct ctdb_connection *val;
2177         size_t np;
2178         int ret;
2179
2180         val = talloc(mem_ctx, struct ctdb_connection);
2181         if (val == NULL) {
2182                 return ENOMEM;
2183         }
2184
2185         ret = ctdb_connection_pull_elems(buf, buflen, val, val, &np);
2186         if (ret != 0) {
2187                 talloc_free(val);
2188                 return ret;
2189         }
2190
2191         *out = val;
2192         *npull = np;
2193         return ret;
2194 }
2195
2196 size_t ctdb_connection_list_len(struct ctdb_connection_list *in)
2197 {
2198         size_t len;
2199
2200         len = ctdb_uint32_len(&in->num);
2201         if (in->num > 0) {
2202                 len += in->num * ctdb_connection_len(&in->conn[0]);
2203         }
2204
2205         return len;
2206 }
2207
2208 void ctdb_connection_list_push(struct ctdb_connection_list *in, uint8_t *buf,
2209                                size_t *npush)
2210 {
2211         size_t offset = 0, np;
2212         uint32_t i;
2213
2214         ctdb_uint32_push(&in->num, buf+offset, &np);
2215         offset += np;
2216
2217         for (i=0; i<in->num; i++) {
2218                 ctdb_connection_push(&in->conn[i], buf+offset, &np);
2219                 offset += np;
2220         }
2221
2222         *npush = offset;
2223 }
2224
2225 int ctdb_connection_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
2226                               struct ctdb_connection_list **out, size_t *npull)
2227 {
2228         struct ctdb_connection_list *val;
2229         size_t offset = 0, np;
2230         uint32_t i;
2231         int ret;
2232
2233         val = talloc(mem_ctx, struct ctdb_connection_list);
2234         if (val == NULL) {
2235                 return ENOMEM;
2236         }
2237
2238         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num, &np);
2239         if (ret != 0) {
2240                 goto fail;
2241         }
2242         offset += np;
2243
2244         if (val->num == 0) {
2245                 val->conn = NULL;
2246                 goto done;
2247         }
2248
2249         val->conn = talloc_array(val, struct ctdb_connection, val->num);
2250         if (val->conn == NULL) {
2251                 ret = ENOMEM;
2252                 goto fail;
2253         }
2254
2255         for (i=0; i<val->num; i++) {
2256                 ret = ctdb_connection_pull_elems(buf+offset, buflen-offset,
2257                                                  val, &val->conn[i], &np);
2258                 if (ret != 0) {
2259                         goto fail;
2260                 }
2261                 offset += np;
2262         }
2263
2264 done:
2265         *out = val;
2266         *npull = offset;
2267         return 0;
2268
2269 fail:
2270         talloc_free(val);
2271         return ret;
2272 }
2273
2274 size_t ctdb_tunable_len(struct ctdb_tunable *in)
2275 {
2276         return ctdb_uint32_len(&in->value) +
2277                 ctdb_stringn_len(&in->name);
2278 }
2279
2280 void ctdb_tunable_push(struct ctdb_tunable *in, uint8_t *buf, size_t *npush)
2281 {
2282         size_t offset = 0, np;
2283
2284         ctdb_uint32_push(&in->value, buf+offset, &np);
2285         offset += np;
2286
2287         ctdb_stringn_push(&in->name, buf+offset, &np);
2288         offset += np;
2289
2290         *npush = offset;
2291 }
2292
2293 int ctdb_tunable_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
2294                       struct ctdb_tunable **out, size_t *npull)
2295 {
2296         struct ctdb_tunable *val;
2297         size_t offset = 0, np;
2298         int ret;
2299
2300         val = talloc(mem_ctx, struct ctdb_tunable);
2301         if (val == NULL) {
2302                 return ENOMEM;
2303         }
2304
2305         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->value, &np);
2306         if (ret != 0) {
2307                 goto fail;
2308         }
2309         offset += np;
2310
2311         ret = ctdb_stringn_pull(buf+offset, buflen-offset, mem_ctx,
2312                                 &val->name, &np);
2313         if (ret != 0) {
2314                 goto fail;
2315         }
2316         offset += np;
2317
2318         *out = val;
2319         *npull = offset;
2320         return 0;
2321
2322 fail:
2323         talloc_free(val);
2324         return ret;
2325 }
2326
2327 size_t ctdb_node_flag_change_len(struct ctdb_node_flag_change *in)
2328 {
2329         return ctdb_uint32_len(&in->pnn) +
2330                 ctdb_uint32_len(&in->new_flags) +
2331                 ctdb_uint32_len(&in->old_flags);
2332 }
2333
2334 void ctdb_node_flag_change_push(struct ctdb_node_flag_change *in,
2335                                 uint8_t *buf, size_t *npush)
2336 {
2337         size_t offset = 0, np;
2338
2339         ctdb_uint32_push(&in->pnn, buf+offset, &np);
2340         offset += np;
2341
2342         ctdb_uint32_push(&in->new_flags, buf+offset, &np);
2343         offset += np;
2344
2345         ctdb_uint32_push(&in->old_flags, buf+offset, &np);
2346         offset += np;
2347
2348         *npush = offset;
2349 }
2350
2351 int ctdb_node_flag_change_pull(uint8_t *buf, size_t buflen,
2352                                TALLOC_CTX *mem_ctx,
2353                                struct ctdb_node_flag_change **out,
2354                                size_t *npull)
2355 {
2356         struct ctdb_node_flag_change *val;
2357         size_t offset = 0, np;
2358         int ret;
2359
2360         val = talloc(mem_ctx, struct ctdb_node_flag_change);
2361         if (val == NULL) {
2362                 return ENOMEM;
2363         }
2364
2365         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
2366         if (ret != 0) {
2367                 goto fail;
2368         }
2369         offset += np;
2370
2371         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->new_flags,
2372                                &np);
2373         if (ret != 0) {
2374                 goto fail;
2375         }
2376         offset += np;
2377
2378         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->old_flags,
2379                                &np);
2380         if (ret != 0) {
2381                 goto fail;
2382         }
2383         offset += np;
2384
2385         *out = val;
2386         *npull = offset;
2387         return 0;
2388
2389 fail:
2390         talloc_free(val);
2391         return ret;
2392 }
2393
2394 size_t ctdb_var_list_len(struct ctdb_var_list *in)
2395 {
2396         uint32_t u32 = 0;
2397         int i;
2398
2399         for (i=0; i<in->count; i++) {
2400                 u32 += ctdb_string_len(&in->var[i]);
2401         }
2402
2403         return ctdb_uint32_len(&u32) + u32;
2404 }
2405
2406 void ctdb_var_list_push(struct ctdb_var_list *in, uint8_t *buf, size_t *npush)
2407 {
2408         size_t offset = 0, np;
2409         uint32_t u32;
2410         int i;
2411         uint8_t sep = ':';
2412
2413         /* The length only corresponds to the payload size */
2414         u32 = ctdb_var_list_len(in);
2415         u32 -= ctdb_uint32_len(&u32);
2416
2417         ctdb_uint32_push(&u32, buf+offset, &np);
2418         offset += np;
2419
2420         /* The variables are separated by ':' and the complete string is null
2421          * terminated.
2422          */
2423         for (i=0; i<in->count; i++) {
2424                 ctdb_string_push(&in->var[i], buf+offset, &np);
2425                 offset += np;
2426
2427                 if (i < in->count - 1) {
2428                         /* Replace '\0' with ':' */
2429                         ctdb_uint8_push(&sep, buf+offset-1, &np);
2430                 }
2431         }
2432
2433         *npush = offset;
2434 }
2435
2436 int ctdb_var_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
2437                        struct ctdb_var_list **out, size_t *npull)
2438 {
2439         struct ctdb_var_list *val;
2440         const char *str, **list;
2441         char *s, *tok, *ptr = NULL;
2442         size_t offset = 0, np;
2443         uint32_t u32;
2444         int ret;
2445
2446         val = talloc_zero(mem_ctx, struct ctdb_var_list);
2447         if (val == NULL) {
2448                 return ENOMEM;
2449         }
2450
2451         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &u32, &np);
2452         if (ret != 0) {
2453                 goto fail;
2454         }
2455         offset += np;
2456
2457         if (buflen-offset < u32) {
2458                 ret = EMSGSIZE;
2459                 goto fail;
2460         }
2461
2462         ret = ctdb_string_pull(buf+offset, u32, val, &str, &np);
2463         if (ret != 0) {
2464                 goto fail;
2465         }
2466         offset += np;
2467
2468         s = discard_const(str);
2469         while ((tok = strtok_r(s, ":", &ptr)) != NULL) {
2470                 list = talloc_realloc(val, val->var, const char *,
2471                                       val->count+1);
2472                 if (list == NULL) {
2473                         ret = ENOMEM;
2474                         goto fail;
2475                 }
2476
2477                 val->var = list;
2478
2479                 s = talloc_strdup(val, tok);
2480                 if (s == NULL) {
2481                         ret = ENOMEM;
2482                         goto fail;
2483                 }
2484
2485                 val->var[val->count] = s;
2486                 val->count += 1;
2487                 s = NULL;
2488         }
2489
2490         talloc_free(discard_const(str));
2491         *out = val;
2492         *npull = offset;
2493         return 0;
2494
2495 fail:
2496         talloc_free(val);
2497         return ret;
2498 }
2499
2500 size_t ctdb_tunable_list_len(struct ctdb_tunable_list *in)
2501 {
2502         return ctdb_uint32_len(&in->max_redirect_count) +
2503                 ctdb_uint32_len(&in->seqnum_interval) +
2504                 ctdb_uint32_len(&in->control_timeout) +
2505                 ctdb_uint32_len(&in->traverse_timeout) +
2506                 ctdb_uint32_len(&in->keepalive_interval) +
2507                 ctdb_uint32_len(&in->keepalive_limit) +
2508                 ctdb_uint32_len(&in->recover_timeout) +
2509                 ctdb_uint32_len(&in->recover_interval) +
2510                 ctdb_uint32_len(&in->election_timeout) +
2511                 ctdb_uint32_len(&in->takeover_timeout) +
2512                 ctdb_uint32_len(&in->monitor_interval) +
2513                 ctdb_uint32_len(&in->tickle_update_interval) +
2514                 ctdb_uint32_len(&in->script_timeout) +
2515                 ctdb_uint32_len(&in->monitor_timeout_count) +
2516                 ctdb_uint32_len(&in->script_unhealthy_on_timeout) +
2517                 ctdb_uint32_len(&in->recovery_grace_period) +
2518                 ctdb_uint32_len(&in->recovery_ban_period) +
2519                 ctdb_uint32_len(&in->database_hash_size) +
2520                 ctdb_uint32_len(&in->database_max_dead) +
2521                 ctdb_uint32_len(&in->rerecovery_timeout) +
2522                 ctdb_uint32_len(&in->enable_bans) +
2523                 ctdb_uint32_len(&in->deterministic_public_ips) +
2524                 ctdb_uint32_len(&in->reclock_ping_period) +
2525                 ctdb_uint32_len(&in->no_ip_failback) +
2526                 ctdb_uint32_len(&in->disable_ip_failover) +
2527                 ctdb_uint32_len(&in->verbose_memory_names) +
2528                 ctdb_uint32_len(&in->recd_ping_timeout) +
2529                 ctdb_uint32_len(&in->recd_ping_failcount) +
2530                 ctdb_uint32_len(&in->log_latency_ms) +
2531                 ctdb_uint32_len(&in->reclock_latency_ms) +
2532                 ctdb_uint32_len(&in->recovery_drop_all_ips) +
2533                 ctdb_uint32_len(&in->verify_recovery_lock) +
2534                 ctdb_uint32_len(&in->vacuum_interval) +
2535                 ctdb_uint32_len(&in->vacuum_max_run_time) +
2536                 ctdb_uint32_len(&in->repack_limit) +
2537                 ctdb_uint32_len(&in->vacuum_limit) +
2538                 ctdb_uint32_len(&in->max_queue_depth_drop_msg) +
2539                 ctdb_uint32_len(&in->allow_unhealthy_db_read) +
2540                 ctdb_uint32_len(&in->stat_history_interval) +
2541                 ctdb_uint32_len(&in->deferred_attach_timeout) +
2542                 ctdb_uint32_len(&in->vacuum_fast_path_count) +
2543                 ctdb_uint32_len(&in->lcp2_public_ip_assignment) +
2544                 ctdb_uint32_len(&in->allow_client_db_attach) +
2545                 ctdb_uint32_len(&in->recover_pdb_by_seqnum) +
2546                 ctdb_uint32_len(&in->deferred_rebalance_on_node_add) +
2547                 ctdb_uint32_len(&in->fetch_collapse) +
2548                 ctdb_uint32_len(&in->hopcount_make_sticky) +
2549                 ctdb_uint32_len(&in->sticky_duration) +
2550                 ctdb_uint32_len(&in->sticky_pindown) +
2551                 ctdb_uint32_len(&in->no_ip_takeover) +
2552                 ctdb_uint32_len(&in->db_record_count_warn) +
2553                 ctdb_uint32_len(&in->db_record_size_warn) +
2554                 ctdb_uint32_len(&in->db_size_warn) +
2555                 ctdb_uint32_len(&in->pulldb_preallocation_size) +
2556                 ctdb_uint32_len(&in->no_ip_host_on_all_disabled) +
2557                 ctdb_uint32_len(&in->samba3_hack) +
2558                 ctdb_uint32_len(&in->mutex_enabled) +
2559                 ctdb_uint32_len(&in->lock_processes_per_db) +
2560                 ctdb_uint32_len(&in->rec_buffer_size_limit) +
2561                 ctdb_uint32_len(&in->queue_buffer_size) +
2562                 ctdb_uint32_len(&in->ip_alloc_algorithm) +
2563                 ctdb_uint32_len(&in->allow_mixed_versions);
2564 }
2565
2566 void ctdb_tunable_list_push(struct ctdb_tunable_list *in, uint8_t *buf,
2567                             size_t *npush)
2568 {
2569         size_t offset = 0, np;
2570
2571         ctdb_uint32_push(&in->max_redirect_count, buf+offset, &np);
2572         offset += np;
2573
2574         ctdb_uint32_push(&in->seqnum_interval, buf+offset, &np);
2575         offset += np;
2576
2577         ctdb_uint32_push(&in->control_timeout, buf+offset, &np);
2578         offset += np;
2579
2580         ctdb_uint32_push(&in->traverse_timeout, buf+offset, &np);
2581         offset += np;
2582
2583         ctdb_uint32_push(&in->keepalive_interval, buf+offset, &np);
2584         offset += np;
2585
2586         ctdb_uint32_push(&in->keepalive_limit, buf+offset, &np);
2587         offset += np;
2588
2589         ctdb_uint32_push(&in->recover_timeout, buf+offset, &np);
2590         offset += np;
2591
2592         ctdb_uint32_push(&in->recover_interval, buf+offset, &np);
2593         offset += np;
2594
2595         ctdb_uint32_push(&in->election_timeout, buf+offset, &np);
2596         offset += np;
2597
2598         ctdb_uint32_push(&in->takeover_timeout, buf+offset, &np);
2599         offset += np;
2600
2601         ctdb_uint32_push(&in->monitor_interval, buf+offset, &np);
2602         offset += np;
2603
2604         ctdb_uint32_push(&in->tickle_update_interval, buf+offset, &np);
2605         offset += np;
2606
2607         ctdb_uint32_push(&in->script_timeout, buf+offset, &np);
2608         offset += np;
2609
2610         ctdb_uint32_push(&in->monitor_timeout_count, buf+offset, &np);
2611         offset += np;
2612
2613         ctdb_uint32_push(&in->script_unhealthy_on_timeout, buf+offset, &np);
2614         offset += np;
2615
2616         ctdb_uint32_push(&in->recovery_grace_period, buf+offset, &np);
2617         offset += np;
2618
2619         ctdb_uint32_push(&in->recovery_ban_period, buf+offset, &np);
2620         offset += np;
2621
2622         ctdb_uint32_push(&in->database_hash_size, buf+offset, &np);
2623         offset += np;
2624
2625         ctdb_uint32_push(&in->database_max_dead, buf+offset, &np);
2626         offset += np;
2627
2628         ctdb_uint32_push(&in->rerecovery_timeout, buf+offset, &np);
2629         offset += np;
2630
2631         ctdb_uint32_push(&in->enable_bans, buf+offset, &np);
2632         offset += np;
2633
2634         ctdb_uint32_push(&in->deterministic_public_ips, buf+offset, &np);
2635         offset += np;
2636
2637         ctdb_uint32_push(&in->reclock_ping_period, buf+offset, &np);
2638         offset += np;
2639
2640         ctdb_uint32_push(&in->no_ip_failback, buf+offset, &np);
2641         offset += np;
2642
2643         ctdb_uint32_push(&in->disable_ip_failover, buf+offset, &np);
2644         offset += np;
2645
2646         ctdb_uint32_push(&in->verbose_memory_names, buf+offset, &np);
2647         offset += np;
2648
2649         ctdb_uint32_push(&in->recd_ping_timeout, buf+offset, &np);
2650         offset += np;
2651
2652         ctdb_uint32_push(&in->recd_ping_failcount, buf+offset, &np);
2653         offset += np;
2654
2655         ctdb_uint32_push(&in->log_latency_ms, buf+offset, &np);
2656         offset += np;
2657
2658         ctdb_uint32_push(&in->reclock_latency_ms, buf+offset, &np);
2659         offset += np;
2660
2661         ctdb_uint32_push(&in->recovery_drop_all_ips, buf+offset, &np);
2662         offset += np;
2663
2664         ctdb_uint32_push(&in->verify_recovery_lock, buf+offset, &np);
2665         offset += np;
2666
2667         ctdb_uint32_push(&in->vacuum_interval, buf+offset, &np);
2668         offset += np;
2669
2670         ctdb_uint32_push(&in->vacuum_max_run_time, buf+offset, &np);
2671         offset += np;
2672
2673         ctdb_uint32_push(&in->repack_limit, buf+offset, &np);
2674         offset += np;
2675
2676         ctdb_uint32_push(&in->vacuum_limit, buf+offset, &np);
2677         offset += np;
2678
2679         ctdb_uint32_push(&in->max_queue_depth_drop_msg, buf+offset, &np);
2680         offset += np;
2681
2682         ctdb_uint32_push(&in->allow_unhealthy_db_read, buf+offset, &np);
2683         offset += np;
2684
2685         ctdb_uint32_push(&in->stat_history_interval, buf+offset, &np);
2686         offset += np;
2687
2688         ctdb_uint32_push(&in->deferred_attach_timeout, buf+offset, &np);
2689         offset += np;
2690
2691         ctdb_uint32_push(&in->vacuum_fast_path_count, buf+offset, &np);
2692         offset += np;
2693
2694         ctdb_uint32_push(&in->lcp2_public_ip_assignment, buf+offset, &np);
2695         offset += np;
2696
2697         ctdb_uint32_push(&in->allow_client_db_attach, buf+offset, &np);
2698         offset += np;
2699
2700         ctdb_uint32_push(&in->recover_pdb_by_seqnum, buf+offset, &np);
2701         offset += np;
2702
2703         ctdb_uint32_push(&in->deferred_rebalance_on_node_add, buf+offset, &np);
2704         offset += np;
2705
2706         ctdb_uint32_push(&in->fetch_collapse, buf+offset, &np);
2707         offset += np;
2708
2709         ctdb_uint32_push(&in->hopcount_make_sticky, buf+offset, &np);
2710         offset += np;
2711
2712         ctdb_uint32_push(&in->sticky_duration, buf+offset, &np);
2713         offset += np;
2714
2715         ctdb_uint32_push(&in->sticky_pindown, buf+offset, &np);
2716         offset += np;
2717
2718         ctdb_uint32_push(&in->no_ip_takeover, buf+offset, &np);
2719         offset += np;
2720
2721         ctdb_uint32_push(&in->db_record_count_warn, buf+offset, &np);
2722         offset += np;
2723
2724         ctdb_uint32_push(&in->db_record_size_warn, buf+offset, &np);
2725         offset += np;
2726
2727         ctdb_uint32_push(&in->db_size_warn, buf+offset, &np);
2728         offset += np;
2729
2730         ctdb_uint32_push(&in->pulldb_preallocation_size, buf+offset, &np);
2731         offset += np;
2732
2733         ctdb_uint32_push(&in->no_ip_host_on_all_disabled, buf+offset, &np);
2734         offset += np;
2735
2736         ctdb_uint32_push(&in->samba3_hack, buf+offset, &np);
2737         offset += np;
2738
2739         ctdb_uint32_push(&in->mutex_enabled, buf+offset, &np);
2740         offset += np;
2741
2742         ctdb_uint32_push(&in->lock_processes_per_db, buf+offset, &np);
2743         offset += np;
2744
2745         ctdb_uint32_push(&in->rec_buffer_size_limit, buf+offset, &np);
2746         offset += np;
2747
2748         ctdb_uint32_push(&in->queue_buffer_size, buf+offset, &np);
2749         offset += np;
2750
2751         ctdb_uint32_push(&in->ip_alloc_algorithm, buf+offset, &np);
2752         offset += np;
2753
2754         ctdb_uint32_push(&in->allow_mixed_versions, buf+offset, &np);
2755         offset += np;
2756
2757         *npush = offset;
2758 }
2759
2760 static int ctdb_tunable_list_pull_elems(uint8_t *buf, size_t buflen,
2761                                         TALLOC_CTX *mem_ctx,
2762                                         struct ctdb_tunable_list *out,
2763                                         size_t *npull)
2764 {
2765         size_t offset = 0, np;
2766         int ret;
2767
2768         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2769                                &out->max_redirect_count, &np);
2770         if (ret != 0) {
2771                 return ret;
2772         }
2773         offset += np;
2774
2775         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2776                                &out->seqnum_interval, &np);
2777         if (ret != 0) {
2778                 return ret;
2779         }
2780         offset += np;
2781
2782         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2783                                &out->control_timeout, &np);
2784         if (ret != 0) {
2785                 return ret;
2786         }
2787         offset += np;
2788
2789         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2790                                &out->traverse_timeout, &np);
2791         if (ret != 0) {
2792                 return ret;
2793         }
2794         offset += np;
2795
2796         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2797                                &out->keepalive_interval, &np);
2798         if (ret != 0) {
2799                 return ret;
2800         }
2801         offset += np;
2802
2803         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2804                                &out->keepalive_limit, &np);
2805         if (ret != 0) {
2806                 return ret;
2807         }
2808         offset += np;
2809
2810         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2811                                &out->recover_timeout, &np);
2812         if (ret != 0) {
2813                 return ret;
2814         }
2815         offset += np;
2816
2817         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2818                                &out->recover_interval, &np);
2819         if (ret != 0) {
2820                 return ret;
2821         }
2822         offset += np;
2823
2824         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2825                                &out->election_timeout, &np);
2826         if (ret != 0) {
2827                 return ret;
2828         }
2829         offset += np;
2830
2831         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2832                                &out->takeover_timeout, &np);
2833         if (ret != 0) {
2834                 return ret;
2835         }
2836         offset += np;
2837
2838         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2839                                &out->monitor_interval, &np);
2840         if (ret != 0) {
2841                 return ret;
2842         }
2843         offset += np;
2844
2845         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2846                                &out->tickle_update_interval, &np);
2847         if (ret != 0) {
2848                 return ret;
2849         }
2850         offset += np;
2851
2852         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2853                                &out->script_timeout, &np);
2854         if (ret != 0) {
2855                 return ret;
2856         }
2857         offset += np;
2858
2859         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2860                                &out->monitor_timeout_count, &np);
2861         if (ret != 0) {
2862                 return ret;
2863         }
2864         offset += np;
2865
2866         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2867                                &out->script_unhealthy_on_timeout, &np);
2868         if (ret != 0) {
2869                 return ret;
2870         }
2871         offset += np;
2872
2873         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2874                                &out->recovery_grace_period, &np);
2875         if (ret != 0) {
2876                 return ret;
2877         }
2878         offset += np;
2879
2880         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2881                                &out->recovery_ban_period, &np);
2882         if (ret != 0) {
2883                 return ret;
2884         }
2885         offset += np;
2886
2887         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2888                                &out->database_hash_size, &np);
2889         if (ret != 0) {
2890                 return ret;
2891         }
2892         offset += np;
2893
2894         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2895                                &out->database_max_dead, &np);
2896         if (ret != 0) {
2897                 return ret;
2898         }
2899         offset += np;
2900
2901         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2902                                &out->rerecovery_timeout, &np);
2903         if (ret != 0) {
2904                 return ret;
2905         }
2906         offset += np;
2907
2908         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2909                                &out->enable_bans, &np);
2910         if (ret != 0) {
2911                 return ret;
2912         }
2913         offset += np;
2914
2915         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2916                                &out->deterministic_public_ips, &np);
2917         if (ret != 0) {
2918                 return ret;
2919         }
2920         offset += np;
2921
2922         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2923                                &out->reclock_ping_period, &np);
2924         if (ret != 0) {
2925                 return ret;
2926         }
2927         offset += np;
2928
2929         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2930                                &out->no_ip_failback, &np);
2931         if (ret != 0) {
2932                 return ret;
2933         }
2934         offset += np;
2935
2936         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2937                                &out->disable_ip_failover, &np);
2938         if (ret != 0) {
2939                 return ret;
2940         }
2941         offset += np;
2942
2943         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2944                                &out->verbose_memory_names, &np);
2945         if (ret != 0) {
2946                 return ret;
2947         }
2948         offset += np;
2949
2950         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2951                                &out->recd_ping_timeout, &np);
2952         if (ret != 0) {
2953                 return ret;
2954         }
2955         offset += np;
2956
2957         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2958                                &out->recd_ping_failcount, &np);
2959         if (ret != 0) {
2960                 return ret;
2961         }
2962         offset += np;
2963
2964         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2965                                &out->log_latency_ms, &np);
2966         if (ret != 0) {
2967                 return ret;
2968         }
2969         offset += np;
2970
2971         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2972                                &out->reclock_latency_ms, &np);
2973         if (ret != 0) {
2974                 return ret;
2975         }
2976         offset += np;
2977
2978         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2979                                &out->recovery_drop_all_ips, &np);
2980         if (ret != 0) {
2981                 return ret;
2982         }
2983         offset += np;
2984
2985         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2986                                &out->verify_recovery_lock, &np);
2987         if (ret != 0) {
2988                 return ret;
2989         }
2990         offset += np;
2991
2992         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
2993                                &out->vacuum_interval, &np);
2994         if (ret != 0) {
2995                 return ret;
2996         }
2997         offset += np;
2998
2999         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3000                                &out->vacuum_max_run_time, &np);
3001         if (ret != 0) {
3002                 return ret;
3003         }
3004         offset += np;
3005
3006         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3007                                &out->repack_limit, &np);
3008         if (ret != 0) {
3009                 return ret;
3010         }
3011         offset += np;
3012
3013         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3014                                &out->vacuum_limit, &np);
3015         if (ret != 0) {
3016                 return ret;
3017         }
3018         offset += np;
3019
3020         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3021                                &out->max_queue_depth_drop_msg, &np);
3022         if (ret != 0) {
3023                 return ret;
3024         }
3025         offset += np;
3026
3027         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3028                                &out->allow_unhealthy_db_read, &np);
3029         if (ret != 0) {
3030                 return ret;
3031         }
3032         offset += np;
3033
3034         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3035                                &out->stat_history_interval, &np);
3036         if (ret != 0) {
3037                 return ret;
3038         }
3039         offset += np;
3040
3041         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3042                                &out->deferred_attach_timeout, &np);
3043         if (ret != 0) {
3044                 return ret;
3045         }
3046         offset += np;
3047
3048         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3049                                &out->vacuum_fast_path_count, &np);
3050         if (ret != 0) {
3051                 return ret;
3052         }
3053         offset += np;
3054
3055         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3056                                &out->lcp2_public_ip_assignment, &np);
3057         if (ret != 0) {
3058                 return ret;
3059         }
3060         offset += np;
3061
3062         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3063                                &out->allow_client_db_attach, &np);
3064         if (ret != 0) {
3065                 return ret;
3066         }
3067         offset += np;
3068
3069         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3070                                &out->recover_pdb_by_seqnum, &np);
3071         if (ret != 0) {
3072                 return ret;
3073         }
3074         offset += np;
3075
3076         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3077                                &out->deferred_rebalance_on_node_add, &np);
3078         if (ret != 0) {
3079                 return ret;
3080         }
3081         offset += np;
3082
3083         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3084                                &out->fetch_collapse, &np);
3085         if (ret != 0) {
3086                 return ret;
3087         }
3088         offset += np;
3089
3090         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3091                                &out->hopcount_make_sticky, &np);
3092         if (ret != 0) {
3093                 return ret;
3094         }
3095         offset += np;
3096
3097         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3098                                &out->sticky_duration, &np);
3099         if (ret != 0) {
3100                 return ret;
3101         }
3102         offset += np;
3103
3104         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3105                                &out->sticky_pindown, &np);
3106         if (ret != 0) {
3107                 return ret;
3108         }
3109         offset += np;
3110
3111         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3112                                &out->no_ip_takeover, &np);
3113         if (ret != 0) {
3114                 return ret;
3115         }
3116         offset += np;
3117
3118         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3119                                &out->db_record_count_warn, &np);
3120         if (ret != 0) {
3121                 return ret;
3122         }
3123         offset += np;
3124
3125         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3126                                &out->db_record_size_warn, &np);
3127         if (ret != 0) {
3128                 return ret;
3129         }
3130         offset += np;
3131
3132         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3133                                &out->db_size_warn, &np);
3134         if (ret != 0) {
3135                 return ret;
3136         }
3137         offset += np;
3138
3139         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3140                                &out->pulldb_preallocation_size, &np);
3141         if (ret != 0) {
3142                 return ret;
3143         }
3144         offset += np;
3145
3146         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3147                                &out->no_ip_host_on_all_disabled, &np);
3148         if (ret != 0) {
3149                 return ret;
3150         }
3151         offset += np;
3152
3153         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3154                                &out->samba3_hack, &np);
3155         if (ret != 0) {
3156                 return ret;
3157         }
3158         offset += np;
3159
3160         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3161                                &out->mutex_enabled, &np);
3162         if (ret != 0) {
3163                 return ret;
3164         }
3165         offset += np;
3166
3167         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3168                                &out->lock_processes_per_db, &np);
3169         if (ret != 0) {
3170                 return ret;
3171         }
3172         offset += np;
3173
3174         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3175                                &out->rec_buffer_size_limit, &np);
3176         if (ret != 0) {
3177                 return ret;
3178         }
3179         offset += np;
3180
3181         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3182                                &out->queue_buffer_size, &np);
3183         if (ret != 0) {
3184                 return ret;
3185         }
3186         offset += np;
3187
3188         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3189                                &out->ip_alloc_algorithm, &np);
3190         if (ret != 0) {
3191                 return ret;
3192         }
3193         offset += np;
3194
3195         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
3196                                &out->allow_mixed_versions, &np);
3197         if (ret != 0) {
3198                 return ret;
3199         }
3200         offset += np;
3201
3202         *npull = offset;
3203         return 0;
3204 }
3205
3206 int ctdb_tunable_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3207                            struct ctdb_tunable_list **out, size_t *npull)
3208 {
3209         struct ctdb_tunable_list *val;
3210         size_t np;
3211         int ret;
3212
3213         val = talloc(mem_ctx, struct ctdb_tunable_list);
3214         if (val == NULL) {
3215                 return ENOMEM;
3216         }
3217
3218         ret = ctdb_tunable_list_pull_elems(buf, buflen, val, val, &np);
3219         if (ret != 0) {
3220                 talloc_free(val);
3221                 return ret;
3222         }
3223
3224         *out = val;
3225         *npull = np;
3226         return 0;
3227 }
3228
3229 size_t ctdb_tickle_list_len(struct ctdb_tickle_list *in)
3230 {
3231         size_t len;
3232
3233         len = ctdb_sock_addr_len(&in->addr) +
3234                 ctdb_uint32_len(&in->num);
3235         if (in->num > 0) {
3236                 len += in->num * ctdb_connection_len(&in->conn[0]);
3237         }
3238
3239         return len;
3240 }
3241
3242 void ctdb_tickle_list_push(struct ctdb_tickle_list *in, uint8_t *buf,
3243                            size_t *npush)
3244 {
3245         size_t offset = 0, np;
3246         uint32_t i;
3247
3248         ctdb_sock_addr_push(&in->addr, buf+offset, &np);
3249         offset += np;
3250
3251         ctdb_uint32_push(&in->num, buf+offset, &np);
3252         offset += np;
3253
3254         for (i=0; i<in->num; i++) {
3255                 ctdb_connection_push(&in->conn[i], buf+offset, &np);
3256                 offset += np;
3257         }
3258
3259         *npush = offset;
3260 }
3261
3262 int ctdb_tickle_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3263                            struct ctdb_tickle_list **out, size_t *npull)
3264 {
3265         struct ctdb_tickle_list *val;
3266         size_t offset = 0, np;
3267         uint32_t i;
3268         int ret;
3269
3270         val = talloc(mem_ctx, struct ctdb_tickle_list);
3271         if (val == NULL) {
3272                 return ENOMEM;
3273         }
3274
3275         ret = ctdb_sock_addr_pull_elems(buf+offset, buflen-offset, val,
3276                                         &val->addr, &np);
3277         if (ret != 0) {
3278                 goto fail;
3279         }
3280         offset += np;
3281
3282         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num, &np);
3283         if (ret != 0) {
3284                 goto fail;
3285         }
3286         offset += np;
3287
3288         if (val->num == 0) {
3289                 val->conn = NULL;
3290                 goto done;
3291         }
3292
3293         val->conn = talloc_array(val, struct ctdb_connection, val->num);
3294         if (val->conn == NULL) {
3295                 ret = ENOMEM;
3296                 goto fail;
3297         }
3298
3299         for (i=0; i<val->num; i++) {
3300                 ret = ctdb_connection_pull_elems(buf+offset, buflen-offset,
3301                                                  val, &val->conn[i], &np);
3302                 if (ret != 0) {
3303                         goto fail;
3304                 }
3305                 offset += np;
3306         }
3307
3308 done:
3309         *out = val;
3310         *npull = offset;
3311         return 0;
3312
3313 fail:
3314         talloc_free(val);
3315         return ret;
3316 }
3317
3318 size_t ctdb_addr_info_len(struct ctdb_addr_info *in)
3319 {
3320         return ctdb_sock_addr_len(&in->addr) +
3321                 ctdb_uint32_len(&in->mask) +
3322                 ctdb_stringn_len(&in->iface);
3323 }
3324
3325 void ctdb_addr_info_push(struct ctdb_addr_info *in, uint8_t *buf,
3326                          size_t *npush)
3327 {
3328         size_t offset = 0, np;
3329
3330         ctdb_sock_addr_push(&in->addr, buf+offset, &np);
3331         offset += np;
3332
3333         ctdb_uint32_push(&in->mask, buf+offset, &np);
3334         offset += np;
3335
3336         ctdb_stringn_push(&in->iface, buf+offset, &np);
3337         offset += np;
3338
3339         *npush = offset;
3340 }
3341
3342 int ctdb_addr_info_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3343                         struct ctdb_addr_info **out, size_t *npull)
3344 {
3345         struct ctdb_addr_info *val;
3346         size_t offset = 0, np;
3347         int ret;
3348
3349         val = talloc(mem_ctx, struct ctdb_addr_info);
3350         if (val == NULL) {
3351                 return ENOMEM;
3352         }
3353
3354         ret = ctdb_sock_addr_pull_elems(buf+offset, buflen-offset, val,
3355                                         &val->addr, &np);
3356         if (ret != 0) {
3357                 goto fail;
3358         }
3359         offset += np;
3360
3361         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->mask, &np);
3362         if (ret != 0) {
3363                 goto fail;
3364         }
3365         offset += np;
3366
3367         ret = ctdb_stringn_pull(buf+offset, buflen-offset, val, &val->iface,
3368                                 &np);
3369         if (ret != 0) {
3370                 goto fail;
3371         }
3372         offset += np;
3373
3374         *out = val;
3375         *npull = offset;
3376         return 0;
3377
3378 fail:
3379         talloc_free(val);
3380         return ret;
3381 }
3382
3383 size_t ctdb_transdb_len(struct ctdb_transdb *in)
3384 {
3385         return ctdb_uint32_len(&in->db_id) +
3386                 ctdb_uint32_len(&in->tid);
3387 }
3388
3389 void ctdb_transdb_push(struct ctdb_transdb *in, uint8_t *buf, size_t *npush)
3390 {
3391         size_t offset = 0, np;
3392
3393         ctdb_uint32_push(&in->db_id, buf+offset, &np);
3394         offset += np;
3395
3396         ctdb_uint32_push(&in->tid, buf+offset, &np);
3397         offset += np;
3398
3399         *npush = offset;
3400 }
3401
3402 int ctdb_transdb_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3403                      struct ctdb_transdb **out, size_t *npull)
3404 {
3405         struct ctdb_transdb *val;
3406         size_t offset = 0, np;
3407         int ret;
3408
3409         val = talloc(mem_ctx, struct ctdb_transdb);
3410         if (val == NULL) {
3411                 return ENOMEM;
3412         }
3413
3414         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
3415         if (ret != 0) {
3416                 goto fail;
3417         }
3418         offset += np;
3419
3420         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->tid, &np);
3421         if (ret != 0) {
3422                 goto fail;
3423         }
3424         offset += np;
3425
3426         *out = val;
3427         *npull = offset;
3428         return 0;
3429
3430 fail:
3431         talloc_free(val);
3432         return ret;
3433 }
3434
3435 size_t ctdb_uptime_len(struct ctdb_uptime *in)
3436 {
3437         return ctdb_timeval_len(&in->current_time) +
3438                 ctdb_timeval_len(&in->ctdbd_start_time) +
3439                 ctdb_timeval_len(&in->last_recovery_started) +
3440                 ctdb_timeval_len(&in->last_recovery_finished);
3441 }
3442
3443 void ctdb_uptime_push(struct ctdb_uptime *in, uint8_t *buf, size_t *npush)
3444 {
3445         size_t offset = 0, np;
3446
3447         ctdb_timeval_push(&in->current_time, buf+offset, &np);
3448         offset += np;
3449
3450         ctdb_timeval_push(&in->ctdbd_start_time, buf+offset, &np);
3451         offset += np;
3452
3453         ctdb_timeval_push(&in->last_recovery_started, buf+offset, &np);
3454         offset += np;
3455
3456         ctdb_timeval_push(&in->last_recovery_finished, buf+offset, &np);
3457         offset += np;
3458
3459         *npush = offset;
3460 }
3461
3462 int ctdb_uptime_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3463                      struct ctdb_uptime **out, size_t *npull)
3464 {
3465         struct ctdb_uptime *val;
3466         size_t offset = 0, np;
3467         int ret;
3468
3469         val = talloc(mem_ctx, struct ctdb_uptime);
3470         if (val == NULL) {
3471                 return ENOMEM;
3472         }
3473
3474         ret = ctdb_timeval_pull(buf+offset, buflen-offset, &val->current_time,
3475                                 &np);
3476         if (ret != 0) {
3477                 goto fail;
3478         }
3479         offset += np;
3480
3481         ret = ctdb_timeval_pull(buf+offset, buflen-offset,
3482                                 &val->ctdbd_start_time, &np);
3483         if (ret != 0) {
3484                 goto fail;
3485         }
3486         offset += np;
3487
3488         ret = ctdb_timeval_pull(buf+offset, buflen-offset,
3489                                 &val->last_recovery_started, &np);
3490         if (ret != 0) {
3491                 goto fail;
3492         }
3493         offset += np;
3494
3495         ret = ctdb_timeval_pull(buf+offset, buflen-offset,
3496                                 &val->last_recovery_finished, &np);
3497         if (ret != 0) {
3498                 goto fail;
3499         }
3500         offset += np;
3501
3502         *out = val;
3503         *npull = offset;
3504         return 0;
3505
3506 fail:
3507         talloc_free(val);
3508         return ret;
3509 }
3510
3511 size_t ctdb_public_ip_len(struct ctdb_public_ip *in)
3512 {
3513         return ctdb_uint32_len(&in->pnn) +
3514                 ctdb_sock_addr_len(&in->addr);
3515 }
3516
3517 void ctdb_public_ip_push(struct ctdb_public_ip *in, uint8_t *buf,
3518                          size_t *npush)
3519 {
3520         size_t offset = 0, np;
3521
3522         ctdb_uint32_push(&in->pnn, buf+offset, &np);
3523         offset += np;
3524
3525         ctdb_sock_addr_push(&in->addr, buf+offset, &np);
3526         offset += np;
3527
3528         *npush = offset;
3529 }
3530
3531 static int ctdb_public_ip_pull_elems(uint8_t *buf, size_t buflen,
3532                                      TALLOC_CTX *mem_ctx,
3533                                      struct ctdb_public_ip *out, size_t *npull)
3534 {
3535         size_t offset = 0, np;
3536         int ret;
3537
3538         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->pnn, &np);
3539         if (ret != 0) {
3540                 return ret;
3541         }
3542         offset += np;
3543
3544         ret = ctdb_sock_addr_pull_elems(buf+offset, buflen-offset, mem_ctx,
3545                                         &out->addr, &np);
3546         if (ret != 0) {
3547                 return ret;
3548         }
3549         offset += np;
3550
3551         *npull = offset;
3552         return 0;
3553 }
3554
3555 int ctdb_public_ip_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3556                         struct ctdb_public_ip **out, size_t *npull)
3557 {
3558         struct ctdb_public_ip *val;
3559         size_t np;
3560         int ret;
3561
3562         val = talloc(mem_ctx, struct ctdb_public_ip);
3563         if (val == NULL) {
3564                 return ENOMEM;
3565         }
3566
3567         ret = ctdb_public_ip_pull_elems(buf, buflen, val, val, &np);
3568         if (ret != 0) {
3569                 TALLOC_FREE(val);
3570                 return ret;
3571         }
3572
3573         *out = val;
3574         *npull = np;
3575         return ret;
3576 }
3577
3578 size_t ctdb_public_ip_list_len(struct ctdb_public_ip_list *in)
3579 {
3580         size_t len;
3581
3582         len = ctdb_uint32_len(&in->num);
3583         if (in->num > 0) {
3584                 len += in->num * ctdb_public_ip_len(&in->ip[0]);
3585         }
3586
3587         return len;
3588 }
3589
3590 void ctdb_public_ip_list_push(struct ctdb_public_ip_list *in, uint8_t *buf,
3591                               size_t *npush)
3592 {
3593         size_t offset = 0, np;
3594         uint32_t i;
3595
3596         ctdb_uint32_push(&in->num, buf+offset, &np);
3597         offset += np;
3598
3599         for (i=0; i<in->num; i++) {
3600                 ctdb_public_ip_push(&in->ip[i], buf+offset, &np);
3601                 offset += np;
3602         }
3603
3604         *npush = offset;
3605 }
3606
3607 int ctdb_public_ip_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3608                              struct ctdb_public_ip_list **out, size_t *npull)
3609 {
3610         struct ctdb_public_ip_list *val;
3611         size_t offset = 0, np;
3612         uint32_t i;
3613         int ret;
3614
3615         val = talloc(mem_ctx, struct ctdb_public_ip_list);
3616         if (val == NULL) {
3617                 return ENOMEM;
3618         }
3619
3620         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num, &np);
3621         if (ret != 0) {
3622                 goto fail;
3623         }
3624         offset += np;
3625
3626         if (val->num == 0) {
3627                 val->ip = NULL;
3628                 goto done;
3629         }
3630
3631         val->ip = talloc_array(val, struct ctdb_public_ip, val->num);
3632         if (val->ip == NULL) {
3633                 ret = ENOMEM;
3634                 goto fail;
3635         }
3636
3637         for (i=0; i<val->num; i++) {
3638                 ret = ctdb_public_ip_pull_elems(buf+offset, buflen-offset,
3639                                                 val->ip, &val->ip[i], &np);
3640                 if (ret != 0) {
3641                         goto fail;
3642                 }
3643                 offset += np;
3644         }
3645
3646 done:
3647         *out = val;
3648         *npull = offset;
3649         return 0;
3650
3651 fail:
3652         talloc_free(val);
3653         return ret;
3654 }
3655
3656 size_t ctdb_node_and_flags_len(struct ctdb_node_and_flags *in)
3657 {
3658         return ctdb_uint32_len(&in->pnn) +
3659                 ctdb_uint32_len(&in->flags) +
3660                 ctdb_sock_addr_len(&in->addr);
3661 }
3662
3663 void ctdb_node_and_flags_push(struct ctdb_node_and_flags *in, uint8_t *buf,
3664                               size_t *npush)
3665 {
3666         size_t offset = 0, np;
3667
3668         ctdb_uint32_push(&in->pnn, buf+offset, &np);
3669         offset += np;
3670
3671         ctdb_uint32_push(&in->flags, buf+offset, &np);
3672         offset += np;
3673
3674         ctdb_sock_addr_push(&in->addr, buf+offset, &np);
3675         offset += np;
3676
3677         *npush = offset;
3678 }
3679
3680 static int ctdb_node_and_flags_pull_elems(uint8_t *buf, size_t buflen,
3681                                           TALLOC_CTX *mem_ctx,
3682                                           struct ctdb_node_and_flags *out,
3683                                           size_t *npull)
3684 {
3685         size_t offset = 0, np;
3686         int ret;
3687
3688         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->pnn, &np);
3689         if (ret != 0) {
3690                 return ret;
3691         }
3692         offset += np;
3693
3694         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->flags, &np);
3695         if (ret != 0) {
3696                 return ret;
3697         }
3698         offset += np;
3699
3700         ret = ctdb_sock_addr_pull_elems(buf+offset, buflen-offset, mem_ctx,
3701                                         &out->addr, &np);
3702         if (ret != 0) {
3703                 return ret;
3704         }
3705         offset += np;
3706
3707         *npull = offset;
3708         return 0;
3709 }
3710
3711 int ctdb_node_and_flags_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3712                               struct ctdb_node_and_flags **out, size_t *npull)
3713 {
3714         struct ctdb_node_and_flags *val;
3715         size_t np;
3716         int ret;
3717
3718         val = talloc(mem_ctx, struct ctdb_node_and_flags);
3719         if (val == NULL) {
3720                 return ENOMEM;
3721         }
3722
3723         ret = ctdb_node_and_flags_pull_elems(buf, buflen, val, val, &np);
3724         if (ret != 0) {
3725                 TALLOC_FREE(val);
3726                 return ret;
3727         }
3728
3729         *out = val;
3730         *npull = np;
3731         return ret;
3732 }
3733
3734 size_t ctdb_node_map_len(struct ctdb_node_map *in)
3735 {
3736         size_t len;
3737
3738         len = ctdb_uint32_len(&in->num);
3739         if (in->num > 0) {
3740                 len += in->num * ctdb_node_and_flags_len(&in->node[0]);
3741         }
3742
3743         return len;
3744 }
3745
3746 void ctdb_node_map_push(struct ctdb_node_map *in, uint8_t *buf, size_t *npush)
3747 {
3748         size_t offset = 0, np;
3749         uint32_t i;
3750
3751         ctdb_uint32_push(&in->num, buf+offset, &np);
3752         offset += np;
3753
3754         for (i=0; i<in->num; i++) {
3755                 ctdb_node_and_flags_push(&in->node[i], buf+offset, &np);
3756                 offset += np;
3757         }
3758
3759         *npush = offset;
3760 }
3761
3762 int ctdb_node_map_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3763                        struct ctdb_node_map **out, size_t *npull)
3764 {
3765         struct ctdb_node_map *val;
3766         size_t offset = 0, np;
3767         uint32_t i;
3768         int ret;
3769
3770         val = talloc(mem_ctx, struct ctdb_node_map);
3771         if (val == NULL) {
3772                 return ENOMEM;
3773         }
3774
3775         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num, &np);
3776         if (ret != 0) {
3777                 goto fail;
3778         }
3779         offset += np;
3780
3781         if (val->num == 0) {
3782                 val->node = NULL;
3783                 goto done;
3784         }
3785
3786         val->node = talloc_array(val, struct ctdb_node_and_flags, val->num);
3787         if (val->node == NULL) {
3788                 ret = ENOMEM;
3789                 goto fail;
3790         }
3791
3792         for (i=0; i<val->num; i++) {
3793                 ret = ctdb_node_and_flags_pull_elems(buf+offset,
3794                                                      buflen-offset,
3795                                                      val->node, &val->node[i],
3796                                                      &np);
3797                 if (ret != 0) {
3798                         goto fail;
3799                 }
3800                 offset += np;
3801         }
3802
3803 done:
3804         *out = val;
3805         *npull = offset;
3806         return 0;
3807
3808 fail:
3809         talloc_free(val);
3810         return ret;
3811 }
3812
3813 size_t ctdb_script_len(struct ctdb_script *in)
3814 {
3815         return ctdb_chararray_len(in->name, MAX_SCRIPT_NAME+1) +
3816                 ctdb_timeval_len(&in->start) +
3817                 ctdb_timeval_len(&in->finished) +
3818                 ctdb_int32_len(&in->status) +
3819                 ctdb_chararray_len(in->output, MAX_SCRIPT_OUTPUT+1) +
3820                 ctdb_padding_len(4);
3821 }
3822
3823 void ctdb_script_push(struct ctdb_script *in, uint8_t *buf, size_t *npush)
3824 {
3825         size_t offset = 0, np;
3826
3827         ctdb_chararray_push(in->name, MAX_SCRIPT_NAME+1, buf+offset, &np);
3828         offset += np;
3829
3830         ctdb_timeval_push(&in->start, buf+offset, &np);
3831         offset += np;
3832
3833         ctdb_timeval_push(&in->finished, buf+offset, &np);
3834         offset += np;
3835
3836         ctdb_int32_push(&in->status, buf+offset, &np);
3837         offset += np;
3838
3839         ctdb_chararray_push(in->output, MAX_SCRIPT_OUTPUT+1, buf+offset, &np);
3840         offset += np;
3841
3842         ctdb_padding_push(4, buf+offset, &np);
3843         offset += np;
3844
3845         *npush = offset;
3846 }
3847
3848 static int ctdb_script_pull_elems(uint8_t *buf, size_t buflen,
3849                                   TALLOC_CTX *mem_ctx,
3850                                   struct ctdb_script *out, size_t *npull)
3851 {
3852         size_t offset = 0, np;
3853         int ret;
3854
3855         ret = ctdb_chararray_pull(buf+offset, buflen-offset,
3856                                   out->name, MAX_SCRIPT_NAME+1, &np);
3857         if (ret != 0) {
3858                 return ret;
3859         }
3860         offset += np;
3861
3862         ret = ctdb_timeval_pull(buf+offset, buflen-offset, &out->start, &np);
3863         if (ret != 0) {
3864                 return ret;
3865         }
3866         offset += np;
3867
3868         ret = ctdb_timeval_pull(buf+offset, buflen-offset, &out->finished,
3869                                 &np);
3870         if (ret != 0) {
3871                 return ret;
3872         }
3873         offset += np;
3874
3875         ret = ctdb_int32_pull(buf+offset, buflen-offset, &out->status, &np);
3876         if (ret != 0) {
3877                 return ret;
3878         }
3879         offset += np;
3880
3881         ret = ctdb_chararray_pull(buf+offset, buflen-offset,
3882                                   out->output, MAX_SCRIPT_OUTPUT+1, &np);
3883         if (ret != 0) {
3884                 return ret;
3885         }
3886         offset += np;
3887
3888         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
3889         if (ret != 0) {
3890                 return ret;
3891         }
3892         offset += np;
3893
3894         *npull = offset;
3895         return 0;
3896 }
3897
3898 int ctdb_script_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3899                      struct ctdb_script **out, size_t *npull)
3900 {
3901         struct ctdb_script *val;
3902         size_t np;
3903         int ret;
3904
3905         val = talloc(mem_ctx, struct ctdb_script);
3906         if (val == NULL) {
3907                 return ENOMEM;
3908         }
3909
3910         ret = ctdb_script_pull_elems(buf, buflen, val, val, &np);
3911         if (ret != 0) {
3912                 TALLOC_FREE(val);
3913                 return ret;
3914         }
3915
3916         *out = val;
3917         *npull = np;
3918         return ret;
3919 }
3920
3921 size_t ctdb_script_list_len(struct ctdb_script_list *in)
3922 {
3923         size_t len;
3924
3925         if (in == NULL) {
3926                 return 0;
3927         }
3928
3929         len = ctdb_uint32_len(&in->num_scripts) + ctdb_padding_len(4);
3930         if (in->num_scripts > 0) {
3931                 len += in->num_scripts * ctdb_script_len(&in->script[0]);
3932         }
3933
3934         return len;
3935 }
3936
3937 void ctdb_script_list_push(struct ctdb_script_list *in, uint8_t *buf,
3938                            size_t *npush)
3939 {
3940         size_t offset = 0, np;
3941         uint32_t i;
3942
3943         if (in == NULL) {
3944                 *npush = 0;
3945                 return;
3946         }
3947
3948         ctdb_uint32_push(&in->num_scripts, buf+offset, &np);
3949         offset += np;
3950
3951         ctdb_padding_push(4, buf+offset, &np);
3952         offset += np;
3953
3954         for (i=0; i<in->num_scripts; i++) {
3955                 ctdb_script_push(&in->script[i], buf+offset, &np);
3956                 offset += np;
3957         }
3958
3959         *npush = offset;
3960 }
3961
3962 int ctdb_script_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
3963                           struct ctdb_script_list **out, size_t *npull)
3964 {
3965         struct ctdb_script_list *val;
3966         size_t offset = 0, np;
3967         uint32_t i;
3968         int ret;
3969
3970         /* If event scripts have never been run, the result will be NULL */
3971         if (buflen == 0) {
3972                 val = NULL;
3973                 goto done;
3974         }
3975
3976         val = talloc(mem_ctx, struct ctdb_script_list);
3977         if (val == NULL) {
3978                 return ENOMEM;
3979         }
3980
3981         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num_scripts,
3982                                &np);
3983         if (ret != 0) {
3984                 goto fail;
3985         }
3986         offset += np;
3987
3988         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
3989         if (ret != 0) {
3990                 goto fail;
3991         }
3992         offset += np;
3993
3994         if (val->num_scripts == 0) {
3995                 val->script = NULL;
3996                 goto done;
3997         }
3998
3999         val->script = talloc_array(val, struct ctdb_script, val->num_scripts);
4000         if (val->script == NULL) {
4001                 ret = ENOMEM;
4002                 goto fail;
4003         }
4004
4005         for (i=0; i<val->num_scripts; i++) {
4006                 ret = ctdb_script_pull_elems(buf+offset, buflen-offset,
4007                                              val, &val->script[i], &np);
4008                 if (ret != 0) {
4009                         goto fail;
4010                 }
4011                 offset += np;
4012         }
4013
4014 done:
4015         *out = val;
4016         *npull = offset;
4017         return 0;
4018
4019 fail:
4020         talloc_free(val);
4021         return ret;
4022 }
4023
4024 size_t ctdb_ban_state_len(struct ctdb_ban_state *in)
4025 {
4026         return ctdb_uint32_len(&in->pnn) +
4027                 ctdb_uint32_len(&in->time);
4028 }
4029
4030 void ctdb_ban_state_push(struct ctdb_ban_state *in, uint8_t *buf,
4031                          size_t *npush)
4032 {
4033         size_t offset = 0, np;
4034
4035         ctdb_uint32_push(&in->pnn, buf+offset, &np);
4036         offset += np;
4037
4038         ctdb_uint32_push(&in->time, buf+offset, &np);
4039         offset += np;
4040
4041         *npush = offset;
4042 }
4043
4044 int ctdb_ban_state_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4045                         struct ctdb_ban_state **out, size_t *npull)
4046 {
4047         struct ctdb_ban_state *val;
4048         size_t offset = 0, np;
4049         int ret;
4050
4051         val = talloc(mem_ctx, struct ctdb_ban_state);
4052         if (val == NULL) {
4053                 return ENOMEM;
4054         }
4055
4056         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
4057         if (ret != 0) {
4058                 goto fail;
4059         }
4060         offset += np;
4061
4062         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->time, &np);
4063         if (ret != 0) {
4064                 goto fail;
4065         }
4066         offset += np;
4067
4068         *out = val;
4069         *npull = offset;
4070         return 0;
4071
4072 fail:
4073         talloc_free(val);
4074         return ret;
4075 }
4076
4077 size_t ctdb_notify_data_len(struct ctdb_notify_data *in)
4078 {
4079         return ctdb_uint64_len(&in->srvid) +
4080                 ctdb_tdb_datan_len(&in->data);
4081 }
4082
4083 void ctdb_notify_data_push(struct ctdb_notify_data *in, uint8_t *buf,
4084                            size_t *npush)
4085 {
4086         size_t offset = 0, np;
4087
4088         ctdb_uint64_push(&in->srvid, buf+offset, &np);
4089         offset += np;
4090
4091         ctdb_tdb_datan_push(&in->data, buf+offset, &np);
4092         offset += np;
4093
4094         *npush = offset;
4095 }
4096
4097 int ctdb_notify_data_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4098                           struct ctdb_notify_data **out, size_t *npull)
4099 {
4100         struct ctdb_notify_data *val;
4101         size_t offset = 0, np;
4102         int ret;
4103
4104         val = talloc(mem_ctx, struct ctdb_notify_data);
4105         if (val == NULL) {
4106                 return ENOMEM;
4107         }
4108
4109         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
4110         if (ret != 0) {
4111                 goto fail;
4112         }
4113         offset += np;
4114
4115         ret = ctdb_tdb_datan_pull(buf+offset, buflen-offset, val, &val->data,
4116                                   &np);
4117         if (ret != 0) {
4118                 goto fail;
4119         }
4120         offset += np;
4121
4122         *out = val;
4123         *npull = offset;
4124         return 0;
4125
4126 fail:
4127         talloc_free(val);
4128         return ret;
4129 }
4130
4131 size_t ctdb_iface_len(struct ctdb_iface *in)
4132 {
4133         return ctdb_chararray_len(in->name, CTDB_IFACE_SIZE+2) +
4134                 ctdb_uint16_len(&in->link_state) +
4135                 ctdb_uint32_len(&in->references);
4136 }
4137
4138 void ctdb_iface_push(struct ctdb_iface *in, uint8_t *buf, size_t *npush)
4139 {
4140         size_t offset = 0, np;
4141
4142         ctdb_chararray_push(in->name, CTDB_IFACE_SIZE+2, buf+offset, &np);
4143         offset += np;
4144
4145         ctdb_uint16_push(&in->link_state, buf+offset, &np);
4146         offset += np;
4147
4148         ctdb_uint32_push(&in->references, buf+offset, &np);
4149         offset += np;
4150
4151         *npush = offset;
4152 }
4153
4154 static int ctdb_iface_pull_elems(uint8_t *buf, size_t buflen,
4155                                  TALLOC_CTX *mem_ctx,
4156                                  struct ctdb_iface *out, size_t *npull)
4157 {
4158         size_t offset = 0, np;
4159         int ret;
4160
4161         ret = ctdb_chararray_pull(buf+offset, buflen-offset,
4162                                   out->name, CTDB_IFACE_SIZE+2, &np);
4163         if (ret != 0) {
4164                 return ret;
4165         }
4166         offset += np;
4167
4168         ret = ctdb_uint16_pull(buf+offset, buflen-offset, &out->link_state,
4169                                &np);
4170         if (ret != 0) {
4171                 return ret;
4172         }
4173         offset += np;
4174
4175         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->references,
4176                                &np);
4177         if (ret != 0) {
4178                 return ret;
4179         }
4180         offset += np;
4181
4182         *npull = offset;
4183         return 0;
4184 }
4185
4186 int ctdb_iface_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4187                     struct ctdb_iface **out, size_t *npull)
4188 {
4189         struct ctdb_iface *val;
4190         size_t np;
4191         int ret;
4192
4193         val = talloc(mem_ctx, struct ctdb_iface);
4194         if (val == NULL) {
4195                 return ENOMEM;
4196         }
4197
4198         ret = ctdb_iface_pull_elems(buf, buflen, val, val, &np);
4199         if (ret != 0) {
4200                 talloc_free(val);
4201                 return ret;
4202         }
4203
4204         *out = val;
4205         *npull = np;
4206         return ret;
4207 }
4208
4209 size_t ctdb_iface_list_len(struct ctdb_iface_list *in)
4210 {
4211         size_t len;
4212
4213         len = ctdb_uint32_len(&in->num);
4214         if (in->num > 0) {
4215                 len += in->num * ctdb_iface_len(&in->iface[0]);
4216         }
4217
4218         return len;
4219 }
4220
4221 void ctdb_iface_list_push(struct ctdb_iface_list *in, uint8_t *buf,
4222                           size_t *npush)
4223 {
4224         size_t offset = 0, np;
4225         uint32_t i;
4226
4227         ctdb_uint32_push(&in->num, buf+offset, &np);
4228         offset += np;
4229
4230         for (i=0; i<in->num; i++) {
4231                 ctdb_iface_push(&in->iface[i], buf+offset, &np);
4232                 offset += np;
4233         }
4234
4235         *npush = offset;
4236 }
4237
4238 int ctdb_iface_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4239                          struct ctdb_iface_list **out, size_t *npull)
4240 {
4241         struct ctdb_iface_list *val;
4242         size_t offset = 0, np;
4243         uint32_t i;
4244         int ret;
4245
4246         val = talloc(mem_ctx, struct ctdb_iface_list);
4247         if (val == NULL) {
4248                 return ENOMEM;
4249         }
4250
4251         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num, &np);
4252         if (ret != 0) {
4253                 goto fail;
4254         }
4255         offset += np;
4256
4257         if (val->num == 0) {
4258                 val->iface = NULL;
4259                 goto done;
4260         }
4261
4262         val->iface = talloc_array(val, struct ctdb_iface, val->num);
4263         if (val->iface == NULL) {
4264                 ret = ENOMEM;
4265                 goto fail;
4266         }
4267
4268         for (i=0; i<val->num; i++) {
4269                 ret = ctdb_iface_pull_elems(buf+offset, buflen-offset,
4270                                             val, &val->iface[i], &np);
4271                 if (ret != 0) {
4272                         goto fail;
4273                 }
4274                 offset += np;
4275         }
4276
4277 done:
4278         *out = val;
4279         *npull = offset;
4280         return 0;
4281
4282 fail:
4283         talloc_free(val);
4284         return ret;
4285 }
4286
4287 size_t ctdb_public_ip_info_len(struct ctdb_public_ip_info *in)
4288 {
4289         return ctdb_public_ip_len(&in->ip) +
4290                 ctdb_uint32_len(&in->active_idx) +
4291                 ctdb_iface_list_len(in->ifaces);
4292 }
4293
4294 void ctdb_public_ip_info_push(struct ctdb_public_ip_info *in, uint8_t *buf,
4295                               size_t  *npush)
4296 {
4297         size_t offset = 0, np;
4298
4299         ctdb_public_ip_push(&in->ip, buf+offset, &np);
4300         offset += np;
4301
4302         ctdb_uint32_push(&in->active_idx, buf+offset, &np);
4303         offset += np;
4304
4305         ctdb_iface_list_push(in->ifaces, buf+offset, &np);
4306         offset += np;
4307
4308         *npush = offset;
4309 }
4310
4311 int ctdb_public_ip_info_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4312                              struct ctdb_public_ip_info **out, size_t *npull)
4313 {
4314         struct ctdb_public_ip_info *val;
4315         size_t offset = 0, np;
4316         int ret;
4317
4318         val = talloc(mem_ctx, struct ctdb_public_ip_info);
4319         if (val == NULL) {
4320                 return ENOMEM;
4321         }
4322
4323         ret = ctdb_public_ip_pull_elems(buf+offset, buflen-offset, val,
4324                                         &val->ip, &np);
4325         if (ret != 0) {
4326                 goto fail;
4327         }
4328         offset += np;
4329
4330         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->active_idx,
4331                                &np);
4332         if (ret != 0) {
4333                 goto fail;
4334         }
4335         offset += np;
4336
4337         ret = ctdb_iface_list_pull(buf+offset, buflen-offset, val,
4338                                    &val->ifaces, &np);
4339         if (ret != 0) {
4340                 goto fail;
4341         }
4342         offset += np;
4343
4344         *out = val;
4345         *npull = offset;
4346         return 0;
4347
4348 fail:
4349         talloc_free(val);
4350         return ret;
4351 }
4352
4353 size_t ctdb_key_data_len(struct ctdb_key_data *in)
4354 {
4355         return ctdb_uint32_len(&in->db_id) +
4356                 ctdb_padding_len(4) +
4357                 ctdb_ltdb_header_len(&in->header) +
4358                 ctdb_tdb_datan_len(&in->key);
4359 }
4360
4361 void ctdb_key_data_push(struct ctdb_key_data *in, uint8_t *buf, size_t *npush)
4362 {
4363         size_t offset = 0, np;
4364
4365         ctdb_uint32_push(&in->db_id, buf+offset, &np);
4366         offset += np;
4367
4368         ctdb_padding_push(4, buf+offset, &np);
4369         offset += np;
4370
4371         ctdb_ltdb_header_push(&in->header, buf+offset, &np);
4372         offset += np;
4373
4374         ctdb_tdb_datan_push(&in->key, buf+offset, &np);
4375         offset += np;
4376
4377         *npush = offset;
4378 }
4379
4380 int ctdb_key_data_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4381                        struct ctdb_key_data **out, size_t *npull)
4382 {
4383         struct ctdb_key_data *val;
4384         size_t offset = 0, np;
4385         int ret;
4386
4387         val = talloc(mem_ctx, struct ctdb_key_data);
4388         if (val == NULL) {
4389                 return ENOMEM;
4390         }
4391
4392         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->db_id, &np);
4393         if (ret != 0) {
4394                 goto fail;
4395         }
4396         offset += np;
4397
4398         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
4399         if (ret != 0) {
4400                 goto fail;
4401         }
4402         offset += np;
4403
4404         ret = ctdb_ltdb_header_pull(buf+offset, buflen-offset, &val->header,
4405                                     &np);
4406         if (ret != 0) {
4407                 goto fail;
4408         }
4409         offset += np;
4410
4411         ret = ctdb_tdb_datan_pull(buf+offset, buflen-offset, val, &val->key,
4412                                   &np);
4413         if (ret != 0) {
4414                 goto fail;
4415         }
4416         offset += np;
4417
4418         *out = val;
4419         *npull = offset;
4420         return 0;
4421
4422 fail:
4423         talloc_free(val);
4424         return ret;
4425 }
4426
4427 /* In the tdb_data structure marshalling, we are only interested in dsize.
4428  * The dptr value is ignored.  The actual tdb_data blob is stored separately.
4429  *
4430  * This is only required for ctdb_db_statistics and will be dropped in future.
4431  */
4432
4433 static size_t tdb_data_struct_len(TDB_DATA *data)
4434 {
4435         return sizeof(void *) + sizeof(size_t);
4436 }
4437
4438 static void tdb_data_struct_push(TDB_DATA *data, uint8_t *buf, size_t *npush)
4439 {
4440         size_t offset = 0;
4441
4442         memcpy(buf+offset, &data->dptr, sizeof(void *));
4443         offset += sizeof(void *);
4444
4445         memcpy(buf+offset, &data->dsize, sizeof(size_t));
4446         offset += sizeof(size_t);
4447
4448         *npush = offset;
4449 }
4450
4451 static int tdb_data_struct_pull(uint8_t *buf, size_t buflen, TDB_DATA *data,
4452                                 size_t *npull)
4453 {
4454         size_t offset = 0;
4455         void *ptr;
4456
4457         if (buflen-offset < sizeof(void *)) {
4458                 return EMSGSIZE;
4459         }
4460
4461         memcpy(&ptr, buf+offset, sizeof(void *));
4462         offset += sizeof(void *);
4463         data->dptr = NULL;
4464
4465         if (buflen-offset < sizeof(size_t)) {
4466                 return EMSGSIZE;
4467         }
4468
4469         memcpy(&data->dsize, buf+offset, sizeof(size_t));
4470         offset += sizeof(size_t);
4471
4472         *npull = offset;
4473         return 0;
4474 }
4475
4476 size_t ctdb_db_statistics_len(struct ctdb_db_statistics *in)
4477 {
4478         TDB_DATA data = { 0 };
4479         size_t len;
4480         uint32_t u32 = 0;
4481         int i;
4482
4483         len = ctdb_uint32_len(&in->locks.num_calls) +
4484                 ctdb_uint32_len(&in->locks.num_current) +
4485                 ctdb_uint32_len(&in->locks.num_pending) +
4486                 ctdb_uint32_len(&in->locks.num_failed) +
4487                 ctdb_latency_counter_len(&in->locks.latency) +
4488                 MAX_COUNT_BUCKETS *
4489                         ctdb_uint32_len(&in->locks.buckets[0]) +
4490                 ctdb_latency_counter_len(&in->vacuum.latency) +
4491                 ctdb_uint32_len(&in->db_ro_delegations) +
4492                 ctdb_uint32_len(&in->db_ro_revokes) +
4493                 MAX_COUNT_BUCKETS *
4494                         ctdb_uint32_len(&in->hop_count_bucket[0]) +
4495                 ctdb_uint32_len(&in->num_hot_keys) +
4496                 ctdb_padding_len(4) +
4497                 MAX_HOT_KEYS *
4498                         (ctdb_uint32_len(&u32) + ctdb_padding_len(4) +
4499                          tdb_data_struct_len(&data));
4500
4501         for (i=0; i<MAX_HOT_KEYS; i++) {
4502                 len += ctdb_tdb_data_len(&in->hot_keys[i].key);
4503         }
4504
4505         return len;
4506 }
4507
4508 void ctdb_db_statistics_push(struct ctdb_db_statistics *in, uint8_t *buf,
4509                              size_t *npush)
4510 {
4511         size_t offset = 0, np;
4512         uint32_t num_hot_keys;
4513         int i;
4514
4515         ctdb_uint32_push(&in->locks.num_calls, buf+offset, &np);
4516         offset += np;
4517
4518         ctdb_uint32_push(&in->locks.num_current, buf+offset, &np);
4519         offset += np;
4520
4521         ctdb_uint32_push(&in->locks.num_pending, buf+offset, &np);
4522         offset += np;
4523
4524         ctdb_uint32_push(&in->locks.num_failed, buf+offset, &np);
4525         offset += np;
4526
4527         ctdb_latency_counter_push(&in->locks.latency, buf+offset, &np);
4528         offset += np;
4529
4530         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
4531                 ctdb_uint32_push(&in->locks.buckets[i], buf+offset, &np);
4532                 offset += np;
4533         }
4534
4535         ctdb_latency_counter_push(&in->vacuum.latency, buf+offset, &np);
4536         offset += np;
4537
4538         ctdb_uint32_push(&in->db_ro_delegations, buf+offset, &np);
4539         offset += np;
4540
4541         ctdb_uint32_push(&in->db_ro_revokes, buf+offset, &np);
4542         offset += np;
4543
4544         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
4545                 ctdb_uint32_push(&in->hop_count_bucket[i], buf+offset, &np);
4546                 offset += np;
4547         }
4548
4549         num_hot_keys = MAX_HOT_KEYS;
4550         ctdb_uint32_push(&num_hot_keys, buf+offset, &np);
4551         offset += np;
4552
4553         ctdb_padding_push(4, buf+offset, &np);
4554         offset += np;
4555
4556         for (i=0; i<MAX_HOT_KEYS; i++) {
4557                 ctdb_uint32_push(&in->hot_keys[i].count, buf+offset, &np);
4558                 offset += np;
4559
4560                 ctdb_padding_push(4, buf+offset, &np);
4561                 offset += np;
4562
4563                 tdb_data_struct_push(&in->hot_keys[i].key, buf+offset, &np);
4564                 offset += np;
4565         }
4566
4567         for (i=0; i<MAX_HOT_KEYS; i++) {
4568                 ctdb_tdb_data_push(&in->hot_keys[i].key, buf+offset, &np);
4569                 offset += np;
4570         }
4571
4572         *npush = offset;
4573 }
4574
4575 static int ctdb_db_statistics_pull_elems(uint8_t *buf, size_t buflen,
4576                                          TALLOC_CTX *mem_ctx,
4577                                          struct ctdb_db_statistics *out,
4578                                          size_t *npull)
4579 {
4580         size_t offset = 0, np;
4581         int ret, i;
4582
4583         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4584                                &out->locks.num_calls, &np);
4585         if (ret != 0) {
4586                 return ret;
4587         }
4588         offset += np;
4589
4590         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4591                                &out->locks.num_current, &np);
4592         if (ret != 0) {
4593                 return ret;
4594         }
4595         offset += np;
4596
4597         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4598                                &out->locks.num_pending, &np);
4599         if (ret != 0) {
4600                 return ret;
4601         }
4602         offset += np;
4603
4604         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4605                                &out->locks.num_failed, &np);
4606         if (ret != 0) {
4607                 return ret;
4608         }
4609         offset += np;
4610
4611         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
4612                                         &out->locks.latency, &np);
4613         if (ret != 0) {
4614                 return ret;
4615         }
4616         offset += np;
4617
4618         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
4619                 ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4620                                        &out->locks.buckets[i], &np);
4621                 if (ret != 0) {
4622                         return ret;
4623                 }
4624                 offset += np;
4625         }
4626
4627         ret = ctdb_latency_counter_pull(buf+offset, buflen-offset,
4628                                         &out->vacuum.latency, &np);
4629         if (ret != 0) {
4630                 return ret;
4631         }
4632         offset += np;
4633
4634         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4635                                &out->db_ro_delegations, &np);
4636         if (ret != 0) {
4637                 return ret;
4638         }
4639         offset += np;
4640
4641         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4642                                &out->db_ro_revokes, &np);
4643         if (ret != 0) {
4644                 return ret;
4645         }
4646         offset += np;
4647
4648         for (i=0; i<MAX_COUNT_BUCKETS; i++) {
4649                 ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4650                                        &out->hop_count_bucket[i], &np);
4651                 if (ret != 0) {
4652                         return ret;
4653                 }
4654                 offset += np;
4655         }
4656
4657         ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4658                                &out->num_hot_keys, &np);
4659         if (ret != 0) {
4660                 return ret;
4661         }
4662         offset += np;
4663
4664         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
4665         if (ret != 0) {
4666                 return ret;
4667         }
4668         offset += np;
4669
4670         for (i=0; i<MAX_HOT_KEYS; i++) {
4671                 ret = ctdb_uint32_pull(buf+offset, buflen-offset,
4672                                        &out->hot_keys[i].count, &np);
4673                 if (ret != 0) {
4674                         return ret;
4675                 }
4676                 offset += np;
4677
4678                 ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
4679                 if (ret != 0) {
4680                         return ret;
4681                 }
4682                 offset += np;
4683
4684                 ret = tdb_data_struct_pull(buf+offset, buflen-offset,
4685                                            &out->hot_keys[i].key, &np);
4686                 if (ret != 0) {
4687                         return ret;
4688                 }
4689                 offset += np;
4690         }
4691
4692         for (i=0; i<MAX_HOT_KEYS; i++) {
4693                 ret = ctdb_tdb_data_pull(buf+offset,
4694                                          out->hot_keys[i].key.dsize,
4695                                          out, &out->hot_keys[i].key, &np);
4696                 if (ret != 0) {
4697                         return ret;
4698                 }
4699                 offset += np;
4700         }
4701
4702         *npull = offset;
4703         return 0;
4704 }
4705
4706 int ctdb_db_statistics_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4707                             struct ctdb_db_statistics **out, size_t *npull)
4708 {
4709         struct ctdb_db_statistics *val;
4710         size_t np;
4711         int ret;
4712
4713         val = talloc(mem_ctx, struct ctdb_db_statistics);
4714         if (val == NULL) {
4715                 return ENOMEM;
4716         }
4717
4718         ret = ctdb_db_statistics_pull_elems(buf, buflen, val, val, &np);
4719         if (ret != 0) {
4720                 talloc_free(val);
4721                 return ret;
4722         }
4723
4724         *out = val;
4725         *npull = np;
4726         return 0;
4727 }
4728
4729 size_t ctdb_pid_srvid_len(struct ctdb_pid_srvid *in)
4730 {
4731         return ctdb_pid_len(&in->pid) +
4732                 ctdb_uint64_len(&in->srvid);
4733 }
4734
4735 void ctdb_pid_srvid_push(struct ctdb_pid_srvid *in, uint8_t *buf,
4736                          size_t *npush)
4737 {
4738         size_t offset = 0, np;
4739
4740         ctdb_pid_push(&in->pid, buf+offset, &np);
4741         offset += np;
4742
4743         ctdb_uint64_push(&in->srvid, buf+offset, &np);
4744         offset += np;
4745
4746         *npush = offset;
4747 }
4748
4749 int ctdb_pid_srvid_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4750                         struct ctdb_pid_srvid **out, size_t *npull)
4751 {
4752         struct ctdb_pid_srvid *val;
4753         size_t offset = 0, np;
4754         int ret;
4755
4756         val = talloc(mem_ctx, struct ctdb_pid_srvid);
4757         if (val == NULL) {
4758                 return ENOMEM;
4759         }
4760
4761         ret = ctdb_pid_pull(buf+offset, buflen-offset, &val->pid, &np);
4762         if (ret != 0) {
4763                 goto fail;
4764         }
4765         offset += np;
4766
4767         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
4768         if (ret != 0) {
4769                 goto fail;
4770         }
4771         offset += np;
4772
4773         *out = val;
4774         *npull = offset;
4775         return 0;
4776
4777 fail:
4778         talloc_free(val);
4779         return ret;
4780 }
4781
4782 size_t ctdb_election_message_len(struct ctdb_election_message *in)
4783 {
4784         return ctdb_uint32_len(&in->num_connected) +
4785                 ctdb_padding_len(4) +
4786                 ctdb_timeval_len(&in->priority_time) +
4787                 ctdb_uint32_len(&in->pnn) +
4788                 ctdb_uint32_len(&in->node_flags);
4789 }
4790
4791 void ctdb_election_message_push(struct ctdb_election_message *in,
4792                                 uint8_t *buf, size_t *npush)
4793 {
4794         size_t offset = 0, np;
4795
4796         ctdb_uint32_push(&in->num_connected, buf+offset, &np);
4797         offset += np;
4798
4799         ctdb_padding_push(4, buf+offset, &np);
4800         offset += np;
4801
4802         ctdb_timeval_push(&in->priority_time, buf+offset, &np);
4803         offset += np;
4804
4805         ctdb_uint32_push(&in->pnn, buf+offset, &np);
4806         offset += np;
4807
4808         ctdb_uint32_push(&in->node_flags, buf+offset, &np);
4809         offset += np;
4810
4811         *npush = offset;
4812 }
4813
4814 int ctdb_election_message_pull(uint8_t *buf, size_t buflen,
4815                                TALLOC_CTX *mem_ctx,
4816                                struct ctdb_election_message **out,
4817                                size_t *npull)
4818 {
4819         struct ctdb_election_message *val;
4820         size_t offset = 0, np;
4821         int ret;
4822
4823         val = talloc(mem_ctx, struct ctdb_election_message);
4824         if (val == NULL) {
4825                 return ENOMEM;
4826         }
4827
4828         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->num_connected,
4829                                &np);
4830         if (ret != 0) {
4831                 goto fail;
4832         }
4833         offset += np;
4834
4835         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
4836         if (ret != 0) {
4837                 goto fail;
4838         }
4839         offset += np;
4840
4841         ret = ctdb_timeval_pull(buf+offset, buflen-offset,
4842                                 &val->priority_time, &np);
4843         if (ret != 0) {
4844                 goto fail;
4845         }
4846         offset += np;
4847
4848         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
4849         if (ret != 0) {
4850                 goto fail;
4851         }
4852         offset += np;
4853
4854         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->node_flags,
4855                                &np);
4856         if (ret != 0) {
4857                 goto fail;
4858         }
4859         offset += np;
4860
4861         *out = val;
4862         *npull = offset;
4863         return 0;
4864
4865 fail:
4866         talloc_free(val);
4867         return ret;
4868 }
4869
4870 size_t ctdb_srvid_message_len(struct ctdb_srvid_message *in)
4871 {
4872         return ctdb_uint32_len(&in->pnn) +
4873                 ctdb_padding_len(4) +
4874                 ctdb_uint64_len(&in->srvid);
4875 }
4876
4877 void ctdb_srvid_message_push(struct ctdb_srvid_message *in, uint8_t *buf,
4878                              size_t *npush)
4879 {
4880         size_t offset = 0, np;
4881
4882         ctdb_uint32_push(&in->pnn, buf+offset, &np);
4883         offset += np;
4884
4885         ctdb_padding_push(4, buf+offset, &np);
4886         offset += np;
4887
4888         ctdb_uint64_push(&in->srvid, buf+offset, &np);
4889         offset += np;
4890
4891         *npush = offset;
4892 }
4893
4894 int ctdb_srvid_message_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4895                             struct ctdb_srvid_message **out, size_t *npull)
4896 {
4897         struct ctdb_srvid_message *val;
4898         size_t offset = 0, np;
4899         int ret;
4900
4901         val = talloc(mem_ctx, struct ctdb_srvid_message);
4902         if (val == NULL) {
4903                 return ENOMEM;
4904         }
4905
4906         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
4907         if (ret != 0) {
4908                 goto fail;
4909         }
4910         offset += np;
4911
4912         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
4913         if (ret != 0) {
4914                 goto fail;
4915         }
4916         offset += np;
4917
4918         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
4919         if (ret != 0) {
4920                 goto fail;
4921         }
4922         offset += np;
4923
4924         *out = val;
4925         *npull = offset;
4926         return 0;
4927
4928 fail:
4929         talloc_free(val);
4930         return ret;
4931 }
4932
4933 size_t ctdb_disable_message_len(struct ctdb_disable_message *in)
4934 {
4935         return ctdb_uint32_len(&in->pnn) +
4936                 ctdb_padding_len(4) +
4937                 ctdb_uint64_len(&in->srvid) +
4938                 ctdb_uint32_len(&in->timeout) +
4939                 ctdb_padding_len(4);
4940 }
4941
4942 void ctdb_disable_message_push(struct ctdb_disable_message *in, uint8_t *buf,
4943                                size_t *npush)
4944 {
4945         size_t offset = 0, np;
4946
4947         ctdb_uint32_push(&in->pnn, buf+offset, &np);
4948         offset += np;
4949
4950         ctdb_padding_push(4, buf+offset, &np);
4951         offset += np;
4952
4953         ctdb_uint64_push(&in->srvid, buf+offset, &np);
4954         offset += np;
4955
4956         ctdb_uint32_push(&in->timeout, buf+offset, &np);
4957         offset += np;
4958
4959         ctdb_padding_push(4, buf+offset, &np);
4960         offset += np;
4961
4962         *npush = offset;
4963 }
4964
4965 int ctdb_disable_message_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
4966                               struct ctdb_disable_message **out,
4967                               size_t *npull)
4968 {
4969         struct ctdb_disable_message *val;
4970         size_t offset = 0, np;
4971         int ret;
4972
4973         val = talloc(mem_ctx, struct ctdb_disable_message);
4974         if (val == NULL) {
4975                 return ENOMEM;
4976         }
4977
4978         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->pnn, &np);
4979         if (ret != 0) {
4980                 goto fail;
4981         }
4982         offset += np;
4983
4984         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
4985         if (ret != 0) {
4986                 goto fail;
4987         }
4988         offset += np;
4989
4990         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &val->srvid, &np);
4991         if (ret != 0) {
4992                 goto fail;
4993         }
4994         offset += np;
4995
4996         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &val->timeout, &np);
4997         if (ret != 0) {
4998                 goto fail;
4999         }
5000         offset += np;
5001
5002         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
5003         if (ret != 0) {
5004                 goto fail;
5005         }
5006         offset += np;
5007
5008         *out = val;
5009         *npull = offset;
5010         return 0;
5011
5012 fail:
5013         talloc_free(val);
5014         return ret;
5015 }
5016
5017 size_t ctdb_server_id_len(struct ctdb_server_id *in)
5018 {
5019         return ctdb_uint64_len(&in->pid) +
5020                 ctdb_uint32_len(&in->task_id) +
5021                 ctdb_uint32_len(&in->vnn) +
5022                 ctdb_uint64_len(&in->unique_id);
5023 }
5024
5025 void ctdb_server_id_push(struct ctdb_server_id *in, uint8_t *buf,
5026                          size_t *npush)
5027 {
5028         size_t offset = 0, np;
5029
5030         ctdb_uint64_push(&in->pid, buf+offset, &np);
5031         offset += np;
5032
5033         ctdb_uint32_push(&in->task_id, buf+offset, &np);
5034         offset += np;
5035
5036         ctdb_uint32_push(&in->vnn, buf+offset, &np);
5037         offset += np;
5038
5039         ctdb_uint64_push(&in->unique_id, buf+offset, &np);
5040         offset += np;
5041
5042         *npush = offset;
5043 }
5044
5045 int ctdb_server_id_pull(uint8_t *buf, size_t buflen,
5046                         struct ctdb_server_id *out, size_t *npull)
5047 {
5048         size_t offset = 0, np;
5049         int ret;
5050
5051         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &out->pid, &np);
5052         if (ret != 0) {
5053                 return ret;
5054         }
5055         offset += np;
5056
5057         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->task_id, &np);
5058         if (ret != 0) {
5059                 return ret;
5060         }
5061         offset += np;
5062
5063         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &out->vnn, &np);
5064         if (ret != 0) {
5065                 return ret;
5066         }
5067         offset += np;
5068
5069         ret = ctdb_uint64_pull(buf+offset, buflen-offset, &out->unique_id,
5070                                &np);
5071         if (ret != 0) {
5072                 return ret;
5073         }
5074         offset += np;
5075
5076         *npull = offset;
5077         return 0;
5078 }
5079
5080 size_t ctdb_g_lock_len(struct ctdb_g_lock *in)
5081 {
5082         return ctdb_uint32_len(&in->type) +
5083                 ctdb_padding_len(4) +
5084                 ctdb_server_id_len(&in->sid);
5085 }
5086
5087 void ctdb_g_lock_push(struct ctdb_g_lock *in, uint8_t *buf, size_t *npush)
5088 {
5089         size_t offset = 0, np;
5090         uint32_t type;
5091
5092         type = in->type;
5093         ctdb_uint32_push(&type, buf+offset, &np);
5094         offset += np;
5095
5096         ctdb_padding_push(4, buf+offset, &np);
5097         offset += np;
5098
5099         ctdb_server_id_push(&in->sid, buf+offset, &np);
5100         offset += np;
5101
5102         *npush = offset;
5103 }
5104
5105 int ctdb_g_lock_pull(uint8_t *buf, size_t buflen, struct ctdb_g_lock *out,
5106                      size_t *npull)
5107 {
5108         size_t offset = 0, np;
5109         int ret;
5110         uint32_t type;
5111
5112         ret = ctdb_uint32_pull(buf+offset, buflen-offset, &type, &np);
5113         if (ret != 0) {
5114                 return ret;
5115         }
5116         offset += np;
5117
5118         if (type == 0) {
5119                 out->type = CTDB_G_LOCK_READ;
5120         } else if (type == 1) {
5121                 out->type = CTDB_G_LOCK_WRITE;
5122         } else {
5123                 return EPROTO;
5124         }
5125
5126         ret = ctdb_padding_pull(buf+offset, buflen-offset, 4, &np);
5127         if (ret != 0) {
5128                 return ret;
5129         }
5130         offset += np;
5131
5132         ret = ctdb_server_id_pull(buf+offset, buflen-offset, &out->sid, &np);
5133         if (ret != 0) {
5134                 return ret;
5135         }
5136         offset += np;
5137
5138         *npull = offset;
5139         return 0;
5140 }
5141
5142 size_t ctdb_g_lock_list_len(struct ctdb_g_lock_list *in)
5143 {
5144         size_t len = 0;
5145
5146         if (in->num > 0) {
5147                 len += in->num * ctdb_g_lock_len(&in->lock[0]);
5148         }
5149
5150         return len;
5151 }
5152
5153 void ctdb_g_lock_list_push(struct ctdb_g_lock_list *in, uint8_t *buf,
5154                            size_t *npush)
5155 {
5156         size_t offset = 0, np;
5157         uint32_t i;
5158
5159         for (i=0; i<in->num; i++) {
5160                 ctdb_g_lock_push(&in->lock[i], buf+offset, &np);
5161                 offset += np;
5162         }
5163
5164         *npush = offset;
5165 }
5166
5167 int ctdb_g_lock_list_pull(uint8_t *buf, size_t buflen, TALLOC_CTX *mem_ctx,
5168                           struct ctdb_g_lock_list **out, size_t *npull)
5169 {
5170         struct ctdb_g_lock_list *val;
5171         struct ctdb_g_lock lock = { 0 };
5172         size_t offset = 0, np;
5173         uint32_t i;
5174         int ret;
5175
5176         val = talloc(mem_ctx, struct ctdb_g_lock_list);
5177         if (val == NULL) {
5178                 return ENOMEM;
5179         }
5180
5181         if (buflen == 0) {
5182                 val->lock = NULL;
5183                 val->num = 0;
5184                 goto done;
5185         }
5186
5187         val->num = buflen / ctdb_g_lock_len(&lock);
5188
5189         val->lock = talloc_array(val, struct ctdb_g_lock, val->num);
5190         if (val->lock == NULL) {
5191                 ret = ENOMEM;
5192                 goto fail;
5193         }
5194
5195         for (i=0; i<val->num; i++) {
5196                 ret = ctdb_g_lock_pull(buf+offset, buflen-offset,
5197                                        &val->lock[i], &np);
5198                 if (ret != 0) {
5199                         goto fail;
5200                 }
5201                 offset += np;
5202         }
5203
5204 done:
5205         *out = val;
5206         *npull = offset;
5207         return 0;
5208
5209 fail:
5210         talloc_free(val);
5211         return ENOMEM;
5212 }