r11023: r10310@SERNOX: metze | 2005-09-19 11:21:39 +0200
[garming/samba-autobuild/.git] / source4 / wrepl_server / wrepl_server.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "dlinklist.h"
25 #include "lib/events/events.h"
26 #include "lib/socket/socket.h"
27 #include "smbd/service_task.h"
28 #include "smbd/service_stream.h"
29 #include "lib/messaging/irpc.h"
30 #include "librpc/gen_ndr/ndr_winsrepl.h"
31 #include "wrepl_server/wrepl_server.h"
32
33 void wreplsrv_terminate_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
34 {
35         stream_terminate_connection(wreplconn->conn, reason);
36 }
37
38 /*
39   called when we get a new connection
40 */
41 static void wreplsrv_accept(struct stream_connection *conn)
42 {
43         struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
44         struct wreplsrv_in_connection *wreplconn;
45
46         wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
47         if (!wreplconn) {
48                 stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
49                 return;
50         }
51
52         wreplconn->conn         = conn;
53         wreplconn->service      = service;
54         wreplconn->our_ip       = socket_get_my_addr(conn->socket, wreplconn);
55         if (!wreplconn->our_ip) {
56                 wreplsrv_terminate_connection(wreplconn, "wreplsrv_accept: out of memory");
57                 return;
58         }
59
60         /* TODO: find out if it's a partner */
61
62         conn->private = wreplconn;
63
64         irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
65 }
66
67 /*
68   receive some data on a WREPL connection
69 */
70 static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
71 {
72         struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
73         struct wreplsrv_in_call *call;
74         DATA_BLOB packet_in_blob;
75         DATA_BLOB packet_out_blob;
76         struct wrepl_wrap packet_out_wrap;
77         struct data_blob_list_item *rep;
78         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
79         size_t nread;
80
81         /* avoid recursion, because of half async code */
82         if (wreplconn->processing) {
83                 EVENT_FD_NOT_READABLE(conn->event.fde);
84                 return;
85         }
86
87         if (wreplconn->partial.length == 0) {
88                 wreplconn->partial = data_blob_talloc(wreplconn, NULL, 4);
89                 if (wreplconn->partial.data == NULL) {
90                         status = NT_STATUS_NO_MEMORY;
91                         goto failed;
92                 }
93                 wreplconn->partial_read = 0;
94         }
95
96         /* read in the packet length */
97         if (wreplconn->partial_read < 4) {
98                 uint32_t packet_length;
99
100                 status = socket_recv(conn->socket, 
101                                      wreplconn->partial.data + wreplconn->partial_read,
102                                      4 - wreplconn->partial_read,
103                                      &nread, 0);
104                 if (NT_STATUS_IS_ERR(status)) goto failed;
105                 if (!NT_STATUS_IS_OK(status)) return;
106
107                 wreplconn->partial_read += nread;
108                 if (wreplconn->partial_read != 4) return;
109
110                 packet_length = RIVAL(wreplconn->partial.data, 0) + 4;
111
112                 wreplconn->partial.data = talloc_realloc(wreplconn, wreplconn->partial.data, 
113                                                          uint8_t, packet_length);
114                 if (wreplconn->partial.data == NULL) {
115                         status = NT_STATUS_NO_MEMORY;
116                         goto failed;
117                 }
118                 wreplconn->partial.length = packet_length;
119         }
120
121         /* read in the body */
122         status = socket_recv(conn->socket, 
123                              wreplconn->partial.data + wreplconn->partial_read,
124                              wreplconn->partial.length - wreplconn->partial_read,
125                              &nread, 0);
126         if (NT_STATUS_IS_ERR(status)) goto failed;
127         if (!NT_STATUS_IS_OK(status)) return;
128
129         wreplconn->partial_read += nread;
130         if (wreplconn->partial_read != wreplconn->partial.length) return;
131
132         packet_in_blob.data = wreplconn->partial.data + 4;
133         packet_in_blob.length = wreplconn->partial.length - 4;
134
135         call = talloc_zero(wreplconn, struct wreplsrv_in_call);
136         if (!call) {
137                 status = NT_STATUS_NO_MEMORY;
138                 goto failed;
139         }
140         call->wreplconn = wreplconn;
141
142         /* we have a full request - parse it */
143         status = ndr_pull_struct_blob(&packet_in_blob,
144                                       call, &call->req_packet,
145                                       (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
146         if (!NT_STATUS_IS_OK(status)) {
147                 DEBUG(2,("Failed to parse incoming WINS-Replication packet - %s\n",
148                          nt_errstr(status)));
149                 DEBUG(10,("packet length %u\n", wreplconn->partial.length));
150                 NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
151                 goto failed;
152         }
153
154         /*
155          * we have parsed the request, so we can reset the wreplconn->partial_read,
156          * maybe we could also free wreplconn->partial, but for now we keep it,
157          * and overwrite it the next time
158          */
159         wreplconn->partial_read = 0;
160
161         if (DEBUGLVL(10)) {
162                 DEBUG(10,("Received WINS-Replication packet of length %u\n", wreplconn->partial.length));
163                 NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
164         }
165
166         /* actually process the request */
167         wreplconn->processing = True;
168         status = wreplsrv_in_call(call);
169         wreplconn->processing = False;
170         if (NT_STATUS_IS_ERR(status)) goto failed;
171         if (!NT_STATUS_IS_OK(status)) {
172                 /* w2k just ignores invalid packets, so we do */
173                 DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
174                 talloc_free(call);
175                 return;
176         }
177
178         /* and now encode the reply */
179         packet_out_wrap.packet = call->rep_packet;
180         status = ndr_push_struct_blob(&packet_out_blob, call, &packet_out_wrap,
181                                       (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
182         if (!NT_STATUS_IS_OK(status)) goto failed;
183
184         if (DEBUGLVL(10)) {
185                 DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
186                 NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
187         }
188
189         rep = talloc(wreplconn, struct data_blob_list_item);
190         if (!rep) {
191                 status = NT_STATUS_NO_MEMORY;
192                 goto failed;
193         }
194
195         rep->blob = packet_out_blob;
196         talloc_steal(rep, packet_out_blob.data);
197         /* we don't need the call anymore */
198         talloc_free(call);
199
200         if (!wreplconn->send_queue) {
201                 EVENT_FD_WRITEABLE(conn->event.fde);
202         }
203         DLIST_ADD_END(wreplconn->send_queue, rep, struct data_blob_list_item *);
204
205         if (wreplconn->terminate) {
206                 EVENT_FD_NOT_READABLE(conn->event.fde);
207         } else {
208                 EVENT_FD_READABLE(conn->event.fde);
209         }
210         return;
211
212 failed:
213         wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
214 }
215
216 /*
217   called when we can write to a connection
218 */
219 static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
220 {
221         struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
222         NTSTATUS status;
223
224         while (wreplconn->send_queue) {
225                 struct data_blob_list_item *rep = wreplconn->send_queue;
226                 size_t sendlen;
227
228                 status = socket_send(conn->socket, &rep->blob, &sendlen, 0);
229                 if (NT_STATUS_IS_ERR(status)) goto failed;
230                 if (!NT_STATUS_IS_OK(status)) return;
231
232                 rep->blob.length -= sendlen;
233                 rep->blob.data   += sendlen;
234
235                 if (rep->blob.length == 0) {
236                         DLIST_REMOVE(wreplconn->send_queue, rep);
237                         talloc_free(rep);
238                 }
239         }
240
241         if (wreplconn->terminate) {
242                 wreplsrv_terminate_connection(wreplconn, "connection terminated after all pending packets are send");
243                 return;
244         }
245
246         EVENT_FD_NOT_WRITEABLE(conn->event.fde);
247         return;
248
249 failed:
250         wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
251 }
252
253 static const struct stream_server_ops wreplsrv_stream_ops = {
254         .name                   = "wreplsrv",
255         .accept_connection      = wreplsrv_accept,
256         .recv_handler           = wreplsrv_recv,
257         .send_handler           = wreplsrv_send,
258 };
259
260 /*
261   open winsdb
262 */
263 static NTSTATUS wreplsrv_open_winsdb(struct wreplsrv_service *service)
264 {
265         service->wins_db     = winsdb_connect(service);
266         if (!service->wins_db) {
267                 return NT_STATUS_INTERNAL_DB_ERROR;
268         }
269
270         return NT_STATUS_OK;
271 }
272
273 /*
274   setup our replication partners
275 */
276 static NTSTATUS wreplsrv_setup_partners(struct wreplsrv_service *service)
277 {
278         return NT_STATUS_OK;
279 }
280
281 /*
282   startup the wrepl port 42 server sockets
283 */
284 static NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service)
285 {
286         NTSTATUS status;
287         struct task_server *task = service->task;
288         const struct model_ops *model_ops;
289         const char *address;
290         uint16_t port = WINS_REPLICATION_PORT;
291
292         /* within the wrepl task we want to be a single process, so
293            ask for the single process model ops and pass these to the
294            stream_setup_socket() call. */
295         model_ops = process_model_byname("single");
296         if (!model_ops) {
297                 DEBUG(0,("Can't find 'single' process model_ops"));
298                 return NT_STATUS_INTERNAL_ERROR;
299         }
300
301         if (lp_interfaces() && lp_bind_interfaces_only()) {
302                 int num_interfaces = iface_count();
303                 int i;
304
305                 /* We have been given an interfaces line, and been 
306                    told to only bind to those interfaces. Create a
307                    socket per interface and bind to only these.
308                 */
309                 for(i = 0; i < num_interfaces; i++) {
310                         address = iface_n_ip(i);
311                         status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
312                                                      "ipv4", address, &port, NULL);
313                         if (!NT_STATUS_IS_OK(status)) {
314                                 DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
315                                          address, port, nt_errstr(status)));
316                                 return status;
317                         }
318                 }
319         } else {
320                 address = lp_socket_address();
321                 status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
322                                              "ipv4", address, &port, NULL);
323                 if (!NT_STATUS_IS_OK(status)) {
324                         DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
325                                  address, port, nt_errstr(status)));
326                         return status;
327                 }
328         }
329
330         return NT_STATUS_OK;
331 }
332
333 /*
334   startup the wrepl task
335 */
336 static void wreplsrv_task_init(struct task_server *task)
337 {
338         NTSTATUS status;
339         struct wreplsrv_service *service;
340
341         service = talloc_zero(task, struct wreplsrv_service);
342         if (!service) {
343                 task_server_terminate(task, "wreplsrv_task_init: out of memory");
344                 return;
345         }
346         service->task = task;
347         task->private = service;
348
349         /*
350          * setup up all partners, and open the winsdb
351          */
352         status = wreplsrv_open_winsdb(service);
353         if (!NT_STATUS_IS_OK(status)) {
354                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_open_winsdb() failed");
355                 return;
356         }
357
358         /*
359          * setup timed events for each partner we want to pull from
360          */
361         status = wreplsrv_setup_partners(service);
362         if (!NT_STATUS_IS_OK(status)) {
363                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_partners() failed");
364                 return;
365         }
366
367         /* 
368          * setup listen sockets, so we can anwser requests from our partners,
369          * which pull from us
370          */
371         status = wreplsrv_setup_sockets(service);
372         if (!NT_STATUS_IS_OK(status)) {
373                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_sockets() failed");
374                 return;
375         }
376
377         irpc_add_name(task->msg_ctx, "wrepl_server");
378 }
379
380 /*
381   initialise the WREPL server
382  */
383 static NTSTATUS wreplsrv_init(struct event_context *event_ctx, const struct model_ops *model_ops)
384 {
385         return task_server_startup(event_ctx, model_ops, wreplsrv_task_init);
386 }
387
388 /*
389   register ourselves as a available server
390 */
391 NTSTATUS server_service_wrepl_init(void)
392 {
393         return register_server_service("wrepl", wreplsrv_init);
394 }