r11035: r10347@SERNOX: metze | 2005-09-20 13:40:24 +0200
[bbaumbach/samba-autobuild/.git] / source4 / wrepl_server / wrepl_server.c
1 /* 
2    Unix SMB/CIFS implementation.
3    
4    WINS Replication server
5    
6    Copyright (C) Stefan Metzmacher      2005
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 #include "includes.h"
24 #include "dlinklist.h"
25 #include "lib/events/events.h"
26 #include "lib/socket/socket.h"
27 #include "smbd/service_task.h"
28 #include "smbd/service_stream.h"
29 #include "lib/messaging/irpc.h"
30 #include "librpc/gen_ndr/ndr_winsrepl.h"
31 #include "wrepl_server/wrepl_server.h"
32 #include "nbt_server/wins/winsdb.h"
33 #include "ldb/include/ldb.h"
34
35 void wreplsrv_terminate_connection(struct wreplsrv_in_connection *wreplconn, const char *reason)
36 {
37         stream_terminate_connection(wreplconn->conn, reason);
38 }
39
40 static struct wreplsrv_partner *wreplsrv_find_partner(struct wreplsrv_service *service, const char *peer_addr)
41 {
42         struct wreplsrv_partner *cur;
43
44         for (cur = service->partners; cur; cur = cur->next) {
45                 if (strcmp(cur->address, peer_addr) == 0) {
46                         return cur;
47                 }
48         }
49
50         return NULL;
51 }
52
53 /*
54   called when we get a new connection
55 */
56 static void wreplsrv_accept(struct stream_connection *conn)
57 {
58         struct wreplsrv_service *service = talloc_get_type(conn->private, struct wreplsrv_service);
59         struct wreplsrv_in_connection *wreplconn;
60         const char *peer_ip;
61
62         wreplconn = talloc_zero(conn, struct wreplsrv_in_connection);
63         if (!wreplconn) {
64                 stream_terminate_connection(conn, "wreplsrv_accept: out of memory");
65                 return;
66         }
67
68         wreplconn->conn         = conn;
69         wreplconn->service      = service;
70         wreplconn->our_ip       = socket_get_my_addr(conn->socket, wreplconn);
71         if (!wreplconn->our_ip) {
72                 wreplsrv_terminate_connection(wreplconn, "wreplsrv_accept: out of memory");
73                 return;
74         }
75
76         peer_ip = socket_get_peer_addr(conn->socket, wreplconn);
77         if (!peer_ip) {
78                 wreplsrv_terminate_connection(wreplconn, "wreplsrv_accept: out of memory");
79                 return;
80         }
81
82         wreplconn->partner      = wreplsrv_find_partner(service, peer_ip);
83
84         conn->private = wreplconn;
85
86         irpc_add_name(conn->msg_ctx, "wreplsrv_connection");
87 }
88
89 /*
90   receive some data on a WREPL connection
91 */
92 static void wreplsrv_recv(struct stream_connection *conn, uint16_t flags)
93 {
94         struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
95         struct wreplsrv_in_call *call;
96         DATA_BLOB packet_in_blob;
97         DATA_BLOB packet_out_blob;
98         struct wrepl_wrap packet_out_wrap;
99         struct data_blob_list_item *rep;
100         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
101         size_t nread;
102
103         /* avoid recursion, because of half async code */
104         if (wreplconn->processing) {
105                 EVENT_FD_NOT_READABLE(conn->event.fde);
106                 return;
107         }
108
109         if (wreplconn->partial.length == 0) {
110                 wreplconn->partial = data_blob_talloc(wreplconn, NULL, 4);
111                 if (wreplconn->partial.data == NULL) {
112                         status = NT_STATUS_NO_MEMORY;
113                         goto failed;
114                 }
115                 wreplconn->partial_read = 0;
116         }
117
118         /* read in the packet length */
119         if (wreplconn->partial_read < 4) {
120                 uint32_t packet_length;
121
122                 status = socket_recv(conn->socket, 
123                                      wreplconn->partial.data + wreplconn->partial_read,
124                                      4 - wreplconn->partial_read,
125                                      &nread, 0);
126                 if (NT_STATUS_IS_ERR(status)) goto failed;
127                 if (!NT_STATUS_IS_OK(status)) return;
128
129                 wreplconn->partial_read += nread;
130                 if (wreplconn->partial_read != 4) return;
131
132                 packet_length = RIVAL(wreplconn->partial.data, 0) + 4;
133
134                 wreplconn->partial.data = talloc_realloc(wreplconn, wreplconn->partial.data, 
135                                                          uint8_t, packet_length);
136                 if (wreplconn->partial.data == NULL) {
137                         status = NT_STATUS_NO_MEMORY;
138                         goto failed;
139                 }
140                 wreplconn->partial.length = packet_length;
141         }
142
143         /* read in the body */
144         status = socket_recv(conn->socket, 
145                              wreplconn->partial.data + wreplconn->partial_read,
146                              wreplconn->partial.length - wreplconn->partial_read,
147                              &nread, 0);
148         if (NT_STATUS_IS_ERR(status)) goto failed;
149         if (!NT_STATUS_IS_OK(status)) return;
150
151         wreplconn->partial_read += nread;
152         if (wreplconn->partial_read != wreplconn->partial.length) return;
153
154         packet_in_blob.data = wreplconn->partial.data + 4;
155         packet_in_blob.length = wreplconn->partial.length - 4;
156
157         call = talloc_zero(wreplconn, struct wreplsrv_in_call);
158         if (!call) {
159                 status = NT_STATUS_NO_MEMORY;
160                 goto failed;
161         }
162         call->wreplconn = wreplconn;
163
164         /* we have a full request - parse it */
165         status = ndr_pull_struct_blob(&packet_in_blob,
166                                       call, &call->req_packet,
167                                       (ndr_pull_flags_fn_t)ndr_pull_wrepl_packet);
168         if (!NT_STATUS_IS_OK(status)) {
169                 DEBUG(2,("Failed to parse incoming WINS-Replication packet - %s\n",
170                          nt_errstr(status)));
171                 DEBUG(10,("packet length %u\n", wreplconn->partial.length));
172                 NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
173                 goto failed;
174         }
175
176         /*
177          * we have parsed the request, so we can reset the wreplconn->partial_read,
178          * maybe we could also free wreplconn->partial, but for now we keep it,
179          * and overwrite it the next time
180          */
181         wreplconn->partial_read = 0;
182
183         if (DEBUGLVL(10)) {
184                 DEBUG(10,("Received WINS-Replication packet of length %u\n", wreplconn->partial.length));
185                 NDR_PRINT_DEBUG(wrepl_packet, &call->req_packet);
186         }
187
188         /* actually process the request */
189         wreplconn->processing = True;
190         status = wreplsrv_in_call(call);
191         wreplconn->processing = False;
192         if (NT_STATUS_IS_ERR(status)) goto failed;
193         if (!NT_STATUS_IS_OK(status)) {
194                 /* w2k just ignores invalid packets, so we do */
195                 DEBUG(10,("Received WINS-Replication packet was invalid, we just ignore it\n"));
196                 talloc_free(call);
197                 return;
198         }
199
200         /* and now encode the reply */
201         packet_out_wrap.packet = call->rep_packet;
202         status = ndr_push_struct_blob(&packet_out_blob, call, &packet_out_wrap,
203                                       (ndr_push_flags_fn_t)ndr_push_wrepl_wrap);
204         if (!NT_STATUS_IS_OK(status)) goto failed;
205
206         if (DEBUGLVL(10)) {
207                 DEBUG(10,("Sending WINS-Replication packet of length %d\n", (int)packet_out_blob.length));
208                 NDR_PRINT_DEBUG(wrepl_packet, &call->rep_packet);
209         }
210
211         rep = talloc(wreplconn, struct data_blob_list_item);
212         if (!rep) {
213                 status = NT_STATUS_NO_MEMORY;
214                 goto failed;
215         }
216
217         rep->blob = packet_out_blob;
218         talloc_steal(rep, packet_out_blob.data);
219         /* we don't need the call anymore */
220         talloc_free(call);
221
222         if (!wreplconn->send_queue) {
223                 EVENT_FD_WRITEABLE(conn->event.fde);
224         }
225         DLIST_ADD_END(wreplconn->send_queue, rep, struct data_blob_list_item *);
226
227         if (wreplconn->terminate) {
228                 EVENT_FD_NOT_READABLE(conn->event.fde);
229         } else {
230                 EVENT_FD_READABLE(conn->event.fde);
231         }
232         return;
233
234 failed:
235         wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
236 }
237
238 /*
239   called when we can write to a connection
240 */
241 static void wreplsrv_send(struct stream_connection *conn, uint16_t flags)
242 {
243         struct wreplsrv_in_connection *wreplconn = talloc_get_type(conn->private, struct wreplsrv_in_connection);
244         NTSTATUS status;
245
246         while (wreplconn->send_queue) {
247                 struct data_blob_list_item *rep = wreplconn->send_queue;
248                 size_t sendlen;
249
250                 status = socket_send(conn->socket, &rep->blob, &sendlen, 0);
251                 if (NT_STATUS_IS_ERR(status)) goto failed;
252                 if (!NT_STATUS_IS_OK(status)) return;
253
254                 rep->blob.length -= sendlen;
255                 rep->blob.data   += sendlen;
256
257                 if (rep->blob.length == 0) {
258                         DLIST_REMOVE(wreplconn->send_queue, rep);
259                         talloc_free(rep);
260                 }
261         }
262
263         if (wreplconn->terminate) {
264                 wreplsrv_terminate_connection(wreplconn, "connection terminated after all pending packets are send");
265                 return;
266         }
267
268         EVENT_FD_NOT_WRITEABLE(conn->event.fde);
269         return;
270
271 failed:
272         wreplsrv_terminate_connection(wreplconn, nt_errstr(status));
273 }
274
275 static const struct stream_server_ops wreplsrv_stream_ops = {
276         .name                   = "wreplsrv",
277         .accept_connection      = wreplsrv_accept,
278         .recv_handler           = wreplsrv_recv,
279         .send_handler           = wreplsrv_send,
280 };
281
282 /*
283   open winsdb
284 */
285 static NTSTATUS wreplsrv_open_winsdb(struct wreplsrv_service *service)
286 {
287         service->wins_db     = winsdb_connect(service);
288         if (!service->wins_db) {
289                 return NT_STATUS_INTERNAL_DB_ERROR;
290         }
291
292         return NT_STATUS_OK;
293 }
294
295 /*
296   load our replication partners
297 */
298 static NTSTATUS wreplsrv_load_partners(struct wreplsrv_service *service)
299 {
300         struct ldb_message **res = NULL;
301         int ret;
302         TALLOC_CTX *tmp_ctx = talloc_new(service);
303         int i;
304
305         /* find the record in the WINS database */
306         ret = ldb_search(service->wins_db, ldb_dn_explode(tmp_ctx, "CN=PARTNERS"), LDB_SCOPE_ONELEVEL,
307                          "(objectClass=wreplPartner)", NULL, &res);
308         if (res != NULL) {
309                 talloc_steal(tmp_ctx, res);
310         }
311         if (ret < 0) goto failed;
312         if (ret == 0) goto done;
313
314         for (i=0; i < ret; i++) {
315                 struct wreplsrv_partner *partner;
316
317                 partner = talloc(service, struct wreplsrv_partner);
318                 if (partner == NULL) goto failed;
319
320                 partner->address        = ldb_msg_find_string(res[i], "address", NULL);
321                 if (!partner->address) goto failed;
322                 partner->name           = ldb_msg_find_string(res[i], "name", partner->address);
323                 partner->type           = ldb_msg_find_int(res[i], "type", WINSREPL_PARTNER_BOTH);
324                 partner->pull.interval  = ldb_msg_find_int(res[i], "pullInterval", WINSREPL_DEFAULT_PULL_INTERVAL);
325                 partner->our_address    = ldb_msg_find_string(res[i], "ourAddress", NULL);
326
327                 talloc_steal(partner, partner->address);
328                 talloc_steal(partner, partner->name);
329                 talloc_steal(partner, partner->our_address);
330
331                 DLIST_ADD(service->partners, partner);
332         }
333 done:
334         talloc_free(tmp_ctx);
335         return NT_STATUS_OK;
336 failed:
337         talloc_free(tmp_ctx);
338         return NT_STATUS_FOOBAR;
339 }
340
341 uint64_t wreplsrv_local_max_version(struct wreplsrv_service *service)
342 {
343         int ret;
344         struct ldb_context *ldb = service->wins_db;
345         struct ldb_dn *dn;
346         struct ldb_message **res = NULL;
347         TALLOC_CTX *tmp_ctx = talloc_new(service);
348         uint64_t maxVersion = 0;
349
350         dn = ldb_dn_explode(tmp_ctx, "CN=VERSION");
351         if (!dn) goto failed;
352
353         /* find the record in the WINS database */
354         ret = ldb_search(ldb, dn, LDB_SCOPE_BASE, 
355                          NULL, NULL, &res);
356         if (res != NULL) {
357                 talloc_steal(tmp_ctx, res);
358         }
359         if (ret < 0) goto failed;
360         if (ret > 1) goto failed;
361
362         if (ret == 1) {
363                 maxVersion = ldb_msg_find_uint64(res[0], "maxVersion", 0);
364         }
365
366 failed:
367         talloc_free(tmp_ctx);
368         return maxVersion;
369 }
370
371 struct wreplsrv_owner *wreplsrv_find_owner(struct wreplsrv_owner *table, const char *wins_owner)
372 {
373         struct wreplsrv_owner *cur;
374
375         for (cur = table; cur; cur = cur->next) {
376                 if (strcmp(cur->owner.address, wins_owner) == 0) {
377                         return cur;
378                 }
379         }
380
381         return NULL;
382 }
383
384 /*
385  update the wins_owner_table max_version, if the given version is the highest version
386  if no entry for the wins_owner exists yet, create one
387 */
388 static NTSTATUS wreplsrv_add_table(struct wreplsrv_service *service,
389                                    TALLOC_CTX *mem_ctx, struct wreplsrv_owner **_table,
390                                    const char *wins_owner, uint64_t version)
391 {
392         struct wreplsrv_owner *table = *_table;
393         struct wreplsrv_owner *cur;
394
395         if (strcmp(WINSDB_OWNER_LOCAL, wins_owner) == 0) {
396                 return NT_STATUS_OK;
397         }
398
399         cur = wreplsrv_find_owner(table, wins_owner);
400
401         /* if it doesn't exists yet, create one */
402         if (!cur) {
403                 cur = talloc_zero(mem_ctx, struct wreplsrv_owner);
404                 NT_STATUS_HAVE_NO_MEMORY(cur);
405
406                 cur->owner.address      = talloc_strdup(cur, wins_owner);
407                 NT_STATUS_HAVE_NO_MEMORY(cur->owner.address);
408                 cur->owner.min_version  = 0;
409                 cur->owner.max_version  = 0;
410                 cur->owner.type         = 1; /* don't know why this is always 1 */
411
412                 cur->partner            = wreplsrv_find_partner(service, wins_owner);
413
414                 DLIST_ADD(table, cur);
415                 *_table = table;
416         }
417
418         /* the min_version is always 0 here, and won't be updated */
419
420         /* if the given version is higher the then current nax_version, update */
421         if (cur->owner.max_version < version) {
422                 cur->owner.max_version = version;
423         }
424
425         return NT_STATUS_OK;
426 }
427
428 /*
429   load the partner table
430 */
431 static NTSTATUS wreplsrv_load_table(struct wreplsrv_service *service)
432 {
433         struct ldb_message **res = NULL;
434         int ret;
435         NTSTATUS status;
436         TALLOC_CTX *tmp_ctx = talloc_new(service);
437         int i;
438         const char *wins_owner;
439         uint64_t version;
440         const char * const attrs[] = {
441                 "winsOwner",
442                 "version",
443                 NULL
444         };
445
446         /* find the record in the WINS database */
447         ret = ldb_search(service->wins_db, NULL, LDB_SCOPE_SUBTREE,
448                          "(objectClass=wins)", attrs, &res);
449         if (res != NULL) {
450                 talloc_steal(tmp_ctx, res);
451         }
452         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
453         if (ret < 0) goto failed;
454         if (ret == 0) goto done;
455
456         for (i=0; i < ret; i++) {
457                 wins_owner     = ldb_msg_find_string(res[i], "winsOwner", NULL);
458                 version        = ldb_msg_find_uint64(res[i], "version", 0);
459
460                 if (wins_owner) { 
461                         status = wreplsrv_add_table(service,
462                                                     service, &service->table,
463                                                     wins_owner, version);
464                         if (!NT_STATUS_IS_OK(status)) goto failed;
465                 }
466                 talloc_free(res[i]);
467
468                 /* TODO: what's abut the per address owners? */
469         }
470 done:
471         talloc_free(tmp_ctx);
472         return NT_STATUS_OK;
473 failed:
474         talloc_free(tmp_ctx);
475         return status;
476 }
477
478 /*
479   setup our replication partners
480 */
481 static NTSTATUS wreplsrv_setup_partners(struct wreplsrv_service *service)
482 {
483         NTSTATUS status;
484
485         status = wreplsrv_load_partners(service);
486         NT_STATUS_NOT_OK_RETURN(status);
487
488         status = wreplsrv_load_table(service);
489         NT_STATUS_NOT_OK_RETURN(status);
490
491         return NT_STATUS_OK;
492 }
493
494 /*
495   startup the wrepl port 42 server sockets
496 */
497 static NTSTATUS wreplsrv_setup_sockets(struct wreplsrv_service *service)
498 {
499         NTSTATUS status;
500         struct task_server *task = service->task;
501         const struct model_ops *model_ops;
502         const char *address;
503         uint16_t port = WINS_REPLICATION_PORT;
504
505         /* within the wrepl task we want to be a single process, so
506            ask for the single process model ops and pass these to the
507            stream_setup_socket() call. */
508         model_ops = process_model_byname("single");
509         if (!model_ops) {
510                 DEBUG(0,("Can't find 'single' process model_ops"));
511                 return NT_STATUS_INTERNAL_ERROR;
512         }
513
514         if (lp_interfaces() && lp_bind_interfaces_only()) {
515                 int num_interfaces = iface_count();
516                 int i;
517
518                 /* We have been given an interfaces line, and been 
519                    told to only bind to those interfaces. Create a
520                    socket per interface and bind to only these.
521                 */
522                 for(i = 0; i < num_interfaces; i++) {
523                         address = iface_n_ip(i);
524                         status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
525                                                      "ipv4", address, &port, service);
526                         if (!NT_STATUS_IS_OK(status)) {
527                                 DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
528                                          address, port, nt_errstr(status)));
529                                 return status;
530                         }
531                 }
532         } else {
533                 address = lp_socket_address();
534                 status = stream_setup_socket(task->event_ctx, model_ops, &wreplsrv_stream_ops,
535                                              "ipv4", address, &port, service);
536                 if (!NT_STATUS_IS_OK(status)) {
537                         DEBUG(0,("stream_setup_socket(address=%s,port=%u) failed - %s\n",
538                                  address, port, nt_errstr(status)));
539                         return status;
540                 }
541         }
542
543         return NT_STATUS_OK;
544 }
545
546 /*
547   startup the wrepl task
548 */
549 static void wreplsrv_task_init(struct task_server *task)
550 {
551         NTSTATUS status;
552         struct wreplsrv_service *service;
553
554         service = talloc_zero(task, struct wreplsrv_service);
555         if (!service) {
556                 task_server_terminate(task, "wreplsrv_task_init: out of memory");
557                 return;
558         }
559         service->task = task;
560         task->private = service;
561
562         /*
563          * setup up all partners, and open the winsdb
564          */
565         status = wreplsrv_open_winsdb(service);
566         if (!NT_STATUS_IS_OK(status)) {
567                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_open_winsdb() failed");
568                 return;
569         }
570
571         /*
572          * setup timed events for each partner we want to pull from
573          */
574         status = wreplsrv_setup_partners(service);
575         if (!NT_STATUS_IS_OK(status)) {
576                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_partners() failed");
577                 return;
578         }
579
580         /* 
581          * setup listen sockets, so we can anwser requests from our partners,
582          * which pull from us
583          */
584         status = wreplsrv_setup_sockets(service);
585         if (!NT_STATUS_IS_OK(status)) {
586                 task_server_terminate(task, "wreplsrv_task_init: wreplsrv_setup_sockets() failed");
587                 return;
588         }
589
590         irpc_add_name(task->msg_ctx, "wrepl_server");
591 }
592
593 /*
594   initialise the WREPL server
595  */
596 static NTSTATUS wreplsrv_init(struct event_context *event_ctx, const struct model_ops *model_ops)
597 {
598         return task_server_startup(event_ctx, model_ops, wreplsrv_task_init);
599 }
600
601 /*
602   register ourselves as a available server
603 */
604 NTSTATUS server_service_wrepl_init(void)
605 {
606         return register_server_service("wrepl", wreplsrv_init);
607 }