s4:ldap_server: add support for async notification requests
authorStefan Metzmacher <metze@samba.org>
Thu, 23 Jul 2015 10:08:42 +0000 (12:08 +0200)
committerGarming Sam <garming@samba.org>
Wed, 17 Feb 2016 02:43:23 +0000 (03:43 +0100)
This is a simplified version that works with the current
dsdb_notification module that requires the caller to retry
periodically. We do that every 5 seconds or 100 microseconds
if we're forcing a retry.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
source4/ldap_server/ldap_backend.c
source4/ldap_server/ldap_bind.c
source4/ldap_server/ldap_extended.c
source4/ldap_server/ldap_server.c
source4/ldap_server/ldap_server.h

index 7efb7ed4521c13a3d706c338f55b52194a5219b5..6a8a0cf5494ae9b1cac8555ff6e90fd62d8a3ad7 100644 (file)
@@ -513,6 +513,7 @@ static NTSTATUS ldapsrv_SearchRequest(struct ldapsrv_call *call)
        struct ldb_search_options_control *search_options;
        struct ldb_control *extended_dn_control;
        struct ldb_extended_dn_control *extended_dn_decoded = NULL;
+       struct ldb_control *notification_control = NULL;
        enum ldb_scope scope = LDB_SCOPE_DEFAULT;
        const char **attrs = NULL;
        const char *scope_str, *errstr = NULL;
@@ -617,6 +618,31 @@ static NTSTATUS ldapsrv_SearchRequest(struct ldapsrv_call *call)
                }
        }
 
+       notification_control = ldb_request_get_control(lreq, LDB_CONTROL_NOTIFICATION_OID);
+       if (notification_control != NULL) {
+               const struct ldapsrv_call *pc = NULL;
+               size_t count = 0;
+
+               for (pc = call->conn->pending_calls; pc != NULL; pc = pc->next) {
+                       count += 1;
+               }
+
+               if (count >= call->conn->limits.max_notifications) {
+                       DEBUG(10,("SearchRequest: error MaxNotificationPerConn\n"));
+                       result = map_ldb_error(local_ctx,
+                                              LDB_ERR_ADMIN_LIMIT_EXCEEDED,
+                                              "MaxNotificationPerConn reached",
+                                              &errstr);
+                       goto reply;
+               }
+
+               /*
+                * For now we need to do periodic retries on our own.
+                * As the dsdb_notification module will return after each run.
+                */
+               call->notification.busy = true;
+       }
+
        ldb_set_timeout(samdb, lreq, req->timelimit);
 
        if (!call->conn->is_privileged) {
@@ -667,6 +693,22 @@ queue_reply:
                        ldapsrv_queue_reply(call, ent_r);
                }
 
+               if (call->notification.busy) {
+                       /* Move/Add it to the end */
+                       DLIST_DEMOTE(call->conn->pending_calls, call);
+                       call->notification.generation =
+                               call->conn->service->notification.generation;
+
+                       if (res->count != 0) {
+                               call->notification.generation += 1;
+                               ldapsrv_notification_retry_setup(call->conn->service,
+                                                                true);
+                       }
+
+                       talloc_free(local_ctx);
+                       return NT_STATUS_OK;
+               }
+
                /* Send back referrals if they do exist (search operations) */
                if (res->refs != NULL) {
                        char **ref;
@@ -691,6 +733,9 @@ queue_reply:
        }
 
 reply:
+       DLIST_REMOVE(call->conn->pending_calls, call);
+       call->notification.busy = false;
+
        done_r = ldapsrv_init_reply(call, LDAP_TAG_SearchResultDone);
        NT_STATUS_HAVE_NO_MEMORY(done_r);
 
@@ -1157,8 +1202,23 @@ static NTSTATUS ldapsrv_CompareRequest(struct ldapsrv_call *call)
 
 static NTSTATUS ldapsrv_AbandonRequest(struct ldapsrv_call *call)
 {
-/*     struct ldap_AbandonRequest *req = &call->request.r.AbandonRequest;*/
+       struct ldap_AbandonRequest *req = &call->request->r.AbandonRequest;
+       struct ldapsrv_call *c = NULL;
+       struct ldapsrv_call *n = NULL;
+
        DEBUG(10, ("AbandonRequest\n"));
+
+       for (c = call->conn->pending_calls; c != NULL; c = n) {
+               n = c->next;
+
+               if (c->request->messageid != req->messageid) {
+                       continue;
+               }
+
+               DLIST_REMOVE(call->conn->pending_calls, c);
+               TALLOC_FREE(c);
+       }
+
        return NT_STATUS_OK;
 }
 
