common: FreeBSD+kFreeBSD: Implement get_process_name (same as in Linux)
[kai/samba-autobuild/.git] / ctdb / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
30 #include <stdarg.h>
31
32 /* structures for packet queueing - see common/ctdb_io.c */
33 struct ctdb_partial {
34         uint8_t *data;
35         uint32_t length;
36 };
37
38 struct ctdb_queue_pkt {
39         struct ctdb_queue_pkt *next, *prev;
40         uint8_t *data;
41         uint32_t length;
42         uint32_t full_length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
49         uint32_t out_queue_length;
50         struct fd_event *fde;
51         int fd;
52         size_t alignment;
53         void *private_data;
54         ctdb_queue_cb_fn_t callback;
55         bool *destroyed;
56         const char *name;
57 };
58
59
60
61 int ctdb_queue_length(struct ctdb_queue *queue)
62 {
63         return queue->out_queue_length;
64 }
65
66 /*
67   called when an incoming connection is readable
68   This function MUST be safe for reentry via the queue callback!
69 */
70 static void queue_io_read(struct ctdb_queue *queue)
71 {
72         int num_ready = 0;
73         uint32_t sz_bytes_req;
74         uint32_t pkt_size;
75         uint32_t pkt_bytes_remaining;
76         uint32_t to_read;
77         ssize_t nread;
78         uint8_t *data;
79
80         /* check how much data is available on the socket for immediately
81            guaranteed nonblocking access.
82            as long as we are careful never to try to read more than this
83            we know all reads will be successful and will neither block
84            nor fail with a "data not available right now" error
85         */
86         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
87                 return;
88         }
89         if (num_ready == 0) {
90                 /* the descriptor has been closed */
91                 goto failed;
92         }
93
94         if (queue->partial.data == NULL) {
95                 /* starting fresh, allocate buf for size bytes */
96                 sz_bytes_req = sizeof(pkt_size);
97                 queue->partial.data = talloc_size(queue, sz_bytes_req);
98                 if (queue->partial.data == NULL) {
99                         DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
100                                          sz_bytes_req));
101                         goto failed;
102                 }
103         } else if (queue->partial.length < sizeof(pkt_size)) {
104                 /* yet to find out the packet length */
105                 sz_bytes_req = sizeof(pkt_size) - queue->partial.length;
106         } else {
107                 /* partial packet, length known, full buf allocated */
108                 sz_bytes_req = 0;
109         }
110         data = queue->partial.data;
111
112         if (sz_bytes_req > 0) {
113                 to_read = MIN(sz_bytes_req, num_ready);
114                 nread = read(queue->fd, data + queue->partial.length,
115                              to_read);
116                 if (nread <= 0) {
117                         DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
118                         goto failed;
119                 }
120                 queue->partial.length += nread;
121
122                 if (nread < sz_bytes_req) {
123                         /* not enough to know the length */
124                         DEBUG(DEBUG_DEBUG,("Partial packet length read\n"));
125                         return;
126                 }
127                 /* size now known, allocate buffer for the full packet */
128                 queue->partial.data = talloc_realloc_size(queue, data,
129                                                           *(uint32_t *)data);
130                 if (queue->partial.data == NULL) {
131                         DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
132                                          *(uint32_t *)data));
133                         goto failed;
134                 }
135                 data = queue->partial.data;
136                 num_ready -= nread;
137         }
138
139         pkt_size = *(uint32_t *)data;
140         if (pkt_size == 0) {
141                 DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
142                 goto failed;
143         }
144
145         pkt_bytes_remaining = pkt_size - queue->partial.length;
146         to_read = MIN(pkt_bytes_remaining, num_ready);
147         nread = read(queue->fd, data + queue->partial.length,
148                      to_read);
149         if (nread <= 0) {
150                 DEBUG(DEBUG_ERR,("read error nread=%d\n",
151                                  (int)nread));
152                 goto failed;
153         }
154         queue->partial.length += nread;
155
156         if (queue->partial.length < pkt_size) {
157                 DEBUG(DEBUG_DEBUG,("Partial packet data read\n"));
158                 return;
159         }
160
161         queue->partial.data = NULL;
162         queue->partial.length = 0;
163         /* it is the responsibility of the callback to free 'data' */
164         queue->callback(data, pkt_size, queue->private_data);
165         return;
166
167 failed:
168         queue->callback(NULL, 0, queue->private_data);
169 }
170
171
172 /* used when an event triggers a dead queue */
173 static void queue_dead(struct event_context *ev, struct timed_event *te, 
174                        struct timeval t, void *private_data)
175 {
176         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
177         queue->callback(NULL, 0, queue->private_data);
178 }
179
180
181 /*
182   called when an incoming connection is writeable
183 */
184 static void queue_io_write(struct ctdb_queue *queue)
185 {
186         while (queue->out_queue) {
187                 struct ctdb_queue_pkt *pkt = queue->out_queue;
188                 ssize_t n;
189                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
190                         n = write(queue->fd, pkt->data, 1);
191                 } else {
192                         n = write(queue->fd, pkt->data, pkt->length);
193                 }
194
195                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
196                         if (pkt->length != pkt->full_length) {
197                                 /* partial packet sent - we have to drop it */
198                                 DLIST_REMOVE(queue->out_queue, pkt);
199                                 queue->out_queue_length--;
200                                 talloc_free(pkt);
201                         }
202                         talloc_free(queue->fde);
203                         queue->fde = NULL;
204                         queue->fd = -1;
205                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
206                                         queue_dead, queue);
207                         return;
208                 }
209                 if (n <= 0) return;
210                 
211                 if (n != pkt->length) {
212                         pkt->length -= n;
213                         pkt->data += n;
214                         return;
215                 }
216
217                 DLIST_REMOVE(queue->out_queue, pkt);
218                 queue->out_queue_length--;
219                 talloc_free(pkt);
220         }
221
222         EVENT_FD_NOT_WRITEABLE(queue->fde);
223 }
224
225 /*
226   called when an incoming connection is readable or writeable
227 */
228 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
229                              uint16_t flags, void *private_data)
230 {
231         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
232
233         if (flags & EVENT_FD_READ) {
234                 queue_io_read(queue);
235         } else {
236                 queue_io_write(queue);
237         }
238 }
239
240
241 /*
242   queue a packet for sending
243 */
244 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
245 {
246         struct ctdb_queue_pkt *pkt;
247         uint32_t length2, full_length;
248
249         if (queue->alignment) {
250                 /* enforce the length and alignment rules from the tcp packet allocator */
251                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
252                 *(uint32_t *)data = length2;
253         } else {
254                 length2 = length;
255         }
256
257         if (length2 != length) {
258                 memset(data+length, 0, length2-length);
259         }
260
261         full_length = length2;
262         
263         /* if the queue is empty then try an immediate write, avoiding
264            queue overhead. This relies on non-blocking sockets */
265         if (queue->out_queue == NULL && queue->fd != -1 &&
266             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
267                 ssize_t n = write(queue->fd, data, length2);
268                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
269                         talloc_free(queue->fde);
270                         queue->fde = NULL;
271                         queue->fd = -1;
272                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
273                                         queue_dead, queue);
274                         /* yes, we report success, as the dead node is 
275                            handled via a separate event */
276                         return 0;
277                 }
278                 if (n > 0) {
279                         data += n;
280                         length2 -= n;
281                 }
282                 if (length2 == 0) return 0;
283         }
284
285         pkt = talloc(queue, struct ctdb_queue_pkt);
286         CTDB_NO_MEMORY(queue->ctdb, pkt);
287
288         pkt->data = talloc_memdup(pkt, data, length2);
289         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
290
291         pkt->length = length2;
292         pkt->full_length = full_length;
293
294         if (queue->out_queue == NULL && queue->fd != -1) {
295                 EVENT_FD_WRITEABLE(queue->fde);
296         }
297
298         DLIST_ADD_END(queue->out_queue, pkt, NULL);
299
300         queue->out_queue_length++;
301
302         if (queue->ctdb->tunable.verbose_memory_names != 0) {
303                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
304                 switch (hdr->operation) {
305                 case CTDB_REQ_CONTROL: {
306                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
307                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
308                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
309                         break;
310                 }
311                 case CTDB_REQ_MESSAGE: {
312                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
313                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
314                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
315                         break;
316                 }
317                 default:
318                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
319                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
320                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
321                         break;
322                 }
323         }
324
325         return 0;
326 }
327
328
329 /*
330   setup the fd used by the queue
331  */
332 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
333 {
334         queue->fd = fd;
335         talloc_free(queue->fde);
336         queue->fde = NULL;
337
338         if (fd != -1) {
339                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
340                                           queue_io_handler, queue);
341                 if (queue->fde == NULL) {
342                         return -1;
343                 }
344                 tevent_fd_set_auto_close(queue->fde);
345
346                 if (queue->out_queue) {
347                         EVENT_FD_WRITEABLE(queue->fde);         
348                 }
349         }
350
351         return 0;
352 }
353
354 /* If someone sets up this pointer, they want to know if the queue is freed */
355 static int queue_destructor(struct ctdb_queue *queue)
356 {
357         if (queue->destroyed != NULL)
358                 *queue->destroyed = true;
359         return 0;
360 }
361
362 /*
363   setup a packet queue on a socket
364  */
365 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
366                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
367                                     
368                                     ctdb_queue_cb_fn_t callback,
369                                     void *private_data, const char *fmt, ...)
370 {
371         struct ctdb_queue *queue;
372         va_list ap;
373
374         queue = talloc_zero(mem_ctx, struct ctdb_queue);
375         CTDB_NO_MEMORY_NULL(ctdb, queue);
376         va_start(ap, fmt);
377         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
378         va_end(ap);
379         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
380
381         queue->ctdb = ctdb;
382         queue->fd = fd;
383         queue->alignment = alignment;
384         queue->private_data = private_data;
385         queue->callback = callback;
386         if (fd != -1) {
387                 if (ctdb_queue_set_fd(queue, fd) != 0) {
388                         talloc_free(queue);
389                         return NULL;
390                 }
391         }
392         talloc_set_destructor(queue, queue_destructor);
393
394         return queue;
395 }