Merge ../linus
[sfrench/cifs-2.6.git] / fs / dlm / lowcomms-tcp.c
1 /******************************************************************************
2 *******************************************************************************
3 **
4 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
5 **  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
6 **
7 **  This copyrighted material is made available to anyone wishing to use,
8 **  modify, copy, or redistribute it subject to the terms and conditions
9 **  of the GNU General Public License v.2.
10 **
11 *******************************************************************************
12 ******************************************************************************/
13
14 /*
15  * lowcomms.c
16  *
17  * This is the "low-level" comms layer.
18  *
19  * It is responsible for sending/receiving messages
20  * from other nodes in the cluster.
21  *
22  * Cluster nodes are referred to by their nodeids. nodeids are
23  * simply 32 bit numbers to the locking module - if they need to
24  * be expanded for the cluster infrastructure then that is it's
25  * responsibility. It is this layer's
26  * responsibility to resolve these into IP address or
27  * whatever it needs for inter-node communication.
28  *
29  * The comms level is two kernel threads that deal mainly with
30  * the receiving of messages from other nodes and passing them
31  * up to the mid-level comms layer (which understands the
32  * message format) for execution by the locking core, and
33  * a send thread which does all the setting up of connections
34  * to remote nodes and the sending of data. Threads are not allowed
35  * to send their own data because it may cause them to wait in times
36  * of high load. Also, this way, the sending thread can collect together
37  * messages bound for one node and send them in one block.
38  *
39  * I don't see any problem with the recv thread executing the locking
40  * code on behalf of remote processes as the locking code is
41  * short, efficient and never waits.
42  *
43  */
44
45
46 #include <asm/ioctls.h>
47 #include <net/sock.h>
48 #include <net/tcp.h>
49 #include <linux/pagemap.h>
50
51 #include "dlm_internal.h"
52 #include "lowcomms.h"
53 #include "midcomms.h"
54 #include "config.h"
55
56 struct cbuf {
57         unsigned int base;
58         unsigned int len;
59         unsigned int mask;
60 };
61
62 #define NODE_INCREMENT 32
63 static void cbuf_add(struct cbuf *cb, int n)
64 {
65         cb->len += n;
66 }
67
68 static int cbuf_data(struct cbuf *cb)
69 {
70         return ((cb->base + cb->len) & cb->mask);
71 }
72
73 static void cbuf_init(struct cbuf *cb, int size)
74 {
75         cb->base = cb->len = 0;
76         cb->mask = size-1;
77 }
78
79 static void cbuf_eat(struct cbuf *cb, int n)
80 {
81         cb->len  -= n;
82         cb->base += n;
83         cb->base &= cb->mask;
84 }
85
86 static bool cbuf_empty(struct cbuf *cb)
87 {
88         return cb->len == 0;
89 }
90
91 /* Maximum number of incoming messages to process before
92    doing a cond_resched()
93 */
94 #define MAX_RX_MSG_COUNT 25
95
96 struct connection {
97         struct socket *sock;    /* NULL if not connected */
98         uint32_t nodeid;        /* So we know who we are in the list */
99         struct rw_semaphore sock_sem; /* Stop connect races */
100         struct list_head read_list;   /* On this list when ready for reading */
101         struct list_head write_list;  /* On this list when ready for writing */
102         struct list_head state_list;  /* On this list when ready to connect */
103         unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
104 #define CF_READ_PENDING 1
105 #define CF_WRITE_PENDING 2
106 #define CF_CONNECT_PENDING 3
107 #define CF_IS_OTHERCON 4
108         struct list_head writequeue;  /* List of outgoing writequeue_entries */
109         struct list_head listenlist;  /* List of allocated listening sockets */
110         spinlock_t writequeue_lock;
111         int (*rx_action) (struct connection *); /* What to do when active */
112         struct page *rx_page;
113         struct cbuf cb;
114         int retries;
115         atomic_t waiting_requests;
116 #define MAX_CONNECT_RETRIES 3
117         struct connection *othercon;
118 };
119 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
120
121 /* An entry waiting to be sent */
122 struct writequeue_entry {
123         struct list_head list;
124         struct page *page;
125         int offset;
126         int len;
127         int end;
128         int users;
129         struct connection *con;
130 };
131
132 static struct sockaddr_storage dlm_local_addr;
133
134 /* Manage daemons */
135 static struct task_struct *recv_task;
136 static struct task_struct *send_task;
137
138 static wait_queue_t lowcomms_send_waitq_head;
139 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
140 static wait_queue_t lowcomms_recv_waitq_head;
141 static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
142
143 /* An array of pointers to connections, indexed by NODEID */
144 static struct connection **connections;
145 static DECLARE_MUTEX(connections_lock);
146 static kmem_cache_t *con_cache;
147 static int conn_array_size;
148
149 /* List of sockets that have reads pending */
150 static LIST_HEAD(read_sockets);
151 static DEFINE_SPINLOCK(read_sockets_lock);
152
153 /* List of sockets which have writes pending */
154 static LIST_HEAD(write_sockets);
155 static DEFINE_SPINLOCK(write_sockets_lock);
156
157 /* List of sockets which have connects pending */
158 static LIST_HEAD(state_sockets);
159 static DEFINE_SPINLOCK(state_sockets_lock);
160
161 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
162 {
163         struct connection *con = NULL;
164
165         down(&connections_lock);
166         if (nodeid >= conn_array_size) {
167                 int new_size = nodeid + NODE_INCREMENT;
168                 struct connection **new_conns;
169
170                 new_conns = kzalloc(sizeof(struct connection *) *
171                                     new_size, allocation);
172                 if (!new_conns)
173                         goto finish;
174
175                 memcpy(new_conns, connections,  sizeof(struct connection *) * conn_array_size);
176                 conn_array_size = new_size;
177                 kfree(connections);
178                 connections = new_conns;
179
180         }
181
182         con = connections[nodeid];
183         if (con == NULL && allocation) {
184                 con = kmem_cache_zalloc(con_cache, allocation);
185                 if (!con)
186                         goto finish;
187
188                 con->nodeid = nodeid;
189                 init_rwsem(&con->sock_sem);
190                 INIT_LIST_HEAD(&con->writequeue);
191                 spin_lock_init(&con->writequeue_lock);
192
193                 connections[nodeid] = con;
194         }
195
196 finish:
197         up(&connections_lock);
198         return con;
199 }
200
201 /* Data available on socket or listen socket received a connect */
202 static void lowcomms_data_ready(struct sock *sk, int count_unused)
203 {
204         struct connection *con = sock2con(sk);
205
206         atomic_inc(&con->waiting_requests);
207         if (test_and_set_bit(CF_READ_PENDING, &con->flags))
208                 return;
209
210         spin_lock_bh(&read_sockets_lock);
211         list_add_tail(&con->read_list, &read_sockets);
212         spin_unlock_bh(&read_sockets_lock);
213
214         wake_up_interruptible(&lowcomms_recv_waitq);
215 }
216
217 static void lowcomms_write_space(struct sock *sk)
218 {
219         struct connection *con = sock2con(sk);
220
221         if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
222                 return;
223
224         spin_lock_bh(&write_sockets_lock);
225         list_add_tail(&con->write_list, &write_sockets);
226         spin_unlock_bh(&write_sockets_lock);
227
228         wake_up_interruptible(&lowcomms_send_waitq);
229 }
230
231 static inline void lowcomms_connect_sock(struct connection *con)
232 {
233         if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
234                 return;
235
236         spin_lock_bh(&state_sockets_lock);
237         list_add_tail(&con->state_list, &state_sockets);
238         spin_unlock_bh(&state_sockets_lock);
239
240         wake_up_interruptible(&lowcomms_send_waitq);
241 }
242
243 static void lowcomms_state_change(struct sock *sk)
244 {
245         if (sk->sk_state == TCP_ESTABLISHED)
246                 lowcomms_write_space(sk);
247 }
248
249 /* Make a socket active */
250 static int add_sock(struct socket *sock, struct connection *con)
251 {
252         con->sock = sock;
253
254         /* Install a data_ready callback */
255         con->sock->sk->sk_data_ready = lowcomms_data_ready;
256         con->sock->sk->sk_write_space = lowcomms_write_space;
257         con->sock->sk->sk_state_change = lowcomms_state_change;
258
259         return 0;
260 }
261
262 /* Add the port number to an IP6 or 4 sockaddr and return the address
263    length */
264 static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
265                           int *addr_len)
266 {
267         saddr->ss_family =  dlm_local_addr.ss_family;
268         if (saddr->ss_family == AF_INET) {
269                 struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
270                 in4_addr->sin_port = cpu_to_be16(port);
271                 *addr_len = sizeof(struct sockaddr_in);
272         } else {
273                 struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
274                 in6_addr->sin6_port = cpu_to_be16(port);
275                 *addr_len = sizeof(struct sockaddr_in6);
276         }
277 }
278
279 /* Close a remote connection and tidy up */
280 static void close_connection(struct connection *con, bool and_other)
281 {
282         down_write(&con->sock_sem);
283
284         if (con->sock) {
285                 sock_release(con->sock);
286                 con->sock = NULL;
287         }
288         if (con->othercon && and_other) {
289                 /* Will only re-enter once. */
290                 close_connection(con->othercon, false);
291         }
292         if (con->rx_page) {
293                 __free_page(con->rx_page);
294                 con->rx_page = NULL;
295         }
296         con->retries = 0;
297         up_write(&con->sock_sem);
298 }
299
300 /* Data received from remote end */
301 static int receive_from_sock(struct connection *con)
302 {
303         int ret = 0;
304         struct msghdr msg;
305         struct iovec iov[2];
306         mm_segment_t fs;
307         unsigned len;
308         int r;
309         int call_again_soon = 0;
310
311         down_read(&con->sock_sem);
312
313         if (con->sock == NULL)
314                 goto out;
315         if (con->rx_page == NULL) {
316                 /*
317                  * This doesn't need to be atomic, but I think it should
318                  * improve performance if it is.
319                  */
320                 con->rx_page = alloc_page(GFP_ATOMIC);
321                 if (con->rx_page == NULL)
322                         goto out_resched;
323                 cbuf_init(&con->cb, PAGE_CACHE_SIZE);
324         }
325
326         msg.msg_control = NULL;
327         msg.msg_controllen = 0;
328         msg.msg_iovlen = 1;
329         msg.msg_iov = iov;
330         msg.msg_name = NULL;
331         msg.msg_namelen = 0;
332         msg.msg_flags = 0;
333
334         /*
335          * iov[0] is the bit of the circular buffer between the current end
336          * point (cb.base + cb.len) and the end of the buffer.
337          */
338         iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
339         iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
340         iov[1].iov_len = 0;
341
342         /*
343          * iov[1] is the bit of the circular buffer between the start of the
344          * buffer and the start of the currently used section (cb.base)
345          */
346         if (cbuf_data(&con->cb) >= con->cb.base) {
347                 iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
348                 iov[1].iov_len = con->cb.base;
349                 iov[1].iov_base = page_address(con->rx_page);
350                 msg.msg_iovlen = 2;
351         }
352         len = iov[0].iov_len + iov[1].iov_len;
353
354         fs = get_fs();
355         set_fs(get_ds());
356         r = ret = sock_recvmsg(con->sock, &msg, len,
357                                MSG_DONTWAIT | MSG_NOSIGNAL);
358         set_fs(fs);
359
360         if (ret <= 0)
361                 goto out_close;
362         if (ret == len)
363                 call_again_soon = 1;
364         cbuf_add(&con->cb, ret);
365         ret = dlm_process_incoming_buffer(con->nodeid,
366                                           page_address(con->rx_page),
367                                           con->cb.base, con->cb.len,
368                                           PAGE_CACHE_SIZE);
369         if (ret == -EBADMSG) {
370                 printk(KERN_INFO "dlm: lowcomms: addr=%p, base=%u, len=%u, "
371                        "iov_len=%u, iov_base[0]=%p, read=%d\n",
372                        page_address(con->rx_page), con->cb.base, con->cb.len,
373                        len, iov[0].iov_base, r);
374         }
375         if (ret < 0)
376                 goto out_close;
377         cbuf_eat(&con->cb, ret);
378
379         if (cbuf_empty(&con->cb) && !call_again_soon) {
380                 __free_page(con->rx_page);
381                 con->rx_page = NULL;
382         }
383
384 out:
385         if (call_again_soon)
386                 goto out_resched;
387         up_read(&con->sock_sem);
388         return 0;
389
390 out_resched:
391         lowcomms_data_ready(con->sock->sk, 0);
392         up_read(&con->sock_sem);
393         cond_resched();
394         return 0;
395
396 out_close:
397         up_read(&con->sock_sem);
398         if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
399                 close_connection(con, false);
400                 /* Reconnect when there is something to send */
401         }
402
403         return ret;
404 }
405
406 /* Listening socket is busy, accept a connection */
407 static int accept_from_sock(struct connection *con)
408 {
409         int result;
410         struct sockaddr_storage peeraddr;
411         struct socket *newsock;
412         int len;
413         int nodeid;
414         struct connection *newcon;
415
416         memset(&peeraddr, 0, sizeof(peeraddr));
417         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
418                                   IPPROTO_TCP, &newsock);
419         if (result < 0)
420                 return -ENOMEM;
421
422         down_read(&con->sock_sem);
423
424         result = -ENOTCONN;
425         if (con->sock == NULL)
426                 goto accept_err;
427
428         newsock->type = con->sock->type;
429         newsock->ops = con->sock->ops;
430
431         result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
432         if (result < 0)
433                 goto accept_err;
434
435         /* Get the connected socket's peer */
436         memset(&peeraddr, 0, sizeof(peeraddr));
437         if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
438                                   &len, 2)) {
439                 result = -ECONNABORTED;
440                 goto accept_err;
441         }
442
443         /* Get the new node's NODEID */
444         make_sockaddr(&peeraddr, 0, &len);
445         if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
446                 printk("dlm: connect from non cluster node\n");
447                 sock_release(newsock);
448                 up_read(&con->sock_sem);
449                 return -1;
450         }
451
452         log_print("got connection from %d", nodeid);
453
454         /*  Check to see if we already have a connection to this node. This
455          *  could happen if the two nodes initiate a connection at roughly
456          *  the same time and the connections cross on the wire.
457          * TEMPORARY FIX:
458          *  In this case we store the incoming one in "othercon"
459          */
460         newcon = nodeid2con(nodeid, GFP_KERNEL);
461         if (!newcon) {
462                 result = -ENOMEM;
463                 goto accept_err;
464         }
465         down_write(&newcon->sock_sem);
466         if (newcon->sock) {
467                 struct connection *othercon = newcon->othercon;
468
469                 if (!othercon) {
470                         othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
471                         if (!othercon) {
472                                 printk("dlm: failed to allocate incoming socket\n");
473                                 up_write(&newcon->sock_sem);
474                                 result = -ENOMEM;
475                                 goto accept_err;
476                         }
477                         othercon->nodeid = nodeid;
478                         othercon->rx_action = receive_from_sock;
479                         init_rwsem(&othercon->sock_sem);
480                         set_bit(CF_IS_OTHERCON, &othercon->flags);
481                         newcon->othercon = othercon;
482                 }
483                 othercon->sock = newsock;
484                 newsock->sk->sk_user_data = othercon;
485                 add_sock(newsock, othercon);
486         }
487         else {
488                 newsock->sk->sk_user_data = newcon;
489                 newcon->rx_action = receive_from_sock;
490                 add_sock(newsock, newcon);
491
492         }
493
494         up_write(&newcon->sock_sem);
495
496         /*
497          * Add it to the active queue in case we got data
498          * beween processing the accept adding the socket
499          * to the read_sockets list
500          */
501         lowcomms_data_ready(newsock->sk, 0);
502         up_read(&con->sock_sem);
503
504         return 0;
505
506 accept_err:
507         up_read(&con->sock_sem);
508         sock_release(newsock);
509
510         if (result != -EAGAIN)
511                 printk("dlm: error accepting connection from node: %d\n", result);
512         return result;
513 }
514
515 /* Connect a new socket to its peer */
516 static void connect_to_sock(struct connection *con)
517 {
518         int result = -EHOSTUNREACH;
519         struct sockaddr_storage saddr;
520         int addr_len;
521         struct socket *sock;
522
523         if (con->nodeid == 0) {
524                 log_print("attempt to connect sock 0 foiled");
525                 return;
526         }
527
528         down_write(&con->sock_sem);
529         if (con->retries++ > MAX_CONNECT_RETRIES)
530                 goto out;
531
532         /* Some odd races can cause double-connects, ignore them */
533         if (con->sock) {
534                 result = 0;
535                 goto out;
536         }
537
538         /* Create a socket to communicate with */
539         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
540                                   IPPROTO_TCP, &sock);
541         if (result < 0)
542                 goto out_err;
543
544         memset(&saddr, 0, sizeof(saddr));
545         if (dlm_nodeid_to_addr(con->nodeid, &saddr))
546                 goto out_err;
547
548         sock->sk->sk_user_data = con;
549         con->rx_action = receive_from_sock;
550
551         make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
552
553         add_sock(sock, con);
554
555         log_print("connecting to %d", con->nodeid);
556         result =
557                 sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
558                                    O_NONBLOCK);
559         if (result == -EINPROGRESS)
560                 result = 0;
561         if (result == 0)
562                 goto out;
563
564 out_err:
565         if (con->sock) {
566                 sock_release(con->sock);
567                 con->sock = NULL;
568         }
569         /*
570          * Some errors are fatal and this list might need adjusting. For other
571          * errors we try again until the max number of retries is reached.
572          */
573         if (result != -EHOSTUNREACH && result != -ENETUNREACH &&
574             result != -ENETDOWN && result != EINVAL
575             && result != -EPROTONOSUPPORT) {
576                 lowcomms_connect_sock(con);
577                 result = 0;
578         }
579 out:
580         up_write(&con->sock_sem);
581         return;
582 }
583
584 static struct socket *create_listen_sock(struct connection *con,
585                                          struct sockaddr_storage *saddr)
586 {
587         struct socket *sock = NULL;
588         mm_segment_t fs;
589         int result = 0;
590         int one = 1;
591         int addr_len;
592
593         if (dlm_local_addr.ss_family == AF_INET)
594                 addr_len = sizeof(struct sockaddr_in);
595         else
596                 addr_len = sizeof(struct sockaddr_in6);
597
598         /* Create a socket to communicate with */
599         result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM, IPPROTO_TCP, &sock);
600         if (result < 0) {
601                 printk("dlm: Can't create listening comms socket\n");
602                 goto create_out;
603         }
604
605         fs = get_fs();
606         set_fs(get_ds());
607         result = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
608                                  (char *)&one, sizeof(one));
609         set_fs(fs);
610         if (result < 0) {
611                 printk("dlm: Failed to set SO_REUSEADDR on socket: result=%d\n",
612                        result);
613         }
614         sock->sk->sk_user_data = con;
615         con->rx_action = accept_from_sock;
616         con->sock = sock;
617
618         /* Bind to our port */
619         make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
620         result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
621         if (result < 0) {
622                 printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
623                 sock_release(sock);
624                 sock = NULL;
625                 con->sock = NULL;
626                 goto create_out;
627         }
628
629         fs = get_fs();
630         set_fs(get_ds());
631
632         result = sock_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
633                                  (char *)&one, sizeof(one));
634         set_fs(fs);
635         if (result < 0) {
636                 printk("dlm: Set keepalive failed: %d\n", result);
637         }
638
639         result = sock->ops->listen(sock, 5);
640         if (result < 0) {
641                 printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
642                 sock_release(sock);
643                 sock = NULL;
644                 goto create_out;
645         }
646
647 create_out:
648         return sock;
649 }
650
651
652 /* Listen on all interfaces */
653 static int listen_for_all(void)
654 {
655         struct socket *sock = NULL;
656         struct connection *con = nodeid2con(0, GFP_KERNEL);
657         int result = -EINVAL;
658
659         /* We don't support multi-homed hosts */
660         set_bit(CF_IS_OTHERCON, &con->flags);
661
662         sock = create_listen_sock(con, &dlm_local_addr);
663         if (sock) {
664                 add_sock(sock, con);
665                 result = 0;
666         }
667         else {
668                 result = -EADDRINUSE;
669         }
670
671         return result;
672 }
673
674
675
676 static struct writequeue_entry *new_writequeue_entry(struct connection *con,
677                                                      gfp_t allocation)
678 {
679         struct writequeue_entry *entry;
680
681         entry = kmalloc(sizeof(struct writequeue_entry), allocation);
682         if (!entry)
683                 return NULL;
684
685         entry->page = alloc_page(allocation);
686         if (!entry->page) {
687                 kfree(entry);
688                 return NULL;
689         }
690
691         entry->offset = 0;
692         entry->len = 0;
693         entry->end = 0;
694         entry->users = 0;
695         entry->con = con;
696
697         return entry;
698 }
699
700 void *dlm_lowcomms_get_buffer(int nodeid, int len,
701                               gfp_t allocation, char **ppc)
702 {
703         struct connection *con;
704         struct writequeue_entry *e;
705         int offset = 0;
706         int users = 0;
707
708         con = nodeid2con(nodeid, allocation);
709         if (!con)
710                 return NULL;
711
712         e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
713         if ((&e->list == &con->writequeue) ||
714             (PAGE_CACHE_SIZE - e->end < len)) {
715                 e = NULL;
716         } else {
717                 offset = e->end;
718                 e->end += len;
719                 users = e->users++;
720         }
721         spin_unlock(&con->writequeue_lock);
722
723         if (e) {
724         got_one:
725                 if (users == 0)
726                         kmap(e->page);
727                 *ppc = page_address(e->page) + offset;
728                 return e;
729         }
730
731         e = new_writequeue_entry(con, allocation);
732         if (e) {
733                 spin_lock(&con->writequeue_lock);
734                 offset = e->end;
735                 e->end += len;
736                 users = e->users++;
737                 list_add_tail(&e->list, &con->writequeue);
738                 spin_unlock(&con->writequeue_lock);
739                 goto got_one;
740         }
741         return NULL;
742 }
743
744 void dlm_lowcomms_commit_buffer(void *mh)
745 {
746         struct writequeue_entry *e = (struct writequeue_entry *)mh;
747         struct connection *con = e->con;
748         int users;
749
750         users = --e->users;
751         if (users)
752                 goto out;
753         e->len = e->end - e->offset;
754         kunmap(e->page);
755         spin_unlock(&con->writequeue_lock);
756
757         if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
758                 spin_lock_bh(&write_sockets_lock);
759                 list_add_tail(&con->write_list, &write_sockets);
760                 spin_unlock_bh(&write_sockets_lock);
761
762                 wake_up_interruptible(&lowcomms_send_waitq);
763         }
764         return;
765
766 out:
767         spin_unlock(&con->writequeue_lock);
768         return;
769 }
770
771 static void free_entry(struct writequeue_entry *e)
772 {
773         __free_page(e->page);
774         kfree(e);
775 }
776
777 /* Send a message */
778 static void send_to_sock(struct connection *con)
779 {
780         int ret = 0;
781         ssize_t(*sendpage) (struct socket *, struct page *, int, size_t, int);
782         const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
783         struct writequeue_entry *e;
784         int len, offset;
785
786         down_read(&con->sock_sem);
787         if (con->sock == NULL)
788                 goto out_connect;
789
790         sendpage = con->sock->ops->sendpage;
791
792         spin_lock(&con->writequeue_lock);
793         for (;;) {
794                 e = list_entry(con->writequeue.next, struct writequeue_entry,
795                                list);
796                 if ((struct list_head *) e == &con->writequeue)
797                         break;
798
799                 len = e->len;
800                 offset = e->offset;
801                 BUG_ON(len == 0 && e->users == 0);
802                 spin_unlock(&con->writequeue_lock);
803
804                 ret = 0;
805                 if (len) {
806                         ret = sendpage(con->sock, e->page, offset, len,
807                                        msg_flags);
808                         if (ret == -EAGAIN || ret == 0)
809                                 goto out;
810                         if (ret <= 0)
811                                 goto send_error;
812                 }
813                 else {
814                         /* Don't starve people filling buffers */
815                         cond_resched();
816                 }
817
818                 spin_lock(&con->writequeue_lock);
819                 e->offset += ret;
820                 e->len -= ret;
821
822                 if (e->len == 0 && e->users == 0) {
823                         list_del(&e->list);
824                         kunmap(e->page);
825                         free_entry(e);
826                         continue;
827                 }
828         }
829         spin_unlock(&con->writequeue_lock);
830 out:
831         up_read(&con->sock_sem);
832         return;
833
834 send_error:
835         up_read(&con->sock_sem);
836         close_connection(con, false);
837         lowcomms_connect_sock(con);
838         return;
839
840 out_connect:
841         up_read(&con->sock_sem);
842         lowcomms_connect_sock(con);
843         return;
844 }
845
846 static void clean_one_writequeue(struct connection *con)
847 {
848         struct list_head *list;
849         struct list_head *temp;
850
851         spin_lock(&con->writequeue_lock);
852         list_for_each_safe(list, temp, &con->writequeue) {
853                 struct writequeue_entry *e =
854                         list_entry(list, struct writequeue_entry, list);
855                 list_del(&e->list);
856                 free_entry(e);
857         }
858         spin_unlock(&con->writequeue_lock);
859 }
860
861 /* Called from recovery when it knows that a node has
862    left the cluster */
863 int dlm_lowcomms_close(int nodeid)
864 {
865         struct connection *con;
866
867         if (!connections)
868                 goto out;
869
870         log_print("closing connection to node %d", nodeid);
871         con = nodeid2con(nodeid, 0);
872         if (con) {
873                 clean_one_writequeue(con);
874                 close_connection(con, true);
875                 atomic_set(&con->waiting_requests, 0);
876         }
877         return 0;
878
879 out:
880         return -1;
881 }
882
883 /* API send message call, may queue the request */
884 /* N.B. This is the old interface - use the new one for new calls */
885 int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
886 {
887         struct writequeue_entry *e;
888         char *b;
889
890         e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
891         if (e) {
892                 memcpy(b, buf, len);
893                 dlm_lowcomms_commit_buffer(e);
894                 return 0;
895         }
896         return -ENOBUFS;
897 }
898
899 /* Look for activity on active sockets */
900 static void process_sockets(void)
901 {
902         struct list_head *list;
903         struct list_head *temp;
904         int count = 0;
905
906         spin_lock_bh(&read_sockets_lock);
907         list_for_each_safe(list, temp, &read_sockets) {
908
909                 struct connection *con =
910                         list_entry(list, struct connection, read_list);
911                 list_del(&con->read_list);
912                 clear_bit(CF_READ_PENDING, &con->flags);
913
914                 spin_unlock_bh(&read_sockets_lock);
915
916                 /* This can reach zero if we are processing requests
917                  * as they come in.
918                  */
919                 if (atomic_read(&con->waiting_requests) == 0) {
920                         spin_lock_bh(&read_sockets_lock);
921                         continue;
922                 }
923
924                 do {
925                         con->rx_action(con);
926
927                         /* Don't starve out everyone else */
928                         if (++count >= MAX_RX_MSG_COUNT) {
929                                 cond_resched();
930                                 count = 0;
931                         }
932
933                 } while (!atomic_dec_and_test(&con->waiting_requests) &&
934                          !kthread_should_stop());
935
936                 spin_lock_bh(&read_sockets_lock);
937         }
938         spin_unlock_bh(&read_sockets_lock);
939 }
940
941 /* Try to send any messages that are pending
942  */
943 static void process_output_queue(void)
944 {
945         struct list_head *list;
946         struct list_head *temp;
947
948         spin_lock_bh(&write_sockets_lock);
949         list_for_each_safe(list, temp, &write_sockets) {
950                 struct connection *con =
951                         list_entry(list, struct connection, write_list);
952                 clear_bit(CF_WRITE_PENDING, &con->flags);
953                 list_del(&con->write_list);
954
955                 spin_unlock_bh(&write_sockets_lock);
956                 send_to_sock(con);
957                 spin_lock_bh(&write_sockets_lock);
958         }
959         spin_unlock_bh(&write_sockets_lock);
960 }
961
962 static void process_state_queue(void)
963 {
964         struct list_head *list;
965         struct list_head *temp;
966
967         spin_lock_bh(&state_sockets_lock);
968         list_for_each_safe(list, temp, &state_sockets) {
969                 struct connection *con =
970                         list_entry(list, struct connection, state_list);
971                 list_del(&con->state_list);
972                 clear_bit(CF_CONNECT_PENDING, &con->flags);
973                 spin_unlock_bh(&state_sockets_lock);
974
975                 connect_to_sock(con);
976                 spin_lock_bh(&state_sockets_lock);
977         }
978         spin_unlock_bh(&state_sockets_lock);
979 }
980
981
982 /* Discard all entries on the write queues */
983 static void clean_writequeues(void)
984 {
985         int nodeid;
986
987         for (nodeid = 1; nodeid < conn_array_size; nodeid++) {
988                 struct connection *con = nodeid2con(nodeid, 0);
989
990                 if (con)
991                         clean_one_writequeue(con);
992         }
993 }
994
995 static int read_list_empty(void)
996 {
997         int status;
998
999         spin_lock_bh(&read_sockets_lock);
1000         status = list_empty(&read_sockets);
1001         spin_unlock_bh(&read_sockets_lock);
1002
1003         return status;
1004 }
1005
1006 /* DLM Transport comms receive daemon */
1007 static int dlm_recvd(void *data)
1008 {
1009         init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
1010         add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
1011
1012         while (!kthread_should_stop()) {
1013                 set_current_state(TASK_INTERRUPTIBLE);
1014                 if (read_list_empty())
1015                         cond_resched();
1016                 set_current_state(TASK_RUNNING);
1017
1018                 process_sockets();
1019         }
1020
1021         return 0;
1022 }
1023
1024 static int write_and_state_lists_empty(void)
1025 {
1026         int status;
1027
1028         spin_lock_bh(&write_sockets_lock);
1029         status = list_empty(&write_sockets);
1030         spin_unlock_bh(&write_sockets_lock);
1031
1032         spin_lock_bh(&state_sockets_lock);
1033         if (list_empty(&state_sockets) == 0)
1034                 status = 0;
1035         spin_unlock_bh(&state_sockets_lock);
1036
1037         return status;
1038 }
1039
1040 /* DLM Transport send daemon */
1041 static int dlm_sendd(void *data)
1042 {
1043         init_waitqueue_entry(&lowcomms_send_waitq_head, current);
1044         add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
1045
1046         while (!kthread_should_stop()) {
1047                 set_current_state(TASK_INTERRUPTIBLE);
1048                 if (write_and_state_lists_empty())
1049                         cond_resched();
1050                 set_current_state(TASK_RUNNING);
1051
1052                 process_state_queue();
1053                 process_output_queue();
1054         }
1055
1056         return 0;
1057 }
1058
1059 static void daemons_stop(void)
1060 {
1061         kthread_stop(recv_task);
1062         kthread_stop(send_task);
1063 }
1064
1065 static int daemons_start(void)
1066 {
1067         struct task_struct *p;
1068         int error;
1069
1070         p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
1071         error = IS_ERR(p);
1072         if (error) {
1073                 log_print("can't start dlm_recvd %d", error);
1074                 return error;
1075         }
1076         recv_task = p;
1077
1078         p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
1079         error = IS_ERR(p);
1080         if (error) {
1081                 log_print("can't start dlm_sendd %d", error);
1082                 kthread_stop(recv_task);
1083                 return error;
1084         }
1085         send_task = p;
1086
1087         return 0;
1088 }
1089
1090 /*
1091  * Return the largest buffer size we can cope with.
1092  */
1093 int lowcomms_max_buffer_size(void)
1094 {
1095         return PAGE_CACHE_SIZE;
1096 }
1097
1098 void dlm_lowcomms_stop(void)
1099 {
1100         int i;
1101
1102         /* Set all the flags to prevent any
1103            socket activity.
1104         */
1105         for (i = 0; i < conn_array_size; i++) {
1106                 if (connections[i])
1107                         connections[i]->flags |= 0xFF;
1108         }
1109
1110         daemons_stop();
1111         clean_writequeues();
1112
1113         for (i = 0; i < conn_array_size; i++) {
1114                 if (connections[i]) {
1115                         close_connection(connections[i], true);
1116                         if (connections[i]->othercon)
1117                                 kmem_cache_free(con_cache, connections[i]->othercon);
1118                         kmem_cache_free(con_cache, connections[i]);
1119                 }
1120         }
1121
1122         kfree(connections);
1123         connections = NULL;
1124
1125         kmem_cache_destroy(con_cache);
1126 }
1127
1128 /* This is quite likely to sleep... */
1129 int dlm_lowcomms_start(void)
1130 {
1131         int error = 0;
1132
1133         error = -ENOMEM;
1134         connections = kzalloc(sizeof(struct connection *) *
1135                               NODE_INCREMENT, GFP_KERNEL);
1136         if (!connections)
1137                 goto out;
1138
1139         conn_array_size = NODE_INCREMENT;
1140
1141         if (dlm_our_addr(&dlm_local_addr, 0)) {
1142                 log_print("no local IP address has been set");
1143                 goto fail_free_conn;
1144         }
1145         if (!dlm_our_addr(&dlm_local_addr, 1)) {
1146                 log_print("This dlm comms module does not support multi-homed clustering");
1147                 goto fail_free_conn;
1148         }
1149
1150         con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1151                                       __alignof__(struct connection), 0,
1152                                       NULL, NULL);
1153         if (!con_cache)
1154                 goto fail_free_conn;
1155
1156
1157         /* Start listening */
1158         error = listen_for_all();
1159         if (error)
1160                 goto fail_unlisten;
1161
1162         error = daemons_start();
1163         if (error)
1164                 goto fail_unlisten;
1165
1166         return 0;
1167
1168 fail_unlisten:
1169         close_connection(connections[0], false);
1170         kmem_cache_free(con_cache, connections[0]);
1171         kmem_cache_destroy(con_cache);
1172
1173 fail_free_conn:
1174         kfree(connections);
1175
1176 out:
1177         return error;
1178 }
1179
1180 /*
1181  * Overrides for Emacs so that we follow Linus's tabbing style.
1182  * Emacs will notice this stuff at the end of the file and automatically
1183  * adjust the settings for this buffer only.  This must remain at the end
1184  * of the file.
1185  * ---------------------------------------------------------------------------
1186  * Local variables:
1187  * c-file-style: "linux"
1188  * End:
1189  */