Merge tag 'armsoc-dt64' of git://git.kernel.org/pub/scm/linux/kernel/git/arm/arm-soc
[sfrench/cifs-2.6.git] / fs / afs / rxrpc.c
1 /* Maintain an RxRPC server socket to do AFS communications through
2  *
3  * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
4  * Written by David Howells (dhowells@redhat.com)
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License
8  * as published by the Free Software Foundation; either version
9  * 2 of the License, or (at your option) any later version.
10  */
11
12 #include <linux/slab.h>
13 #include <net/sock.h>
14 #include <net/af_rxrpc.h>
15 #include <rxrpc/packet.h>
16 #include "internal.h"
17 #include "afs_cm.h"
18
19 static struct socket *afs_socket; /* my RxRPC socket */
20 static struct workqueue_struct *afs_async_calls;
21 static atomic_t afs_outstanding_calls;
22 static atomic_t afs_outstanding_skbs;
23
24 static void afs_wake_up_call_waiter(struct afs_call *);
25 static int afs_wait_for_call_to_complete(struct afs_call *);
26 static void afs_wake_up_async_call(struct afs_call *);
27 static int afs_dont_wait_for_call_to_complete(struct afs_call *);
28 static void afs_process_async_call(struct afs_call *);
29 static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);
30 static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);
31
32 /* synchronous call management */
33 const struct afs_wait_mode afs_sync_call = {
34         .rx_wakeup      = afs_wake_up_call_waiter,
35         .wait           = afs_wait_for_call_to_complete,
36 };
37
38 /* asynchronous call management */
39 const struct afs_wait_mode afs_async_call = {
40         .rx_wakeup      = afs_wake_up_async_call,
41         .wait           = afs_dont_wait_for_call_to_complete,
42 };
43
44 /* asynchronous incoming call management */
45 static const struct afs_wait_mode afs_async_incoming_call = {
46         .rx_wakeup      = afs_wake_up_async_call,
47 };
48
49 /* asynchronous incoming call initial processing */
50 static const struct afs_call_type afs_RXCMxxxx = {
51         .name           = "CB.xxxx",
52         .deliver        = afs_deliver_cm_op_id,
53         .abort_to_error = afs_abort_to_error,
54 };
55
56 static void afs_collect_incoming_call(struct work_struct *);
57
58 static struct sk_buff_head afs_incoming_calls;
59 static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
60
61 static void afs_async_workfn(struct work_struct *work)
62 {
63         struct afs_call *call = container_of(work, struct afs_call, async_work);
64
65         call->async_workfn(call);
66 }
67
68 static int afs_wait_atomic_t(atomic_t *p)
69 {
70         schedule();
71         return 0;
72 }
73
74 /*
75  * open an RxRPC socket and bind it to be a server for callback notifications
76  * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
77  */
78 int afs_open_socket(void)
79 {
80         struct sockaddr_rxrpc srx;
81         struct socket *socket;
82         int ret;
83
84         _enter("");
85
86         skb_queue_head_init(&afs_incoming_calls);
87
88         afs_async_calls = create_singlethread_workqueue("kafsd");
89         if (!afs_async_calls) {
90                 _leave(" = -ENOMEM [wq]");
91                 return -ENOMEM;
92         }
93
94         ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
95         if (ret < 0) {
96                 destroy_workqueue(afs_async_calls);
97                 _leave(" = %d [socket]", ret);
98                 return ret;
99         }
100
101         socket->sk->sk_allocation = GFP_NOFS;
102
103         /* bind the callback manager's address to make this a server socket */
104         srx.srx_family                  = AF_RXRPC;
105         srx.srx_service                 = CM_SERVICE;
106         srx.transport_type              = SOCK_DGRAM;
107         srx.transport_len               = sizeof(srx.transport.sin);
108         srx.transport.sin.sin_family    = AF_INET;
109         srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
110         memset(&srx.transport.sin.sin_addr, 0,
111                sizeof(srx.transport.sin.sin_addr));
112
113         ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
114         if (ret < 0) {
115                 sock_release(socket);
116                 destroy_workqueue(afs_async_calls);
117                 _leave(" = %d [bind]", ret);
118                 return ret;
119         }
120
121         rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
122
123         afs_socket = socket;
124         _leave(" = 0");
125         return 0;
126 }
127
128 /*
129  * close the RxRPC socket AFS was using
130  */
131 void afs_close_socket(void)
132 {
133         _enter("");
134
135         wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
136                          TASK_UNINTERRUPTIBLE);
137         _debug("no outstanding calls");
138
139         sock_release(afs_socket);
140
141         _debug("dework");
142         destroy_workqueue(afs_async_calls);
143
144         ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
145         _leave("");
146 }
147
148 /*
149  * note that the data in a socket buffer is now delivered and that the buffer
150  * should be freed
151  */
152 static void afs_data_delivered(struct sk_buff *skb)
153 {
154         if (!skb) {
155                 _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
156                 dump_stack();
157         } else {
158                 _debug("DLVR %p{%u} [%d]",
159                        skb, skb->mark, atomic_read(&afs_outstanding_skbs));
160                 if (atomic_dec_return(&afs_outstanding_skbs) == -1)
161                         BUG();
162                 rxrpc_kernel_data_delivered(skb);
163         }
164 }
165
166 /*
167  * free a socket buffer
168  */
169 static void afs_free_skb(struct sk_buff *skb)
170 {
171         if (!skb) {
172                 _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
173                 dump_stack();
174         } else {
175                 _debug("FREE %p{%u} [%d]",
176                        skb, skb->mark, atomic_read(&afs_outstanding_skbs));
177                 if (atomic_dec_return(&afs_outstanding_skbs) == -1)
178                         BUG();
179                 rxrpc_kernel_free_skb(skb);
180         }
181 }
182
183 /*
184  * free a call
185  */
186 static void afs_free_call(struct afs_call *call)
187 {
188         _debug("DONE %p{%s} [%d]",
189                call, call->type->name, atomic_read(&afs_outstanding_calls));
190
191         ASSERTCMP(call->rxcall, ==, NULL);
192         ASSERT(!work_pending(&call->async_work));
193         ASSERT(skb_queue_empty(&call->rx_queue));
194         ASSERT(call->type->name != NULL);
195
196         kfree(call->request);
197         kfree(call);
198
199         if (atomic_dec_and_test(&afs_outstanding_calls))
200                 wake_up_atomic_t(&afs_outstanding_calls);
201 }
202
203 /*
204  * End a call but do not free it
205  */
206 static void afs_end_call_nofree(struct afs_call *call)
207 {
208         if (call->rxcall) {
209                 rxrpc_kernel_end_call(call->rxcall);
210                 call->rxcall = NULL;
211         }
212         if (call->type->destructor)
213                 call->type->destructor(call);
214 }
215
216 /*
217  * End a call and free it
218  */
219 static void afs_end_call(struct afs_call *call)
220 {
221         afs_end_call_nofree(call);
222         afs_free_call(call);
223 }
224
225 /*
226  * allocate a call with flat request and reply buffers
227  */
228 struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
229                                      size_t request_size, size_t reply_size)
230 {
231         struct afs_call *call;
232
233         call = kzalloc(sizeof(*call), GFP_NOFS);
234         if (!call)
235                 goto nomem_call;
236
237         _debug("CALL %p{%s} [%d]",
238                call, type->name, atomic_read(&afs_outstanding_calls));
239         atomic_inc(&afs_outstanding_calls);
240
241         call->type = type;
242         call->request_size = request_size;
243         call->reply_max = reply_size;
244
245         if (request_size) {
246                 call->request = kmalloc(request_size, GFP_NOFS);
247                 if (!call->request)
248                         goto nomem_free;
249         }
250
251         if (reply_size) {
252                 call->buffer = kmalloc(reply_size, GFP_NOFS);
253                 if (!call->buffer)
254                         goto nomem_free;
255         }
256
257         init_waitqueue_head(&call->waitq);
258         skb_queue_head_init(&call->rx_queue);
259         return call;
260
261 nomem_free:
262         afs_free_call(call);
263 nomem_call:
264         return NULL;
265 }
266
267 /*
268  * clean up a call with flat buffer
269  */
270 void afs_flat_call_destructor(struct afs_call *call)
271 {
272         _enter("");
273
274         kfree(call->request);
275         call->request = NULL;
276         kfree(call->buffer);
277         call->buffer = NULL;
278 }
279
280 /*
281  * attach the data from a bunch of pages on an inode to a call
282  */
283 static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
284                           struct kvec *iov)
285 {
286         struct page *pages[8];
287         unsigned count, n, loop, offset, to;
288         pgoff_t first = call->first, last = call->last;
289         int ret;
290
291         _enter("");
292
293         offset = call->first_offset;
294         call->first_offset = 0;
295
296         do {
297                 _debug("attach %lx-%lx", first, last);
298
299                 count = last - first + 1;
300                 if (count > ARRAY_SIZE(pages))
301                         count = ARRAY_SIZE(pages);
302                 n = find_get_pages_contig(call->mapping, first, count, pages);
303                 ASSERTCMP(n, ==, count);
304
305                 loop = 0;
306                 do {
307                         msg->msg_flags = 0;
308                         to = PAGE_SIZE;
309                         if (first + loop >= last)
310                                 to = call->last_to;
311                         else
312                                 msg->msg_flags = MSG_MORE;
313                         iov->iov_base = kmap(pages[loop]) + offset;
314                         iov->iov_len = to - offset;
315                         offset = 0;
316
317                         _debug("- range %u-%u%s",
318                                offset, to, msg->msg_flags ? " [more]" : "");
319                         iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC,
320                                       iov, 1, to - offset);
321
322                         /* have to change the state *before* sending the last
323                          * packet as RxRPC might give us the reply before it
324                          * returns from sending the request */
325                         if (first + loop >= last)
326                                 call->state = AFS_CALL_AWAIT_REPLY;
327                         ret = rxrpc_kernel_send_data(call->rxcall, msg,
328                                                      to - offset);
329                         kunmap(pages[loop]);
330                         if (ret < 0)
331                                 break;
332                 } while (++loop < count);
333                 first += count;
334
335                 for (loop = 0; loop < count; loop++)
336                         put_page(pages[loop]);
337                 if (ret < 0)
338                         break;
339         } while (first <= last);
340
341         _leave(" = %d", ret);
342         return ret;
343 }
344
345 /*
346  * initiate a call
347  */
348 int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
349                   const struct afs_wait_mode *wait_mode)
350 {
351         struct sockaddr_rxrpc srx;
352         struct rxrpc_call *rxcall;
353         struct msghdr msg;
354         struct kvec iov[1];
355         int ret;
356         struct sk_buff *skb;
357
358         _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
359
360         ASSERT(call->type != NULL);
361         ASSERT(call->type->name != NULL);
362
363         _debug("____MAKE %p{%s,%x} [%d]____",
364                call, call->type->name, key_serial(call->key),
365                atomic_read(&afs_outstanding_calls));
366
367         call->wait_mode = wait_mode;
368         call->async_workfn = afs_process_async_call;
369         INIT_WORK(&call->async_work, afs_async_workfn);
370
371         memset(&srx, 0, sizeof(srx));
372         srx.srx_family = AF_RXRPC;
373         srx.srx_service = call->service_id;
374         srx.transport_type = SOCK_DGRAM;
375         srx.transport_len = sizeof(srx.transport.sin);
376         srx.transport.sin.sin_family = AF_INET;
377         srx.transport.sin.sin_port = call->port;
378         memcpy(&srx.transport.sin.sin_addr, addr, 4);
379
380         /* create a call */
381         rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
382                                          (unsigned long) call, gfp);
383         call->key = NULL;
384         if (IS_ERR(rxcall)) {
385                 ret = PTR_ERR(rxcall);
386                 goto error_kill_call;
387         }
388
389         call->rxcall = rxcall;
390
391         /* send the request */
392         iov[0].iov_base = call->request;
393         iov[0].iov_len  = call->request_size;
394
395         msg.msg_name            = NULL;
396         msg.msg_namelen         = 0;
397         iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
398                       call->request_size);
399         msg.msg_control         = NULL;
400         msg.msg_controllen      = 0;
401         msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
402
403         /* have to change the state *before* sending the last packet as RxRPC
404          * might give us the reply before it returns from sending the
405          * request */
406         if (!call->send_pages)
407                 call->state = AFS_CALL_AWAIT_REPLY;
408         ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
409         if (ret < 0)
410                 goto error_do_abort;
411
412         if (call->send_pages) {
413                 ret = afs_send_pages(call, &msg, iov);
414                 if (ret < 0)
415                         goto error_do_abort;
416         }
417
418         /* at this point, an async call may no longer exist as it may have
419          * already completed */
420         return wait_mode->wait(call);
421
422 error_do_abort:
423         rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
424         while ((skb = skb_dequeue(&call->rx_queue)))
425                 afs_free_skb(skb);
426 error_kill_call:
427         afs_end_call(call);
428         _leave(" = %d", ret);
429         return ret;
430 }
431
432 /*
433  * Handles intercepted messages that were arriving in the socket's Rx queue.
434  *
435  * Called from the AF_RXRPC call processor in waitqueue process context.  For
436  * each call, it is guaranteed this will be called in order of packet to be
437  * delivered.
438  */
439 static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
440                                struct sk_buff *skb)
441 {
442         struct afs_call *call = (struct afs_call *) user_call_ID;
443
444         _enter("%p,,%u", call, skb->mark);
445
446         _debug("ICPT %p{%u} [%d]",
447                skb, skb->mark, atomic_read(&afs_outstanding_skbs));
448
449         ASSERTCMP(sk, ==, afs_socket->sk);
450         atomic_inc(&afs_outstanding_skbs);
451
452         if (!call) {
453                 /* its an incoming call for our callback service */
454                 skb_queue_tail(&afs_incoming_calls, skb);
455                 queue_work(afs_wq, &afs_collect_incoming_call_work);
456         } else {
457                 /* route the messages directly to the appropriate call */
458                 skb_queue_tail(&call->rx_queue, skb);
459                 call->wait_mode->rx_wakeup(call);
460         }
461
462         _leave("");
463 }
464
465 /*
466  * deliver messages to a call
467  */
468 static void afs_deliver_to_call(struct afs_call *call)
469 {
470         struct sk_buff *skb;
471         bool last;
472         u32 abort_code;
473         int ret;
474
475         _enter("");
476
477         while ((call->state == AFS_CALL_AWAIT_REPLY ||
478                 call->state == AFS_CALL_AWAIT_OP_ID ||
479                 call->state == AFS_CALL_AWAIT_REQUEST ||
480                 call->state == AFS_CALL_AWAIT_ACK) &&
481                (skb = skb_dequeue(&call->rx_queue))) {
482                 switch (skb->mark) {
483                 case RXRPC_SKB_MARK_DATA:
484                         _debug("Rcv DATA");
485                         last = rxrpc_kernel_is_data_last(skb);
486                         ret = call->type->deliver(call, skb, last);
487                         switch (ret) {
488                         case 0:
489                                 if (last &&
490                                     call->state == AFS_CALL_AWAIT_REPLY)
491                                         call->state = AFS_CALL_COMPLETE;
492                                 break;
493                         case -ENOTCONN:
494                                 abort_code = RX_CALL_DEAD;
495                                 goto do_abort;
496                         case -ENOTSUPP:
497                                 abort_code = RX_INVALID_OPERATION;
498                                 goto do_abort;
499                         default:
500                                 abort_code = RXGEN_CC_UNMARSHAL;
501                                 if (call->state != AFS_CALL_AWAIT_REPLY)
502                                         abort_code = RXGEN_SS_UNMARSHAL;
503                         do_abort:
504                                 rxrpc_kernel_abort_call(call->rxcall,
505                                                         abort_code);
506                                 call->error = ret;
507                                 call->state = AFS_CALL_ERROR;
508                                 break;
509                         }
510                         afs_data_delivered(skb);
511                         skb = NULL;
512                         continue;
513                 case RXRPC_SKB_MARK_FINAL_ACK:
514                         _debug("Rcv ACK");
515                         call->state = AFS_CALL_COMPLETE;
516                         break;
517                 case RXRPC_SKB_MARK_BUSY:
518                         _debug("Rcv BUSY");
519                         call->error = -EBUSY;
520                         call->state = AFS_CALL_BUSY;
521                         break;
522                 case RXRPC_SKB_MARK_REMOTE_ABORT:
523                         abort_code = rxrpc_kernel_get_abort_code(skb);
524                         call->error = call->type->abort_to_error(abort_code);
525                         call->state = AFS_CALL_ABORTED;
526                         _debug("Rcv ABORT %u -> %d", abort_code, call->error);
527                         break;
528                 case RXRPC_SKB_MARK_LOCAL_ABORT:
529                         abort_code = rxrpc_kernel_get_abort_code(skb);
530                         call->error = call->type->abort_to_error(abort_code);
531                         call->state = AFS_CALL_ABORTED;
532                         _debug("Loc ABORT %u -> %d", abort_code, call->error);
533                         break;
534                 case RXRPC_SKB_MARK_NET_ERROR:
535                         call->error = -rxrpc_kernel_get_error_number(skb);
536                         call->state = AFS_CALL_ERROR;
537                         _debug("Rcv NET ERROR %d", call->error);
538                         break;
539                 case RXRPC_SKB_MARK_LOCAL_ERROR:
540                         call->error = -rxrpc_kernel_get_error_number(skb);
541                         call->state = AFS_CALL_ERROR;
542                         _debug("Rcv LOCAL ERROR %d", call->error);
543                         break;
544                 default:
545                         BUG();
546                         break;
547                 }
548
549                 afs_free_skb(skb);
550         }
551
552         /* make sure the queue is empty if the call is done with (we might have
553          * aborted the call early because of an unmarshalling error) */
554         if (call->state >= AFS_CALL_COMPLETE) {
555                 while ((skb = skb_dequeue(&call->rx_queue)))
556                         afs_free_skb(skb);
557                 if (call->incoming)
558                         afs_end_call(call);
559         }
560
561         _leave("");
562 }
563
564 /*
565  * wait synchronously for a call to complete
566  */
567 static int afs_wait_for_call_to_complete(struct afs_call *call)
568 {
569         struct sk_buff *skb;
570         int ret;
571
572         DECLARE_WAITQUEUE(myself, current);
573
574         _enter("");
575
576         add_wait_queue(&call->waitq, &myself);
577         for (;;) {
578                 set_current_state(TASK_INTERRUPTIBLE);
579
580                 /* deliver any messages that are in the queue */
581                 if (!skb_queue_empty(&call->rx_queue)) {
582                         __set_current_state(TASK_RUNNING);
583                         afs_deliver_to_call(call);
584                         continue;
585                 }
586
587                 ret = call->error;
588                 if (call->state >= AFS_CALL_COMPLETE)
589                         break;
590                 ret = -EINTR;
591                 if (signal_pending(current))
592                         break;
593                 schedule();
594         }
595
596         remove_wait_queue(&call->waitq, &myself);
597         __set_current_state(TASK_RUNNING);
598
599         /* kill the call */
600         if (call->state < AFS_CALL_COMPLETE) {
601                 _debug("call incomplete");
602                 rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
603                 while ((skb = skb_dequeue(&call->rx_queue)))
604                         afs_free_skb(skb);
605         }
606
607         _debug("call complete");
608         afs_end_call(call);
609         _leave(" = %d", ret);
610         return ret;
611 }
612
613 /*
614  * wake up a waiting call
615  */
616 static void afs_wake_up_call_waiter(struct afs_call *call)
617 {
618         wake_up(&call->waitq);
619 }
620
621 /*
622  * wake up an asynchronous call
623  */
624 static void afs_wake_up_async_call(struct afs_call *call)
625 {
626         _enter("");
627         queue_work(afs_async_calls, &call->async_work);
628 }
629
630 /*
631  * put a call into asynchronous mode
632  * - mustn't touch the call descriptor as the call my have completed by the
633  *   time we get here
634  */
635 static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
636 {
637         _enter("");
638         return -EINPROGRESS;
639 }
640
641 /*
642  * delete an asynchronous call
643  */
644 static void afs_delete_async_call(struct afs_call *call)
645 {
646         _enter("");
647
648         afs_free_call(call);
649
650         _leave("");
651 }
652
653 /*
654  * perform processing on an asynchronous call
655  * - on a multiple-thread workqueue this work item may try to run on several
656  *   CPUs at the same time
657  */
658 static void afs_process_async_call(struct afs_call *call)
659 {
660         _enter("");
661
662         if (!skb_queue_empty(&call->rx_queue))
663                 afs_deliver_to_call(call);
664
665         if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) {
666                 if (call->wait_mode->async_complete)
667                         call->wait_mode->async_complete(call->reply,
668                                                         call->error);
669                 call->reply = NULL;
670
671                 /* kill the call */
672                 afs_end_call_nofree(call);
673
674                 /* we can't just delete the call because the work item may be
675                  * queued */
676                 call->async_workfn = afs_delete_async_call;
677                 queue_work(afs_async_calls, &call->async_work);
678         }
679
680         _leave("");
681 }
682
683 /*
684  * empty a socket buffer into a flat reply buffer
685  */
686 void afs_transfer_reply(struct afs_call *call, struct sk_buff *skb)
687 {
688         size_t len = skb->len;
689
690         if (skb_copy_bits(skb, 0, call->buffer + call->reply_size, len) < 0)
691                 BUG();
692         call->reply_size += len;
693 }
694
695 /*
696  * accept the backlog of incoming calls
697  */
698 static void afs_collect_incoming_call(struct work_struct *work)
699 {
700         struct rxrpc_call *rxcall;
701         struct afs_call *call = NULL;
702         struct sk_buff *skb;
703
704         while ((skb = skb_dequeue(&afs_incoming_calls))) {
705                 _debug("new call");
706
707                 /* don't need the notification */
708                 afs_free_skb(skb);
709
710                 if (!call) {
711                         call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
712                         if (!call) {
713                                 rxrpc_kernel_reject_call(afs_socket);
714                                 return;
715                         }
716
717                         call->async_workfn = afs_process_async_call;
718                         INIT_WORK(&call->async_work, afs_async_workfn);
719                         call->wait_mode = &afs_async_incoming_call;
720                         call->type = &afs_RXCMxxxx;
721                         init_waitqueue_head(&call->waitq);
722                         skb_queue_head_init(&call->rx_queue);
723                         call->state = AFS_CALL_AWAIT_OP_ID;
724
725                         _debug("CALL %p{%s} [%d]",
726                                call, call->type->name,
727                                atomic_read(&afs_outstanding_calls));
728                         atomic_inc(&afs_outstanding_calls);
729                 }
730
731                 rxcall = rxrpc_kernel_accept_call(afs_socket,
732                                                   (unsigned long) call);
733                 if (!IS_ERR(rxcall)) {
734                         call->rxcall = rxcall;
735                         call = NULL;
736                 }
737         }
738
739         if (call)
740                 afs_free_call(call);
741 }
742
743 /*
744  * grab the operation ID from an incoming cache manager call
745  */
746 static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
747                                 bool last)
748 {
749         size_t len = skb->len;
750         void *oibuf = (void *) &call->operation_ID;
751
752         _enter("{%u},{%zu},%d", call->offset, len, last);
753
754         ASSERTCMP(call->offset, <, 4);
755
756         /* the operation ID forms the first four bytes of the request data */
757         len = min_t(size_t, len, 4 - call->offset);
758         if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0)
759                 BUG();
760         if (!pskb_pull(skb, len))
761                 BUG();
762         call->offset += len;
763
764         if (call->offset < 4) {
765                 if (last) {
766                         _leave(" = -EBADMSG [op ID short]");
767                         return -EBADMSG;
768                 }
769                 _leave(" = 0 [incomplete]");
770                 return 0;
771         }
772
773         call->state = AFS_CALL_AWAIT_REQUEST;
774
775         /* ask the cache manager to route the call (it'll change the call type
776          * if successful) */
777         if (!afs_cm_incoming_call(call))
778                 return -ENOTSUPP;
779
780         /* pass responsibility for the remainer of this message off to the
781          * cache manager op */
782         return call->type->deliver(call, skb, last);
783 }
784
785 /*
786  * send an empty reply
787  */
788 void afs_send_empty_reply(struct afs_call *call)
789 {
790         struct msghdr msg;
791
792         _enter("");
793
794         msg.msg_name            = NULL;
795         msg.msg_namelen         = 0;
796         iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
797         msg.msg_control         = NULL;
798         msg.msg_controllen      = 0;
799         msg.msg_flags           = 0;
800
801         call->state = AFS_CALL_AWAIT_ACK;
802         switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
803         case 0:
804                 _leave(" [replied]");
805                 return;
806
807         case -ENOMEM:
808                 _debug("oom");
809                 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
810         default:
811                 afs_end_call(call);
812                 _leave(" [error]");
813                 return;
814         }
815 }
816
817 /*
818  * send a simple reply
819  */
820 void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
821 {
822         struct msghdr msg;
823         struct kvec iov[1];
824         int n;
825
826         _enter("");
827
828         iov[0].iov_base         = (void *) buf;
829         iov[0].iov_len          = len;
830         msg.msg_name            = NULL;
831         msg.msg_namelen         = 0;
832         iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
833         msg.msg_control         = NULL;
834         msg.msg_controllen      = 0;
835         msg.msg_flags           = 0;
836
837         call->state = AFS_CALL_AWAIT_ACK;
838         n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
839         if (n >= 0) {
840                 /* Success */
841                 _leave(" [replied]");
842                 return;
843         }
844
845         if (n == -ENOMEM) {
846                 _debug("oom");
847                 rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
848         }
849         afs_end_call(call);
850         _leave(" [error]");
851 }
852
853 /*
854  * extract a piece of data from the received data socket buffers
855  */
856 int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
857                      bool last, void *buf, size_t count)
858 {
859         size_t len = skb->len;
860
861         _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count);
862
863         ASSERTCMP(call->offset, <, count);
864
865         len = min_t(size_t, len, count - call->offset);
866         if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 ||
867             !pskb_pull(skb, len))
868                 BUG();
869         call->offset += len;
870
871         if (call->offset < count) {
872                 if (last) {
873                         _leave(" = -EBADMSG [%d < %zu]", call->offset, count);
874                         return -EBADMSG;
875                 }
876                 _leave(" = -EAGAIN");
877                 return -EAGAIN;
878         }
879         return 0;
880 }