s3: New notify implementation
authorVolker Lendecke <vl@samba.org>
Wed, 4 Apr 2012 12:51:43 +0000 (14:51 +0200)
committerVolker Lendecke <vl@samba.org>
Tue, 17 Apr 2012 08:21:02 +0000 (10:21 +0200)
From notify_internal.c:

        /*
         * The notify database is split up into two databases: One
         * relatively static index db and the real notify db with the
         * volatile entries.
         */

This change is necessary to make notify scale better in a cluster

librpc/idl/notify.idl
source3/include/smb.h
source3/librpc/idl/messaging.idl
source3/smbd/files.c
source3/smbd/notify.c
source3/smbd/notify_internal.c
source3/smbd/proto.h
source3/smbd/server.c

index 845010601ea6b982a3717b2f756767c4505ef320..ec81e8c64ac065c58e424af4c47737a273e52de5 100644 (file)
@@ -32,6 +32,13 @@ interface notify
                notify_entry entries[num_entries];
        } notify_entry_array;
 
+       typedef [public] struct {
+               server_id server;
+               uint32 filter; /* filter to apply in this directory */
+               uint32 subdir_filter; /* filter to apply in child directories */
+               pointer private_data;
+       } notify_db_entry;
+
        /*
          to allow for efficient search for matching entries, we
          divide them by the directory depth, with a separate array
@@ -62,6 +69,12 @@ interface notify
                pointer private_data;
        } notify_event;
 
+       typedef [public] struct {
+               uint32 action;
+               uint32 filter;
+               utf8string path;
+       } notify_remote_event;
+
        typedef [v1_enum] enum {
                FILE_ACTION_ADDED               = 0x00000001,
                FILE_ACTION_REMOVED             = 0x00000002,
index 758ad9959d129d419564bccb989dd5283a3a53ec..b5c674dce606e6d8e06dc98052d3025904b457e0 100644 (file)
@@ -182,7 +182,7 @@ struct notify_change {
 };
 
 struct notify_mid_map;
-struct notify_entry;
+struct notify_db_entry;
 struct notify_event;
 struct notify_change_request;
 struct sys_notify_backend;
index 1c80cc2b3074661ab054a3618214adeed51790f7..cda42fcf1e42331f38940f6cef5726c1e60b39b5 100644 (file)
@@ -85,6 +85,9 @@ interface messaging
                /*Close a specific file given a share entry. */
                MSG_SMB_CLOSE_FILE              = 0x0313,
 
+               /* Trigger a notify cleanup run */
+               MSG_SMB_NOTIFY_CLEANUP          = 0x0314,
+
                /* winbind messages */
                MSG_WINBIND_FINISHED            = 0x0401,
                MSG_WINBIND_FORGET_STATE        = 0x0402,
index c71e864648a9ad4a30a60fa09b43692a6129d05a..edcd98cd85ebb21a4a4cf7d8c66ee9a226fdff8c 100644 (file)
@@ -449,10 +449,6 @@ void file_free(struct smb_request *req, files_struct *fsp)
        if (fsp->notify) {
                struct notify_context *notify_ctx =
                        fsp->conn->sconn->notify_ctx;
-               if (fsp->is_directory) {
-                       notify_remove_onelevel(notify_ctx,
-                                              &fsp->file_id, fsp);
-               }
                notify_remove(notify_ctx, fsp);
                TALLOC_FREE(fsp->notify);
        }