@@ -1169,6 +1229,8 @@ NTSTATUS ldapsrv_do_call(struct ldapsrv_call *call)
        struct ldb_context *samdb = call->conn->ldb;
        NTSTATUS status;
        time_t *lastts;
+       bool recheck_schema = false;
+
        /* Check for undecoded critical extensions */
        for (i=0; msg->controls && msg->controls[i]; i++) {
                if (!msg->controls_decoded[i] && 
@@ -1187,26 +1249,31 @@ NTSTATUS ldapsrv_do_call(struct ldapsrv_call *call)
        case LDAP_TAG_SearchRequest:
                return ldapsrv_SearchRequest(call);
        case LDAP_TAG_ModifyRequest:
+               recheck_schema = true;
                status = ldapsrv_ModifyRequest(call);
                break;
        case LDAP_TAG_AddRequest:
+               recheck_schema = true;
                status = ldapsrv_AddRequest(call);
                break;
        case LDAP_TAG_DelRequest:
-               return ldapsrv_DelRequest(call);
+               status = ldapsrv_DelRequest(call);
+               break;
        case LDAP_TAG_ModifyDNRequest:
-               return ldapsrv_ModifyDNRequest(call);
+               status = ldapsrv_ModifyDNRequest(call);
+               break;
        case LDAP_TAG_CompareRequest:
                return ldapsrv_CompareRequest(call);
        case LDAP_TAG_AbandonRequest:
                return ldapsrv_AbandonRequest(call);
        case LDAP_TAG_ExtendedRequest:
-               return ldapsrv_ExtendedRequest(call);
+               status = ldapsrv_ExtendedRequest(call);
+               break;
        default:
                return ldapsrv_unwilling(call, LDAP_PROTOCOL_ERROR);
        }
 
-       if (NT_STATUS_IS_OK(status)) {
+       if (NT_STATUS_IS_OK(status) && recheck_schema) {
                lastts = (time_t *)ldb_get_opaque(samdb, DSDB_OPAQUE_LAST_SCHEMA_UPDATE_MSG_OPAQUE_NAME);
                if (lastts && !*lastts) {
                        DEBUG(10, ("Schema update now was requested, "
@@ -1222,5 +1289,10 @@ NTSTATUS ldapsrv_do_call(struct ldapsrv_call *call)
                        ldb_set_opaque(samdb, DSDB_OPAQUE_LAST_SCHEMA_UPDATE_MSG_OPAQUE_NAME, lastts);
                }
        }
+
+       if (NT_STATUS_IS_OK(status)) {
+               ldapsrv_notification_retry_setup(call->conn->service, true);
+       }
+
        return status;
 }
index fcbdadf52ee7836d188af771021f89bdf52234b8..77dfe902fdf15ba266e65050c6ac63da407cdb82 100644 (file)
@@ -333,11 +333,25 @@ NTSTATUS ldapsrv_BindRequest(struct ldapsrv_call *call)
        struct ldapsrv_reply *reply;
        struct ldap_BindResponse *resp;
 
+       if (call->conn->pending_calls != NULL) {
+               reply = ldapsrv_init_reply(call, LDAP_TAG_BindResponse);
+               if (!reply) {
+                       return NT_STATUS_NO_MEMORY;
+               }
+
+               resp = &reply->msg->r.BindResponse;
+               resp->response.resultcode = LDAP_BUSY;
+               resp->response.dn = NULL;
+               resp->response.errormessage = talloc_asprintf(reply, "Pending requests on this LDAP session");
+               resp->response.referral = NULL;
+               resp->SASL.secblob = NULL;
+
+               ldapsrv_queue_reply(call, reply);
+               return NT_STATUS_OK;
+       }
+
        /* 
-        * TODO: we should fail the bind request
-        *       if there're any pending requests.
-        *
-        *       also a simple bind should cancel an
+        * TODO: a simple bind should cancel an
         *       inprogress SASL bind.
         *       (see RFC 4513)
         */
index 338858f0347a24874713bdf383a3abe0289fccd8..2d4a5345460fd88f40e12c6c723b8f7440b9665f 100644 (file)
@@ -112,8 +112,7 @@ static NTSTATUS ldapsrv_StartTLS(struct ldapsrv_call *call,
 
        /*
         * TODO: give LDAP_OPERATIONS_ERROR also when
-        *       there're pending requests or there's
-        *       a SASL bind in progress
+        *       there's a SASL bind in progress
         *       (see rfc4513 section 3.1.1)
         */
        if (call->conn->sockets.tls) {
@@ -126,6 +125,11 @@ static NTSTATUS ldapsrv_StartTLS(struct ldapsrv_call *call,
                return NT_STATUS_LDAP(LDAP_OPERATIONS_ERROR);
        }
 
+       if (call->conn->pending_calls != NULL) {
+               (*errstr) = talloc_asprintf(reply, "START-TLS: pending requests on this LDAP session");
+               return NT_STATUS_LDAP(LDAP_BUSY);
+       }
+
        context = talloc(call, struct ldapsrv_starttls_postprocess_context);
        NT_STATUS_HAVE_NO_MEMORY(context);
 
index 02c3c952920bc94a0051d52e5007bc83b1c7c23b..9b2f18595fc59c5e983ebfcb008cabced14d6b98 100644 (file)
@@ -62,6 +62,8 @@ static void ldapsrv_terminate_connection(struct ldapsrv_connection *conn,
                return;
        }
 
+       DLIST_REMOVE(conn->service->connections, conn);
+
        conn->limits.endtime = timeval_current_ofs(0, 500);
 
        tevent_queue_stop(conn->sockets.send_queue);
@@ -167,6 +169,7 @@ static int ldapsrv_load_limits(struct ldapsrv_connection *conn)
        conn->limits.initial_timeout = 120;
        conn->limits.conn_idle_time = 900;
        conn->limits.max_page_size = 1000;
+       conn->limits.max_notifications = 5;
        conn->limits.search_timeout = 120;
 
 
@@ -233,6 +236,10 @@ static int ldapsrv_load_limits(struct ldapsrv_connection *conn)
                        conn->limits.max_page_size = policy_value;
                        continue;
                }
+               if (strcasecmp("MaxNotificationPerConn", policy_name) == 0) {
+                       conn->limits.max_notifications = policy_value;
+                       continue;
+               }
                if (strcasecmp("MaxQueryDuration", policy_name) == 0) {
                        conn->limits.search_timeout = policy_value;
                        continue;
@@ -347,6 +354,8 @@ static void ldapsrv_accept(struct stream_connection *c,
        /* register the server */       
        irpc_add_name(c->msg_ctx, "ldap_server");
 
+       DLIST_ADD_END(ldapsrv_service->connections, conn);
+
        if (port != 636 && port != 3269) {
                ldapsrv_call_read_next(conn);
                return;
@@ -405,7 +414,11 @@ static bool ldapsrv_call_read_next(struct ldapsrv_connection *conn)
 {
        struct tevent_req *subreq;
 
-       if (timeval_is_zero(&conn->limits.endtime)) {
+       if (conn->pending_calls != NULL) {
+               conn->limits.endtime = timeval_zero();
+
+               ldapsrv_notification_retry_setup(conn->service, false);
+       } else if (timeval_is_zero(&conn->limits.endtime)) {
                conn->limits.endtime =
                        timeval_current_ofs(conn->limits.initial_timeout, 0);
        } else {
@@ -456,9 +469,11 @@ static bool ldapsrv_call_read_next(struct ldapsrv_connection *conn)
                                "no memory for tstream_read_pdu_blob_send");
                return false;
        }
-       tevent_req_set_endtime(subreq,
-                              conn->connection->event.ctx,
-                              conn->limits.endtime);
+       if (!timeval_is_zero(&conn->limits.endtime)) {
+               tevent_req_set_endtime(subreq,
+                                      conn->connection->event.ctx,
+                                      conn->limits.endtime);
+       }
        tevent_req_set_callback(subreq, ldapsrv_call_read_done, conn);
        conn->sockets.read_req = subreq;
        return true;
@@ -544,6 +559,7 @@ static void ldapsrv_call_read_done(struct tevent_req *subreq)
        conn->active_call = subreq;
 }
 
+
 static void ldapsrv_call_writev_done(struct tevent_req *subreq);
 
 static void ldapsrv_call_process_done(struct tevent_req *subreq)
@@ -590,7 +606,9 @@ static void ldapsrv_call_process_done(struct tevent_req *subreq)
        }
 
        if (blob.length == 0) {
-               TALLOC_FREE(call);
+               if (!call->notification.busy) {
+                       TALLOC_FREE(call);
+               }
 
                ldapsrv_call_read_next(conn);
                return;
@@ -654,7 +672,9 @@ static void ldapsrv_call_writev_done(struct tevent_req *subreq)
                return;
        }
 
-       TALLOC_FREE(call);
+       if (!call->notification.busy) {
+               TALLOC_FREE(call);
+       }
 
        ldapsrv_call_read_next(conn);
 }
@@ -688,6 +708,112 @@ static void ldapsrv_call_postprocess_done(struct tevent_req *subreq)
        ldapsrv_call_read_next(conn);
 }
 
+static void ldapsrv_notification_retry_done(struct tevent_req *subreq);
+
+void ldapsrv_notification_retry_setup(struct ldapsrv_service *service, bool force)
+{
+       struct ldapsrv_connection *conn = NULL;
+       struct timeval retry;
+       size_t num_pending = 0;
+       size_t num_active = 0;
+
+       if (force) {
+               TALLOC_FREE(service->notification.retry);
+               service->notification.generation += 1;
+       }
+
+       if (service->notification.retry != NULL) {
+               return;
+       }
+
+       for (conn = service->connections; conn != NULL; conn = conn->next) {
+               if (conn->pending_calls == NULL) {
+                       continue;
+               }
+
+               num_pending += 1;
+
+               if (conn->pending_calls->notification.generation !=
+                   service->notification.generation)
+               {
+                       num_active += 1;
+               }
+       }
+
+       if (num_pending == 0) {
+               return;
+       }
+
+       if (num_active != 0) {
+               retry = timeval_current_ofs(0, 100);
+       } else {
+               retry = timeval_current_ofs(5, 0);
+       }
+
+       service->notification.retry = tevent_wakeup_send(service,
+                                                        service->task->event_ctx,
+                                                        retry);
+       if (service->notification.retry == NULL) {
+               /* retry later */
+               return;
+       }
+
+       tevent_req_set_callback(service->notification.retry,
+                               ldapsrv_notification_retry_done,
+                               service);
+}
+
+static void ldapsrv_notification_retry_done(struct tevent_req *subreq)
+{
+       struct ldapsrv_service *service =
+               tevent_req_callback_data(subreq,
+               struct ldapsrv_service);
+       struct ldapsrv_connection *conn = NULL;
+       struct ldapsrv_connection *conn_next = NULL;
+       bool ok;
+
+       service->notification.retry = NULL;
+
+       ok = tevent_wakeup_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (!ok) {
+               /* ignore */
+       }
+
+       for (conn = service->connections; conn != NULL; conn = conn_next) {
+               struct ldapsrv_call *call = conn->pending_calls;
+
+               conn_next = conn->next;
+
+               if (conn->pending_calls == NULL) {
+                       continue;
+               }
+
+               if (conn->active_call != NULL) {
+                       continue;
+               }
+
+               DLIST_DEMOTE(conn->pending_calls, call);
+               call->notification.generation =
+                               service->notification.generation;
+
+               /* queue the call in the global queue */
+               subreq = ldapsrv_process_call_send(call,
+                                                  conn->connection->event.ctx,
+                                                  conn->service->call_queue,
+                                                  call);
+               if (subreq == NULL) {
+                       ldapsrv_terminate_connection(conn,
+                                       "ldapsrv_process_call_send failed");
+                       continue;
+               }
+               tevent_req_set_callback(subreq, ldapsrv_call_process_done, call);
+               conn->active_call = subreq;
+       }
+
+       ldapsrv_notification_retry_setup(service, false);
+}
+
 struct ldapsrv_process_call_state {
        struct ldapsrv_call *call;
 };
index bfd95c05c1832f09232f55321ecc907f26203735..27e0f1322bb5c7669e2d0fd3b2ab5e85aea88cf5 100644 (file)
@@ -24,6 +24,7 @@
 #include "system/network.h"
 
 struct ldapsrv_connection {
+       struct ldapsrv_connection *next, *prev;
        struct loadparm_context *lp_ctx;
        struct stream_connection *connection;
        struct gensec_security *gensec;
@@ -48,15 +49,19 @@ struct ldapsrv_connection {
                int initial_timeout;
                int conn_idle_time;
                int max_page_size;
+               int max_notifications;
                int search_timeout;
                struct timeval endtime;
                const char *reason;
        } limits;
 
        struct tevent_req *active_call;
+
+       struct ldapsrv_call *pending_calls;
 };
 
 struct ldapsrv_call {
+       struct ldapsrv_call *prev, *next;
        struct ldapsrv_connection *conn;
        struct ldap_message *request;
        struct ldapsrv_reply {
@@ -70,12 +75,22 @@ struct ldapsrv_call {
                                               void *private_data);
        NTSTATUS (*postprocess_recv)(struct tevent_req *req);
        void *postprocess_private;
+
+       struct {
+               bool busy;
+               uint64_t generation;
+       } notification;
 };
 
 struct ldapsrv_service {
        struct tstream_tls_params *tls_params;
        struct task_server *task;
        struct tevent_queue *call_queue;
+       struct ldapsrv_connection *connections;
+       struct {
+               uint64_t generation;
+               struct tevent_req *retry;
+       } notification;
 };
 
 #include "ldap_server/proto.h"