Revert "smbd: add an effective connection_struct->user_ev_ctx that holds the event...
[samba.git] / source3 / torture / test_messaging_send_all.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Test for a messaging_send_all bug
4  * Copyright (C) Volker Lendecke 2017
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "includes.h"
21 #include "torture/proto.h"
22 #include "lib/util/tevent_unix.h"
23 #include "messages.h"
24 #include "lib/async_req/async_sock.h"
25 #include "lib/util/sys_rw.h"
26
27 static pid_t fork_responder(struct messaging_context *msg_ctx,
28                             int exit_pipe[2])
29 {
30         struct tevent_context *ev = messaging_tevent_context(msg_ctx);
31         struct tevent_req *req;
32         pid_t child_pid;
33         int ready_pipe[2];
34         char c = 0;
35         bool ok;
36         int ret, err;
37         NTSTATUS status;
38         ssize_t nwritten;
39
40         ret = pipe(ready_pipe);
41         if (ret == -1) {
42                 perror("pipe failed");
43                 return -1;
44         }
45
46         child_pid = fork();
47         if (child_pid == -1) {
48                 perror("fork failed");
49                 close(ready_pipe[0]);
50                 close(ready_pipe[1]);
51                 return -1;
52         }
53
54         if (child_pid != 0) {
55                 ssize_t nread;
56                 close(ready_pipe[1]);
57                 nread = read(ready_pipe[0], &c, 1);
58                 close(ready_pipe[0]);
59                 if (nread != 1) {
60                         perror("read failed");
61                         return -1;
62                 }
63                 return child_pid;
64         }
65
66         close(ready_pipe[0]);
67         close(exit_pipe[1]);
68
69         status = messaging_reinit(msg_ctx);
70         if (!NT_STATUS_IS_OK(status)) {
71                 fprintf(stderr, "messaging_reinit failed: %s\n",
72                         nt_errstr(status));
73                 close(ready_pipe[1]);
74                 exit(1);
75         }
76
77         nwritten = sys_write(ready_pipe[1], &c, 1);
78         if (nwritten != 1) {
79                 fprintf(stderr, "write failed: %s\n", strerror(errno));
80                 exit(1);
81         }
82
83         close(ready_pipe[1]);
84
85         req = wait_for_read_send(ev, ev, exit_pipe[0], false);
86         if (req == NULL) {
87                 fprintf(stderr, "wait_for_read_send failed\n");
88                 exit(1);
89         }
90
91         ok = tevent_req_poll_unix(req, ev, &err);
92         if (!ok) {
93                 fprintf(stderr, "tevent_req_poll_unix failed: %s\n",
94                         strerror(err));
95                 exit(1);
96         }
97
98         exit(0);
99 }
100
101 struct messaging_send_all_state {
102         struct tevent_context *ev;
103         struct messaging_context *msg;
104         pid_t *senders;
105         size_t num_received;
106 };
107
108 static void collect_pong_received(struct tevent_req *subreq);
109
110 static struct tevent_req *collect_pong_send(TALLOC_CTX *mem_ctx,
111                                             struct tevent_context *ev,
112                                             struct messaging_context *msg,
113                                             const pid_t *senders,
114                                             size_t num_senders)
115 {
116         struct tevent_req *req, *subreq;
117         struct messaging_send_all_state *state;
118
119         req = tevent_req_create(mem_ctx, &state,
120                                 struct messaging_send_all_state);
121         if (req == NULL) {
122                 return NULL;
123         }
124         state->senders = talloc_memdup(
125                 state, senders, num_senders * sizeof(pid_t));
126         if (tevent_req_nomem(state->senders, req)) {
127                 return tevent_req_post(req, ev);
128         }
129         state->ev = ev;
130         state->msg = msg;
131
132         subreq = messaging_read_send(state, state->ev, state->msg, MSG_PONG);
133         if (tevent_req_nomem(subreq, req)) {
134                 return tevent_req_post(req, ev);
135         }
136         tevent_req_set_callback(subreq, collect_pong_received, req);
137         return req;
138 }
139
140 static void collect_pong_received(struct tevent_req *subreq)
141 {
142         struct tevent_req *req = tevent_req_callback_data(
143                 subreq, struct tevent_req);
144         struct messaging_send_all_state *state = tevent_req_data(
145                 req, struct messaging_send_all_state);
146         size_t num_senders = talloc_array_length(state->senders);
147         size_t i;
148         struct messaging_rec *rec;
149         int ret;
150
151         ret = messaging_read_recv(subreq, state, &rec);
152         TALLOC_FREE(subreq);
153         if (tevent_req_error(req, ret)) {
154                 return;
155         }
156
157         /*
158          * We need to make sure we don't receive our own broadcast!
159          */
160
161         if (rec->src.pid == (uint64_t)getpid()) {
162                 fprintf(stderr, "Received my own broadcast!\n");
163                 tevent_req_error(req, EMULTIHOP);
164                 return;
165         }
166
167         for (i=0; i<num_senders; i++) {
168                 if (state->senders[i] == (pid_t)rec->src.pid) {
169                         printf("got message from %"PRIu64"\n", rec->src.pid);
170                         state->senders[i] = 0;
171                         state->num_received += 1;
172                         break;
173                 }
174         }
175
176         if (state->num_received == num_senders) {
177                 printf("done\n");
178                 tevent_req_done(req);
179                 return;
180         }
181
182         subreq = messaging_read_send(state, state->ev, state->msg, MSG_PONG);
183         if (tevent_req_nomem(subreq, req)) {
184                 return;
185         }
186         tevent_req_set_callback(subreq, collect_pong_received, req);
187 }
188
189 static int collect_pong_recv(struct tevent_req *req)
190 {
191         return tevent_req_simple_recv_unix(req);
192 }
193
194 extern int torture_nprocs;
195
196 bool run_messaging_send_all(int dummy)
197 {
198         struct tevent_context *ev = NULL;
199         struct messaging_context *msg_ctx = NULL;
200         int exit_pipe[2];
201         pid_t children[MAX(5, torture_nprocs)];
202         struct tevent_req *req;
203         size_t i;
204         bool ok;
205         int ret, err;
206
207         ev = samba_tevent_context_init(talloc_tos());
208         if (ev == NULL) {
209                 fprintf(stderr, "tevent_context_init failed\n");
210                 return false;
211         }
212         msg_ctx = messaging_init(ev, ev);
213         if (msg_ctx == NULL) {
214                 fprintf(stderr, "messaging_init failed\n");
215                 return false;
216         }
217         ret = pipe(exit_pipe);
218         if (ret != 0) {
219                 perror("parent: pipe failed for exit_pipe");
220                 return false;
221         }
222
223         for (i=0; i<ARRAY_SIZE(children); i++) {
224                 children[i] = fork_responder(msg_ctx, exit_pipe);
225                 if (children[i] == -1) {
226                         fprintf(stderr, "fork_responder(%zu) failed\n", i);
227                         return false;
228                 }
229         }
230
231         req = collect_pong_send(ev, ev, msg_ctx, children,
232                                 ARRAY_SIZE(children));
233         if (req == NULL) {
234                 perror("collect_pong failed");
235                 return false;
236         }
237
238         ok = tevent_req_set_endtime(req, ev,
239                                     tevent_timeval_current_ofs(10, 0));
240         if (!ok) {
241                 perror("tevent_req_set_endtime failed");
242                 return false;
243         }
244
245         messaging_send_all(msg_ctx, MSG_PING, NULL, 0);
246
247         ok = tevent_req_poll_unix(req, ev, &err);
248         if (!ok) {
249                 perror("tevent_req_poll_unix failed");
250                 return false;
251         }
252
253         ret = collect_pong_recv(req);
254         TALLOC_FREE(req);
255
256         if (ret != 0) {
257                 fprintf(stderr, "collect_pong_send returned %s\n",
258                         strerror(ret));
259                 return false;
260         }
261
262         close(exit_pipe[1]);
263
264         for (i=0; i<ARRAY_SIZE(children); i++) {
265                 pid_t child;
266                 int status;
267
268                 do {
269                         child = waitpid(children[i], &status, 0);
270                 } while ((child == -1) && (errno == EINTR));
271
272                 if (child != children[i]) {
273                         printf("waitpid(%d) failed\n", children[i]);
274                         return false;
275                 }
276         }
277
278         return true;
279 }