index fd9e5524a7c8e25fde66732fc971fafe9dbc3d22..0401b65a6a65511d9d73e05891a273913d5fc915 100644 (file)
@@ -188,7 +188,7 @@ NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter,
 {
        char *fullpath;
        size_t len;
-       struct notify_entry e;
+       uint32_t subdir_filter;
        NTSTATUS status = NT_STATUS_NOT_IMPLEMENTED;
 
        if (fsp->notify != NULL) {
@@ -220,22 +220,14 @@ NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter,
                fullpath[len-2] = '\0';
        }
 
-       ZERO_STRUCT(e);
-       e.path = fullpath;
-       e.dir_fd = fsp->fh->fd;
-       e.dir_id = fsp->file_id;
-       e.filter = filter;
-       e.subdir_filter = 0;
-       if (recursive) {
-               e.subdir_filter = filter;
-       }
+       subdir_filter = recursive ? filter : 0;
 
        if (fsp->conn->sconn->sys_notify_ctx != NULL) {
                void *sys_notify_handle = NULL;
 
                status = SMB_VFS_NOTIFY_WATCH(
                        fsp->conn, fsp->conn->sconn->sys_notify_ctx,
-                       e.path, &e.filter, &e.subdir_filter,
+                       fullpath, &filter, &subdir_filter,
                        sys_notify_callback, fsp, &sys_notify_handle);
 
                if (NT_STATUS_IS_OK(status)) {
@@ -243,9 +235,10 @@ NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter,
                }
        }
 
-       if ((e.filter != 0) || (e.subdir_filter != 0)) {
-               status = notify_add(fsp->conn->sconn->notify_ctx, fsp->conn,
-                                   &e, notify_callback, fsp);
+       if ((filter != 0) || (subdir_filter != 0)) {
+               status = notify_add(fsp->conn->sconn->notify_ctx,
+                                   fullpath, filter, subdir_filter,
+                                   notify_callback, fsp);
        }
        TALLOC_FREE(fullpath);
        return status;
@@ -389,25 +382,10 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 {
        struct notify_context *notify_ctx = conn->sconn->notify_ctx;
        char *fullpath;
-       char *parent;
-       const char *name;
 
        if (path[0] == '.' && path[1] == '/') {
                path += 2;
        }
-       if (parent_dirname(talloc_tos(), path, &parent, &name)) {
-               struct smb_filename smb_fname_parent;
-
-               ZERO_STRUCT(smb_fname_parent);
-               smb_fname_parent.base_name = parent;
-
-               if (SMB_VFS_STAT(conn, &smb_fname_parent) != -1) {
-                       notify_onelevel(notify_ctx, action, filter,
-                           SMB_VFS_FILE_ID_CREATE(conn, &smb_fname_parent.st),
-                           name);
-               }
-       }
-
        fullpath = talloc_asprintf(talloc_tos(), "%s/%s", conn->connectpath,
                                   path);
        if (fullpath == NULL) {
index c036e8a000684553ef4cf65c4d1385a0fb8cf0a2..6e6bdf7b0368294c9c60397c9976bb1ac34b6f14 100644 (file)
@@ -2,6 +2,7 @@
    Unix SMB/CIFS implementation.
 
    Copyright (C) Andrew Tridgell 2006
+   Copyright (C) Volker Lendecke 2012
 
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
 #include "librpc/gen_ndr/ndr_notify.h"
 #include "dbwrap/dbwrap.h"
 #include "dbwrap/dbwrap_open.h"
+#include "dbwrap/dbwrap_tdb.h"
 #include "smbd/smbd.h"
 #include "messages.h"
 #include "lib/tdb_wrap/tdb_wrap.h"
 #include "util_tdb.h"
 #include "lib/param/param.h"
-
-struct notify_context {
-       struct db_context *db_recursive;
-       struct db_context *db_onelevel;
-       struct server_id server;
-       struct messaging_context *messaging_ctx;
-       struct notify_list *list;
-       struct notify_array *array;
-       int seqnum;
-       TDB_DATA key;
-};
-
+#include "lib/dbwrap/dbwrap_cache.h"
+#include "ctdb_srvids.h"
+#include "ctdbd_conn.h"
+#include "ctdb_conn.h"
+#include "lib/util/tevent_unix.h"
 
 struct notify_list {
        struct notify_list *next, *prev;
-       void *private_data;
+       const char *path;
        void (*callback)(void *, const struct notify_event *);
-       int depth;
+       void *private_data;
 };
 
-#define NOTIFY_KEY "notify array"
-
-#define NOTIFY_ENABLE          "notify:enable"
-#define NOTIFY_ENABLE_DEFAULT  True
+struct notify_context {
+       struct messaging_context *msg;
+       struct notify_list *list;
 
-static NTSTATUS notify_remove_all(struct notify_context *notify,
-                                 const struct server_id *server);
-static void notify_handler(struct messaging_context *msg_ctx, void *private_data,
-                          uint32_t msg_type, struct server_id server_id, DATA_BLOB *data);
+       /*
+        * The notify database is split up into two databases: One
+        * relatively static index db and the real notify db with the
+        * volatile entries.
+        */
 
-/*
-  destroy the notify context
-*/
-static int notify_destructor(struct notify_context *notify)
-{
-       messaging_deregister(notify->messaging_ctx, MSG_PVFS_NOTIFY, notify);
+       /*
+        * "db_notify" is indexed by pathname. Per record it stores an
+        * array of notify_db_entry structs. These represent the
+        * notify records as requested by the smb client. This
+        * database is always held locally, it is never clustered.
+        */
+       struct db_context *db_notify;
 
-       if (notify->list != NULL) {
-               notify_remove_all(notify, &notify->server);
-       }
+       /*
+        * "db_index" is indexed by pathname. The records are an array
+        * of VNNs which have any interest in notifies for this path
+        * name.
+        *
+        * In the non-clustered case this database is cached in RAM by
+        * means of db_cache_open, which maintains a cache per
+        * process. Cache consistency is maintained by the tdb
+        * sequence number.
+        *
+        * In the clustered case right now we can not use the tdb
+        * sequence number, but by means of read only records we
+        * should be able to avoid a lot of full migrations.
+        *
+        * In both cases, it is important to keep the update
+        * operations to db_index to a minimum. This is achieved by
+        * delayed deletion. When a db_notify is initially created,
+        * the db_index record is also created. When more notifies are
+        * add for a path, then only the db_notify record needs to be
+        * modified, the db_index record is not touched. When the last
+        * entry from the db_notify record is deleted, the db_index
+        * record is not immediately deleted. Instead, the db_notify
+        * record is replaced with a current timestamp. A regular
+        * cleanup process will delete all db_index records that are
+        * older than a minute.
+        */
+       struct db_context *db_index;
+};
 
-       return 0;
-}
+static void notify_trigger_local(struct notify_context *notify,
+                                uint32_t action, uint32_t filter,
+                                const char *path, size_t path_len,
+                                bool recursive);
+static NTSTATUS notify_send(struct notify_context *notify,
+                           struct server_id *pid,
+                           const char *path, uint32_t action,
+                           void *private_data);
+static NTSTATUS notify_add_entry(struct db_record *rec,
+                                const struct notify_db_entry *e,
+                                bool *p_add_idx);
+static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn);
+
+static NTSTATUS notify_del_entry(struct db_record *rec,
+                                const struct server_id *pid,
+                                void *private_data);
+static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn);
+
+static int notify_context_destructor(struct notify_context *notify);
+
+static void notify_handler(struct messaging_context *msg_ctx,
+                          void *private_data, uint32_t msg_type,
+                          struct server_id server_id, DATA_BLOB *data);
 
-/*
-  Open up the notify.tdb database. You should close it down using
-  talloc_free(). We need the messaging_ctx to allow for notifications
-  via internal messages
-*/
 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
-                                  struct messaging_context *messaging_ctx,
+                                  struct messaging_context *msg,
                                   struct event_context *ev)
 {
        struct notify_context *notify;
 
        notify = talloc(mem_ctx, struct notify_context);
        if (notify == NULL) {
-               return NULL;
-       }
-
-       notify->db_recursive = db_open(notify, lock_path("notify.tdb"),
-                                      0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
-                                      O_RDWR|O_CREAT, 0644,
-                                      DBWRAP_LOCK_ORDER_2);
-       if (notify->db_recursive == NULL) {
-               talloc_free(notify);
-               return NULL;
+               goto fail;
        }
+       notify->msg = msg;
+       notify->list = NULL;
 
-       notify->db_onelevel = db_open(notify, lock_path("notify_onelevel.tdb"),
-                                     0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
-                                     O_RDWR|O_CREAT, 0644,
-                                     DBWRAP_LOCK_ORDER_2);
-       if (notify->db_onelevel == NULL) {
-               talloc_free(notify);
-               return NULL;
+       notify->db_notify = db_open_tdb(
+               notify, lock_path("notify.tdb"),
+               0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
+               O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_2);
+       if (notify->db_notify == NULL) {
+               goto fail;
+       }
+       notify->db_index = db_open(
+               notify, lock_path("notify_index.tdb"),
+               0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
+               O_RDWR|O_CREAT, 0644, DBWRAP_LOCK_ORDER_3);
+       if (notify->db_index == NULL) {
+               goto fail;
+       }
+       if (!lp_clustering()) {
+               notify->db_index = db_open_cache(notify, notify->db_index);
+               if (notify->db_index == NULL) {
+                       goto fail;
+               }
        }
 
-       notify->server = messaging_server_id(messaging_ctx);
-       notify->messaging_ctx = messaging_ctx;
-       notify->list = NULL;
-       notify->array = NULL;
-       notify->seqnum = dbwrap_get_seqnum(notify->db_recursive);
-       notify->key = string_term_tdb_data(NOTIFY_KEY);
+       if (notify->msg != NULL) {
+               NTSTATUS status;
 
-       talloc_set_destructor(notify, notify_destructor);
+               status = messaging_register(notify->msg, notify,
+                                           MSG_PVFS_NOTIFY, notify_handler);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(1, ("messaging_register returned %s\n",
+                                 nt_errstr(status)));
+                       goto fail;
+               }
+       }
 
-       /* register with the messaging subsystem for the notify
-          message type */
-       messaging_register(notify->messaging_ctx, notify,
-                          MSG_PVFS_NOTIFY, notify_handler);
+       talloc_set_destructor(notify, notify_context_destructor);
 
        return notify;
+fail:
+       TALLOC_FREE(notify);
+       return NULL;
 }
 
-bool notify_internal_parent_init(TALLOC_CTX *mem_ctx)
+static int notify_context_destructor(struct notify_context *notify)
 {
-       struct tdb_wrap *db1, *db2;
-       struct loadparm_context *lp_ctx;
-
-       if (lp_clustering()) {
-               return true;
-       }
+       DEBUG(10, ("notify_context_destructor called\n"));
 
-       lp_ctx = loadparm_init_s3(mem_ctx, loadparm_s3_context());
-       if (lp_ctx == NULL) {
-               DEBUG(0, ("loadparm_init_s3 failed\n"));
-               return false;
-       }
-       /*
-        * Open the tdbs in the parent process (smbd) so that our
-        * CLEAR_IF_FIRST optimization in tdb_reopen_all can properly
-        * work.
-        */
-
-       db1 = tdb_wrap_open(mem_ctx, lock_path("notify.tdb"),
-                           0, TDB_SEQNUM|TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH,
-                           O_RDWR|O_CREAT, 0644, lp_ctx);
-       if (db1 == NULL) {
-               talloc_unlink(mem_ctx, lp_ctx);
-               DEBUG(1, ("could not open notify.tdb: %s\n", strerror(errno)));
-               return false;
-       }
-       db2 = tdb_wrap_open(mem_ctx, lock_path("notify_onelevel.tdb"),
-                           0, TDB_CLEAR_IF_FIRST|TDB_INCOMPATIBLE_HASH, O_RDWR|O_CREAT, 0644, lp_ctx);
-       talloc_unlink(mem_ctx, lp_ctx);
-       if (db2 == NULL) {
-               DEBUG(1, ("could not open notify_onelevel.tdb: %s\n",
-                         strerror(errno)));
-               TALLOC_FREE(db1);
-               return false;
+       if (notify->msg != NULL) {
+               messaging_deregister(notify->msg, MSG_PVFS_NOTIFY, notify);
        }
-       return true;
-}
 
-/*
-  lock and fetch the record
-*/
-static NTSTATUS notify_fetch_locked(struct notify_context *notify, struct db_record **rec)
-{
-       *rec = dbwrap_fetch_locked(notify->db_recursive, notify, notify->key);
-       if (*rec == NULL) {
-               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       while (notify->list != NULL) {
+               DEBUG(10, ("Removing private_data=%p\n",
+                          notify->list->private_data));
+               notify_remove(notify, notify->list->private_data);
        }
-       return NT_STATUS_OK;
+       return 0;
 }
 
-/*
-  load the notify array
-*/
-static NTSTATUS notify_load(struct notify_context *notify, struct db_record *rec)
+NTSTATUS notify_add(struct notify_context *notify,
+                   const char *path, uint32_t filter, uint32_t subdir_filter,
+                   void (*callback)(void *, const struct notify_event *),
+                   void *private_data)
 {
-       TDB_DATA dbuf;
-       DATA_BLOB blob;
+       struct notify_db_entry e;
+       struct notify_list *listel;
+       struct db_record *notify_rec, *idx_rec;
+       bool add_idx;
        NTSTATUS status;
-       int seqnum;
+       TDB_DATA key, notify_copy;
+
+       if (notify == NULL) {
+               return NT_STATUS_NOT_IMPLEMENTED;
+       }
 
-       seqnum = dbwrap_get_seqnum(notify->db_recursive);
+       DEBUG(10, ("notify_add: path=[%s], private_data=%p\n", path,
+                  private_data));
 
-       if (seqnum == notify->seqnum && notify->array != NULL) {
-               return NT_STATUS_OK;
+       listel = talloc(notify, struct notify_list);
+       if (listel == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       listel->callback = callback;
+       listel->private_data = private_data;
+       listel->path = talloc_strdup(listel, path);
+       if (listel->path == NULL) {
+               TALLOC_FREE(listel);
+               return NT_STATUS_NO_MEMORY;
        }
+       DLIST_ADD(notify->list, listel);
 
-       notify->seqnum = seqnum;
+       ZERO_STRUCT(e);
+       e.filter = filter;
+       e.subdir_filter = subdir_filter;
+       e.server = messaging_server_id(notify->msg);
+       e.private_data = private_data;
 
-       talloc_free(notify->array);
-       notify->array = talloc_zero(notify, struct notify_array);
-       NT_STATUS_HAVE_NO_MEMORY(notify->array);
+       key = string_tdb_data(path);
 
-       if (!rec) {
-               status = dbwrap_fetch(notify->db_recursive, notify,
-                                     notify->key, &dbuf);
-               if (!NT_STATUS_IS_OK(status)) {
-                       return NT_STATUS_INTERNAL_DB_CORRUPTION;
-               }
-       } else {
-               dbuf = dbwrap_record_get_value(rec);
-       }
-
-       blob.data = (uint8 *)dbuf.dptr;
-       blob.length = dbuf.dsize;
-
-       status = NT_STATUS_OK;
-       if (blob.length > 0) {
-               enum ndr_err_code ndr_err;
-               ndr_err = ndr_pull_struct_blob(&blob, notify->array, notify->array,
-                                              (ndr_pull_flags_fn_t)ndr_pull_notify_array);
-               if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-                       /* 1. log that we got a corrupt notify_array
-                        * 2. clear the variable the garbage was stored into to not trip
-                        *  over it next time this method is entered with the same seqnum
-                        * 3. delete it from the database */
-                       DEBUG(2, ("notify_array is corrupt, discarding it\n"));
-
-                       ZERO_STRUCTP(notify->array);
-                       if (rec != NULL) {
-                               dbwrap_record_delete(rec);
-                       }
+       notify_rec = dbwrap_fetch_locked(notify->db_notify,
+                                        talloc_tos(), key);
+       if (notify_rec == NULL) {
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto fail;
+       }
 
-               } else {
-                       if (DEBUGLEVEL >= 10) {
-                               DEBUG(10, ("notify_load:\n"));
-                               NDR_PRINT_DEBUG(notify_array, notify->array);
-                       }
+       /*
+        * Make a copy of the notify_rec for easy restore in case
+        * updating the index_rec fails;
+        */
+       notify_copy = dbwrap_record_get_value(notify_rec);
+       if (notify_copy.dsize != 0) {
+               notify_copy.dptr = (uint8_t *)talloc_memdup(
+                       notify_rec, notify_copy.dptr,
+                       notify_copy.dsize);
+               if (notify_copy.dptr == NULL) {
+                       TALLOC_FREE(notify_rec);
+                       status = NT_STATUS_NO_MEMORY;
+                       goto fail;
                }
        }
 
+       if (DEBUGLEVEL >= 10) {
+               NDR_PRINT_DEBUG(notify_db_entry, &e);
+       }
 
-       if (!rec) {
-               talloc_free(dbuf.dptr);
+       status = notify_add_entry(notify_rec, &e, &add_idx);
+       if (!NT_STATUS_IS_OK(status)) {
+               goto fail;
+       }
+       if (!add_idx) {
+               /*
+                * Someone else has added the idx entry already
+                */
+               TALLOC_FREE(notify_rec);
+               return NT_STATUS_OK;
        }
 
-       return status;
-}
+       idx_rec = dbwrap_fetch_locked(notify->db_index,
+                                     talloc_tos(), key);
+       if (idx_rec == NULL) {
+               status = NT_STATUS_INTERNAL_DB_CORRUPTION;
+               goto restore_notify;
+       }
+       status = notify_add_idx(idx_rec, get_my_vnn());
+       if (!NT_STATUS_IS_OK(status)) {
+               goto restore_notify;
+       }
 
-/*
-  compare notify entries for sorting
-*/
-static int notify_compare(const struct notify_entry *e1, const struct notify_entry *e2)
-{
-       return strcmp(e1->path, e2->path);
+       TALLOC_FREE(idx_rec);
+       TALLOC_FREE(notify_rec);
+       return NT_STATUS_OK;
+
+restore_notify:
+       if (notify_copy.dsize != 0) {
+               dbwrap_record_store(notify_rec, notify_copy, 0);
+       } else {
+               dbwrap_record_delete(notify_rec);
+       }
+       TALLOC_FREE(notify_rec);
+fail:
+       DLIST_REMOVE(notify->list, listel);
+       TALLOC_FREE(listel);
+       return status;
 }
 
-/*
-  save the notify array
-*/
-static NTSTATUS notify_save(struct notify_context *notify, struct db_record *rec)
+static NTSTATUS notify_add_entry(struct db_record *rec,
+                                const struct notify_db_entry *e,
+                                bool *p_add_idx)
 {
-       TDB_DATA dbuf;
-       DATA_BLOB blob;
+       TDB_DATA value = dbwrap_record_get_value(rec);
+       struct notify_db_entry *entries;
+       size_t num_entries;
+       bool add_idx = true;
        NTSTATUS status;
-       enum ndr_err_code ndr_err;
-       TALLOC_CTX *tmp_ctx;
 
-       /* if possible, remove some depth arrays */
-       while (notify->array->num_depths > 0 &&
-              notify->array->depth[notify->array->num_depths-1].num_entries == 0) {
-               notify->array->num_depths--;
+       if (value.dsize == sizeof(time_t)) {
+               DEBUG(10, ("Re-using deleted entry\n"));
+               value.dsize = 0;
+               add_idx = false;
        }
 
-       /* we might just be able to delete the record */
-       if (notify->array->num_depths == 0) {
-               return dbwrap_record_delete(rec);
+       if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
+               DEBUG(1, ("Invalid value.dsize = %u\n",
+                         (unsigned)value.dsize));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
        }
+       num_entries = value.dsize / sizeof(struct notify_db_entry);
 
-       tmp_ctx = talloc_new(notify);
-       NT_STATUS_HAVE_NO_MEMORY(tmp_ctx);
+       if (num_entries != 0) {
+               add_idx = false;
+       }
 
-       ndr_err = ndr_push_struct_blob(&blob, tmp_ctx, notify->array,
-                                     (ndr_push_flags_fn_t)ndr_push_notify_array);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               talloc_free(tmp_ctx);
-               return ndr_map_error2ntstatus(ndr_err);
+       entries = talloc_array(rec, struct notify_db_entry, num_entries + 1);
+       if (entries == NULL) {
+               return NT_STATUS_NO_MEMORY;
        }
+       memcpy(entries, value.dptr, value.dsize);
 
-       if (DEBUGLEVEL >= 10) {
-               DEBUG(10, ("notify_save:\n"));
-               NDR_PRINT_DEBUG(notify_array, notify->array);
+       entries[num_entries] = *e;
+       value = make_tdb_data((uint8_t *)entries, talloc_get_size(entries));
+       status = dbwrap_record_store(rec, value, 0);
+       TALLOC_FREE(entries);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
        }
+       *p_add_idx = add_idx;
+       return NT_STATUS_OK;
+}
 
-       dbuf.dptr = blob.data;
-       dbuf.dsize = blob.length;
+static NTSTATUS notify_add_idx(struct db_record *rec, uint32_t vnn)
+{
+       TDB_DATA value = dbwrap_record_get_value(rec);
+       uint32_t *vnns;
+       size_t i, num_vnns;
+       NTSTATUS status;
 
-       status = dbwrap_record_store(rec, dbuf, TDB_REPLACE);
-       talloc_free(tmp_ctx);
+       if ((value.dsize % sizeof(uint32_t)) != 0) {
+               DEBUG(1, ("Invalid value.dsize = %u\n",
+                         (unsigned)value.dsize));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+       num_vnns = value.dsize / sizeof(uint32_t);
+       vnns = (uint32_t *)value.dptr;
 
-       return status;
-}
+       for (i=0; i<num_vnns; i++) {
+               if (vnns[i] == vnn) {
+                       return NT_STATUS_OK;
+               }
+               if (vnns[i] > vnn) {
+                       break;
+               }
+       }
 
+       value.dptr = (uint8_t *)talloc_realloc(
+               rec, value.dptr, uint32_t, num_vnns + 1);
+       if (value.dptr == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       value.dsize = talloc_get_size(value.dptr);
 
-/*
-  handle incoming notify messages
-*/
-static void notify_handler(struct messaging_context *msg_ctx, void *private_data,
-                          uint32_t msg_type, struct server_id server_id, DATA_BLOB *data)
+       vnns = (uint32_t *)value.dptr;
+
+       memmove(&vnns[i+1], &vnns[i], sizeof(uint32_t) * (num_vnns - i));
+       vnns[i] = vnn;
+
+       status = dbwrap_record_store(rec, value, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               return status;
+       }
+       return NT_STATUS_OK;
+}
+
+NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
 {
-       struct notify_context *notify = talloc_get_type(private_data, struct notify_context);
-       enum ndr_err_code ndr_err;
-       struct notify_event ev;
-       TALLOC_CTX *tmp_ctx = talloc_new(notify);
+       struct server_id pid = messaging_server_id(notify->msg);
        struct notify_list *listel;
+       struct db_record *notify_rec;
+       NTSTATUS status;
 
-       if (tmp_ctx == NULL) {
-               return;
+       if (notify == NULL) {
+               return NT_STATUS_NOT_IMPLEMENTED;
        }
 
-       ndr_err = ndr_pull_struct_blob(data, tmp_ctx, &ev,
-                                      (ndr_pull_flags_fn_t)ndr_pull_notify_event);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               talloc_free(tmp_ctx);
-               return;
-       }
+       DEBUG(10, ("notify_remove: private_data=%p\n", private_data));
 
        for (listel=notify->list;listel;listel=listel->next) {
-               if (listel->private_data == ev.private_data) {
-                       listel->callback(listel->private_data, &ev);
+               if (listel->private_data == private_data) {
+                       DLIST_REMOVE(notify->list, listel);
                        break;
                }
        }
-
-       talloc_free(tmp_ctx);
+       if (listel == NULL) {
+               DEBUG(10, ("%p not found\n", private_data));
+               return NT_STATUS_NOT_FOUND;
+       }
+       notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(),
+                                        string_tdb_data(listel->path));
+       TALLOC_FREE(listel);
+       if (notify_rec == NULL) {
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+       status = notify_del_entry(notify_rec, &pid, private_data);
+       DEBUG(10, ("del_entry returned %s\n", nt_errstr(status)));
+       TALLOC_FREE(notify_rec);
+       return status;
 }
 
-/*
-  add an entry to the notify array
-*/
-static NTSTATUS notify_add_array(struct notify_context *notify, struct db_record *rec,
-                                struct notify_entry *e,
-                                void *private_data, int depth)
+static NTSTATUS notify_del_entry(struct db_record *rec,
+                                const struct server_id *pid,
+                                void *private_data)
 {
-       int i;
-       struct notify_depth *d;
-       struct notify_entry *ee;
-
-       /* possibly expand the depths array */
-       if (depth >= notify->array->num_depths) {
-               d = talloc_realloc(notify->array, notify->array->depth,
-                                  struct notify_depth, depth+1);
-               NT_STATUS_HAVE_NO_MEMORY(d);
-               for (i=notify->array->num_depths;i<=depth;i++) {
-                       ZERO_STRUCT(d[i]);
-               }
-               notify->array->depth = d;
-               notify->array->num_depths = depth+1;
-       }
-       d = &notify->array->depth[depth];
+       TDB_DATA value = dbwrap_record_get_value(rec);
+       struct notify_db_entry *entries;
+       size_t i, num_entries;
+       time_t now;
 
-       /* expand the entries array */
-       ee = talloc_realloc(notify->array->depth, d->entries, struct notify_entry,
-                           d->num_entries+1);
-       NT_STATUS_HAVE_NO_MEMORY(ee);
-       d->entries = ee;
+       DEBUG(10, ("del_entry called for %s %p\n", procid_str_static(pid),
+                  private_data));
 
-       d->entries[d->num_entries] = *e;
-       d->entries[d->num_entries].private_data = private_data;
-       d->entries[d->num_entries].server = notify->server;
-       d->entries[d->num_entries].path_len = strlen(e->path);
-       d->num_entries++;
-
-       d->max_mask |= e->filter;
-       d->max_mask_subdir |= e->subdir_filter;
+       if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
+               DEBUG(1, ("Invalid value.dsize = %u\n",
+                         (unsigned)value.dsize));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+       num_entries = value.dsize / sizeof(struct notify_db_entry);
+       entries = (struct notify_db_entry *)value.dptr;
 
-       TYPESAFE_QSORT(d->entries, d->num_entries, notify_compare);
+       for (i=0; i<num_entries; i++) {
+               struct notify_db_entry *e = &entries[i];
 
-       /* recalculate the maximum masks */
-       d->max_mask = 0;
-       d->max_mask_subdir = 0;
+               if (DEBUGLEVEL >= 10) {
+                       NDR_PRINT_DEBUG(notify_db_entry, e);
+               }
 
-       for (i=0;i<d->num_entries;i++) {
-               d->max_mask |= d->entries[i].filter;
-               d->max_mask_subdir |= d->entries[i].subdir_filter;
+               if (e->private_data != private_data) {
+                       continue;
+               }
+               if (procid_equal(&e->server, pid)) {
+                       break;
+               }
+       }
+       if (i == num_entries) {
+               return NT_STATUS_NOT_FOUND;
        }
+       entries[i] = entries[num_entries-1];
+       value.dsize -= sizeof(struct notify_db_entry);
 
-       return notify_save(notify, rec);
+       if (value.dsize == 0) {
+               now = time(NULL);
+               value.dptr = (uint8_t *)&now;
+               value.dsize = sizeof(now);
+       }
+       return dbwrap_record_store(rec, value, 0);
 }
 
-/*
-  Add a non-recursive watch
-*/
+struct notify_trigger_index_state {
+       TALLOC_CTX *mem_ctx;
+       uint32_t *vnns;
+       uint32_t my_vnn;
+       bool found_my_vnn;
+};
 
-static void notify_add_onelevel(struct notify_context *notify,
-                               struct notify_entry *e, void *private_data)
+static void notify_trigger_index_parser(TDB_DATA key, TDB_DATA data,
+                                       void *private_data)
 {
-       struct notify_entry_array *array;
-       struct db_record *rec;
-       DATA_BLOB blob;
-       TDB_DATA dbuf;
-       TDB_DATA value;
-       enum ndr_err_code ndr_err;
-       NTSTATUS status;
-
-       array = talloc_zero(talloc_tos(), struct notify_entry_array);
-       if (array == NULL) {
+       struct notify_trigger_index_state *state =
+               (struct notify_trigger_index_state *)private_data;
+       uint32_t *new_vnns;
+       size_t i, num_vnns, num_new_vnns;
+
+       if ((data.dsize % sizeof(uint32_t)) != 0) {
+               DEBUG(1, ("Invalid record size in notify index db: %u\n",
+                         (unsigned)data.dsize));
                return;
        }
+       new_vnns = (uint32_t *)data.dptr;
+       num_new_vnns = data.dsize / sizeof(uint32_t);
 
-       rec = dbwrap_fetch_locked(notify->db_onelevel, array,
-                                 make_tdb_data((uint8_t *)&e->dir_id,
-                                  sizeof(e->dir_id)));
-       if (rec == NULL) {
-               DEBUG(10, ("notify_add_onelevel: fetch_locked for %s failed"
-                          "\n", file_id_string_tos(&e->dir_id)));
-               TALLOC_FREE(array);
-               return;
-       }
+       num_vnns = talloc_array_length(state->vnns);
 
-       value = dbwrap_record_get_value(rec);
-       blob.data = (uint8_t *)value.dptr;
-       blob.length = value.dsize;
-
-       if (blob.length > 0) {
-               ndr_err = ndr_pull_struct_blob(&blob, array, array,
-                       (ndr_pull_flags_fn_t)ndr_pull_notify_entry_array);
-               if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-                       DEBUG(10, ("ndr_pull_notify_entry_array failed: %s\n",
-                                  ndr_errstr(ndr_err)));
-                       TALLOC_FREE(array);
-                       return;
-               }
-               if (DEBUGLEVEL >= 10) {
-                       DEBUG(10, ("notify_add_onelevel:\n"));
-                       NDR_PRINT_DEBUG(notify_entry_array, array);
+       for (i=0; i<num_new_vnns; i++) {
+               if (new_vnns[i] == state->my_vnn) {
+                       state->found_my_vnn = true;
                }
        }
 
-       array->entries = talloc_realloc(array, array->entries,
-                                       struct notify_entry,
-                                       array->num_entries+1);
-       if (array->entries == NULL) {
-               TALLOC_FREE(array);
+       state->vnns = talloc_realloc(state->mem_ctx, state->vnns, uint32_t,
+                                    num_vnns + num_new_vnns);
+       if ((num_vnns + num_new_vnns != 0) && (state->vnns == NULL)) {
+               DEBUG(1, ("talloc_realloc failed\n"));
                return;
        }
-       array->entries[array->num_entries] = *e;
-       array->entries[array->num_entries].private_data = private_data;
-       array->entries[array->num_entries].server = notify->server;
-       array->num_entries += 1;
+       memcpy(&state->vnns[num_vnns], data.dptr, data.dsize);
+}
 
-       ndr_err = ndr_push_struct_blob(&blob, rec, array,
-               (ndr_push_flags_fn_t)ndr_push_notify_entry_array);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               DEBUG(10, ("ndr_push_notify_entry_array failed: %s\n",
-                          ndr_errstr(ndr_err)));
-               TALLOC_FREE(array);
-               return;
+static int vnn_cmp(const void *p1, const void *p2)
+{
+       uint32_t *vnn1 = (uint32_t *)p1;
+       uint32_t *vnn2 = (uint32_t *)p2;
+
+       if (*vnn1 < *vnn2) {
+               return -1;
        }
+       if (*vnn1 == *vnn2) {
+               return 0;
+       }
+       return 1;
+}
+
+static bool notify_push_remote_blob(TALLOC_CTX *mem_ctx, uint32_t action,
+                                   uint32_t filter, const char *path,
+                                   uint8_t **pblob, size_t *pblob_len)
+{
+       struct notify_remote_event ev;
+       DATA_BLOB data;
+       enum ndr_err_code ndr_err;
+
+       ev.action = action;
+       ev.filter = filter;
+       ev.path = path;
 
        if (DEBUGLEVEL >= 10) {
-               DEBUG(10, ("notify_add_onelevel:\n"));
-               NDR_PRINT_DEBUG(notify_entry_array, array);
+               NDR_PRINT_DEBUG(notify_remote_event, &ev);
        }
 
-       dbuf.dptr = blob.data;
-       dbuf.dsize = blob.length;
-
-       status = dbwrap_record_store(rec, dbuf, TDB_REPLACE);
-       TALLOC_FREE(array);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(10, ("notify_add_onelevel: store failed: %s\n",
-                          nt_errstr(status)));
-               return;
+       ndr_err = ndr_push_struct_blob(
+               &data, mem_ctx, &ev,
+               (ndr_push_flags_fn_t)ndr_push_notify_remote_event);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               return false;
        }
-       e->filter = 0;
-       return;
+       *pblob = data.data;
+       *pblob_len = data.length;
+       return true;
 }
 
+static bool notify_pull_remote_blob(TALLOC_CTX *mem_ctx,
+                                   const uint8_t *blob, size_t blob_len,
+                                   uint32_t *paction, uint32_t *pfilter,
+                                   char **path)
+{
+       struct notify_remote_event *ev;
+       enum ndr_err_code ndr_err;
+       DATA_BLOB data;
 
-/*
-  add a notify watch. This is called when a notify is first setup on a open
-  directory handle.
-*/
-NTSTATUS notify_add(struct notify_context *notify, connection_struct *conn,
-                   struct notify_entry *e0,
-                   void (*callback)(void *, const struct notify_event *),
-                   void *private_data)
+       data.data = discard_const_p(uint8_t, blob);
+       data.length = blob_len;
+
+       ev = talloc(mem_ctx, struct notify_remote_event);
+       if (ev == NULL) {
+               return false;
+       }
+
+       ndr_err = ndr_pull_struct_blob(
+               &data, ev, ev,
+               (ndr_pull_flags_fn_t)ndr_pull_notify_remote_event);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               TALLOC_FREE(ev);
+               return false;
+       }
+       if (DEBUGLEVEL >= 10) {
+               NDR_PRINT_DEBUG(notify_remote_event, ev);
+       }
+       *paction = ev->action;
+       *pfilter = ev->filter;
+       *path = talloc_move(mem_ctx, (char **)&ev->path);
+
+       TALLOC_FREE(ev);
+       return true;
+}
+
+void notify_trigger(struct notify_context *notify,
+                   uint32_t action, uint32_t filter, const char *path)
 {
-       struct notify_entry e = *e0;
-       NTSTATUS status;
-       struct notify_list *listel;
-       int depth;
+       struct ctdbd_connection *ctdbd_conn;
+       struct notify_trigger_index_state idx_state;
+       const char *p, *next_p;
+       size_t i, num_vnns;
+       uint32_t last_vnn;
+       uint8_t *remote_blob = NULL;
+       size_t remote_blob_len = 0;
+
+       DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
+                  "path=%s\n", (unsigned)action, (unsigned)filter, path));
 
        /* see if change notify is enabled at all */
        if (notify == NULL) {
-               return NT_STATUS_NOT_IMPLEMENTED;
+               return;
        }
 
-       depth = count_chars(e.path, '/');
+       idx_state.mem_ctx = talloc_tos();
+       idx_state.vnns = NULL;
+       idx_state.my_vnn = get_my_vnn();
 
-       listel = talloc_zero(notify, struct notify_list);
-       if (listel == NULL) {
-               status = NT_STATUS_NO_MEMORY;
+       for (p = path; p != NULL; p = next_p) {
+               ptrdiff_t path_len = p - path;
+               bool recursive;
+
+               next_p = strchr(p+1, '/');
+               recursive = (next_p != NULL);
+
+               idx_state.found_my_vnn = false;
+
+               dbwrap_parse_record(
+                       notify->db_index,
+                       make_tdb_data((uint8_t *)path, path_len),
+                       notify_trigger_index_parser, &idx_state);
+
+               if (!idx_state.found_my_vnn) {
+                       continue;
+               }
+               notify_trigger_local(notify, action, filter,
+                                    path, path_len, recursive);
+       }
+
+       ctdbd_conn = messaging_ctdbd_connection();
+       if (ctdbd_conn == NULL) {
                goto done;
        }
 
-       listel->private_data = private_data;
-       listel->callback = callback;
-       listel->depth = depth;
-       DLIST_ADD(notify->list, listel);
+       num_vnns = talloc_array_length(idx_state.vnns);
+       qsort(idx_state.vnns, num_vnns, sizeof(uint32_t), vnn_cmp);
 
-       if (e.filter != 0) {
-               notify_add_onelevel(notify, &e, private_data);
-               status = NT_STATUS_OK;
-       }
+       last_vnn = 0xffffffff;
+       remote_blob = NULL;
 
-       /* if the system notify handler couldn't handle some of the
-          filter bits, or couldn't handle a request for recursion
-          then we need to install it in the array used for the
-          intra-samba notify handling */
-       if (e.filter != 0 || e.subdir_filter != 0) {
-               struct db_record *rec;
+       for (i=0; i<num_vnns; i++) {
+               uint32_t vnn = idx_state.vnns[i];
+               NTSTATUS status;
 
-               status = notify_fetch_locked(notify, &rec);
-               if (!NT_STATUS_IS_OK(status)) {
-                       goto done;
+               if (vnn == last_vnn) {
+                       continue;
+               }
+               if (vnn == idx_state.my_vnn) {
+                       continue;
+               }
+               if ((remote_blob == NULL) &&
+                   !notify_push_remote_blob(
+                           talloc_tos(), action, filter,
+                           path, &remote_blob, &remote_blob_len)) {
+                       break;
                }
-               status = notify_load(notify, rec);
+
+               status = ctdbd_messaging_send_blob(
+                       ctdbd_conn, vnn, CTDB_SRVID_SAMBA_NOTIFY_PROXY,
+                       remote_blob, remote_blob_len);
                if (!NT_STATUS_IS_OK(status)) {
-                       TALLOC_FREE(rec);
-                       goto done;
+                       DEBUG(10, ("ctdbd_messaging_send_blob to vnn %d "
+                                  "returned %s, ignoring\n", (int)vnn,
+                                  nt_errstr(status)));
                }
-               status = notify_add_array(notify, rec, &e, private_data,
-                                         depth);
-               TALLOC_FREE(rec);
+
+               last_vnn = vnn;
        }
-       status = NT_STATUS_OK;
+
 done:
-       return status;
+       TALLOC_FREE(remote_blob);
+       TALLOC_FREE(idx_state.vnns);
 }
 
-NTSTATUS notify_remove_onelevel(struct notify_context *notify,
-                               const struct file_id *fid,
-                               void *private_data)
+static void notify_trigger_local(struct notify_context *notify,
+                                uint32_t action, uint32_t filter,
+                                const char *path, size_t path_len,
+                                bool recursive)
 {
-       struct notify_entry_array *array;
-       struct db_record *rec;
-       DATA_BLOB blob;
-       TDB_DATA dbuf;
-       TDB_DATA value;
-       enum ndr_err_code ndr_err;
+       TDB_DATA data;
+       struct notify_db_entry *entries;
+       size_t i, num_entries;
        NTSTATUS status;
-       int i;
 
-       if (notify == NULL) {
-               return NT_STATUS_NOT_IMPLEMENTED;
-       }
+       DEBUG(10, ("notify_trigger_local called for %*s, path_len=%d, "
+                  "filter=%d\n", (int)path_len, path, (int)path_len,
+                  (int)filter));
 
-       array = talloc_zero(talloc_tos(), struct notify_entry_array);
-       if (array == NULL) {
-               return NT_STATUS_NO_MEMORY;
+       status = dbwrap_fetch(
+               notify->db_notify, talloc_tos(),
+               make_tdb_data((uint8_t *)path, path_len), &data);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("dbwrap_fetch returned %s\n",
+                          nt_errstr(status)));
+               return;
        }
-
-       rec = dbwrap_fetch_locked(
-               notify->db_onelevel, array,
-               make_tdb_data((const uint8_t *)fid, sizeof(*fid)));
-       if (rec == NULL) {
-               DEBUG(10, ("notify_remove_onelevel: fetch_locked for %s failed"
-                          "\n", file_id_string_tos(fid)));
-               TALLOC_FREE(array);
-               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       if (data.dsize == sizeof(time_t)) {
+               DEBUG(10, ("Got deleted record\n"));
+               goto done;
+       }
+       if ((data.dsize % sizeof(struct notify_db_entry)) != 0) {
+               DEBUG(1, ("Invalid data.dsize = %u\n",
+                         (unsigned)data.dsize));
+               goto done;
        }
 
-       value = dbwrap_record_get_value(rec);
-       blob.data = (uint8_t *)value.dptr;
-       blob.length = value.dsize;
-
-       if (blob.length > 0) {
-               ndr_err = ndr_pull_struct_blob(&blob, array, array,
-                       (ndr_pull_flags_fn_t)ndr_pull_notify_entry_array);
-               if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-                       DEBUG(10, ("ndr_pull_notify_entry_array failed: %s\n",
-                                  ndr_errstr(ndr_err)));
-                       TALLOC_FREE(array);
-                       return ndr_map_error2ntstatus(ndr_err);
-               }
+       entries = (struct notify_db_entry *)data.dptr;
+       num_entries = data.dsize / sizeof(struct notify_db_entry);
+
+       DEBUG(10, ("recursive = %s pathlen=%d (%c)\n",
+                  recursive ? "true" : "false", (int)path_len,
+                  path[path_len]));
+
+       for (i=0; i<num_entries; i++) {
+               struct notify_db_entry *e = &entries[i];
+               uint32_t e_filter;
+
                if (DEBUGLEVEL >= 10) {
-                       DEBUG(10, ("notify_remove_onelevel:\n"));
-                       NDR_PRINT_DEBUG(notify_entry_array, array);
+                       NDR_PRINT_DEBUG(notify_db_entry, e);
                }
-       }
 
-       for (i=0; i<array->num_entries; i++) {
-               if ((private_data == array->entries[i].private_data) &&
-                   procid_equal(&notify->server, &array->entries[i].server)) {
-                       break;
-               }
-       }
+               e_filter = recursive ? e->subdir_filter : e->filter;
 
-       if (i == array->num_entries) {
-               TALLOC_FREE(array);
-               return NT_STATUS_OBJECT_NAME_NOT_FOUND;
-       }
+               if ((filter & e_filter) == 0) {
+                       continue;
+               }
 
-       array->entries[i] = array->entries[array->num_entries-1];
-       array->num_entries -= 1;
+               if (!procid_is_local(&e->server)) {
+                       DEBUG(1, ("internal error: Non-local pid %s in "
+                                 "notify.tdb\n",
+                                 procid_str_static(&e->server)));
+                       continue;
+               }
 
-       if (array->num_entries == 0) {
-               dbwrap_record_delete(rec);
-               TALLOC_FREE(array);
-               return NT_STATUS_OK;
+               status = notify_send(notify, &e->server, path + path_len + 1,
+                                    action, e->private_data);
+               if (!NT_STATUS_IS_OK(status)) {
+                       DEBUG(10, ("notify_send returned %s\n",
+                                  nt_errstr(status)));
+               }
        }
 
-       ndr_err = ndr_push_struct_blob(&blob, rec, array,
-               (ndr_push_flags_fn_t)ndr_push_notify_entry_array);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               DEBUG(10, ("ndr_push_notify_entry_array failed: %s\n",
-                          ndr_errstr(ndr_err)));
-               TALLOC_FREE(array);
-               return ndr_map_error2ntstatus(ndr_err);
-       }
+done:
+       TALLOC_FREE(data.dptr);
+}
 
-       if (DEBUGLEVEL >= 10) {
-               DEBUG(10, ("notify_add_onelevel:\n"));
-               NDR_PRINT_DEBUG(notify_entry_array, array);
-       }
+static NTSTATUS notify_send(struct notify_context *notify,
+                           struct server_id *pid,
+                           const char *path, uint32_t action,
+                           void *private_data)
+{
+       struct notify_event ev;
+       DATA_BLOB data;
+       NTSTATUS status;
+       enum ndr_err_code ndr_err;
 
-       dbuf.dptr = blob.data;
-       dbuf.dsize = blob.length;
+       ev.action = action;
+       ev.path = path;
+       ev.private_data = private_data;
 
-       status = dbwrap_record_store(rec, dbuf, TDB_REPLACE);
-       TALLOC_FREE(array);
-       if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(10, ("notify_add_onelevel: store failed: %s\n",
-                          nt_errstr(status)));
-               return status;
+       ndr_err = ndr_push_struct_blob(
+               &data, talloc_tos(), &ev,
+               (ndr_push_flags_fn_t)ndr_push_notify_event);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               return ndr_map_error2ntstatus(ndr_err);
        }
-       return NT_STATUS_OK;
+       status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
+                               &data);
+       TALLOC_FREE(data.data);
+       return status;
 }
 
-/*
-  remove a notify watch. Called when the directory handle is closed
-*/
-NTSTATUS notify_remove(struct notify_context *notify, void *private_data)
+static void notify_handler(struct messaging_context *msg_ctx,
+                          void *private_data, uint32_t msg_type,
+                          struct server_id server_id, DATA_BLOB *data)
 {
-       NTSTATUS status;
+       struct notify_context *notify = talloc_get_type_abort(
+               private_data, struct notify_context);
+       enum ndr_err_code ndr_err;
+       struct notify_event *n;
        struct notify_list *listel;
-       int i, depth;
-       struct notify_depth *d;
-       struct db_record *rec;
 
-       /* see if change notify is enabled at all */
-       if (notify == NULL) {
-               return NT_STATUS_NOT_IMPLEMENTED;
+       n = talloc(talloc_tos(), struct notify_event);
+       if (n == NULL) {
+               DEBUG(1, ("talloc failed\n"));
+               return;
+       }
+
+       ndr_err = ndr_pull_struct_blob(
+               data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
+       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
+               TALLOC_FREE(n);
+               return;
+       }
+       if (DEBUGLEVEL >= 10) {
+               NDR_PRINT_DEBUG(notify_event, n);
        }
 
        for (listel=notify->list;listel;listel=listel->next) {
-               if (listel->private_data == private_data) {
-                       DLIST_REMOVE(notify->list, listel);
+               if (listel->private_data == n->private_data) {
+                       listel->callback(listel->private_data, n);
                        break;
                }
        }
-       if (listel == NULL) {
-               return NT_STATUS_OBJECT_NAME_NOT_FOUND;
-       }
+       TALLOC_FREE(n);
+}
 
-       depth = listel->depth;
+struct notify_walk_idx_state {
+       void (*fn)(const char *path,
+                  uint32_t *vnns, size_t num_vnns,
+                  void *private_data);
+       void *private_data;
+};
 
-       talloc_free(listel);
+static int notify_walk_idx_fn(struct db_record *rec, void *private_data)
+{
+       struct notify_walk_idx_state *state =
+               (struct notify_walk_idx_state *)private_data;
+       TDB_DATA key, value;
+       char *path;
 
-       status = notify_fetch_locked(notify, &rec);
-       NT_STATUS_NOT_OK_RETURN(status);
+       key = dbwrap_record_get_key(rec);
+       value = dbwrap_record_get_value(rec);
 
-       status = notify_load(notify, rec);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(rec);
-               return status;
+       if ((value.dsize % sizeof(uint32_t)) != 0) {
+               DEBUG(1, ("invalid value size in notify index db: %u\n",
+                         (unsigned)(value.dsize)));
+               return 0;
        }
 
-       if (depth >= notify->array->num_depths) {
-               talloc_free(rec);
-               return NT_STATUS_OBJECT_NAME_NOT_FOUND;
+       path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
+       if (path == NULL) {
+               DEBUG(1, ("talloc_strndup failed\n"));
+               return 0;
        }
+       state->fn(path, (uint32_t *)value.dptr, value.dsize/sizeof(uint32_t),
+                 state->private_data);
+       TALLOC_FREE(path);
+       return 0;
+}
 
-       /* we only have to search at the depth of this element */
-       d = &notify->array->depth[depth];
+void notify_walk_idx(struct notify_context *notify,
+                    void (*fn)(const char *path,
+                               uint32_t *vnns, size_t num_vnns,
+                               void *private_data),
+                    void *private_data)
+{
+       struct notify_walk_idx_state state;
+       state.fn = fn;
+       state.private_data = private_data;
+       dbwrap_traverse_read(notify->db_index, notify_walk_idx_fn, &state,
+                            NULL);
+}
 
-       for (i=0;i<d->num_entries;i++) {
-               if (private_data == d->entries[i].private_data &&
-                   procid_equal(&notify->server, &d->entries[i].server)) {
-                       break;
+struct notify_walk_state {
+       void (*fn)(const char *path,
+                  struct notify_db_entry *entries, size_t num_entries,
+                  time_t deleted_time, void *private_data);
+       void *private_data;
+};
+
+static int notify_walk_fn(struct db_record *rec, void *private_data)
+{
+       struct notify_walk_state *state =
+               (struct notify_walk_state *)private_data;
+       TDB_DATA key, value;
+       struct notify_db_entry *entries;
+       size_t num_entries;
+       time_t deleted_time;
+       char *path;
+
+       key = dbwrap_record_get_key(rec);
+       value = dbwrap_record_get_value(rec);
+
+       if (value.dsize == sizeof(deleted_time)) {
+               memcpy(&deleted_time, value.dptr, sizeof(deleted_time));
+               entries = NULL;
+               num_entries = 0;
+       } else {
+               if ((value.dsize % sizeof(struct notify_db_entry)) != 0) {
+                       DEBUG(1, ("invalid value size in notify db: %u\n",
+                                 (unsigned)(value.dsize)));
+                       return 0;
                }
-       }
-       if (i == d->num_entries) {
-               talloc_free(rec);
-               return NT_STATUS_OBJECT_NAME_NOT_FOUND;
+               entries = (struct notify_db_entry *)value.dptr;
+               num_entries = value.dsize / sizeof(struct notify_db_entry);
+               deleted_time = 0;
        }
 
-       if (i < d->num_entries-1) {
-               memmove(&d->entries[i], &d->entries[i+1],
-                       sizeof(d->entries[i])*(d->num_entries-(i+1)));
+       path = talloc_strndup(talloc_tos(), (char *)key.dptr, key.dsize);
+       if (path == NULL) {
+               DEBUG(1, ("talloc_strndup failed\n"));
+               return 0;
        }
-       d->num_entries--;
-
-       status = notify_save(notify, rec);
-
-       talloc_free(rec);
-
-       return status;
+       state->fn(path, entries, num_entries, deleted_time,
+                 state->private_data);
+       TALLOC_FREE(path);
+       return 0;
 }
 
-/*
-  remove all notify watches for a messaging server
-*/
-static NTSTATUS notify_remove_all(struct notify_context *notify,
-                                 const struct server_id *server)
+void notify_walk(struct notify_context *notify,
+                void (*fn)(const char *path,
+                           struct notify_db_entry *entries,
+                           size_t num_entries,
+                           time_t deleted_time, void *private_data),
+                void *private_data)
 {
-       NTSTATUS status;
-       int i, depth, del_count=0;
-       struct db_record *rec;
+       struct notify_walk_state state;
+       state.fn = fn;
+       state.private_data = private_data;
+       dbwrap_traverse_read(notify->db_notify, notify_walk_fn, &state,
+                            NULL);
+}
 
-       status = notify_fetch_locked(notify, &rec);
-       NT_STATUS_NOT_OK_RETURN(status);
+struct notify_cleanup_state {
+       TALLOC_CTX *mem_ctx;
+       time_t delete_before;
+       ssize_t array_size;
+       uint32_t num_paths;
+       char **paths;
+};
 
-       status = notify_load(notify, rec);
-       if (!NT_STATUS_IS_OK(status)) {
-               talloc_free(rec);
-               return status;
-       }
+static void notify_cleanup_collect(
+       const char *path, struct notify_db_entry *entries, size_t num_entries,
+       time_t deleted_time, void *private_data)
+{
+       struct notify_cleanup_state *state =
+               (struct notify_cleanup_state *)private_data;
+       char *p;
 
-       /* we have to search for all entries across all depths, looking for matches
-          for the server id */
-       for (depth=0;depth<notify->array->num_depths;depth++) {
-               struct notify_depth *d = &notify->array->depth[depth];
-               for (i=0;i<d->num_entries;i++) {
-                       if (procid_equal(server, &d->entries[i].server)) {
-                               if (i < d->num_entries-1) {
-                                       memmove(&d->entries[i], &d->entries[i+1],
-                                               sizeof(d->entries[i])*(d->num_entries-(i+1)));
-                               }
-                               i--;
-                               d->num_entries--;
-                               del_count++;
-                       }
-               }
+       if (num_entries != 0) {
+               return;
        }
-
-       if (del_count > 0) {
-               status = notify_save(notify, rec);
+       if (deleted_time >= state->delete_before) {
+               return;
        }
 
-       talloc_free(rec);
-
-       return status;
+       p = talloc_strdup(state->mem_ctx, path);
+       if (p == NULL) {
+               DEBUG(1, ("talloc_strdup failed\n"));
+               return;
+       }
+       add_to_large_array(state->mem_ctx, sizeof(p), (void *)&p,
+                          &state->paths, &state->num_paths,
+                          &state->array_size);
+       if (state->array_size == -1) {
+               TALLOC_FREE(p);
+       }
 }
 
+static bool notify_cleanup_path(struct notify_context *notify,
+                             const char *path, time_t delete_before);
 
-/*
-  send a notify message to another messaging server
-*/
-static NTSTATUS notify_send(struct notify_context *notify, struct notify_entry *e,
-                           const char *path, uint32_t action)
+void notify_cleanup(struct notify_context *notify)
 {
-       struct notify_event ev;
-       DATA_BLOB data;
-       NTSTATUS status;
-       enum ndr_err_code ndr_err;
+       struct notify_cleanup_state state;
+       uint32_t failure_pool;
 
-       ev.action = action;
-       ev.path = path;
-       ev.private_data = e->private_data;
+       ZERO_STRUCT(state);
+       state.mem_ctx = talloc_stackframe();
 
-       ndr_err = ndr_push_struct_blob(&data, talloc_tos(), &ev,
-                                      (ndr_push_flags_fn_t)ndr_push_notify_event);
-       if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-               return ndr_map_error2ntstatus(ndr_err);
-       }
+       state.delete_before = time(NULL)
+               - lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
 
-       status = messaging_send(notify->messaging_ctx, e->server,
-                               MSG_PVFS_NOTIFY, &data);
-       TALLOC_FREE(data.data);
-       return status;
+       notify_walk(notify, notify_cleanup_collect, &state);
+
+       failure_pool = state.num_paths;
+
+       while (state.num_paths != 0) {
+               size_t idx;
+
+               /*
+                * This loop is designed to be as kind as possible to
+                * ctdb. ctdb does not like it if many smbds hammer on a
+                * single record. If on many nodes the cleanup process starts
+                * running, it can happen that all of them need to clean up
+                * records in the same order. This would generate a ctdb
+                * migrate storm on these records. Randomizing the load across
+                * multiple records reduces the load on the individual record.
+                */
+
+               generate_random_buffer((uint8_t *)&idx, sizeof(idx));
+               idx = idx % state.num_paths;
+
+               if (!notify_cleanup_path(notify, state.paths[idx],
+                                        state.delete_before)) {
+                       /*
+                        * notify_cleanup_path failed, the most likely reason
+                        * is that dbwrap_try_fetch_locked failed due to
+                        * contention. We allow one failed attempt per deleted
+                        * path on average before we give up.
+                        */
+                       failure_pool -= 1;
+                       if (failure_pool == 0) {
+                               /*
+                                * Too many failures. We will come back here,
+                                * maybe next time there is less contention.
+                                */
+                               break;
+                       }
+               }
+
+               TALLOC_FREE(state.paths[idx]);
+               state.paths[idx] = state.paths[state.num_paths-1];
+               state.num_paths -= 1;
+       }
+       TALLOC_FREE(state.mem_ctx);
 }
 
-void notify_onelevel(struct notify_context *notify, uint32_t action,
-                    uint32_t filter, struct file_id fid, const char *name)
+static bool notify_cleanup_path(struct notify_context *notify,
+                               const char *path, time_t delete_before)
 {
-       struct notify_entry_array *array;
-       TDB_DATA dbuf;
-       DATA_BLOB blob;
-       bool have_dead_entries = false;
-       int i;
+       struct db_record *notify_rec = NULL;
+       struct db_record *idx_rec = NULL;
+       TDB_DATA key = string_tdb_data(path);
+       TDB_DATA value;
+       time_t deleted;
        NTSTATUS status;
 
-       if (notify == NULL) {
-               return;
+       notify_rec = dbwrap_fetch_locked(notify->db_notify, talloc_tos(), key);
+       if (notify_rec == NULL) {
+               DEBUG(10, ("Could not fetch notify_rec\n"));
+               return false;
        }
+       value = dbwrap_record_get_value(notify_rec);
 
-       array = talloc_zero(talloc_tos(), struct notify_entry_array);
-       if (array == NULL) {
-               return;
+       if (value.dsize != sizeof(deleted)) {
+               DEBUG(10, ("record %s has been re-used\n", path));
+               goto done;
        }
+       memcpy(&deleted, value.dptr, sizeof(deleted));
 
-       status = dbwrap_fetch(notify->db_onelevel, array,
-                             make_tdb_data((uint8_t *)&fid, sizeof(fid)),
-                             &dbuf);
-       if (!NT_STATUS_IS_OK(status)) {
-               TALLOC_FREE(array);
-               return;
+       if (deleted >= delete_before) {
+               DEBUG(10, ("record %s too young\n", path));
+               goto done;
        }
 
-       blob.data = (uint8 *)dbuf.dptr;
-       blob.length = dbuf.dsize;
-
-       if (blob.length > 0) {
-               enum ndr_err_code ndr_err;
-               ndr_err = ndr_pull_struct_blob(&blob, array, array,
-                       (ndr_pull_flags_fn_t)ndr_pull_notify_entry_array);
-               if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-                       DEBUG(10, ("ndr_pull_notify_entry_array failed: %s\n",
-                                  ndr_errstr(ndr_err)));
-                       TALLOC_FREE(array);
-                       return;
-               }
-               if (DEBUGLEVEL >= 10) {
-                       DEBUG(10, ("notify_onelevel:\n"));
-                       NDR_PRINT_DEBUG(notify_entry_array, array);
-               }
+       /*
+        * Be kind to ctdb and only try one dmaster migration at most.
+        */
+       idx_rec = dbwrap_try_fetch_locked(notify->db_index, talloc_tos(), key);
+       if (idx_rec == NULL) {
+               DEBUG(10, ("Could not fetch idx_rec\n"));
+               goto done;
        }
 
-       for (i=0; i<array->num_entries; i++) {
-               struct notify_entry *e = &array->entries[i];
+       status = dbwrap_record_delete(notify_rec);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("Could not delete notify_rec: %s\n",
+                          nt_errstr(status)));
+       }
 
-               if ((e->filter & filter) != 0) {
-                       status = notify_send(notify, e, name, action);
-                       if (NT_STATUS_EQUAL(
-                                   status, NT_STATUS_INVALID_HANDLE)) {
-                               /*
-                                * Mark the entry as dead. All entries have a
-                                * path set. The marker used here is setting
-                                * that to NULL.
-                                */
-                               e->path = NULL;
-                               have_dead_entries = true;
-                       }
-               }
+       status = notify_del_idx(idx_rec, get_my_vnn());
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(10, ("Could not delete idx_rec: %s\n",
+                          nt_errstr(status)));
        }
 
-       if (!have_dead_entries) {
-               TALLOC_FREE(array);
-               return;
+done:
+       TALLOC_FREE(idx_rec);
+       TALLOC_FREE(notify_rec);
+       return true;
+}
+
+static NTSTATUS notify_del_idx(struct db_record *rec, uint32_t vnn)
+{
+       TDB_DATA value = dbwrap_record_get_value(rec);
+       uint32_t *vnns;
+       size_t i, num_vnns;
+
+       if ((value.dsize % sizeof(uint32_t)) != 0) {
+               DEBUG(1, ("Invalid value.dsize = %u\n",
+                         (unsigned)value.dsize));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
        }
+       num_vnns = value.dsize / sizeof(uint32_t);
+       vnns = (uint32_t *)value.dptr;
 
-       for (i=0; i<array->num_entries; i++) {
-               struct notify_entry *e = &array->entries[i];
-               if (e->path != NULL) {
-                       continue;
+       for (i=0; i<num_vnns; i++) {
+               if (vnns[i] == vnn) {
+                       break;
                }
-               DEBUG(10, ("Deleting notify entries for process %s because "
-                          "it's gone\n", procid_str_static(&e->server)));
+       }
+
+       if (i == num_vnns) {
                /*
-                * Potential TODO: This might need optimizing,
-                * notify_remove_onelevel() does a fetch_locked() operation at
-                * every call. But this would only matter if a process with
-                * MANY notifies has died without shutting down properly.
+                * Not found. Should not happen, but okay...
                 */
-               notify_remove_onelevel(notify, &e->dir_id, e->private_data);
+               return NT_STATUS_OK;
        }
 
-       TALLOC_FREE(array);
-       return;
+       memmove(&vnns[i], &vnns[i+1], sizeof(uint32_t) * (num_vnns - i - 1));
+       value.dsize -= sizeof(uint32_t);
+
+       if (value.dsize == 0) {
+               return dbwrap_record_delete(rec);
+       }
+       return dbwrap_record_store(rec, value, 0);
 }
 
-/*
-  trigger a notify message for anyone waiting on a matching event
+struct notify_cluster_proxy_state {
+       struct tevent_context *ev;
+       struct notify_context *notify;
+       struct ctdb_msg_channel *chan;
+};
 
-  This function is called a lot, and needs to be very fast. The unusual data structure
-  and traversal is designed to be fast in the average case, even for large numbers of
-  notifies
-*/
-void notify_trigger(struct notify_context *notify,
-                   uint32_t action, uint32_t filter, const char *path)
+static void notify_cluster_proxy_got_chan(struct tevent_req *subreq);
+static void notify_cluster_proxy_got_msg(struct tevent_req *subreq);
+static void notify_cluster_proxy_trigger(struct notify_context *notify,
+                                        uint32_t action, uint32_t filter,
+                                        char *path);
+
+struct tevent_req *notify_cluster_proxy_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct notify_context *notify)
 {
-       NTSTATUS status;
-       int depth;
-       const char *p, *next_p;
+       struct tevent_req *req, *subreq;
+       struct notify_cluster_proxy_state *state;
 
-       DEBUG(10, ("notify_trigger called action=0x%x, filter=0x%x, "
-                  "path=%s\n", (unsigned)action, (unsigned)filter, path));
+       req = tevent_req_create(mem_ctx, &state,
+                               struct notify_cluster_proxy_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->notify = notify;
 
-       /* see if change notify is enabled at all */
-       if (notify == NULL) {
-               return;
+       subreq = ctdb_msg_channel_init_send(
+               state, state->ev,  lp_ctdbd_socket(),
+               CTDB_SRVID_SAMBA_NOTIFY_PROXY);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
        }
+       tevent_req_set_callback(subreq, notify_cluster_proxy_got_chan, req);
+       return req;
+}
 
- again:
-       status = notify_load(notify, NULL);
-       if (!NT_STATUS_IS_OK(status)) {
+static void notify_cluster_proxy_got_chan(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct notify_cluster_proxy_state *state = tevent_req_data(
+               req, struct notify_cluster_proxy_state);
+       int ret;
+
+       ret = ctdb_msg_channel_init_recv(subreq, state, &state->chan);
+       TALLOC_FREE(subreq);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
                return;
        }
+       subreq = ctdb_msg_read_send(state, state->ev, state->chan);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
+}
 
-       /* loop along the given path, working with each directory depth separately */
-       for (depth=0,p=path;
-            p && depth < notify->array->num_depths;
-            p=next_p,depth++) {
-               int p_len = p - path;
-               int min_i, max_i, i;
-               struct notify_depth *d = &notify->array->depth[depth];
-               uint32_t d_max_mask;
-               next_p = strchr(p+1, '/');
-
-               /* see if there are any entries at this depth */
-               if (d->num_entries == 0) continue;
-
-               /* try to skip based on the maximum mask. If next_p is
-                NULL then we know it will be a 'this directory'
-                match, otherwise it must be a subdir match */
-
-               d_max_mask = next_p ? d->max_mask_subdir : d->max_mask;
-
-               if ((filter & d_max_mask) == 0) {
-                       continue;
-               }
+static void notify_cluster_proxy_got_msg(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct notify_cluster_proxy_state *state = tevent_req_data(
+               req, struct notify_cluster_proxy_state);
+       uint8_t *msg;
+       size_t msg_len;
+       uint32_t action, filter;
+       char *path;
+       int ret;
+       bool res;
+
+       ret = ctdb_msg_read_recv(subreq, talloc_tos(), &msg, &msg_len);
+       TALLOC_FREE(subreq);
+       if (ret != 0) {
+               tevent_req_error(req, ret);
+               return;
+       }
 
-               /* we know there is an entry here worth looking
-                for. Use a bisection search to find the first entry
-                with a matching path */
-               min_i = 0;
-               max_i = d->num_entries-1;
-
-               while (min_i < max_i) {
-                       struct notify_entry *e;
-                       int cmp;
-                       i = (min_i+max_i)/2;
-                       e = &d->entries[i];
-                       cmp = strncmp(path, e->path, p_len);
-                       if (cmp == 0) {
-                               if (p_len == e->path_len) {
-                                       max_i = i;
-                               } else {
-                                       max_i = i-1;
-                               }
-                       } else if (cmp < 0) {
-                               max_i = i-1;
-                       } else {
-                               min_i = i+1;
-                       }
-               }
+       res = notify_pull_remote_blob(talloc_tos(), msg, msg_len,
+                                     &action, &filter, &path);
+       TALLOC_FREE(msg);
+       if (!res) {
+               tevent_req_error(req, EIO);
+               return;
+       }
+       notify_cluster_proxy_trigger(state->notify, action, filter, path);
+       TALLOC_FREE(path);
 
-               if (min_i != max_i) {
-                       /* none match */
-                       continue;
-               }
+       subreq = ctdb_msg_read_send(state, state->ev, state->chan);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq, notify_cluster_proxy_got_msg, req);
+}
 
-               /* we now know that the entries start at min_i */
-               for (i=min_i;i<d->num_entries;i++) {
-                       struct notify_entry *e = &d->entries[i];
-                       uint32_t e_filter;
-                       if (p_len != e->path_len ||
-                           strncmp(path, e->path, p_len) != 0) break;
+static void notify_cluster_proxy_trigger(struct notify_context *notify,
+                                        uint32_t action, uint32_t filter,
+                                        char *path)
+{
+       const char *p, *next_p;
 
-                       e_filter = next_p ? e->subdir_filter : e->filter;
+       for (p = path; p != NULL; p = next_p) {
+               ptrdiff_t path_len = p - path;
+               bool recursive;
 
-                       if ((filter & e_filter) == 0) {
-                               continue;
-                       }
+               next_p = strchr(p+1, '/');
+               recursive = (next_p != NULL);
 
-                       status = notify_send(notify, e, path + e->path_len + 1,
-                                            action);
+               notify_trigger_local(notify, action, filter,
+                                    path, path_len, recursive);
+       }
+}
 
-                       if (NT_STATUS_EQUAL(
-                                   status, NT_STATUS_INVALID_HANDLE)) {
-                               struct server_id server = e->server;
+int notify_cluster_proxy_recv(struct tevent_req *req)
+{
+       int err;
 
-                               DEBUG(10, ("Deleting notify entries for "
-                                          "process %s because it's gone\n",
-                                          procid_str_static(&e->server)));
-                               notify_remove_all(notify, &server);
-                               goto again;
-                       }
-               }
+       if (tevent_req_is_unix_error(req, &err)) {
+               return err;
        }
+       return 0;
 }
index a770c3a2a4d5030e78280c4029c00bd85fa8d55a..4300ee3bcaa03d33a79f4ad5af04c9654ab2870b 100644 (file)
@@ -540,19 +540,30 @@ NTSTATUS inotify_watch(struct sys_notify_context *ctx,
 struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
                                   struct messaging_context *messaging_ctx,
                                   struct event_context *ev);
-bool notify_internal_parent_init(TALLOC_CTX *mem_ctx);
-NTSTATUS notify_add(struct notify_context *notify, connection_struct *conn,
-                   struct notify_entry *e0,
+NTSTATUS notify_add(struct notify_context *notify,
+                   const char *path, uint32_t filter, uint32_t subdir_filter,
                    void (*callback)(void *, const struct notify_event *),
                    void *private_data);
 NTSTATUS notify_remove(struct notify_context *notify, void *private_data);
-NTSTATUS notify_remove_onelevel(struct notify_context *notify,
-                               const struct file_id *fid,
-                               void *private_data);
-void notify_onelevel(struct notify_context *notify, uint32_t action,
-                    uint32_t filter, struct file_id fid, const char *name);
 void notify_trigger(struct notify_context *notify,
                    uint32_t action, uint32_t filter, const char *path);
+void notify_walk_idx(struct notify_context *notify,
+                    void (*fn)(const char *path,
+                               uint32_t *vnns, size_t num_vnns,
+                               void *private_data),
+                    void *private_data);
+void notify_walk(struct notify_context *notify,
+                void (*fn)(const char *path,
+                           struct notify_db_entry *entries,
+                           size_t num_entries,
+                           time_t deleted_time, void *private_data),
+                void *private_data);
+void notify_cleanup(struct notify_context *notify);
+
+struct tevent_req *notify_cluster_proxy_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct notify_context *notify);
+int notify_cluster_proxy_recv(struct tevent_req *req);
 
 /* The following definitions come from smbd/ntquotas.c  */
 
index 2920bbd7fb1c4b80c3f1a15c7da3c96f85430f2e..912d7a129bdbf16735aa36ac2fe00ab712a83c4a 100644 (file)
@@ -41,6 +41,7 @@
 #include "smbprofile.h"
 #include "lib/id_cache.h"
 #include "lib/param/param.h"
+#include "lib/background.h"
 
 struct smbd_open_socket;
 struct smbd_child_pid;
@@ -269,6 +270,103 @@ static void smbd_parent_id_cache_delete(struct messaging_context *ctx,
        messaging_send_to_children(ctx, msg_type, msg_data);
 }
 
+struct smbd_parent_notify_state {
+       struct tevent_context *ev;
+       struct messaging_context *msg;
+       uint32_t msgtype;
+       struct notify_context *notify;
+};
+
+static int smbd_parent_notify_cleanup(void *private_data);
+static void smbd_parent_notify_cleanup_done(struct tevent_req *req);
+static void smbd_parent_notify_proxy_done(struct tevent_req *req);
+
+static bool smbd_parent_notify_init(TALLOC_CTX *mem_ctx,
+                                   struct messaging_context *msg,
+                                   struct tevent_context *ev)
+{
+       struct smbd_parent_notify_state *state;
+       struct tevent_req *req;
+
+       state = talloc(mem_ctx, struct smbd_parent_notify_state);
+       if (state == NULL) {
+               return NULL;
+       }
+       state->msg = msg;
+       state->ev = ev;
+       state->msgtype = MSG_SMB_NOTIFY_CLEANUP;
+
+       state->notify = notify_init(state, msg, ev);
+       if (state->notify == NULL) {
+               goto fail;
+       }
+       req = background_job_send(
+               state, state->ev, state->msg, &state->msgtype, 1,
+               lp_parm_int(-1, "smbd", "notify cleanup interval", 60),
+               smbd_parent_notify_cleanup, state->notify);
+       if (req == NULL) {
+               goto fail;
+       }
+       tevent_req_set_callback(req, smbd_parent_notify_cleanup_done, state);
+
+       if (!lp_clustering()) {
+               return true;
+       }
+
+       req = notify_cluster_proxy_send(state, ev, state->notify);
+       if (req == NULL) {
+               goto fail;
+       }
+       tevent_req_set_callback(req, smbd_parent_notify_proxy_done, state);
+
+       return true;
+fail:
+       TALLOC_FREE(state);
+       return false;
+}
+
+static int smbd_parent_notify_cleanup(void *private_data)
+{
+       struct notify_context *notify = talloc_get_type_abort(
+               private_data, struct notify_context);
+       notify_cleanup(notify);
+       return lp_parm_int(-1, "smbd", "notify cleanup interval", 60);
+}
+
+static void smbd_parent_notify_cleanup_done(struct tevent_req *req)
+{
+       struct smbd_parent_notify_state *state = tevent_req_callback_data(
+               req, struct smbd_parent_notify_state);
+       NTSTATUS status;
+
+       status = background_job_recv(req);
+       TALLOC_FREE(req);
+       DEBUG(1, ("notify cleanup job ended with %s\n", nt_errstr(status)));
+
+       /*
+        * Provide self-healing: Whatever the error condition was, it
+        * will have printed it into log.smbd. Just retrying and
+        * spamming log.smbd once a minute should be fine.
+        */
+       req = background_job_send(
+               state, state->ev, state->msg, &state->msgtype, 1, 60,
+               smbd_parent_notify_cleanup, state->notify);
+       if (req == NULL) {
+               DEBUG(1, ("background_job_send failed\n"));
+               return;
+       }
+       tevent_req_set_callback(req, smbd_parent_notify_cleanup_done, state);
+}
+
+static void smbd_parent_notify_proxy_done(struct tevent_req *req)
+{
+       int ret;
+
+       ret = notify_cluster_proxy_recv(req);
+       TALLOC_FREE(req);
+       DEBUG(1, ("notify proxy job ended with %s\n", strerror(ret)));
+}
+
 static void smb_parent_force_tdis(struct messaging_context *ctx,
                                  void* data,
                                  uint32_t msg_type,
@@ -1319,7 +1417,7 @@ extern void build_options(bool screen);
                exit(1);
        }
 
-       if (!notify_internal_parent_init(ev_ctx)) {
+       if (!smbd_parent_notify_init(NULL, msg_ctx, ev_ctx)) {
                exit(1);
        }