ctdb-tests: Fix function names in protocol test
[vlendec/samba-autobuild/.git] / ctdb / tests / src / message_ring.c
1 /*
2    simple ctdb benchmark - send messages in a ring around cluster
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22
23 #include "lib/util/time.h"
24 #include "lib/util/tevent_unix.h"
25
26 #include "client/client.h"
27 #include "tests/src/test_options.h"
28 #include "tests/src/cluster_wait.h"
29
30 #define MSG_ID_BENCH    0
31
32 struct message_ring_state {
33         struct tevent_context *ev;
34         struct ctdb_client_context *client;
35         int num_nodes;
36         int timelimit;
37         int interactive;
38         int msg_count;
39         int msg_plus, msg_minus;
40         struct timeval start_time;
41 };
42
43 static void message_ring_wait(struct tevent_req *subreq);
44 static void message_ring_start(struct tevent_req *subreq);
45 static void message_ring_each_second(struct tevent_req *subreq);
46 static void message_ring_msg_sent(struct tevent_req *subreq);
47 static void message_ring_msg_handler(uint64_t srvid, TDB_DATA data,
48                                    void *private_data);
49 static void message_ring_finish(struct tevent_req *subreq);
50
51 static struct tevent_req *message_ring_send(TALLOC_CTX *mem_ctx,
52                                             struct tevent_context *ev,
53                                             struct ctdb_client_context *client,
54                                             int num_nodes, int timelimit,
55                                             int interactive)
56 {
57         struct tevent_req *req, *subreq;
58         struct message_ring_state *state;
59
60         req = tevent_req_create(mem_ctx, &state, struct message_ring_state);
61         if (req == NULL) {
62                 return NULL;
63         }
64
65         state->ev = ev;
66         state->client = client;
67         state->num_nodes = num_nodes;
68         state->timelimit = timelimit;
69         state->interactive = interactive;
70
71         subreq = ctdb_client_set_message_handler_send(
72                                         state, state->ev, state->client,
73                                         MSG_ID_BENCH,
74                                         message_ring_msg_handler, req);
75         if (tevent_req_nomem(subreq, req)) {
76                 return tevent_req_post(req, ev);
77         }
78         tevent_req_set_callback(subreq, message_ring_wait, req);
79
80         return req;
81 }
82
83 static void message_ring_wait(struct tevent_req *subreq)
84 {
85         struct tevent_req *req = tevent_req_callback_data(
86                 subreq, struct tevent_req);
87         struct message_ring_state *state = tevent_req_data(
88                 req, struct message_ring_state);
89         bool status;
90         int ret;
91
92         status = ctdb_client_set_message_handler_recv(subreq, &ret);
93         TALLOC_FREE(subreq);
94         if (! status) {
95                 tevent_req_error(req, ret);
96                 return;
97         }
98
99         subreq = cluster_wait_send(state, state->ev, state->client,
100                                    state->num_nodes);
101         if (tevent_req_nomem(subreq, req)) {
102                 return;
103         }
104         tevent_req_set_callback(subreq, message_ring_start, req);
105 }
106
107 static void message_ring_start(struct tevent_req *subreq)
108 {
109         struct tevent_req *req = tevent_req_callback_data(
110                 subreq, struct tevent_req);
111         struct message_ring_state *state = tevent_req_data(
112                 req, struct message_ring_state);
113         bool status;
114         int ret;
115
116         status = cluster_wait_recv(subreq, &ret);
117         TALLOC_FREE(subreq);
118         if (! status) {
119                 tevent_req_error(req, ret);
120                 return;
121         }
122
123         state->start_time = tevent_timeval_current();
124
125         if (ctdb_client_pnn(state->client) == 0) {
126                 subreq = tevent_wakeup_send(state, state->ev,
127                                             tevent_timeval_current_ofs(1, 0));
128                 if (tevent_req_nomem(subreq, req)) {
129                         return;
130                 }
131                 tevent_req_set_callback(subreq, message_ring_each_second, req);
132         }
133
134         subreq = tevent_wakeup_send(state, state->ev,
135                                     tevent_timeval_current_ofs(
136                                             state->timelimit, 0));
137         if (tevent_req_nomem(subreq, req)) {
138                 return;
139         }
140         tevent_req_set_callback(subreq, message_ring_finish, req);
141 }
142
143 static uint32_t next_node(struct ctdb_client_context *client,
144                           int num_nodes, int incr)
145 {
146         return (ctdb_client_pnn(client) + num_nodes + incr) % num_nodes;
147 }
148
149 static void message_ring_each_second(struct tevent_req *subreq)
150 {
151         struct tevent_req *req = tevent_req_callback_data(
152                 subreq, struct tevent_req);
153         struct message_ring_state *state = tevent_req_data(
154                 req, struct message_ring_state);
155         struct ctdb_req_message msg;
156         uint32_t pnn;
157         int incr;
158         bool status;
159
160         status = tevent_wakeup_recv(subreq);
161         TALLOC_FREE(subreq);
162         if (! status) {
163                 tevent_req_error(req, EIO);
164                 return;
165         }
166
167         pnn = ctdb_client_pnn(state->client);
168         if (pnn == 0 && state->interactive == 1) {
169                 double t;
170
171                 t = timeval_elapsed(&state->start_time);
172                 printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
173                        pnn, state->msg_count / t,
174                        state->msg_plus, state->msg_minus);
175                 fflush(stdout);
176         }
177
178         if (state->msg_plus == 0) {
179                 incr = 1;
180
181                 msg.srvid = 0;
182                 msg.data.data.dptr = (uint8_t *)&incr;
183                 msg.data.data.dsize = sizeof(incr);
184
185                 pnn = next_node(state->client, state->num_nodes, incr);
186
187                 subreq = ctdb_client_message_send(state, state->ev,
188                                                   state->client, pnn, &msg);
189                 if (tevent_req_nomem(subreq, req)) {
190                         return;
191                 }
192                 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
193         }
194
195         if (state->msg_minus == 0) {
196                 incr = -1;
197
198                 msg.srvid = 0;
199                 msg.data.data.dptr = (uint8_t *)&incr;
200                 msg.data.data.dsize = sizeof(incr);
201
202                 pnn = next_node(state->client, state->num_nodes, incr);
203
204                 subreq = ctdb_client_message_send(state, state->ev,
205                                                   state->client, pnn, &msg);
206                 if (tevent_req_nomem(subreq, req)) {
207                         return;
208                 }
209                 tevent_req_set_callback(subreq, message_ring_msg_sent, req);
210         }
211
212         subreq = tevent_wakeup_send(state, state->ev,
213                                     tevent_timeval_current_ofs(1, 0));
214         if (tevent_req_nomem(subreq, req)) {
215                 return;
216         }
217         tevent_req_set_callback(subreq, message_ring_each_second, req);
218 }
219
220 static void message_ring_msg_sent(struct tevent_req *subreq)
221 {
222         struct tevent_req *req = tevent_req_callback_data(
223                 subreq, struct tevent_req);
224         bool status;
225         int ret;
226
227         status = ctdb_client_message_recv(subreq, &ret);
228         TALLOC_FREE(subreq);
229         if (! status) {
230                 tevent_req_error(req, ret);
231         }
232 }
233
234 static void message_ring_msg_handler(uint64_t srvid, TDB_DATA data,
235                                    void *private_data)
236 {
237         struct tevent_req *req = talloc_get_type_abort(
238                 private_data, struct tevent_req);
239         struct message_ring_state *state = tevent_req_data(
240                 req, struct message_ring_state);
241         struct ctdb_req_message msg;
242         struct tevent_req *subreq;
243         int incr;
244         uint32_t pnn;
245
246         if (srvid != MSG_ID_BENCH) {
247                 return;
248         }
249
250         if (data.dsize != sizeof(int)) {
251                 return;
252         }
253         incr = *(int *)data.dptr;
254
255         state->msg_count += 1;
256         if (incr == 1) {
257                 state->msg_plus += 1;
258         } else {
259                 state->msg_minus += 1;
260         }
261
262         pnn = next_node(state->client, state->num_nodes, incr);
263
264         msg.srvid = srvid;
265         msg.data.data = data;
266
267         subreq = ctdb_client_message_send(state, state->ev, state->client,
268                                           pnn, &msg);
269         if (tevent_req_nomem(subreq, req)) {
270                 return;
271         }
272         tevent_req_set_callback(subreq, message_ring_msg_sent, req);
273 }
274
275 static void message_ring_finish(struct tevent_req *subreq)
276 {
277         struct tevent_req *req = tevent_req_callback_data(
278                 subreq, struct tevent_req);
279         struct message_ring_state *state = tevent_req_data(
280                 req, struct message_ring_state);
281         bool status;
282         double t;
283
284         status = tevent_wakeup_recv(subreq);
285         TALLOC_FREE(subreq);
286         if (! status) {
287                 tevent_req_error(req, EIO);
288                 return;
289         }
290
291         t = timeval_elapsed(&state->start_time);
292
293         printf("Ring[%u]: %.2f msgs/sec (+ve=%d -ve=%d)\n",
294                ctdb_client_pnn(state->client), state->msg_count / t,
295                state->msg_plus, state->msg_minus);
296
297         tevent_req_done(req);
298 }
299
300 static bool message_ring_recv(struct tevent_req *req)
301 {
302         int ret;
303
304         if (tevent_req_is_unix_error(req, &ret)) {
305                 return false;
306         }
307         return true;
308 }
309
310 int main(int argc, const char *argv[])
311 {
312         const struct test_options *opts;
313         TALLOC_CTX *mem_ctx;
314         struct tevent_context *ev;
315         struct ctdb_client_context *client;
316         struct tevent_req *req;
317         int ret;
318         bool status;
319
320         status = process_options_basic(argc, argv, &opts);
321         if (! status) {
322                 exit(1);
323         }
324
325         mem_ctx = talloc_new(NULL);
326         if (mem_ctx == NULL) {
327                 fprintf(stderr, "Memory allocation error\n");
328                 exit(1);
329         }
330
331         ev = tevent_context_init(mem_ctx);
332         if (ev == NULL) {
333                 fprintf(stderr, "Memory allocation error\n");
334                 exit(1);
335         }
336
337         ret = ctdb_client_init(mem_ctx, ev, opts->socket, &client);
338         if (ret != 0) {
339                 fprintf(stderr, "Failed to initialize client, ret=%d\n", ret);
340                 exit(1);
341         }
342
343         if (! ctdb_recovery_wait(ev, client)) {
344                 fprintf(stderr, "Failed to wait for recovery\n");
345                 exit(1);
346         }
347
348         req = message_ring_send(mem_ctx, ev, client,
349                                 opts->num_nodes, opts->timelimit,
350                                 opts->interactive);
351         if (req == NULL) {
352                 fprintf(stderr, "Memory allocation error\n");
353                 exit(1);
354         }
355
356         tevent_req_poll(req, ev);
357
358         status = message_ring_recv(req);
359         if (! status) {
360                 fprintf(stderr, "message ring test failed\n");
361                 exit(1);
362         }
363
364         talloc_free(mem_ctx);
365         return 0;
366 }