r11004: r10083@SERNOX: metze | 2005-09-08 16:50:32 +0200
[kai/samba.git] / source / wrepl_server / wrepl_server.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "dlinklist.h"
25 #include "lib/events/events.h"
26 #include "lib/socket/socket.h"
27 #include "smbd/service_task.h"
28 #include "smbd/service_stream.h"
29 #include "lib/messaging/irpc.h"
30 #include "librpc/gen_ndr/ndr_winsrepl.h"
31
32 struct wreplsrv_service;
33 struct wreplsrv_in_connection;
34 struct wreplsrv_out_connection;
35 struct wreplsrv_partner;
36 struct wreplsrv_pull_partner_item;
37 struct wreplsrv_push_partner_item;
38
39 /*
40   state of an incoming wrepl call
41 */
42 struct wreplsrv_in_call {
43         struct wreplsrv_in_connection *wreplconn;
44         struct wrepl_packet req_packet;
45         struct wrepl_packet rep_packet;
46 };
47
48 /*
49   state of an incoming wrepl connection
50 */
51 struct wreplsrv_in_connection {
52         struct wreplsrv_in_connection *prev,*next;
53         struct stream_connection *conn;
54
55         /* our global service context */
56         struct wreplsrv_service *service;
57
58         /*
59          * the partner that connects us,
60          * can be NULL, when we got a connection
61          * from an unknown address
62          */
63         struct wreplsrv_partner *partner;
64
65         /*
66          * we need to take care of our own ip address,
67          * as this is the WINS-Owner ID the peer expect
68          * from us.
69          */
70         const char *our_ip;
71
72         /* the partial input on the connection */
73         DATA_BLOB partial;
74         size_t partial_read;
75
76         /*
77          * are we currently processing a request?
78          * this prevents loops, with half async code
79          */
80         BOOL processing;
81
82         /* the list of outgoing DATA_BLOB's that needs to be send */
83         struct data_blob_list_item *send_queue;
84 };
85
86 /*
87   state of an outcoming wrepl connection
88 */
89 struct wreplsrv_out_connection {
90         struct wreplsrv_partner *partner;
91 };
92
93 /*
94   state of the whole wrepl service
95 */
96 struct wreplsrv_service {
97         /* the whole wrepl service is in one task */
98         struct task_server *task;
99
100         /* all incoming connections */
101         struct wreplsrv_in_connection *in_connections;
102
103         /* all partners (pull and push) */
104         struct wreplsrv_partner *partners;
105
106         /* all pull partners */
107         struct wreplsrv_pull_partner *pull_partners;
108
109         /* all push partners */
110         struct wreplsrv_push_partner *push_partners;
111 };
112
113 static void wreplsrv_terminate_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
114 {
115         stream_terminate_connection(wreplconn->conn, reason);
116 }
117
118 static NTSTATUS wreplsrv_in_start_association(struct wreplsrv_in_call *call)
119 {
120         struct wrepl_stop *stop;
121
122         call->rep_packet.opcode         = WREPL_OPCODE_BITS;
123         call->rep_packet.assoc_ctx      = 0;
124         call->rep_packet.mess_type      = WREPL_STOP_ASSOCIATION;
125         stop                            = &call->rep_packet.message.stop;
126         stop->reason                    = 4;
127
128         return NT_STATUS_OK;
129 }
130
131 static NTSTATUS wreplsrv_in_replication(struct wreplsrv_in_call *call)
132 {
133         struct wrepl_replication *repl_in = &call->req_packet.message.replication;
134         struct wrepl_stop *stop_out;
135
136         switch (repl_in->command) {
137                 case WREPL_REPL_TABLE_QUERY:
138                         break;
139                 case WREPL_REPL_TABLE_REPLY:
140                         break;
141                 case WREPL_REPL_SEND_REQUEST:
142                         break;
143                 case WREPL_REPL_SEND_REPLY:
144                         break;
145                 case WREPL_REPL_UPDATE:
146                         break;
147                 case WREPL_REPL_INFORM:
148                         break;
149         }
150
151         call->rep_packet.opcode         = WREPL_OPCODE_BITS;
152         call->rep_packet.assoc_ctx      = 0;
153         call->rep_packet.mess_type      = WREPL_STOP_ASSOCIATION;
154         stop_out                        = &call->rep_packet.message.stop;
155         stop_out->reason                = 4;
156
157         return NT_STATUS_OK;
158 }
159
160 static NTSTATUS wreplsrv_in_call(struct wreplsrv_in_call *call)
161 {
162         struct wrepl_stop *stop_out;
163
164         /* TODO: check opcode and assoc_ctx */
165
166         switch (call->req_packet.mess_type) {
167                 case WREPL_START_ASSOCIATION:
168                         return wreplsrv_in_start_association(call);
169
170                 case WREPL_START_ASSOCIATION_REPLY:
171                         /* this is not valid here */
172                         break;
173                 case WREPL_STOP_ASSOCIATION:
174                         /* this is not valid here */
175                         break;
176
177                 case WREPL_REPLICATION:
178                         return wreplsrv_in_replication(call);
179         }
180
181         call->rep_packet.opcode         = WREPL_OPCODE_BITS;
182         call->rep_packet.assoc_ctx      = 0;
183         call->rep_packet.mess_type      = WREPL_STOP_ASSOCIATION;
184         call->rep_packet.padding        = data_blob(NULL, 0);
185         stop_out                        = &call->rep_packet.message.stop;
186         stop_out->reason                = 4;
187
188         return NT_STATUS_OK;
189 }
190
191 /*
192   called when we get a new connection
193 */
194 static void wreplsrv_accept(struct stream_connection *conn)
195 {
196         struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
197         struct wreplsrv_in_connection *wreplconn;
198
199         wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
200         if (!wreplconn) {
201                 stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
202                 return;
203         }
204
205         wreplconn->conn         = conn;
206         wreplconn->service      = service;
207         wreplconn->our_ip       = socket_get_my_addr(conn->socket, wreplconn);
208         if (!wreplconn->our_ip) {
209                 wreplsrv_terminate_connection(wreplconn, "wreplsrv_accept: out of memory");
210                 return;
211         }
212
213         conn->private = wreplconn;
214
215         irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
216 }
217
218 /*
219   receive some data on a WREPL connection
220 */
221 static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
222 {
223         struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
224         struct wreplsrv_in_call *call;
225         DATA_BLOB packet_in_blob;
226         DATA_BLOB packet_out_blob;
227         struct wrepl_wrap packet_out_wrap;
228         struct data_blob_list_item *rep;
229         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
230         size_t nread;
231
232         /* avoid recursion, because of half async code */
233         if (wreplconn->processing) {
234                 EVENT_FD_NOT_READABLE(conn->event.fde);
235                 return;
236         }
237
238         if (wreplconn->partial.length == 0) {
239                 wreplconn->partial = data_blob_talloc(wreplconn, NULL, 4);
240                 if (wreplconn->partial.data == NULL) {
241                         status = NT_STATUS_NO_MEMORY;
242                         goto failed;
243                 }
244                 wreplconn->partial_read = 0;
245         }
246
247         /* read in the packet length */
248         if (wreplconn->partial_read < 4) {
249                 uint32_t packet_length;
250
251                 status = socket_recv(conn->socket, 
252                                      wreplconn->partial.data + wreplconn->partial_read,
253                                      4 - wreplconn->partial_read,
254                                      &nread, 0);
255                 if (NT_STATUS_IS_ERR(status)) goto failed;
256                 if (!NT_STATUS_IS_OK(status)) return;
257
258                 wreplconn->partial_read += nread;
259                 if (wreplconn->partial_read != 4) return;
260
261                 packet_length = RIVAL(wreplconn->partial.data, 0) + 4;
262
263                 wreplconn->partial.data = talloc_realloc(wreplconn, wreplconn->partial.data, 
264                                                          uint8_t, packet_length);
265                 if (wreplconn->partial.data == NULL) {
266                         status = NT_STATUS_NO_MEMORY;
267                         goto failed;
268                 }
269                 wreplconn->partial.length = packet_length;
270         }
271
272         /* read in the body */
273         status = socket_recv(conn->socket, 
274                              wreplconn->partial.data + wreplconn->partial_read,
275                              wreplconn->partial.length - wreplconn->partial_read,
276                              &nread, 0);
277         if (NT_STATUS_IS_ERR(status)) goto failed;
278         if (!NT_STATUS_IS_OK(status)) return;
279
280         wreplconn->partial_read += nread;
281         if (wreplconn->partial_read != wreplconn->partial.length) return;
282
283         packet_in_blob.data = wreplconn->partial.data + 4;
284         packet_in_blob.length = wreplconn->partial.length - 4;
285
286         call = talloc(wreplconn, struct wreplsrv_in_call);
287         if (!call) {
288                 status = NT_STATUS_NO_MEMORY;
289                 goto failed;
290         }
291         call->wreplconn = wreplconn;
292
293         /* we have a full request - parse it */
294         status = ndr_pull_struct_blob(&packet_in_blob,
295                                       call, &call->req_packet,
296                                       (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
297         if (!NT_STATUS_IS_OK(status)) {
298                 DEBUG(2,("Failed to parse incoming WINS-Replication packet - %s\n",
299                          nt_errstr(status)));
300                 DEBUG(10,("packet length %u\n", wreplconn->partial.length));
301                 NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
302                 goto failed;
303         }
304
305         /*
306          * we have parsed the request, so we can reset the wreplconn->partial_read,
307          * maybe we could also free wreplconn->partial, but for now we keep it,
308          * and overwrite it the next time
309          */
310         wreplconn->partial_read = 0;
311
312         if (DEBUGLVL(10)) {
313                 DEBUG(10,("Received WINS-Replication packet of length %u\n", wreplconn->partial.length));
314                 NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
315         }
316
317         /* actually process the request */
318         wreplconn->processing = True;
319         status = wreplsrv_in_call(call);
320         wreplconn->processing = False;
321         if (!NT_STATUS_IS_OK(status)) goto failed;
322
323         /* and now encode the reply */
324         packet_out_wrap.packet = call->rep_packet;
325         status = ndr_push_struct_blob(&packet_out_blob, call, &packet_out_wrap,
326                                       (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
327         if (!NT_STATUS_IS_OK(status)) goto failed;
328
329         if (DEBUGLVL(10)) {
330                 DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
331                 NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
332         }
333
334         rep = talloc(wreplconn, struct data_blob_list_item);
335         if (!rep) {
336                 status = NT_STATUS_NO_MEMORY;
337                 goto failed;
338         }
339
340         rep->blob = packet_out_blob;
341         talloc_steal(rep, packet_out_blob.data);
342         /* we don't need the call anymore */
343         talloc_free(call);
344
345         if (!wreplconn->send_queue) {
346                 EVENT_FD_WRITEABLE(conn->event.fde);
347         }
348         DLIST_ADD_END(wreplconn->send_queue, rep, struct data_blob_list_item *);
349
350         EVENT_FD_READABLE(conn->event.fde);
351         return;
352
353 failed:
354         wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
355 }
356
357 /*
358   called when we can write to a connection
359 */
360 static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
361 {
362         struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
363         NTSTATUS status;
364
365         while (wreplconn->send_queue) {
366                 struct data_blob_list_item *rep = wreplconn->send_queue;
367                 size_t sendlen;
368
369                 status = socket_send(conn->socket, &rep->blob, &sendlen, 0);
370                 if (NT_STATUS_IS_ERR(status)) goto failed;
371                 if (!NT_STATUS_IS_OK(status)) return;
372
373                 rep->blob.length -= sendlen;
374                 rep->blob.data   += sendlen;
375
376                 if (rep->blob.length == 0) {
377                         DLIST_REMOVE(wreplconn->send_queue, rep);
378                         talloc_free(rep);
379                 }
380         }
381
382         EVENT_FD_NOT_WRITEABLE(conn->event.fde);
383         return;
384
385 failed:
386         wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
387 }
388
389 static const struct stream_server_ops wreplsrv_stream_ops = {
390         .name                   = "wreplsrv",
391         .accept_connection      = wreplsrv_accept,
392         .recv_handler           = wreplsrv_recv,
393         .send_handler           = wreplsrv_send,
394 };
395
396 /*
397   startup the wrepl port 42 server sockets
398 */
399 static NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service)
400 {
401         NTSTATUS status;
402         struct task_server *task = service->task;
403         const struct model_ops *model_ops;
404         const char *address;
405         uint16_t port = WINS_REPLICATION_PORT;
406
407         /* within the wrepl task we want to be a single process, so
408            ask for the single process model ops and pass these to the
409            stream_setup_socket() call. */
410         model_ops = process_model_byname("single");
411         if (!model_ops) {
412                 DEBUG(0,("Can't find 'single' process model_ops"));
413                 return NT_STATUS_INTERNAL_ERROR;
414         }
415
416         if (lp_interfaces() && lp_bind_interfaces_only()) {
417                 int num_interfaces = iface_count();
418                 int i;
419
420                 /* We have been given an interfaces line, and been 
421                    told to only bind to those interfaces. Create a
422                    socket per interface and bind to only these.
423                 */
424                 for(i = 0; i < num_interfaces; i++) {
425                         address = iface_n_ip(i);
426                         status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
427                                                      "ipv4", address, &port, NULL);
428                         if (!NT_STATUS_IS_OK(status)) {
429                                 DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
430                                          address, port, nt_errstr(status)));
431                                 return status;
432                         }
433                 }
434         } else {
435                 address = lp_socket_address();
436                 status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
437                                              "ipv4", address, &port, NULL);
438                 if (!NT_STATUS_IS_OK(status)) {
439                         DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
440                                  address, port, nt_errstr(status)));
441                         return status;
442                 }
443         }
444
445         return NT_STATUS_OK;
446 }
447
448 /*
449   startup the wrepl task
450 */
451 static void wreplsrv_task_init(struct task_server *task)
452 {
453         NTSTATUS status;
454         struct wreplsrv_service *service;
455
456         service = talloc_zero(task, struct wreplsrv_service);
457         if (!service) {
458                 task_server_terminate(task, "wreplsrv_task_init: out of memory");
459                 return;
460         }
461         service->task = task;
462         task->private = service;
463
464         /*
465          * TODO: setup up all partners, and open the winsdb
466          */
467
468         /* 
469          * setup listen sockets, so we can anwser requests from our partners,
470          * which pull from us
471          */
472         status = wreplsrv_setup_sockets(service);
473         if (!NT_STATUS_IS_OK(status)) {
474                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_sockets() failed");
475                 return;
476         }
477
478         /*
479          * TODO: setup timed events for each partner we want to pull from
480          */
481
482         irpc_add_name(task->msg_ctx, "wrepl_server");
483 }
484
485 /*
486   initialise the WREPL server
487  */
488 static NTSTATUS wreplsrv_init(struct event_context *event_ctx, const struct model_ops *model_ops)
489 {
490         return task_server_startup(event_ctx, model_ops, wreplsrv_task_init);
491 }
492
493 /*
494   register ourselves as a available server
495 */
496 NTSTATUS server_service_wrepl_init(void)
497 {
498         return register_server_service("wrepl", wreplsrv_init);
499 }