158fe3c82fb0eb16d8ef5f7f30ad9daf743ba937
[kai/samba-autobuild/.git] / source3 / torture / msg_sink.c
1 /*
2  *  Unix SMB/CIFS implementation.
3  *  Receive and count messages
4  *  Copyright (C) Volker Lendecke 2014
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "includes.h"
22 #include "messages.h"
23 #include "lib/util/tevent_unix.h"
24 #include <stdio.h>
25
26 struct sink_state {
27         struct tevent_context *ev;
28         struct messaging_context *msg_ctx;
29         int msg_type;
30         unsigned *counter;
31 };
32
33 static void sink_done(struct tevent_req *subreq);
34
35 static struct tevent_req *sink_send(TALLOC_CTX *mem_ctx,
36                                     struct tevent_context *ev,
37                                     struct messaging_context *msg_ctx,
38                                     int msg_type, unsigned *counter)
39 {
40         struct tevent_req *req, *subreq;
41         struct sink_state *state;
42
43         req = tevent_req_create(mem_ctx, &state, struct sink_state);
44         if (req == NULL) {
45                 return NULL;
46         }
47         state->ev = ev;
48         state->msg_ctx = msg_ctx;
49         state->msg_type = msg_type;
50         state->counter = counter;
51
52         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
53                                      state->msg_type);
54         if (tevent_req_nomem(subreq, req)) {
55                 return tevent_req_post(req, ev);
56         }
57         tevent_req_set_callback(subreq, sink_done, req);
58         return req;
59 }
60
61 static void sink_done(struct tevent_req *subreq)
62 {
63         struct tevent_req *req = tevent_req_callback_data(
64                 subreq, struct tevent_req);
65         struct sink_state *state = tevent_req_data(
66                 req, struct sink_state);
67         int ret;
68
69         ret = messaging_read_recv(subreq, NULL, NULL);
70         TALLOC_FREE(subreq);
71         if (tevent_req_error(req, ret)) {
72                 return;
73         }
74
75         *state->counter += 1;
76
77         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
78                                      state->msg_type);
79         if (tevent_req_nomem(subreq, req)) {
80                 return;
81         }
82         tevent_req_set_callback(subreq, sink_done, req);
83 }
84
85 static int sink_recv(struct tevent_req *req)
86 {
87         int err;
88
89         if (tevent_req_is_unix_error(req, &err)) {
90                 return err;
91         }
92         return 0;
93 }
94
95 struct prcount_state {
96         struct tevent_context *ev;
97         struct timeval interval;
98         unsigned *counter;
99 };
100
101 static void prcount_waited(struct tevent_req *subreq);
102
103 static struct tevent_req *prcount_send(TALLOC_CTX *mem_ctx,
104                                        struct tevent_context *ev,
105                                        struct timeval interval,
106                                        unsigned *counter)
107 {
108         struct tevent_req *req, *subreq;
109         struct prcount_state *state;
110
111         req = tevent_req_create(mem_ctx, &state, struct prcount_state);
112         if (req == NULL) {
113                 return NULL;
114         }
115         state->ev = ev;
116         state->interval = interval;
117         state->counter = counter;
118
119         subreq = tevent_wakeup_send(
120                 state, state->ev,
121                 timeval_current_ofs(state->interval.tv_sec,
122                                     state->interval.tv_usec));
123         if (tevent_req_nomem(subreq, req)) {
124                 return tevent_req_post(req, ev);
125         }
126         tevent_req_set_callback(subreq, prcount_waited, req);
127         return req;
128 }
129
130 static void prcount_waited(struct tevent_req *subreq)
131 {
132         struct tevent_req *req = tevent_req_callback_data(
133                 subreq, struct tevent_req);
134         struct prcount_state *state = tevent_req_data(
135                 req, struct prcount_state);
136         bool ok;
137
138         ok = tevent_wakeup_recv(subreq);
139         TALLOC_FREE(subreq);
140         if (!ok) {
141                 tevent_req_error(req, ENOMEM);
142                 return;
143         }
144
145         printf("%u\n", *state->counter);
146
147         subreq = tevent_wakeup_send(
148                 state, state->ev,
149                 timeval_current_ofs(state->interval.tv_sec,
150                                     state->interval.tv_usec));
151         if (tevent_req_nomem(subreq, req)) {
152                 return;
153         }
154         tevent_req_set_callback(subreq, prcount_waited, req);
155 }
156
157 static int prcount_recv(struct tevent_req *req)
158 {
159         int err;
160
161         if (tevent_req_is_unix_error(req, &err)) {
162                 return err;
163         }
164         return 0;
165 }
166
167 struct msgcount_state {
168         unsigned count;
169 };
170
171 static void msgcount_sunk(struct tevent_req *subreq);
172 static void msgcount_printed(struct tevent_req *subreq);
173
174 static struct tevent_req *msgcount_send(TALLOC_CTX *mem_ctx,
175                                         struct tevent_context *ev,
176                                         struct messaging_context *msg_ctx,
177                                         int msg_type, struct timeval interval)
178 {
179         struct tevent_req *req, *subreq;
180         struct msgcount_state *state;
181
182         req = tevent_req_create(mem_ctx, &state, struct msgcount_state);
183         if (req == NULL) {
184                 return NULL;
185         }
186
187         subreq = sink_send(state, ev, msg_ctx, msg_type, &state->count);
188         if (tevent_req_nomem(subreq, req)) {
189                 return tevent_req_post(req, ev);
190         }
191         tevent_req_set_callback(subreq, msgcount_sunk, req);
192
193         subreq = prcount_send(state, ev, interval, &state->count);
194         if (tevent_req_nomem(subreq, req)) {
195                 return tevent_req_post(req, ev);
196         }
197         tevent_req_set_callback(subreq, msgcount_printed, req);
198
199         return req;
200 }
201
202 static void msgcount_sunk(struct tevent_req *subreq)
203 {
204         struct tevent_req *req = tevent_req_callback_data(
205                 subreq, struct tevent_req);
206         int ret;
207
208         ret = sink_recv(subreq);
209         TALLOC_FREE(subreq);
210         if (tevent_req_error(req, ret)) {
211                 return;
212         }
213         tevent_req_done(req);
214 }
215
216 static void msgcount_printed(struct tevent_req *subreq)
217 {
218         struct tevent_req *req = tevent_req_callback_data(
219                 subreq, struct tevent_req);
220         int ret;
221
222         ret = prcount_recv(subreq);
223         TALLOC_FREE(subreq);
224         if (tevent_req_error(req, ret)) {
225                 return;
226         }
227         tevent_req_done(req);
228 }
229
230 static int msgcount_recv(struct tevent_req *req)
231 {
232         int err;
233
234         if (tevent_req_is_unix_error(req, &err)) {
235                 return err;
236         }
237         return 0;
238 }
239
240 int main(void)
241 {
242         TALLOC_CTX *frame = talloc_stackframe();
243         struct tevent_context *ev;
244         struct messaging_context *msg_ctx;
245         struct tevent_req *req;
246         int ret;
247         struct server_id id;
248         struct server_id_buf tmp;
249
250         lp_load_global(get_dyn_CONFIGFILE());
251
252         ev = tevent_context_init(frame);
253         if (ev == NULL) {
254                 perror("tevent_context_init failed");
255                 return -1;
256         }
257
258         msg_ctx = messaging_init(ev, ev);
259         if (msg_ctx == NULL) {
260                 perror("messaging_init failed");
261                 return -1;
262         }
263
264         id = messaging_server_id(msg_ctx);
265
266         printf("server_id: %s\n", server_id_str_buf(id, &tmp));
267
268         req = msgcount_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY,
269                             timeval_set(1, 0));
270         if (req == NULL) {
271                 perror("msgcount_send failed");
272                 return -1;
273         }
274
275         if (!tevent_req_poll(req, ev)) {
276                 perror("tevent_req_poll failed");
277                 return -1;
278         }
279
280         ret = msgcount_recv(req);
281         printf("msgcount_recv returned %d\n", ret);
282
283         return 0;
284 }