Merge git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw
authorLinus Torvalds <torvalds@woody.linux-foundation.org>
Wed, 7 Feb 2007 16:09:00 +0000 (08:09 -0800)
committerLinus Torvalds <torvalds@woody.linux-foundation.org>
Wed, 7 Feb 2007 16:09:00 +0000 (08:09 -0800)
* git://git.kernel.org/pub/scm/linux/kernel/git/steve/gfs2-2.6-nmw: (57 commits)
  [GFS2] make gfs2_writepages() static
  [GFS2] Unlock page on prepare_write try lock failure
  [GFS2] nfsd readdirplus assertion failure
  [DLM] fix softlockup in dlm_recv
  [DLM] zero new user lvbs
  [DLM/GFS2] indent help text
  [GFS2] Fix unlink deadlocks
  [GFS2] Put back semaphore to avoid umount problem
  [GFS2] more CURRENT_TIME_SEC
  [GFS2/DLM] fix GFS2 circular dependency
  [GFS2/DLM] use sysfs
  [GFS2] make lock_dlm drop_count tunable in sysfs
  [GFS2] increase default lock limit
  [GFS2] Fix list corruption in lops.c
  [GFS2] Fix recursive locking attempt with NFS
  [DLM] can miss clearing resend flag
  [DLM] saved dlm message can be dropped
  [DLM] Make sock_sem into a mutex
  [GFS2] Fix typo in glock.c
  [GFS2] use CURRENT_TIME_SEC instead of get_seconds in gfs2
  ...

39 files changed:
fs/dlm/Kconfig
fs/dlm/config.c
fs/dlm/config.h
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lockspace.c
fs/dlm/lowcomms-sctp.c
fs/dlm/lowcomms-tcp.c
fs/dlm/midcomms.c
fs/dlm/rcom.c
fs/dlm/recover.c
fs/dlm/recoverd.c
fs/dlm/user.c
fs/dlm/util.c
fs/gfs2/Kconfig
fs/gfs2/bmap.c
fs/gfs2/dir.c
fs/gfs2/dir.h
fs/gfs2/eattr.c
fs/gfs2/glock.c
fs/gfs2/glock.h
fs/gfs2/glops.c
fs/gfs2/incore.h
fs/gfs2/inode.c
fs/gfs2/lm.c
fs/gfs2/locking/dlm/lock_dlm.h
fs/gfs2/locking/dlm/main.c
fs/gfs2/locking/dlm/mount.c
fs/gfs2/locking/dlm/sysfs.c
fs/gfs2/lops.c
fs/gfs2/ops_address.c
fs/gfs2/ops_dentry.c
fs/gfs2/ops_export.c
fs/gfs2/ops_file.c
fs/gfs2/ops_inode.c
fs/gfs2/ops_super.c
fs/gfs2/ops_vm.c
fs/gfs2/super.c
fs/gfs2/sys.c

index b5654a284fef1283b1186d7e39dcc1e0b3426396..6fa7b0d5c04387b6ab7cf9b3f90f7dc21e21831a 100644 (file)
@@ -3,21 +3,21 @@ menu "Distributed Lock Manager"
 
 config DLM
        tristate "Distributed Lock Manager (DLM)"
-       depends on IPV6 || IPV6=n
+       depends on SYSFS && (IPV6 || IPV6=n)
        select CONFIGFS_FS
        select IP_SCTP if DLM_SCTP
        help
-       A general purpose distributed lock manager for kernel or userspace
-       applications.
+         A general purpose distributed lock manager for kernel or userspace
+         applications.
 
 choice
        prompt "Select DLM communications protocol"
        depends on DLM
        default DLM_TCP
        help
-       The DLM Can use TCP or SCTP for it's network communications.
-       SCTP supports multi-homed operations whereas TCP doesn't.
-       However, SCTP seems to have stability problems at the moment.
+         The DLM Can use TCP or SCTP for it's network communications.
+         SCTP supports multi-homed operations whereas TCP doesn't.
+         However, SCTP seems to have stability problems at the moment.
 
 config DLM_TCP
        bool "TCP/IP"
@@ -31,8 +31,8 @@ config DLM_DEBUG
        bool "DLM debugging"
        depends on DLM
        help
-       Under the debugfs mount point, the name of each lockspace will
-       appear as a file in the "dlm" directory.  The output is the
-       list of resource and locks the local node knows about.
+         Under the debugfs mount point, the name of each lockspace will
+         appear as a file in the "dlm" directory.  The output is the
+         list of resource and locks the local node knows about.
 
 endmenu
index 88553054bbfa4cd7a03efdf41036e75b54255d7a..8665c88e5af26c674bc9e066355193e99f5c7061 100644 (file)
@@ -54,6 +54,11 @@ static struct config_item *make_node(struct config_group *, const char *);
 static void drop_node(struct config_group *, struct config_item *);
 static void release_node(struct config_item *);
 
+static ssize_t show_cluster(struct config_item *i, struct configfs_attribute *a,
+                           char *buf);
+static ssize_t store_cluster(struct config_item *i,
+                            struct configfs_attribute *a,
+                            const char *buf, size_t len);
 static ssize_t show_comm(struct config_item *i, struct configfs_attribute *a,
                         char *buf);
 static ssize_t store_comm(struct config_item *i, struct configfs_attribute *a,
@@ -73,6 +78,101 @@ static ssize_t node_nodeid_write(struct node *nd, const char *buf, size_t len);
 static ssize_t node_weight_read(struct node *nd, char *buf);
 static ssize_t node_weight_write(struct node *nd, const char *buf, size_t len);
 
+struct cluster {
+       struct config_group group;
+       unsigned int cl_tcp_port;
+       unsigned int cl_buffer_size;
+       unsigned int cl_rsbtbl_size;
+       unsigned int cl_lkbtbl_size;
+       unsigned int cl_dirtbl_size;
+       unsigned int cl_recover_timer;
+       unsigned int cl_toss_secs;
+       unsigned int cl_scan_secs;
+       unsigned int cl_log_debug;
+};
+
+enum {
+       CLUSTER_ATTR_TCP_PORT = 0,
+       CLUSTER_ATTR_BUFFER_SIZE,
+       CLUSTER_ATTR_RSBTBL_SIZE,
+       CLUSTER_ATTR_LKBTBL_SIZE,
+       CLUSTER_ATTR_DIRTBL_SIZE,
+       CLUSTER_ATTR_RECOVER_TIMER,
+       CLUSTER_ATTR_TOSS_SECS,
+       CLUSTER_ATTR_SCAN_SECS,
+       CLUSTER_ATTR_LOG_DEBUG,
+};
+
+struct cluster_attribute {
+       struct configfs_attribute attr;
+       ssize_t (*show)(struct cluster *, char *);
+       ssize_t (*store)(struct cluster *, const char *, size_t);
+};
+
+static ssize_t cluster_set(struct cluster *cl, unsigned int *cl_field,
+                          unsigned int *info_field, int check_zero,
+                          const char *buf, size_t len)
+{
+       unsigned int x;
+
+       if (!capable(CAP_SYS_ADMIN))
+               return -EACCES;
+
+       x = simple_strtoul(buf, NULL, 0);
+
+       if (check_zero && !x)
+               return -EINVAL;
+
+       *cl_field = x;
+       *info_field = x;
+
+       return len;
+}
+
+#define __CONFIGFS_ATTR(_name,_mode,_read,_write) {                           \
+       .attr   = { .ca_name = __stringify(_name),                            \
+                   .ca_mode = _mode,                                         \
+                   .ca_owner = THIS_MODULE },                                \
+       .show   = _read,                                                      \
+       .store  = _write,                                                     \
+}
+
+#define CLUSTER_ATTR(name, check_zero)                                        \
+static ssize_t name##_write(struct cluster *cl, const char *buf, size_t len)  \
+{                                                                             \
+       return cluster_set(cl, &cl->cl_##name, &dlm_config.ci_##name,         \
+                          check_zero, buf, len);                             \
+}                                                                             \
+static ssize_t name##_read(struct cluster *cl, char *buf)                     \
+{                                                                             \
+       return snprintf(buf, PAGE_SIZE, "%u\n", cl->cl_##name);               \
+}                                                                             \
+static struct cluster_attribute cluster_attr_##name =                         \
+__CONFIGFS_ATTR(name, 0644, name##_read, name##_write)
+
+CLUSTER_ATTR(tcp_port, 1);
+CLUSTER_ATTR(buffer_size, 1);
+CLUSTER_ATTR(rsbtbl_size, 1);
+CLUSTER_ATTR(lkbtbl_size, 1);
+CLUSTER_ATTR(dirtbl_size, 1);
+CLUSTER_ATTR(recover_timer, 1);
+CLUSTER_ATTR(toss_secs, 1);
+CLUSTER_ATTR(scan_secs, 1);
+CLUSTER_ATTR(log_debug, 0);
+
+static struct configfs_attribute *cluster_attrs[] = {
+       [CLUSTER_ATTR_TCP_PORT] = &cluster_attr_tcp_port.attr,
+       [CLUSTER_ATTR_BUFFER_SIZE] = &cluster_attr_buffer_size.attr,
+       [CLUSTER_ATTR_RSBTBL_SIZE] = &cluster_attr_rsbtbl_size.attr,
+       [CLUSTER_ATTR_LKBTBL_SIZE] = &cluster_attr_lkbtbl_size.attr,
+       [CLUSTER_ATTR_DIRTBL_SIZE] = &cluster_attr_dirtbl_size.attr,
+       [CLUSTER_ATTR_RECOVER_TIMER] = &cluster_attr_recover_timer.attr,
+       [CLUSTER_ATTR_TOSS_SECS] = &cluster_attr_toss_secs.attr,
+       [CLUSTER_ATTR_SCAN_SECS] = &cluster_attr_scan_secs.attr,
+       [CLUSTER_ATTR_LOG_DEBUG] = &cluster_attr_log_debug.attr,
+       NULL,
+};
+
 enum {
        COMM_ATTR_NODEID = 0,
        COMM_ATTR_LOCAL,
@@ -152,10 +252,6 @@ struct clusters {
        struct configfs_subsystem subsys;
 };
 
-struct cluster {
-       struct config_group group;
-};
-
 struct spaces {
        struct config_group ss_group;
 };
@@ -197,6 +293,8 @@ static struct configfs_group_operations clusters_ops = {
 
 static struct configfs_item_operations cluster_ops = {
        .release = release_cluster,
+       .show_attribute = show_cluster,
+       .store_attribute = store_cluster,
 };
 
 static struct configfs_group_operations spaces_ops = {
@@ -237,6 +335,7 @@ static struct config_item_type clusters_type = {
 
 static struct config_item_type cluster_type = {
        .ct_item_ops = &cluster_ops,
+       .ct_attrs = cluster_attrs,
        .ct_owner = THIS_MODULE,
 };
 
@@ -317,6 +416,16 @@ static struct config_group *make_cluster(struct config_group *g,
        cl->group.default_groups[1] = &cms->cs_group;
        cl->group.default_groups[2] = NULL;
 
+       cl->cl_tcp_port = dlm_config.ci_tcp_port;
+       cl->cl_buffer_size = dlm_config.ci_buffer_size;
+       cl->cl_rsbtbl_size = dlm_config.ci_rsbtbl_size;
+       cl->cl_lkbtbl_size = dlm_config.ci_lkbtbl_size;
+       cl->cl_dirtbl_size = dlm_config.ci_dirtbl_size;
+       cl->cl_recover_timer = dlm_config.ci_recover_timer;
+       cl->cl_toss_secs = dlm_config.ci_toss_secs;
+       cl->cl_scan_secs = dlm_config.ci_scan_secs;
+       cl->cl_log_debug = dlm_config.ci_log_debug;
+
        space_list = &sps->ss_group;
        comm_list = &cms->cs_group;
        return &cl->group;
@@ -509,6 +618,25 @@ void dlm_config_exit(void)
  * Functions for user space to read/write attributes
  */
 
+static ssize_t show_cluster(struct config_item *i, struct configfs_attribute *a,
+                           char *buf)
+{
+       struct cluster *cl = to_cluster(i);
+       struct cluster_attribute *cla =
+                       container_of(a, struct cluster_attribute, attr);
+       return cla->show ? cla->show(cl, buf) : 0;
+}
+
+static ssize_t store_cluster(struct config_item *i,
+                            struct configfs_attribute *a,
+                            const char *buf, size_t len)
+{
+       struct cluster *cl = to_cluster(i);
+       struct cluster_attribute *cla =
+               container_of(a, struct cluster_attribute, attr);
+       return cla->store ? cla->store(cl, buf, len) : -EINVAL;
+}
+
 static ssize_t show_comm(struct config_item *i, struct configfs_attribute *a,
                         char *buf)
 {
@@ -775,15 +903,17 @@ int dlm_our_addr(struct sockaddr_storage *addr, int num)
 #define DEFAULT_RECOVER_TIMER      5
 #define DEFAULT_TOSS_SECS         10
 #define DEFAULT_SCAN_SECS          5
+#define DEFAULT_LOG_DEBUG          0
 
 struct dlm_config_info dlm_config = {
-       .tcp_port = DEFAULT_TCP_PORT,
-       .buffer_size = DEFAULT_BUFFER_SIZE,
-       .rsbtbl_size = DEFAULT_RSBTBL_SIZE,
-       .lkbtbl_size = DEFAULT_LKBTBL_SIZE,
-       .dirtbl_size = DEFAULT_DIRTBL_SIZE,
-       .recover_timer = DEFAULT_RECOVER_TIMER,
-       .toss_secs = DEFAULT_TOSS_SECS,
-       .scan_secs = DEFAULT_SCAN_SECS
+       .ci_tcp_port = DEFAULT_TCP_PORT,
+       .ci_buffer_size = DEFAULT_BUFFER_SIZE,
+       .ci_rsbtbl_size = DEFAULT_RSBTBL_SIZE,
+       .ci_lkbtbl_size = DEFAULT_LKBTBL_SIZE,
+       .ci_dirtbl_size = DEFAULT_DIRTBL_SIZE,
+       .ci_recover_timer = DEFAULT_RECOVER_TIMER,
+       .ci_toss_secs = DEFAULT_TOSS_SECS,
+       .ci_scan_secs = DEFAULT_SCAN_SECS,
+       .ci_log_debug = DEFAULT_LOG_DEBUG
 };
 
index 9da7839958a99c87a033efc9c6069041d6aa525b..1e978611a96e7165b480d09ac3a129f1839b0950 100644 (file)
 #define DLM_MAX_ADDR_COUNT 3
 
 struct dlm_config_info {
-       int tcp_port;
-       int buffer_size;
-       int rsbtbl_size;
-       int lkbtbl_size;
-       int dirtbl_size;
-       int recover_timer;
-       int toss_secs;
-       int scan_secs;
+       int ci_tcp_port;
+       int ci_buffer_size;
+       int ci_rsbtbl_size;
+       int ci_lkbtbl_size;
+       int ci_dirtbl_size;
+       int ci_recover_timer;
+       int ci_toss_secs;
+       int ci_scan_secs;
+       int ci_log_debug;
 };
 
 extern struct dlm_config_info dlm_config;
index 1ee8195e6fc0c17a34fcd351b9248f64d5171025..61d93201e1b22c8783996d86c850d5ccbd1ee835 100644 (file)
@@ -41,6 +41,7 @@
 #include <asm/uaccess.h>
 
 #include <linux/dlm.h>
+#include "config.h"
 
 #define DLM_LOCKSPACE_LEN      64
 
@@ -69,12 +70,12 @@ struct dlm_mhandle;
 #define log_error(ls, fmt, args...) \
        printk(KERN_ERR "dlm: %s: " fmt "\n", (ls)->ls_name , ##args)
 
-#define DLM_LOG_DEBUG
-#ifdef DLM_LOG_DEBUG
-#define log_debug(ls, fmt, args...) log_error(ls, fmt, ##args)
-#else
-#define log_debug(ls, fmt, args...)
-#endif
+#define log_debug(ls, fmt, args...) \
+do { \
+       if (dlm_config.ci_log_debug) \
+               printk(KERN_DEBUG "dlm: %s: " fmt "\n", \
+                      (ls)->ls_name , ##args); \
+} while (0)
 
 #define DLM_ASSERT(x, do) \
 { \
@@ -309,8 +310,8 @@ static inline int rsb_flag(struct dlm_rsb *r, enum rsb_flags flag)
 
 /* dlm_header is first element of all structs sent between nodes */
 
-#define DLM_HEADER_MAJOR       0x00020000
-#define DLM_HEADER_MINOR       0x00000001
+#define DLM_HEADER_MAJOR       0x00030000
+#define DLM_HEADER_MINOR       0x00000000
 
 #define DLM_MSG                        1
 #define DLM_RCOM               2
@@ -386,6 +387,8 @@ struct dlm_rcom {
        uint32_t                rc_type;        /* DLM_RCOM_ */
        int                     rc_result;      /* multi-purpose */
        uint64_t                rc_id;          /* match reply with request */
+       uint64_t                rc_seq;         /* sender's ls_recover_seq */
+       uint64_t                rc_seq_reply;   /* remote ls_recover_seq */
        char                    rc_buf[0];
 };
 
@@ -523,6 +526,7 @@ struct dlm_user_proc {
        spinlock_t              asts_spin;
        struct list_head        locks;
        spinlock_t              locks_spin;
+       struct list_head        unlocking;
        wait_queue_head_t       wait;
 };
 
index 30878defaeb6d23a92378233d260f6cee082f7ea..e725005fafd024cd41de5bdb9c0271497a7d2e1f 100644 (file)
@@ -754,6 +754,11 @@ static void add_to_waiters(struct dlm_lkb *lkb, int mstype)
        mutex_unlock(&ls->ls_waiters_mutex);
 }
 
+/* We clear the RESEND flag because we might be taking an lkb off the waiters
+   list as part of process_requestqueue (e.g. a lookup that has an optimized
+   request reply on the requestqueue) between dlm_recover_waiters_pre() which
+   set RESEND and dlm_recover_waiters_post() */
+
 static int _remove_from_waiters(struct dlm_lkb *lkb)
 {
        int error = 0;
@@ -764,6 +769,7 @@ static int _remove_from_waiters(struct dlm_lkb *lkb)
                goto out;
        }
        lkb->lkb_wait_type = 0;
+       lkb->lkb_flags &= ~DLM_IFL_RESEND;
        list_del(&lkb->lkb_wait_reply);
        unhold_lkb(lkb);
  out:
@@ -810,7 +816,7 @@ static int shrink_bucket(struct dlm_ls *ls, int b)
                list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
                                            res_hashchain) {
                        if (!time_after_eq(jiffies, r->res_toss_time +
-                                          dlm_config.toss_secs * HZ))
+                                          dlm_config.ci_toss_secs * HZ))
                                continue;
                        found = 1;
                        break;
@@ -2144,12 +2150,24 @@ static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
        if (lkb->lkb_astaddr)
                ms->m_asts |= AST_COMP;
 
-       if (ms->m_type == DLM_MSG_REQUEST || ms->m_type == DLM_MSG_LOOKUP)
-               memcpy(ms->m_extra, r->res_name, r->res_length);
+       /* compare with switch in create_message; send_remove() doesn't
+          use send_args() */
 
-       else if (lkb->lkb_lvbptr)
+       switch (ms->m_type) {
+       case DLM_MSG_REQUEST:
+       case DLM_MSG_LOOKUP:
+               memcpy(ms->m_extra, r->res_name, r->res_length);
+               break;
+       case DLM_MSG_CONVERT:
+       case DLM_MSG_UNLOCK:
+       case DLM_MSG_REQUEST_REPLY:
+       case DLM_MSG_CONVERT_REPLY:
+       case DLM_MSG_GRANT:
+               if (!lkb->lkb_lvbptr)
+                       break;
                memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
-
+               break;
+       }
 }
 
 static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
@@ -2418,8 +2436,12 @@ static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
 
        DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
 
-       if (receive_lvb(ls, lkb, ms))
-               return -ENOMEM;
+       if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
+               /* lkb was just created so there won't be an lvb yet */
+               lkb->lkb_lvbptr = allocate_lvb(ls);
+               if (!lkb->lkb_lvbptr)
+                       return -ENOMEM;
+       }
 
        return 0;
 }
@@ -3002,7 +3024,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
 {
        struct dlm_message *ms = (struct dlm_message *) hd;
        struct dlm_ls *ls;
-       int error;
+       int error = 0;
 
        if (!recovery)
                dlm_message_in(ms);
@@ -3119,7 +3141,7 @@ int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
  out:
        dlm_put_lockspace(ls);
        dlm_astd_wake();
-       return 0;
+       return error;
 }
 
 
@@ -3132,6 +3154,7 @@ static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
        if (middle_conversion(lkb)) {
                hold_lkb(lkb);
                ls->ls_stub_ms.m_result = -EINPROGRESS;
+               ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                _remove_from_waiters(lkb);
                _receive_convert_reply(lkb, &ls->ls_stub_ms);
 
@@ -3205,6 +3228,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_UNLOCK:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
+                       ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                        _remove_from_waiters(lkb);
                        _receive_unlock_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
@@ -3213,6 +3237,7 @@ void dlm_recover_waiters_pre(struct dlm_ls *ls)
                case DLM_MSG_CANCEL:
                        hold_lkb(lkb);
                        ls->ls_stub_ms.m_result = -DLM_ECANCEL;
+                       ls->ls_stub_ms.m_flags = lkb->lkb_flags;
                        _remove_from_waiters(lkb);
                        _receive_cancel_reply(lkb, &ls->ls_stub_ms);
                        dlm_put_lkb(lkb);
@@ -3571,6 +3596,14 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        lock_rsb(r);
 
        switch (error) {
+       case -EBADR:
+               /* There's a chance the new master received our lock before
+                  dlm_recover_master_reply(), this wouldn't happen if we did
+                  a barrier between recover_masters and recover_locks. */
+               log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
+                         (unsigned long)r, r->res_name);
+               dlm_send_rcom_lock(r, lkb);
+               goto out;
        case -EEXIST:
                log_debug(ls, "master copy exists %x", lkb->lkb_id);
                /* fall through */
@@ -3585,7 +3618,7 @@ int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
        /* an ack for dlm_recover_locks() which waits for replies from
           all the locks it sends to new masters */
        dlm_recovered_lock(r);
-
+ out:
        unlock_rsb(r);
        put_rsb(r);
        dlm_put_lkb(lkb);
@@ -3610,7 +3643,7 @@ int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
        }
 
        if (flags & DLM_LKF_VALBLK) {
-               ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+               ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
                if (!ua->lksb.sb_lvbptr) {
                        kfree(ua);
                        __put_lkb(ls, lkb);
@@ -3679,7 +3712,7 @@ int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        ua = (struct dlm_user_args *)lkb->lkb_astparam;
 
        if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
-               ua->lksb.sb_lvbptr = kmalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
+               ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
                if (!ua->lksb.sb_lvbptr) {
                        error = -ENOMEM;
                        goto out_put;
@@ -3745,12 +3778,10 @@ int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
                goto out_put;
 
        spin_lock(&ua->proc->locks_spin);
-       list_del_init(&lkb->lkb_ownqueue);
+       /* dlm_user_add_ast() may have already taken lkb off the proc list */
+       if (!list_empty(&lkb->lkb_ownqueue))
+               list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
        spin_unlock(&ua->proc->locks_spin);
-
-       /* this removes the reference for the proc->locks list added by
-          dlm_user_request */
-       unhold_lkb(lkb);
  out_put:
        dlm_put_lkb(lkb);
  out:
@@ -3790,9 +3821,8 @@ int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
        /* this lkb was removed from the WAITING queue */
        if (lkb->lkb_grmode == DLM_LOCK_IV) {
                spin_lock(&ua->proc->locks_spin);
-               list_del_init(&lkb->lkb_ownqueue);
+               list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
                spin_unlock(&ua->proc->locks_spin);
-               unhold_lkb(lkb);
        }
  out_put:
        dlm_put_lkb(lkb);
@@ -3853,11 +3883,6 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
        mutex_lock(&ls->ls_clear_proc_locks);
 
        list_for_each_entry_safe(lkb, safe, &proc->locks, lkb_ownqueue) {
-               if (lkb->lkb_ast_type) {
-                       list_del(&lkb->lkb_astqueue);
-                       unhold_lkb(lkb);
-               }
-
                list_del_init(&lkb->lkb_ownqueue);
 
                if (lkb->lkb_exflags & DLM_LKF_PERSISTENT) {
@@ -3874,6 +3899,20 @@ void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
 
                dlm_put_lkb(lkb);
        }
+
+       /* in-progress unlocks */
+       list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
+               list_del_init(&lkb->lkb_ownqueue);
+               lkb->lkb_flags |= DLM_IFL_DEAD;
+               dlm_put_lkb(lkb);
+       }
+
+       list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
+               list_del(&lkb->lkb_astqueue);
+               dlm_put_lkb(lkb);
+       }
+
        mutex_unlock(&ls->ls_clear_proc_locks);
        unlock_recovery(ls);
 }
+
index 59012b089e8d68ba77444a8ef0fd82f8743f5ca6..f40817b53c6fc33fd39e112a7b3d2a07e7089c16 100644 (file)
@@ -236,7 +236,7 @@ static int dlm_scand(void *data)
        while (!kthread_should_stop()) {
                list_for_each_entry(ls, &lslist, ls_list)
                        dlm_scan_rsbs(ls);
-               schedule_timeout_interruptible(dlm_config.scan_secs * HZ);
+               schedule_timeout_interruptible(dlm_config.ci_scan_secs * HZ);
        }
        return 0;
 }
@@ -422,7 +422,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
        ls->ls_count = 0;
        ls->ls_flags = 0;
 
-       size = dlm_config.rsbtbl_size;
+       size = dlm_config.ci_rsbtbl_size;
        ls->ls_rsbtbl_size = size;
 
        ls->ls_rsbtbl = kmalloc(sizeof(struct dlm_rsbtable) * size, GFP_KERNEL);
@@ -434,7 +434,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
                rwlock_init(&ls->ls_rsbtbl[i].lock);
        }
 
-       size = dlm_config.lkbtbl_size;
+       size = dlm_config.ci_lkbtbl_size;
        ls->ls_lkbtbl_size = size;
 
        ls->ls_lkbtbl = kmalloc(sizeof(struct dlm_lkbtable) * size, GFP_KERNEL);
@@ -446,7 +446,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
                ls->ls_lkbtbl[i].counter = 1;
        }
 
-       size = dlm_config.dirtbl_size;
+       size = dlm_config.ci_dirtbl_size;
        ls->ls_dirtbl_size = size;
 
        ls->ls_dirtbl = kmalloc(sizeof(struct dlm_dirtable) * size, GFP_KERNEL);
@@ -489,7 +489,7 @@ static int new_lockspace(char *name, int namelen, void **lockspace,
        mutex_init(&ls->ls_requestqueue_mutex);
        mutex_init(&ls->ls_clear_proc_locks);
 
-       ls->ls_recover_buf = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
+       ls->ls_recover_buf = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
        if (!ls->ls_recover_buf)
                goto out_dirfree;
 
index fe158d7a92853074da36daa33dcbcbabaae00ec7..dc83a9d979b5285bb5e9704a9d32cef199a5c129 100644 (file)
@@ -72,6 +72,8 @@ struct nodeinfo {
        struct list_head        writequeue; /* outgoing writequeue_entries */
        spinlock_t              writequeue_lock;
        int                     nodeid;
+       struct work_struct      swork; /* Send workqueue */
+       struct work_struct      lwork; /* Locking workqueue */
 };
 
 static DEFINE_IDR(nodeinfo_idr);
@@ -96,6 +98,7 @@ struct connection {
        atomic_t                waiting_requests;
        struct cbuf             cb;
        int                     eagain_flag;
+       struct work_struct      work; /* Send workqueue */
 };
 
 /* An entry waiting to be sent */
@@ -137,19 +140,23 @@ static void cbuf_eat(struct cbuf *cb, int n)
 static LIST_HEAD(write_nodes);
 static DEFINE_SPINLOCK(write_nodes_lock);
 
+
 /* Maximum number of incoming messages to process before
  * doing a schedule()
  */
 #define MAX_RX_MSG_COUNT 25
 
-/* Manage daemons */
-static struct task_struct *recv_task;
-static struct task_struct *send_task;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_wait);
+/* Work queues */
+static struct workqueue_struct *recv_workqueue;
+static struct workqueue_struct *send_workqueue;
+static struct workqueue_struct *lock_workqueue;
 
 /* The SCTP connection */
 static struct connection sctp_con;
 
+static void process_send_sockets(struct work_struct *work);
+static void process_recv_sockets(struct work_struct *work);
+static void process_lock_request(struct work_struct *work);
 
 static int nodeid_to_addr(int nodeid, struct sockaddr *retaddr)
 {
@@ -222,6 +229,8 @@ static struct nodeinfo *nodeid2nodeinfo(int nodeid, gfp_t alloc)
        spin_lock_init(&ni->lock);
        INIT_LIST_HEAD(&ni->writequeue);
        spin_lock_init(&ni->writequeue_lock);
+       INIT_WORK(&ni->lwork, process_lock_request);
+       INIT_WORK(&ni->swork, process_send_sockets);
        ni->nodeid = nodeid;
 
        if (nodeid > max_nodeid)
@@ -249,11 +258,8 @@ static struct nodeinfo *assoc2nodeinfo(sctp_assoc_t assoc)
 /* Data or notification available on socket */
 static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
-       atomic_inc(&sctp_con.waiting_requests);
        if (test_and_set_bit(CF_READ_PENDING, &sctp_con.flags))
-               return;
-
-       wake_up_interruptible(&lowcomms_recv_wait);
+               queue_work(recv_workqueue, &sctp_con.work);
 }
 
 
@@ -361,10 +367,10 @@ static void init_failed(void)
                                spin_lock_bh(&write_nodes_lock);
                                list_add_tail(&ni->write_list, &write_nodes);
                                spin_unlock_bh(&write_nodes_lock);
+                               queue_work(send_workqueue, &ni->swork);
                        }
                }
        }
-       wake_up_process(send_task);
 }
 
 /* Something happened to an association */
@@ -446,8 +452,8 @@ static void process_sctp_notification(struct msghdr *msg, char *buf)
                                spin_lock_bh(&write_nodes_lock);
                                list_add_tail(&ni->write_list, &write_nodes);
                                spin_unlock_bh(&write_nodes_lock);
+                               queue_work(send_workqueue, &ni->swork);
                        }
-                       wake_up_process(send_task);
                }
                break;
 
@@ -580,8 +586,8 @@ static int receive_from_sock(void)
                                spin_lock_bh(&write_nodes_lock);
                                list_add_tail(&ni->write_list, &write_nodes);
                                spin_unlock_bh(&write_nodes_lock);
+                               queue_work(send_workqueue, &ni->swork);
                        }
-                       wake_up_process(send_task);
                }
        }
 
@@ -590,6 +596,7 @@ static int receive_from_sock(void)
                return 0;
 
        cbuf_add(&sctp_con.cb, ret);
+       // PJC: TODO: Add to node's workqueue....can we ??
        ret = dlm_process_incoming_buffer(cpu_to_le32(sinfo->sinfo_ppid),
                                          page_address(sctp_con.rx_page),
                                          sctp_con.cb.base, sctp_con.cb.len,
@@ -635,7 +642,7 @@ static int add_bind_addr(struct sockaddr_storage *addr, int addr_len, int num)
 
        if (result < 0)
                log_print("Can't bind to port %d addr number %d",
-                         dlm_config.tcp_port, num);
+                         dlm_config.ci_tcp_port, num);
 
        return result;
 }
@@ -711,7 +718,7 @@ static int init_sock(void)
        /* Bind to all interfaces. */
        for (i = 0; i < dlm_local_count; i++) {
                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
-               make_sockaddr(&localaddr, dlm_config.tcp_port, &addr_len);
+               make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
 
                result = add_bind_addr(&localaddr, addr_len, num);
                if (result)
@@ -820,7 +827,8 @@ void dlm_lowcomms_commit_buffer(void *arg)
                spin_lock_bh(&write_nodes_lock);
                list_add_tail(&ni->write_list, &write_nodes);
                spin_unlock_bh(&write_nodes_lock);
-               wake_up_process(send_task);
+
+               queue_work(send_workqueue, &ni->swork);
        }
        return;
 
@@ -863,7 +871,7 @@ static void initiate_association(int nodeid)
                return;
        }
 
-       make_sockaddr(&rem_addr, dlm_config.tcp_port, &addrlen);
+       make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
 
        outmessage.msg_name = &rem_addr;
        outmessage.msg_namelen = addrlen;
@@ -1088,101 +1096,75 @@ int dlm_lowcomms_close(int nodeid)
        return 0;
 }
 
-static int write_list_empty(void)
+// PJC: The work queue function for receiving.
+static void process_recv_sockets(struct work_struct *work)
 {
-       int status;
-
-       spin_lock_bh(&write_nodes_lock);
-       status = list_empty(&write_nodes);
-       spin_unlock_bh(&write_nodes_lock);
-
-       return status;
-}
-
-static int dlm_recvd(void *data)
-{
-       DECLARE_WAITQUEUE(wait, current);
-
-       while (!kthread_should_stop()) {
+       if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
+               int ret;
                int count = 0;
 
-               set_current_state(TASK_INTERRUPTIBLE);
-               add_wait_queue(&lowcomms_recv_wait, &wait);
-               if (!test_bit(CF_READ_PENDING, &sctp_con.flags))
-                       cond_resched();
-               remove_wait_queue(&lowcomms_recv_wait, &wait);
-               set_current_state(TASK_RUNNING);
-
-               if (test_and_clear_bit(CF_READ_PENDING, &sctp_con.flags)) {
-                       int ret;
-
-                       do {
-                               ret = receive_from_sock();
+               do {
+                       ret = receive_from_sock();
 
-                               /* Don't starve out everyone else */
-                               if (++count >= MAX_RX_MSG_COUNT) {
-                                       cond_resched();
-                                       count = 0;
-                               }
-                       } while (!kthread_should_stop() && ret >=0);
-               }
-               cond_resched();
+                       /* Don't starve out everyone else */
+                       if (++count >= MAX_RX_MSG_COUNT) {
+                               cond_resched();
+                               count = 0;
+                       }
+               } while (!kthread_should_stop() && ret >=0);
        }
-
-       return 0;
+       cond_resched();
 }
 
-static int dlm_sendd(void *data)
+// PJC: the work queue function for sending
+static void process_send_sockets(struct work_struct *work)
 {
-       DECLARE_WAITQUEUE(wait, current);
-
-       add_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (write_list_empty())
-                       cond_resched();
-               set_current_state(TASK_RUNNING);
-
-               if (sctp_con.eagain_flag) {
-                       sctp_con.eagain_flag = 0;
-                       refill_write_queue();
-               }
-               process_output_queue();
+       if (sctp_con.eagain_flag) {
+               sctp_con.eagain_flag = 0;
+               refill_write_queue();
        }
+       process_output_queue();
+}
 
-       remove_wait_queue(sctp_con.sock->sk->sk_sleep, &wait);
-
-       return 0;
+// PJC: Process lock requests from a particular node.
+// TODO: can we optimise this out on UP ??
+static void process_lock_request(struct work_struct *work)
+{
 }
 
 static void daemons_stop(void)
 {
-       kthread_stop(recv_task);
-       kthread_stop(send_task);
+       destroy_workqueue(recv_workqueue);
+       destroy_workqueue(send_workqueue);
+       destroy_workqueue(lock_workqueue);
 }
 
 static int daemons_start(void)
 {
-       struct task_struct *p;
        int error;
+       recv_workqueue = create_workqueue("dlm_recv");
+       error = IS_ERR(recv_workqueue);
+       if (error) {
+               log_print("can't start dlm_recv %d", error);
+               return error;
+       }
 
-       p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
-       error = IS_ERR(p);
+       send_workqueue = create_singlethread_workqueue("dlm_send");
+       error = IS_ERR(send_workqueue);
        if (error) {
-               log_print("can't start dlm_recvd %d", error);
+               log_print("can't start dlm_send %d", error);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       recv_task = p;
 
-       p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
-       error = IS_ERR(p);
+       lock_workqueue = create_workqueue("dlm_rlock");
+       error = IS_ERR(lock_workqueue);
        if (error) {
-               log_print("can't start dlm_sendd %d", error);
-               kthread_stop(recv_task);
+               log_print("can't start dlm_rlock %d", error);
+               destroy_workqueue(send_workqueue);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       send_task = p;
 
        return 0;
 }
@@ -1194,6 +1176,8 @@ int dlm_lowcomms_start(void)
 {
        int error;
 
+       INIT_WORK(&sctp_con.work, process_recv_sockets);
+
        error = init_sock();
        if (error)
                goto fail_sock;
@@ -1224,4 +1208,3 @@ void dlm_lowcomms_stop(void)
        for (i = 0; i < dlm_local_count; i++)
                kfree(dlm_local_addr[i]);
 }
-
index 9be3a440c42a084ba5c335d05d77218c1a480b21..f1efd17b2614b28cc0be7aff2e5f1fff8f65463f 100644 (file)
@@ -2,7 +2,7 @@
 *******************************************************************************
 **
 **  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
-**  Copyright (C) 2004-2006 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2004-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -96,10 +96,7 @@ static bool cbuf_empty(struct cbuf *cb)
 struct connection {
        struct socket *sock;    /* NULL if not connected */
        uint32_t nodeid;        /* So we know who we are in the list */
-       struct rw_semaphore sock_sem; /* Stop connect races */
-       struct list_head read_list;   /* On this list when ready for reading */
-       struct list_head write_list;  /* On this list when ready for writing */
-       struct list_head state_list;  /* On this list when ready to connect */
+       struct mutex sock_mutex;
        unsigned long flags;    /* bit 1,2 = We are on the read/write lists */
 #define CF_READ_PENDING 1
 #define CF_WRITE_PENDING 2
@@ -112,9 +109,10 @@ struct connection {
        struct page *rx_page;
        struct cbuf cb;
        int retries;
-       atomic_t waiting_requests;
 #define MAX_CONNECT_RETRIES 3
        struct connection *othercon;
+       struct work_struct rwork; /* Receive workqueue */
+       struct work_struct swork; /* Send workqueue */
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
@@ -131,14 +129,9 @@ struct writequeue_entry {
 
 static struct sockaddr_storage dlm_local_addr;
 
-/* Manage daemons */
-static struct task_struct *recv_task;
-static struct task_struct *send_task;
-
-static wait_queue_t lowcomms_send_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
-static wait_queue_t lowcomms_recv_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
+/* Work queues */
+static struct workqueue_struct *recv_workqueue;
+static struct workqueue_struct *send_workqueue;
 
 /* An array of pointers to connections, indexed by NODEID */
 static struct connection **connections;
@@ -146,17 +139,8 @@ static DECLARE_MUTEX(connections_lock);
 static struct kmem_cache *con_cache;
 static int conn_array_size;
 
-/* List of sockets that have reads pending */
-static LIST_HEAD(read_sockets);
-static DEFINE_SPINLOCK(read_sockets_lock);
-
-/* List of sockets which have writes pending */
-static LIST_HEAD(write_sockets);
-static DEFINE_SPINLOCK(write_sockets_lock);
-
-/* List of sockets which have connects pending */
-static LIST_HEAD(state_sockets);
-static DEFINE_SPINLOCK(state_sockets_lock);
+static void process_recv_sockets(struct work_struct *work);
+static void process_send_sockets(struct work_struct *work);
 
 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 {
@@ -186,9 +170,11 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
                        goto finish;
 
                con->nodeid = nodeid;
-               init_rwsem(&con->sock_sem);
+               mutex_init(&con->sock_mutex);
                INIT_LIST_HEAD(&con->writequeue);
                spin_lock_init(&con->writequeue_lock);
+               INIT_WORK(&con->swork, process_send_sockets);
+               INIT_WORK(&con->rwork, process_recv_sockets);
 
                connections[nodeid] = con;
        }
@@ -203,41 +189,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
        struct connection *con = sock2con(sk);
 
-       atomic_inc(&con->waiting_requests);
-       if (test_and_set_bit(CF_READ_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&read_sockets_lock);
-       list_add_tail(&con->read_list, &read_sockets);
-       spin_unlock_bh(&read_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_recv_waitq);
+       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+               queue_work(recv_workqueue, &con->rwork);
 }
 
 static void lowcomms_write_space(struct sock *sk)
 {
        struct connection *con = sock2con(sk);
 
-       if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&write_sockets_lock);
-       list_add_tail(&con->write_list, &write_sockets);
-       spin_unlock_bh(&write_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
+               queue_work(send_workqueue, &con->swork);
 }
 
 static inline void lowcomms_connect_sock(struct connection *con)
 {
-       if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
-               return;
-
-       spin_lock_bh(&state_sockets_lock);
-       list_add_tail(&con->state_list, &state_sockets);
-       spin_unlock_bh(&state_sockets_lock);
-
-       wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
+               queue_work(send_workqueue, &con->swork);
 }
 
 static void lowcomms_state_change(struct sock *sk)
@@ -279,7 +246,7 @@ static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 /* Close a remote connection and tidy up */
 static void close_connection(struct connection *con, bool and_other)
 {
-       down_write(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
 
        if (con->sock) {
                sock_release(con->sock);
@@ -294,7 +261,7 @@ static void close_connection(struct connection *con, bool and_other)
                con->rx_page = NULL;
        }
        con->retries = 0;
-       up_write(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
 }
 
 /* Data received from remote end */
@@ -308,10 +275,13 @@ static int receive_from_sock(struct connection *con)
        int r;
        int call_again_soon = 0;
 
-       down_read(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
+
+       if (con->sock == NULL) {
+               ret = -EAGAIN;
+               goto out_close;
+       }
 
-       if (con->sock == NULL)
-               goto out;
        if (con->rx_page == NULL) {
                /*
                 * This doesn't need to be atomic, but I think it should
@@ -359,6 +329,9 @@ static int receive_from_sock(struct connection *con)
 
        if (ret <= 0)
                goto out_close;
+       if (ret == -EAGAIN)
+               goto out_resched;
+
        if (ret == len)
                call_again_soon = 1;
        cbuf_add(&con->cb, ret);
@@ -381,24 +354,26 @@ static int receive_from_sock(struct connection *con)
                con->rx_page = NULL;
        }
 
-out:
        if (call_again_soon)
                goto out_resched;
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        return 0;
 
 out_resched:
-       lowcomms_data_ready(con->sock->sk, 0);
-       up_read(&con->sock_sem);
-       cond_resched();
-       return 0;
+       if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+               queue_work(recv_workqueue, &con->rwork);
+       mutex_unlock(&con->sock_mutex);
+       return -EAGAIN;
 
 out_close:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        if (ret != -EAGAIN && !test_bit(CF_IS_OTHERCON, &con->flags)) {
                close_connection(con, false);
                /* Reconnect when there is something to send */
        }
+       /* Don't return success if we really got EOF */
+       if (ret == 0)
+               ret = -EAGAIN;
 
        return ret;
 }
@@ -412,6 +387,7 @@ static int accept_from_sock(struct connection *con)
        int len;
        int nodeid;
        struct connection *newcon;
+       struct connection *addcon;
 
        memset(&peeraddr, 0, sizeof(peeraddr));
        result = sock_create_kern(dlm_local_addr.ss_family, SOCK_STREAM,
@@ -419,7 +395,7 @@ static int accept_from_sock(struct connection *con)
        if (result < 0)
                return -ENOMEM;
 
-       down_read(&con->sock_sem);
+       mutex_lock_nested(&con->sock_mutex, 0);
 
        result = -ENOTCONN;
        if (con->sock == NULL)
@@ -445,7 +421,7 @@ static int accept_from_sock(struct connection *con)
        if (dlm_addr_to_nodeid(&peeraddr, &nodeid)) {
                printk("dlm: connect from non cluster node\n");
                sock_release(newsock);
-               up_read(&con->sock_sem);
+               mutex_unlock(&con->sock_mutex);
                return -1;
        }
 
@@ -462,7 +438,7 @@ static int accept_from_sock(struct connection *con)
                result = -ENOMEM;
                goto accept_err;
        }
-       down_write(&newcon->sock_sem);
+       mutex_lock_nested(&newcon->sock_mutex, 1);
        if (newcon->sock) {
                struct connection *othercon = newcon->othercon;
 
@@ -470,41 +446,45 @@ static int accept_from_sock(struct connection *con)
                        othercon = kmem_cache_zalloc(con_cache, GFP_KERNEL);
                        if (!othercon) {
                                printk("dlm: failed to allocate incoming socket\n");
-                               up_write(&newcon->sock_sem);
+                               mutex_unlock(&newcon->sock_mutex);
                                result = -ENOMEM;
                                goto accept_err;
                        }
                        othercon->nodeid = nodeid;
                        othercon->rx_action = receive_from_sock;
-                       init_rwsem(&othercon->sock_sem);
+                       mutex_init(&othercon->sock_mutex);
+                       INIT_WORK(&othercon->swork, process_send_sockets);
+                       INIT_WORK(&othercon->rwork, process_recv_sockets);
                        set_bit(CF_IS_OTHERCON, &othercon->flags);
                        newcon->othercon = othercon;
                }
                othercon->sock = newsock;
                newsock->sk->sk_user_data = othercon;
                add_sock(newsock, othercon);
+               addcon = othercon;
        }
        else {
                newsock->sk->sk_user_data = newcon;
                newcon->rx_action = receive_from_sock;
                add_sock(newsock, newcon);
-
+               addcon = newcon;
        }
 
-       up_write(&newcon->sock_sem);
+       mutex_unlock(&newcon->sock_mutex);
 
        /*
         * Add it to the active queue in case we got data
         * beween processing the accept adding the socket
         * to the read_sockets list
         */
-       lowcomms_data_ready(newsock->sk, 0);
-       up_read(&con->sock_sem);
+       if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
+               queue_work(recv_workqueue, &addcon->rwork);
+       mutex_unlock(&con->sock_mutex);
 
        return 0;
 
 accept_err:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        sock_release(newsock);
 
        if (result != -EAGAIN)
@@ -525,7 +505,7 @@ static void connect_to_sock(struct connection *con)
                return;
        }
 
-       down_write(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
        if (con->retries++ > MAX_CONNECT_RETRIES)
                goto out;
 
@@ -548,7 +528,7 @@ static void connect_to_sock(struct connection *con)
        sock->sk->sk_user_data = con;
        con->rx_action = receive_from_sock;
 
-       make_sockaddr(&saddr, dlm_config.tcp_port, &addr_len);
+       make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
 
        add_sock(sock, con);
 
@@ -577,7 +557,7 @@ out_err:
                result = 0;
        }
 out:
-       up_write(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        return;
 }
 
@@ -616,10 +596,10 @@ static struct socket *create_listen_sock(struct connection *con,
        con->sock = sock;
 
        /* Bind to our port */
-       make_sockaddr(saddr, dlm_config.tcp_port, &addr_len);
+       make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
        if (result < 0) {
-               printk("dlm: Can't bind to port %d\n", dlm_config.tcp_port);
+               printk("dlm: Can't bind to port %d\n", dlm_config.ci_tcp_port);
                sock_release(sock);
                sock = NULL;
                con->sock = NULL;
@@ -638,7 +618,7 @@ static struct socket *create_listen_sock(struct connection *con,
 
        result = sock->ops->listen(sock, 5);
        if (result < 0) {
-               printk("dlm: Can't listen on port %d\n", dlm_config.tcp_port);
+               printk("dlm: Can't listen on port %d\n", dlm_config.ci_tcp_port);
                sock_release(sock);
                sock = NULL;
                goto create_out;
@@ -709,6 +689,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
        if (!con)
                return NULL;
 
+       spin_lock(&con->writequeue_lock);
        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
        if ((&e->list == &con->writequeue) ||
            (PAGE_CACHE_SIZE - e->end < len)) {
@@ -747,6 +728,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
        struct connection *con = e->con;
        int users;
 
+       spin_lock(&con->writequeue_lock);
        users = --e->users;
        if (users)
                goto out;
@@ -754,12 +736,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
        kunmap(e->page);
        spin_unlock(&con->writequeue_lock);
 
-       if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
-               spin_lock_bh(&write_sockets_lock);
-               list_add_tail(&con->write_list, &write_sockets);
-               spin_unlock_bh(&write_sockets_lock);
-
-               wake_up_interruptible(&lowcomms_send_waitq);
+       if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
+               queue_work(send_workqueue, &con->swork);
        }
        return;
 
@@ -783,7 +761,7 @@ static void send_to_sock(struct connection *con)
        struct writequeue_entry *e;
        int len, offset;
 
-       down_read(&con->sock_sem);
+       mutex_lock(&con->sock_mutex);
        if (con->sock == NULL)
                goto out_connect;
 
@@ -800,6 +778,7 @@ static void send_to_sock(struct connection *con)
                offset = e->offset;
                BUG_ON(len == 0 && e->users == 0);
                spin_unlock(&con->writequeue_lock);
+               kmap(e->page);
 
                ret = 0;
                if (len) {
@@ -828,18 +807,18 @@ static void send_to_sock(struct connection *con)
        }
        spin_unlock(&con->writequeue_lock);
 out:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        return;
 
 send_error:
-       up_read(&con->sock_sem);
+       mutex_unlock(&con->sock_mutex);
        close_connection(con, false);
        lowcomms_connect_sock(con);
        return;
 
 out_connect:
-       up_read(&con->sock_sem);
-       lowcomms_connect_sock(con);
+       mutex_unlock(&con->sock_mutex);
+       connect_to_sock(con);
        return;
 }
 
@@ -872,7 +851,6 @@ int dlm_lowcomms_close(int nodeid)
        if (con) {
                clean_one_writequeue(con);
                close_connection(con, true);
-               atomic_set(&con->waiting_requests, 0);
        }
        return 0;
 
@@ -880,102 +858,29 @@ out:
        return -1;
 }
 
-/* API send message call, may queue the request */
-/* N.B. This is the old interface - use the new one for new calls */
-int lowcomms_send_message(int nodeid, char *buf, int len, gfp_t allocation)
-{
-       struct writequeue_entry *e;
-       char *b;
-
-       e = dlm_lowcomms_get_buffer(nodeid, len, allocation, &b);
-       if (e) {
-               memcpy(b, buf, len);
-               dlm_lowcomms_commit_buffer(e);
-               return 0;
-       }
-       return -ENOBUFS;
-}
-
 /* Look for activity on active sockets */
-static void process_sockets(void)
+static void process_recv_sockets(struct work_struct *work)
 {
-       struct list_head *list;
-       struct list_head *temp;
-       int count = 0;
-
-       spin_lock_bh(&read_sockets_lock);
-       list_for_each_safe(list, temp, &read_sockets) {
+       struct connection *con = container_of(work, struct connection, rwork);
+       int err;
 
-               struct connection *con =
-                       list_entry(list, struct connection, read_list);
-               list_del(&con->read_list);
-               clear_bit(CF_READ_PENDING, &con->flags);
-
-               spin_unlock_bh(&read_sockets_lock);
-
-               /* This can reach zero if we are processing requests
-                * as they come in.
-                */
-               if (atomic_read(&con->waiting_requests) == 0) {
-                       spin_lock_bh(&read_sockets_lock);
-                       continue;
-               }
-
-               do {
-                       con->rx_action(con);
-
-                       /* Don't starve out everyone else */
-                       if (++count >= MAX_RX_MSG_COUNT) {
-                               cond_resched();
-                               count = 0;
-                       }
-
-               } while (!atomic_dec_and_test(&con->waiting_requests) &&
-                        !kthread_should_stop());
-
-               spin_lock_bh(&read_sockets_lock);
-       }
-       spin_unlock_bh(&read_sockets_lock);
+       clear_bit(CF_READ_PENDING, &con->flags);
+       do {
+               err = con->rx_action(con);
+       } while (!err);
 }
 
-/* Try to send any messages that are pending
- */
-static void process_output_queue(void)
-{
-       struct list_head *list;
-       struct list_head *temp;
-
-       spin_lock_bh(&write_sockets_lock);
-       list_for_each_safe(list, temp, &write_sockets) {
-               struct connection *con =
-                       list_entry(list, struct connection, write_list);
-               clear_bit(CF_WRITE_PENDING, &con->flags);
-               list_del(&con->write_list);
-
-               spin_unlock_bh(&write_sockets_lock);
-               send_to_sock(con);
-               spin_lock_bh(&write_sockets_lock);
-       }
-       spin_unlock_bh(&write_sockets_lock);
-}
 
-static void process_state_queue(void)
+static void process_send_sockets(struct work_struct *work)
 {
-       struct list_head *list;
-       struct list_head *temp;
-
-       spin_lock_bh(&state_sockets_lock);
-       list_for_each_safe(list, temp, &state_sockets) {
-               struct connection *con =
-                       list_entry(list, struct connection, state_list);
-               list_del(&con->state_list);
-               clear_bit(CF_CONNECT_PENDING, &con->flags);
-               spin_unlock_bh(&state_sockets_lock);
+       struct connection *con = container_of(work, struct connection, swork);
 
+       if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
                connect_to_sock(con);
-               spin_lock_bh(&state_sockets_lock);
        }
-       spin_unlock_bh(&state_sockets_lock);
+
+       clear_bit(CF_WRITE_PENDING, &con->flags);
+       send_to_sock(con);
 }
 
 
@@ -992,109 +897,33 @@ static void clean_writequeues(void)
        }
 }
 
-static int read_list_empty(void)
+static void work_stop(void)
 {
-       int status;
-
-       spin_lock_bh(&read_sockets_lock);
-       status = list_empty(&read_sockets);
-       spin_unlock_bh(&read_sockets_lock);
-
-       return status;
-}
-
-/* DLM Transport comms receive daemon */
-static int dlm_recvd(void *data)
-{
-       init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
-       add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (read_list_empty())
-                       cond_resched();
-               set_current_state(TASK_RUNNING);
-
-               process_sockets();
-       }
-
-       return 0;
+       destroy_workqueue(recv_workqueue);
+       destroy_workqueue(send_workqueue);
 }
 
-static int write_and_state_lists_empty(void)
+static int work_start(void)
 {
-       int status;
-
-       spin_lock_bh(&write_sockets_lock);
-       status = list_empty(&write_sockets);
-       spin_unlock_bh(&write_sockets_lock);
-
-       spin_lock_bh(&state_sockets_lock);
-       if (list_empty(&state_sockets) == 0)
-               status = 0;
-       spin_unlock_bh(&state_sockets_lock);
-
-       return status;
-}
-
-/* DLM Transport send daemon */
-static int dlm_sendd(void *data)
-{
-       init_waitqueue_entry(&lowcomms_send_waitq_head, current);
-       add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
-
-       while (!kthread_should_stop()) {
-               set_current_state(TASK_INTERRUPTIBLE);
-               if (write_and_state_lists_empty())
-                       cond_resched();
-               set_current_state(TASK_RUNNING);
-
-               process_state_queue();
-               process_output_queue();
-       }
-
-       return 0;
-}
-
-static void daemons_stop(void)
-{
-       kthread_stop(recv_task);
-       kthread_stop(send_task);
-}
-
-static int daemons_start(void)
-{
-       struct task_struct *p;
        int error;
-
-       p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
-       error = IS_ERR(p);
+       recv_workqueue = create_workqueue("dlm_recv");
+       error = IS_ERR(recv_workqueue);
        if (error) {
-               log_print("can't start dlm_recvd %d", error);
+               log_print("can't start dlm_recv %d", error);
                return error;
        }
-       recv_task = p;
 
-       p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
-       error = IS_ERR(p);
+       send_workqueue = create_singlethread_workqueue("dlm_send");
+       error = IS_ERR(send_workqueue);
        if (error) {
-               log_print("can't start dlm_sendd %d", error);
-               kthread_stop(recv_task);
+               log_print("can't start dlm_send %d", error);
+               destroy_workqueue(recv_workqueue);
                return error;
        }
-       send_task = p;
 
        return 0;
 }
 
-/*
- * Return the largest buffer size we can cope with.
- */
-int lowcomms_max_buffer_size(void)
-{
-       return PAGE_CACHE_SIZE;
-}
-
 void dlm_lowcomms_stop(void)
 {
        int i;
@@ -1107,7 +936,7 @@ void dlm_lowcomms_stop(void)
                        connections[i]->flags |= 0xFF;
        }
 
-       daemons_stop();
+       work_stop();
        clean_writequeues();
 
        for (i = 0; i < conn_array_size; i++) {
@@ -1159,7 +988,7 @@ int dlm_lowcomms_start(void)
        if (error)
                goto fail_unlisten;
 
-       error = daemons_start();
+       error = work_start();
        if (error)
                goto fail_unlisten;
 
index c9b1c3d535f4c0e3a1904db462c0b5827090700f..a5126e0c68a69e6983e7d77833586bd928e7a3a7 100644 (file)
@@ -82,7 +82,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
                if (msglen < sizeof(struct dlm_header))
                        break;
                err = -E2BIG;
-               if (msglen > dlm_config.buffer_size) {
+               if (msglen > dlm_config.ci_buffer_size) {
                        log_print("message size %d from %d too big, buf len %d",
                                  msglen, nodeid, len);
                        break;
@@ -103,7 +103,7 @@ int dlm_process_incoming_buffer(int nodeid, const void *base,
 
                if (msglen > sizeof(__tmp) &&
                    msg == (struct dlm_header *) __tmp) {
-                       msg = kmalloc(dlm_config.buffer_size, GFP_KERNEL);
+                       msg = kmalloc(dlm_config.ci_buffer_size, GFP_KERNEL);
                        if (msg == NULL)
                                return ret;
                }
index 4cc31be9cd9d849c69e986397451f54387c03236..6bfbd61538094f72cb1001bc782cc246faf55552 100644 (file)
@@ -56,6 +56,10 @@ static int create_rcom(struct dlm_ls *ls, int to_nodeid, int type, int len,
 
        rc->rc_type = type;
 
+       spin_lock(&ls->ls_recover_lock);
+       rc->rc_seq = ls->ls_recover_seq;
+       spin_unlock(&ls->ls_recover_lock);
+
        *mh_ret = mh;
        *rc_ret = rc;
        return 0;
@@ -78,8 +82,17 @@ static void make_config(struct dlm_ls *ls, struct rcom_config *rf)
        rf->rf_lsflags = ls->ls_exflags;
 }
 
-static int check_config(struct dlm_ls *ls, struct rcom_config *rf, int nodeid)
+static int check_config(struct dlm_ls *ls, struct dlm_rcom *rc, int nodeid)
 {
+       struct rcom_config *rf = (struct rcom_config *) rc->rc_buf;
+
+       if ((rc->rc_header.h_version & 0xFFFF0000) != DLM_HEADER_MAJOR) {
+               log_error(ls, "version mismatch: %x nodeid %d: %x",
+                         DLM_HEADER_MAJOR | DLM_HEADER_MINOR, nodeid,
+                         rc->rc_header.h_version);
+               return -EINVAL;
+       }
+
        if (rf->rf_lvblen != ls->ls_lvblen ||
            rf->rf_lsflags != ls->ls_exflags) {
                log_error(ls, "config mismatch: %d,%x nodeid %d: %d,%x",
@@ -125,7 +138,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
                goto out;
 
        allow_sync_reply(ls, &rc->rc_id);
-       memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
+       memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 
        send_rcom(ls, mh, rc);
 
@@ -141,8 +154,7 @@ int dlm_rcom_status(struct dlm_ls *ls, int nodeid)
                log_debug(ls, "remote node %d not ready", nodeid);
                rc->rc_result = 0;
        } else
-               error = check_config(ls, (struct rcom_config *) rc->rc_buf,
-                                    nodeid);
+               error = check_config(ls, rc, nodeid);
        /* the caller looks at rc_result for the remote recovery status */
  out:
        return error;
@@ -159,6 +171,7 @@ static void receive_rcom_status(struct dlm_ls *ls, struct dlm_rcom *rc_in)
        if (error)
                return;
        rc->rc_id = rc_in->rc_id;
+       rc->rc_seq_reply = rc_in->rc_seq;
        rc->rc_result = dlm_recover_status(ls);
        make_config(ls, (struct rcom_config *) rc->rc_buf);
 
@@ -200,7 +213,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
        if (nodeid == dlm_our_nodeid()) {
                dlm_copy_master_names(ls, last_name, last_len,
                                      ls->ls_recover_buf + len,
-                                     dlm_config.buffer_size - len, nodeid);
+                                     dlm_config.ci_buffer_size - len, nodeid);
                goto out;
        }
 
@@ -210,7 +223,7 @@ int dlm_rcom_names(struct dlm_ls *ls, int nodeid, char *last_name, int last_len)
        memcpy(rc->rc_buf, last_name, last_len);
 
        allow_sync_reply(ls, &rc->rc_id);
-       memset(ls->ls_recover_buf, 0, dlm_config.buffer_size);
+       memset(ls->ls_recover_buf, 0, dlm_config.ci_buffer_size);
 
        send_rcom(ls, mh, rc);
 
@@ -224,30 +237,17 @@ static void receive_rcom_names(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 {
        struct dlm_rcom *rc;
        struct dlm_mhandle *mh;
-       int error, inlen, outlen;
-       int nodeid = rc_in->rc_header.h_nodeid;
-       uint32_t status = dlm_recover_status(ls);
-
-       /*
-        * We can't run dlm_dir_rebuild_send (which uses ls_nodes) while
-        * dlm_recoverd is running ls_nodes_reconfig (which changes ls_nodes).
-        * It could only happen in rare cases where we get a late NAMES
-        * message from a previous instance of recovery.
-        */
-
-       if (!(status & DLM_RS_NODES)) {
-               log_debug(ls, "ignoring RCOM_NAMES from %u", nodeid);
-               return;
-       }
+       int error, inlen, outlen, nodeid;
 
        nodeid = rc_in->rc_header.h_nodeid;
        inlen = rc_in->rc_header.h_length - sizeof(struct dlm_rcom);
-       outlen = dlm_config.buffer_size - sizeof(struct dlm_rcom);
+       outlen = dlm_config.ci_buffer_size - sizeof(struct dlm_rcom);
 
        error = create_rcom(ls, nodeid, DLM_RCOM_NAMES_REPLY, outlen, &rc, &mh);
        if (error)
                return;
        rc->rc_id = rc_in->rc_id;
+       rc->rc_seq_reply = rc_in->rc_seq;
 
        dlm_copy_master_names(ls, rc_in->rc_buf, inlen, rc->rc_buf, outlen,
                              nodeid);
@@ -294,6 +294,7 @@ static void receive_rcom_lookup(struct dlm_ls *ls, struct dlm_rcom *rc_in)
                ret_nodeid = error;
        rc->rc_result = ret_nodeid;
        rc->rc_id = rc_in->rc_id;
+       rc->rc_seq_reply = rc_in->rc_seq;
 
        send_rcom(ls, mh, rc);
 }
@@ -375,20 +376,13 @@ static void receive_rcom_lock(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 
        memcpy(rc->rc_buf, rc_in->rc_buf, sizeof(struct rcom_lock));
        rc->rc_id = rc_in->rc_id;
+       rc->rc_seq_reply = rc_in->rc_seq;
 
        send_rcom(ls, mh, rc);
 }
 
 static void receive_rcom_lock_reply(struct dlm_ls *ls, struct dlm_rcom *rc_in)
 {
-       uint32_t status = dlm_recover_status(ls);
-
-       if (!(status & DLM_RS_DIR)) {
-               log_debug(ls, "ignoring RCOM_LOCK_REPLY from %u",
-                         rc_in->rc_header.h_nodeid);
-               return;
-       }
-
        dlm_recover_process_copy(ls, rc_in);
 }
 
@@ -415,6 +409,7 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
 
        rc->rc_type = DLM_RCOM_STATUS_REPLY;
        rc->rc_id = rc_in->rc_id;
+       rc->rc_seq_reply = rc_in->rc_seq;
        rc->rc_result = -ESRCH;
 
        rf = (struct rcom_config *) rc->rc_buf;
@@ -426,6 +421,31 @@ static int send_ls_not_ready(int nodeid, struct dlm_rcom *rc_in)
        return 0;
 }
 
+static int is_old_reply(struct dlm_ls *ls, struct dlm_rcom *rc)
+{
+       uint64_t seq;
+       int rv = 0;
+
+       switch (rc->rc_type) {
+       case DLM_RCOM_STATUS_REPLY:
+       case DLM_RCOM_NAMES_REPLY:
+       case DLM_RCOM_LOOKUP_REPLY:
+       case DLM_RCOM_LOCK_REPLY:
+               spin_lock(&ls->ls_recover_lock);
+               seq = ls->ls_recover_seq;
+               spin_unlock(&ls->ls_recover_lock);
+               if (rc->rc_seq_reply != seq) {
+                       log_debug(ls, "ignoring old reply %x from %d "
+                                     "seq_reply %llx expect %llx",
+                                     rc->rc_type, rc->rc_header.h_nodeid,
+                                     (unsigned long long)rc->rc_seq_reply,
+                                     (unsigned long long)seq);
+                       rv = 1;
+               }
+       }
+       return rv;
+}
+
 /* Called by dlm_recvd; corresponds to dlm_receive_message() but special
    recovery-only comms are sent through here. */
 
@@ -449,11 +469,14 @@ void dlm_receive_rcom(struct dlm_header *hd, int nodeid)
        }
 
        if (dlm_recovery_stopped(ls) && (rc->rc_type != DLM_RCOM_STATUS)) {
-               log_error(ls, "ignoring recovery message %x from %d",
+               log_debug(ls, "ignoring recovery message %x from %d",
                          rc->rc_type, nodeid);
                goto out;
        }
 
+       if (is_old_reply(ls, rc))
+               goto out;
+
        if (nodeid != rc->rc_header.h_nodeid) {
                log_error(ls, "bad rcom nodeid %d from %d",
                          rc->rc_header.h_nodeid, nodeid);
index cf9f6831bab57c34e9c733e4328a0c1a38ea534f..c2cc7694cd164b6847f35c12257b6a783bf82525 100644 (file)
@@ -44,7 +44,7 @@
 static void dlm_wait_timer_fn(unsigned long data)
 {
        struct dlm_ls *ls = (struct dlm_ls *) data;
-       mod_timer(&ls->ls_timer, jiffies + (dlm_config.recover_timer * HZ));
+       mod_timer(&ls->ls_timer, jiffies + (dlm_config.ci_recover_timer * HZ));
        wake_up(&ls->ls_wait_general);
 }
 
@@ -55,7 +55,7 @@ int dlm_wait_function(struct dlm_ls *ls, int (*testfn) (struct dlm_ls *ls))
        init_timer(&ls->ls_timer);
        ls->ls_timer.function = dlm_wait_timer_fn;
        ls->ls_timer.data = (long) ls;
-       ls->ls_timer.expires = jiffies + (dlm_config.recover_timer * HZ);
+       ls->ls_timer.expires = jiffies + (dlm_config.ci_recover_timer * HZ);
        add_timer(&ls->ls_timer);
 
        wait_event(ls->ls_wait_general, testfn(ls) || dlm_recovery_stopped(ls));
@@ -397,7 +397,9 @@ int dlm_recover_masters(struct dlm_ls *ls)
 
                if (dlm_no_directory(ls))
                        count += recover_master_static(r);
-               else if (!is_master(r) && dlm_is_removed(ls, r->res_nodeid)) {
+               else if (!is_master(r) &&
+                        (dlm_is_removed(ls, r->res_nodeid) ||
+                         rsb_flag(r, RSB_NEW_MASTER))) {
                        recover_master(r);
                        count++;
                }
index 650536aa513930948a1c5b5eaf6250aa68c837a2..3cb636d6024912b8aa96b55f6e0c731c11d56b82 100644 (file)
@@ -77,7 +77,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        error = dlm_recover_members(ls, rv, &neg);
        if (error) {
-               log_error(ls, "recover_members failed %d", error);
+               log_debug(ls, "recover_members failed %d", error);
                goto fail;
        }
        start = jiffies;
@@ -89,7 +89,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        error = dlm_recover_directory(ls);
        if (error) {
-               log_error(ls, "recover_directory failed %d", error);
+               log_debug(ls, "recover_directory failed %d", error);
                goto fail;
        }
 
@@ -99,7 +99,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        error = dlm_recover_directory_wait(ls);
        if (error) {
-               log_error(ls, "recover_directory_wait failed %d", error);
+               log_debug(ls, "recover_directory_wait failed %d", error);
                goto fail;
        }
 
@@ -129,7 +129,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
                error = dlm_recover_masters(ls);
                if (error) {
-                       log_error(ls, "recover_masters failed %d", error);
+                       log_debug(ls, "recover_masters failed %d", error);
                        goto fail;
                }
 
@@ -139,13 +139,13 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
                error = dlm_recover_locks(ls);
                if (error) {
-                       log_error(ls, "recover_locks failed %d", error);
+                       log_debug(ls, "recover_locks failed %d", error);
                        goto fail;
                }
 
                error = dlm_recover_locks_wait(ls);
                if (error) {
-                       log_error(ls, "recover_locks_wait failed %d", error);
+                       log_debug(ls, "recover_locks_wait failed %d", error);
                        goto fail;
                }
 
@@ -166,7 +166,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
                error = dlm_recover_locks_wait(ls);
                if (error) {
-                       log_error(ls, "recover_locks_wait failed %d", error);
+                       log_debug(ls, "recover_locks_wait failed %d", error);
                        goto fail;
                }
        }
@@ -184,7 +184,7 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
        dlm_set_recover_status(ls, DLM_RS_DONE);
        error = dlm_recover_done_wait(ls);
        if (error) {
-               log_error(ls, "recover_done_wait failed %d", error);
+               log_debug(ls, "recover_done_wait failed %d", error);
                goto fail;
        }
 
@@ -192,19 +192,19 @@ static int ls_recover(struct dlm_ls *ls, struct dlm_recover *rv)
 
        error = enable_locking(ls, rv->seq);
        if (error) {
-               log_error(ls, "enable_locking failed %d", error);
+               log_debug(ls, "enable_locking failed %d", error);
                goto fail;
        }
 
        error = dlm_process_requestqueue(ls);
        if (error) {
-               log_error(ls, "process_requestqueue failed %d", error);
+               log_debug(ls, "process_requestqueue failed %d", error);
                goto fail;
        }
 
        error = dlm_recover_waiters_post(ls);
        if (error) {
-               log_error(ls, "recover_waiters_post failed %d", error);
+               log_debug(ls, "recover_waiters_post failed %d", error);
                goto fail;
        }
 
index c37e93e4f2df6f8625ee31064f857469bb544290..d378b7fe2a1ea6d4dc7846a6e95e46dff051f8b0 100644 (file)
@@ -180,6 +180,14 @@ void dlm_user_add_ast(struct dlm_lkb *lkb, int type)
            ua->lksb.sb_status == -EAGAIN && !list_empty(&lkb->lkb_ownqueue))
                remove_ownqueue = 1;
 
+       /* unlocks or cancels of waiting requests need to be removed from the
+          proc's unlocking list, again there must be a better way...  */
+
+       if (ua->lksb.sb_status == -DLM_EUNLOCK ||
+           (ua->lksb.sb_status == -DLM_ECANCEL &&
+            lkb->lkb_grmode == DLM_LOCK_IV))
+               remove_ownqueue = 1;
+
        /* We want to copy the lvb to userspace when the completion
           ast is read if the status is 0, the lock has an lvb and
           lvb_ops says we should.  We could probably have set_lvb_lock()
@@ -523,6 +531,7 @@ static int device_open(struct inode *inode, struct file *file)
        proc->lockspace = ls->ls_local_handle;
        INIT_LIST_HEAD(&proc->asts);
        INIT_LIST_HEAD(&proc->locks);
+       INIT_LIST_HEAD(&proc->unlocking);
        spin_lock_init(&proc->asts_spin);
        spin_lock_init(&proc->locks_spin);
        init_waitqueue_head(&proc->wait);
index 767197db994404749e6411eb2eebf1158ad44de6..963889cf674063fb5591cc0ab7b933afb06b24b5 100644 (file)
@@ -134,6 +134,8 @@ void dlm_rcom_out(struct dlm_rcom *rc)
        rc->rc_type             = cpu_to_le32(rc->rc_type);
        rc->rc_result           = cpu_to_le32(rc->rc_result);
        rc->rc_id               = cpu_to_le64(rc->rc_id);
+       rc->rc_seq              = cpu_to_le64(rc->rc_seq);
+       rc->rc_seq_reply        = cpu_to_le64(rc->rc_seq_reply);
 
        if (type == DLM_RCOM_LOCK)
                rcom_lock_out((struct rcom_lock *) rc->rc_buf);
@@ -151,6 +153,8 @@ void dlm_rcom_in(struct dlm_rcom *rc)
        rc->rc_type             = le32_to_cpu(rc->rc_type);
        rc->rc_result           = le32_to_cpu(rc->rc_result);
        rc->rc_id               = le64_to_cpu(rc->rc_id);
+       rc->rc_seq              = le64_to_cpu(rc->rc_seq);
+       rc->rc_seq_reply        = le64_to_cpu(rc->rc_seq_reply);
 
        if (rc->rc_type == DLM_RCOM_LOCK)
                rcom_lock_in((struct rcom_lock *) rc->rc_buf);
index 6a2ffa2db14f55566db9b6f9d643218afa6d5b59..de8e64c03f730abd9c7c509a52efebf844e96fda 100644 (file)
@@ -4,44 +4,43 @@ config GFS2_FS
        select FS_POSIX_ACL
        select CRC32
        help
-       A cluster filesystem.
+         A cluster filesystem.
 
-       Allows a cluster of computers to simultaneously use a block device
-       that is shared between them (with FC, iSCSI, NBD, etc...).  GFS reads
-       and writes to the block device like a local filesystem, but also uses
-       a lock module to allow the computers coordinate their I/O so
-       filesystem consistency is maintained.  One of the nifty features of
-       GFS is perfect consistency -- changes made to the filesystem on one
-       machine show up immediately on all other machines in the cluster.
+         Allows a cluster of computers to simultaneously use a block device
+         that is shared between them (with FC, iSCSI, NBD, etc...).  GFS reads
+         and writes to the block device like a local filesystem, but also uses
+         a lock module to allow the computers coordinate their I/O so
+         filesystem consistency is maintained.  One of the nifty features of
+         GFS is perfect consistency -- changes made to the filesystem on one
+         machine show up immediately on all other machines in the cluster.
 
-       To use the GFS2 filesystem, you will need to enable one or more of
-       the below locking modules. Documentation and utilities for GFS2 can
-       be found here: http://sources.redhat.com/cluster
+         To use the GFS2 filesystem, you will need to enable one or more of
+         the below locking modules. Documentation and utilities for GFS2 can
+         be found here: http://sources.redhat.com/cluster
 
 config GFS2_FS_LOCKING_NOLOCK
        tristate "GFS2 \"nolock\" locking module"
        depends on GFS2_FS
        help
-       Single node locking module for GFS2.
+         Single node locking module for GFS2.
 
-       Use this module if you want to use GFS2 on a single node without
-       its clustering features. You can still take advantage of the
-       large file support, and upgrade to running a full cluster later on
-       if required.
+         Use this module if you want to use GFS2 on a single node without
+         its clustering features. You can still take advantage of the
+         large file support, and upgrade to running a full cluster later on
+         if required.
 
-       If you will only be using GFS2 in cluster mode, you do not need this
-       module.
+         If you will only be using GFS2 in cluster mode, you do not need this
+         module.
 
 config GFS2_FS_LOCKING_DLM
        tristate "GFS2 DLM locking module"
-       depends on GFS2_FS && NET && INET && (IPV6 || IPV6=n)
+       depends on GFS2_FS && SYSFS && NET && INET && (IPV6 || IPV6=n)
        select IP_SCTP if DLM_SCTP
        select CONFIGFS_FS
        select DLM
        help
-       Multiple node locking module for GFS2
-
-       Most users of GFS2 will require this module. It provides the locking
-       interface between GFS2 and the DLM, which is required to use GFS2
-       in a cluster environment.
+         Multiple node locking module for GFS2
 
+         Most users of GFS2 will require this module. It provides the locking
+         interface between GFS2 and the DLM, which is required to use GFS2
+         in a cluster environment.
index 8240c1ff94f4ef009151b7c1d9d4170c5a192553..113f6c9110c745c31a6759a613ef628b4a3f68cb 100644 (file)
@@ -773,7 +773,7 @@ static int do_strip(struct gfs2_inode *ip, struct buffer_head *dibh,
                        gfs2_free_data(ip, bstart, blen);
        }
 
-       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
 
        gfs2_dinode_out(ip, dibh->b_data);
 
@@ -848,7 +848,7 @@ static int do_grow(struct gfs2_inode *ip, u64 size)
        }
 
        ip->i_di.di_size = size;
-       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
 
        error = gfs2_meta_inode_buffer(ip, &dibh);
        if (error)
@@ -963,7 +963,7 @@ static int trunc_start(struct gfs2_inode *ip, u64 size)
 
        if (gfs2_is_stuffed(ip)) {
                ip->i_di.di_size = size;
-               ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+               ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                gfs2_trans_add_bh(ip->i_gl, dibh, 1);
                gfs2_dinode_out(ip, dibh->b_data);
                gfs2_buffer_clear_tail(dibh, sizeof(struct gfs2_dinode) + size);
@@ -975,7 +975,7 @@ static int trunc_start(struct gfs2_inode *ip, u64 size)
 
                if (!error) {
                        ip->i_di.di_size = size;
-                       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+                       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                        ip->i_di.di_flags |= GFS2_DIF_TRUNC_IN_PROG;
                        gfs2_trans_add_bh(ip->i_gl, dibh, 1);
                        gfs2_dinode_out(ip, dibh->b_data);
@@ -1048,7 +1048,7 @@ static int trunc_end(struct gfs2_inode *ip)
                        ip->i_num.no_addr;
                gfs2_buffer_clear_tail(dibh, sizeof(struct gfs2_dinode));
        }
-       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
        ip->i_di.di_flags &= ~GFS2_DIF_TRUNC_IN_PROG;
 
        gfs2_trans_add_bh(ip->i_gl, dibh, 1);
index 0fdcb7713cd9378c1fe723b61b6eda9e7a1711ee..c93ca8f361b55fa09bcd72cd9a2d21e1a3014393 100644 (file)
@@ -131,7 +131,7 @@ static int gfs2_dir_write_stuffed(struct gfs2_inode *ip, const char *buf,
        memcpy(dibh->b_data + offset + sizeof(struct gfs2_dinode), buf, size);
        if (ip->i_di.di_size < offset + size)
                ip->i_di.di_size = offset + size;
-       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
        gfs2_dinode_out(ip, dibh->b_data);
 
        brelse(dibh);
@@ -229,7 +229,7 @@ out:
 
        if (ip->i_di.di_size < offset + copied)
                ip->i_di.di_size = offset + copied;
-       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
 
        gfs2_trans_add_bh(ip->i_gl, dibh, 1);
        gfs2_dinode_out(ip, dibh->b_data);
@@ -1198,12 +1198,11 @@ static int compare_dents(const void *a, const void *b)
  */
 
 static int do_filldir_main(struct gfs2_inode *dip, u64 *offset,
-                          void *opaque, gfs2_filldir_t filldir,
+                          void *opaque, filldir_t filldir,
                           const struct gfs2_dirent **darr, u32 entries,
                           int *copied)
 {
        const struct gfs2_dirent *dent, *dent_next;
-       struct gfs2_inum_host inum;
        u64 off, off_next;
        unsigned int x, y;
        int run = 0;
@@ -1240,11 +1239,9 @@ static int do_filldir_main(struct gfs2_inode *dip, u64 *offset,
                        *offset = off;
                }
 
-               gfs2_inum_in(&inum, (char *)&dent->de_inum);
-
                error = filldir(opaque, (const char *)(dent + 1),
                                be16_to_cpu(dent->de_name_len),
-                               off, &inum,
+                               off, be64_to_cpu(dent->de_inum.no_addr),
                                be16_to_cpu(dent->de_type));
                if (error)
                        return 1;
@@ -1262,8 +1259,8 @@ static int do_filldir_main(struct gfs2_inode *dip, u64 *offset,
 }
 
 static int gfs2_dir_read_leaf(struct inode *inode, u64 *offset, void *opaque,
-                             gfs2_filldir_t filldir, int *copied,
-                             unsigned *depth, u64 leaf_no)
+                             filldir_t filldir, int *copied, unsigned *depth,
+                             u64 leaf_no)
 {
        struct gfs2_inode *ip = GFS2_I(inode);
        struct buffer_head *bh;
@@ -1343,7 +1340,7 @@ out:
  */
 
 static int dir_e_read(struct inode *inode, u64 *offset, void *opaque,
-                     gfs2_filldir_t filldir)
+                     filldir_t filldir)
 {
        struct gfs2_inode *dip = GFS2_I(inode);
        struct gfs2_sbd *sdp = GFS2_SB(inode);
@@ -1402,7 +1399,7 @@ out:
 }
 
 int gfs2_dir_read(struct inode *inode, u64 *offset, void *opaque,
-                 gfs2_filldir_t filldir)
+                 filldir_t filldir)
 {
        struct gfs2_inode *dip = GFS2_I(inode);
        struct dirent_gather g;
@@ -1568,7 +1565,7 @@ int gfs2_dir_add(struct inode *inode, const struct qstr *name,
                                break;
                        gfs2_trans_add_bh(ip->i_gl, bh, 1);
                        ip->i_di.di_entries++;
-                       ip->i_inode.i_mtime.tv_sec = ip->i_inode.i_ctime.tv_sec = get_seconds();
+                       ip->i_inode.i_mtime = ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                        gfs2_dinode_out(ip, bh->b_data);
                        brelse(bh);
                        error = 0;
@@ -1654,7 +1651,7 @@ int gfs2_dir_del(struct gfs2_inode *dip, const struct qstr *name)
                gfs2_consist_inode(dip);
        gfs2_trans_add_bh(dip->i_gl, bh, 1);
        dip->i_di.di_entries--;
-       dip->i_inode.i_mtime.tv_sec = dip->i_inode.i_ctime.tv_sec = get_seconds();
+       dip->i_inode.i_mtime = dip->i_inode.i_ctime = CURRENT_TIME_SEC;
        gfs2_dinode_out(dip, bh->b_data);
        brelse(bh);
        mark_inode_dirty(&dip->i_inode);
@@ -1702,7 +1699,7 @@ int gfs2_dir_mvino(struct gfs2_inode *dip, const struct qstr *filename,
                gfs2_trans_add_bh(dip->i_gl, bh, 1);
        }
 
-       dip->i_inode.i_mtime.tv_sec = dip->i_inode.i_ctime.tv_sec = get_seconds();
+       dip->i_inode.i_mtime = dip->i_inode.i_ctime = CURRENT_TIME_SEC;
        gfs2_dinode_out(dip, bh->b_data);
        brelse(bh);
        return 0;
index b21b33668a5bc4ee2ea29c7658c2f8f343232900..48fe89046bbad39af0277bb3a38d0e1f7f5de813 100644 (file)
@@ -16,30 +16,13 @@ struct inode;
 struct gfs2_inode;
 struct gfs2_inum;
 
-/**
- * gfs2_filldir_t - Report a directory entry to the caller of gfs2_dir_read()
- * @opaque: opaque data used by the function
- * @name: the name of the directory entry
- * @length: the length of the name
- * @offset: the entry's offset in the directory
- * @inum: the inode number the entry points to
- * @type: the type of inode the entry points to
- *
- * Returns: 0 on success, 1 if buffer full
- */
-
-typedef int (*gfs2_filldir_t) (void *opaque,
-                             const char *name, unsigned int length,
-                             u64 offset,
-                             struct gfs2_inum_host *inum, unsigned int type);
-
 int gfs2_dir_search(struct inode *dir, const struct qstr *filename,
                    struct gfs2_inum_host *inum, unsigned int *type);
 int gfs2_dir_add(struct inode *inode, const struct qstr *filename,
                 const struct gfs2_inum_host *inum, unsigned int type);
 int gfs2_dir_del(struct gfs2_inode *dip, const struct qstr *filename);
-int gfs2_dir_read(struct inode *inode, u64 * offset, void *opaque,
-                 gfs2_filldir_t filldir);
+int gfs2_dir_read(struct inode *inode, u64 *offset, void *opaque,
+                 filldir_t filldir);
 int gfs2_dir_mvino(struct gfs2_inode *dip, const struct qstr *filename,
                   struct gfs2_inum_host *new_inum, unsigned int new_type);
 
index ebebbdcd7057cf2b2dd1b700eb1ac18aadab5a92..0c83c7f4dda85f4d7c12cce58c4cf1dbcb8db9a6 100644 (file)
@@ -301,7 +301,7 @@ static int ea_dealloc_unstuffed(struct gfs2_inode *ip, struct buffer_head *bh,
 
        error = gfs2_meta_inode_buffer(ip, &dibh);
        if (!error) {
-               ip->i_inode.i_ctime.tv_sec = get_seconds();
+               ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                gfs2_trans_add_bh(ip->i_gl, dibh, 1);
                gfs2_dinode_out(ip, dibh->b_data);
                brelse(dibh);
@@ -718,7 +718,7 @@ static int ea_alloc_skeleton(struct gfs2_inode *ip, struct gfs2_ea_request *er,
                                            (er->er_mode & S_IFMT));
                        ip->i_inode.i_mode = er->er_mode;
                }
-               ip->i_inode.i_ctime.tv_sec = get_seconds();
+               ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                gfs2_trans_add_bh(ip->i_gl, dibh, 1);
                gfs2_dinode_out(ip, dibh->b_data);
                brelse(dibh);
@@ -853,7 +853,7 @@ static int ea_set_simple_noalloc(struct gfs2_inode *ip, struct buffer_head *bh,
                        (ip->i_inode.i_mode & S_IFMT) == (er->er_mode & S_IFMT));
                ip->i_inode.i_mode = er->er_mode;
        }
-       ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_ctime = CURRENT_TIME_SEC;
        gfs2_trans_add_bh(ip->i_gl, dibh, 1);
        gfs2_dinode_out(ip, dibh->b_data);
        brelse(dibh);
@@ -1134,7 +1134,7 @@ static int ea_remove_stuffed(struct gfs2_inode *ip, struct gfs2_ea_location *el)
 
        error = gfs2_meta_inode_buffer(ip, &dibh);
        if (!error) {
-               ip->i_inode.i_ctime.tv_sec = get_seconds();
+               ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                gfs2_trans_add_bh(ip->i_gl, dibh, 1);
                gfs2_dinode_out(ip, dibh->b_data);
                brelse(dibh);
index 438146904b5839727ac2c378c5df87f574cfedaa..6618c1190252881f6aae8266d7303deec76398ae 100644 (file)
@@ -19,6 +19,8 @@
 #include <linux/gfs2_ondisk.h>
 #include <linux/list.h>
 #include <linux/lm_interface.h>
+#include <linux/wait.h>
+#include <linux/rwsem.h>
 #include <asm/uaccess.h>
 
 #include "gfs2.h"
 #include "super.h"
 #include "util.h"
 
-struct greedy {
-       struct gfs2_holder gr_gh;
-       struct delayed_work gr_work;
-};
-
 struct gfs2_gl_hash_bucket {
         struct hlist_head hb_list;
 };
@@ -47,6 +44,9 @@ typedef void (*glock_examiner) (struct gfs2_glock * gl);
 static int gfs2_dump_lockstate(struct gfs2_sbd *sdp);
 static int dump_glock(struct gfs2_glock *gl);
 static int dump_inode(struct gfs2_inode *ip);
+static void gfs2_glock_xmote_th(struct gfs2_holder *gh);
+static void gfs2_glock_drop_th(struct gfs2_glock *gl);
+static DECLARE_RWSEM(gfs2_umount_flush_sem);
 
 #define GFS2_GL_HASH_SHIFT      15
 #define GFS2_GL_HASH_SIZE       (1 << GFS2_GL_HASH_SHIFT)
@@ -212,30 +212,6 @@ out:
        return rv;
 }
 
-/**
- * queue_empty - check to see if a glock's queue is empty
- * @gl: the glock
- * @head: the head of the queue to check
- *
- * This function protects the list in the event that a process already
- * has a holder on the list and is adding a second holder for itself.
- * The glmutex lock is what generally prevents processes from working
- * on the same glock at once, but the special case of adding a second
- * holder for yourself ("recursive" locking) doesn't involve locking
- * glmutex, making the spin lock necessary.
- *
- * Returns: 1 if the queue is empty
- */
-
-static inline int queue_empty(struct gfs2_glock *gl, struct list_head *head)
-{
-       int empty;
-       spin_lock(&gl->gl_spin);
-       empty = list_empty(head);
-       spin_unlock(&gl->gl_spin);
-       return empty;
-}
-
 /**
  * search_bucket() - Find struct gfs2_glock by lock number
  * @bucket: the bucket to search
@@ -395,11 +371,6 @@ void gfs2_holder_init(struct gfs2_glock *gl, unsigned int state, unsigned flags,
        gh->gh_flags = flags;
        gh->gh_error = 0;
        gh->gh_iflags = 0;
-       init_completion(&gh->gh_wait);
-
-       if (gh->gh_state == LM_ST_EXCLUSIVE)
-               gh->gh_flags |= GL_LOCAL_EXCL;
-
        gfs2_glock_hold(gl);
 }
 
@@ -417,9 +388,6 @@ void gfs2_holder_reinit(unsigned int state, unsigned flags, struct gfs2_holder *
 {
        gh->gh_state = state;
        gh->gh_flags = flags;
-       if (gh->gh_state == LM_ST_EXCLUSIVE)
-               gh->gh_flags |= GL_LOCAL_EXCL;
-
        gh->gh_iflags &= 1 << HIF_ALLOCED;
        gh->gh_ip = (unsigned long)__builtin_return_address(0);
 }
@@ -479,6 +447,29 @@ static void gfs2_holder_put(struct gfs2_holder *gh)
        kfree(gh);
 }
 
+static void gfs2_holder_dispose_or_wake(struct gfs2_holder *gh)
+{
+       if (test_bit(HIF_DEALLOC, &gh->gh_iflags)) {
+               gfs2_holder_put(gh);
+               return;
+       }
+       clear_bit(HIF_WAIT, &gh->gh_iflags);
+       smp_mb();
+       wake_up_bit(&gh->gh_iflags, HIF_WAIT);
+}
+
+static int holder_wait(void *word)
+{
+        schedule();
+        return 0;
+}
+
+static void wait_on_holder(struct gfs2_holder *gh)
+{
+       might_sleep();
+       wait_on_bit(&gh->gh_iflags, HIF_WAIT, holder_wait, TASK_UNINTERRUPTIBLE);
+}
+
 /**
  * rq_mutex - process a mutex request in the queue
  * @gh: the glock holder
@@ -493,7 +484,9 @@ static int rq_mutex(struct gfs2_holder *gh)
        list_del_init(&gh->gh_list);
        /*  gh->gh_error never examined.  */
        set_bit(GLF_LOCK, &gl->gl_flags);
-       complete(&gh->gh_wait);
+       clear_bit(HIF_WAIT, &gh->gh_iflags);
+       smp_mb();
+       wake_up_bit(&gh->gh_iflags, HIF_WAIT);
 
        return 1;
 }
@@ -511,7 +504,6 @@ static int rq_promote(struct gfs2_holder *gh)
 {
        struct gfs2_glock *gl = gh->gh_gl;
        struct gfs2_sbd *sdp = gl->gl_sbd;
-       const struct gfs2_glock_operations *glops = gl->gl_ops;
 
        if (!relaxed_state_ok(gl->gl_state, gh->gh_state, gh->gh_flags)) {
                if (list_empty(&gl->gl_holders)) {
@@ -526,7 +518,7 @@ static int rq_promote(struct gfs2_holder *gh)
                                gfs2_reclaim_glock(sdp);
                        }
 
-                       glops->go_xmote_th(gl, gh->gh_state, gh->gh_flags);
+                       gfs2_glock_xmote_th(gh);
                        spin_lock(&gl->gl_spin);
                }
                return 1;
@@ -537,11 +529,11 @@ static int rq_promote(struct gfs2_holder *gh)
                set_bit(GLF_LOCK, &gl->gl_flags);
        } else {
                struct gfs2_holder *next_gh;
-               if (gh->gh_flags & GL_LOCAL_EXCL)
+               if (gh->gh_state == LM_ST_EXCLUSIVE)
                        return 1;
                next_gh = list_entry(gl->gl_holders.next, struct gfs2_holder,
                                     gh_list);
-               if (next_gh->gh_flags & GL_LOCAL_EXCL)
+               if (next_gh->gh_state == LM_ST_EXCLUSIVE)
                         return 1;
        }
 
@@ -549,7 +541,7 @@ static int rq_promote(struct gfs2_holder *gh)
        gh->gh_error = 0;
        set_bit(HIF_HOLDER, &gh->gh_iflags);
 
-       complete(&gh->gh_wait);
+       gfs2_holder_dispose_or_wake(gh);
 
        return 0;
 }
@@ -564,7 +556,6 @@ static int rq_promote(struct gfs2_holder *gh)
 static int rq_demote(struct gfs2_holder *gh)
 {
        struct gfs2_glock *gl = gh->gh_gl;
-       const struct gfs2_glock_operations *glops = gl->gl_ops;
 
        if (!list_empty(&gl->gl_holders))
                return 1;
@@ -573,10 +564,7 @@ static int rq_demote(struct gfs2_holder *gh)
                list_del_init(&gh->gh_list);
                gh->gh_error = 0;
                spin_unlock(&gl->gl_spin);
-               if (test_bit(HIF_DEALLOC, &gh->gh_iflags))
-                       gfs2_holder_put(gh);
-               else
-                       complete(&gh->gh_wait);
+               gfs2_holder_dispose_or_wake(gh);
                spin_lock(&gl->gl_spin);
        } else {
                gl->gl_req_gh = gh;
@@ -585,9 +573,9 @@ static int rq_demote(struct gfs2_holder *gh)
 
                if (gh->gh_state == LM_ST_UNLOCKED ||
                    gl->gl_state != LM_ST_EXCLUSIVE)
-                       glops->go_drop_th(gl);
+                       gfs2_glock_drop_th(gl);
                else
-                       glops->go_xmote_th(gl, gh->gh_state, gh->gh_flags);
+                       gfs2_glock_xmote_th(gh);
 
                spin_lock(&gl->gl_spin);
        }
@@ -595,30 +583,6 @@ static int rq_demote(struct gfs2_holder *gh)
        return 0;
 }
 
-/**
- * rq_greedy - process a queued request to drop greedy status
- * @gh: the glock holder
- *
- * Returns: 1 if the queue is blocked
- */
-
-static int rq_greedy(struct gfs2_holder *gh)
-{
-       struct gfs2_glock *gl = gh->gh_gl;
-
-       list_del_init(&gh->gh_list);
-       /*  gh->gh_error never examined.  */
-       clear_bit(GLF_GREEDY, &gl->gl_flags);
-       spin_unlock(&gl->gl_spin);
-
-       gfs2_holder_uninit(gh);
-       kfree(container_of(gh, struct greedy, gr_gh));
-
-       spin_lock(&gl->gl_spin);
-
-       return 0;
-}
-
 /**
  * run_queue - process holder structures on a glock
  * @gl: the glock
@@ -649,8 +613,6 @@ static void run_queue(struct gfs2_glock *gl)
 
                        if (test_bit(HIF_DEMOTE, &gh->gh_iflags))
                                blocked = rq_demote(gh);
-                       else if (test_bit(HIF_GREEDY, &gh->gh_iflags))
-                               blocked = rq_greedy(gh);
                        else
                                gfs2_assert_warn(gl->gl_sbd, 0);
 
@@ -684,6 +646,8 @@ static void gfs2_glmutex_lock(struct gfs2_glock *gl)
 
        gfs2_holder_init(gl, 0, 0, &gh);
        set_bit(HIF_MUTEX, &gh.gh_iflags);
+       if (test_and_set_bit(HIF_WAIT, &gh.gh_iflags))
+               BUG();
 
        spin_lock(&gl->gl_spin);
        if (test_and_set_bit(GLF_LOCK, &gl->gl_flags)) {
@@ -691,11 +655,13 @@ static void gfs2_glmutex_lock(struct gfs2_glock *gl)
        } else {
                gl->gl_owner = current;
                gl->gl_ip = (unsigned long)__builtin_return_address(0);
-               complete(&gh.gh_wait);
+               clear_bit(HIF_WAIT, &gh.gh_iflags);
+               smp_mb();
+               wake_up_bit(&gh.gh_iflags, HIF_WAIT);
        }
        spin_unlock(&gl->gl_spin);
 
-       wait_for_completion(&gh.gh_wait);
+       wait_on_holder(&gh);
        gfs2_holder_uninit(&gh);
 }
 
@@ -774,6 +740,7 @@ restart:
                        return;
                set_bit(HIF_DEMOTE, &new_gh->gh_iflags);
                set_bit(HIF_DEALLOC, &new_gh->gh_iflags);
+               set_bit(HIF_WAIT, &new_gh->gh_iflags);
 
                goto restart;
        }
@@ -825,7 +792,7 @@ static void xmote_bh(struct gfs2_glock *gl, unsigned int ret)
        int op_done = 1;
 
        gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
-       gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
+       gfs2_assert_warn(sdp, list_empty(&gl->gl_holders));
        gfs2_assert_warn(sdp, !(ret & LM_OUT_ASYNC));
 
        state_change(gl, ret & LM_OUT_ST_MASK);
@@ -908,12 +875,8 @@ static void xmote_bh(struct gfs2_glock *gl, unsigned int ret)
 
        gfs2_glock_put(gl);
 
-       if (gh) {
-               if (test_bit(HIF_DEALLOC, &gh->gh_iflags))
-                       gfs2_holder_put(gh);
-               else
-                       complete(&gh->gh_wait);
-       }
+       if (gh)
+               gfs2_holder_dispose_or_wake(gh);
 }
 
 /**
@@ -924,23 +887,26 @@ static void xmote_bh(struct gfs2_glock *gl, unsigned int ret)
  *
  */
 
-void gfs2_glock_xmote_th(struct gfs2_glock *gl, unsigned int state, int flags)
+void gfs2_glock_xmote_th(struct gfs2_holder *gh)
 {
+       struct gfs2_glock *gl = gh->gh_gl;
        struct gfs2_sbd *sdp = gl->gl_sbd;
+       int flags = gh->gh_flags;
+       unsigned state = gh->gh_state;
        const struct gfs2_glock_operations *glops = gl->gl_ops;
        int lck_flags = flags & (LM_FLAG_TRY | LM_FLAG_TRY_1CB |
                                 LM_FLAG_NOEXP | LM_FLAG_ANY |
                                 LM_FLAG_PRIORITY);
        unsigned int lck_ret;
 
+       if (glops->go_xmote_th)
+               glops->go_xmote_th(gl);
+
        gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
-       gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
+       gfs2_assert_warn(sdp, list_empty(&gl->gl_holders));
        gfs2_assert_warn(sdp, state != LM_ST_UNLOCKED);
        gfs2_assert_warn(sdp, state != gl->gl_state);
 
-       if (gl->gl_state == LM_ST_EXCLUSIVE && glops->go_sync)
-               glops->go_sync(gl);
-
        gfs2_glock_hold(gl);
        gl->gl_req_bh = xmote_bh;
 
@@ -971,10 +937,8 @@ static void drop_bh(struct gfs2_glock *gl, unsigned int ret)
        const struct gfs2_glock_operations *glops = gl->gl_ops;
        struct gfs2_holder *gh = gl->gl_req_gh;
 
-       clear_bit(GLF_PREFETCH, &gl->gl_flags);
-
        gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
-       gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
+       gfs2_assert_warn(sdp, list_empty(&gl->gl_holders));
        gfs2_assert_warn(sdp, !ret);
 
        state_change(gl, LM_ST_UNLOCKED);
@@ -1001,12 +965,8 @@ static void drop_bh(struct gfs2_glock *gl, unsigned int ret)
 
        gfs2_glock_put(gl);
 
-       if (gh) {
-               if (test_bit(HIF_DEALLOC, &gh->gh_iflags))
-                       gfs2_holder_put(gh);
-               else
-                       complete(&gh->gh_wait);
-       }
+       if (gh)
+               gfs2_holder_dispose_or_wake(gh);
 }
 
 /**
@@ -1015,19 +975,19 @@ static void drop_bh(struct gfs2_glock *gl, unsigned int ret)
  *
  */
 
-void gfs2_glock_drop_th(struct gfs2_glock *gl)
+static void gfs2_glock_drop_th(struct gfs2_glock *gl)
 {
        struct gfs2_sbd *sdp = gl->gl_sbd;
        const struct gfs2_glock_operations *glops = gl->gl_ops;
        unsigned int ret;
 
+       if (glops->go_drop_th)
+               glops->go_drop_th(gl);
+
        gfs2_assert_warn(sdp, test_bit(GLF_LOCK, &gl->gl_flags));
-       gfs2_assert_warn(sdp, queue_empty(gl, &gl->gl_holders));
+       gfs2_assert_warn(sdp, list_empty(&gl->gl_holders));
        gfs2_assert_warn(sdp, gl->gl_state != LM_ST_UNLOCKED);
 
-       if (gl->gl_state == LM_ST_EXCLUSIVE && glops->go_sync)
-               glops->go_sync(gl);
-
        gfs2_glock_hold(gl);
        gl->gl_req_bh = drop_bh;
 
@@ -1107,8 +1067,7 @@ static int glock_wait_internal(struct gfs2_holder *gh)
        if (gh->gh_flags & LM_FLAG_PRIORITY)
                do_cancels(gh);
 
-       wait_for_completion(&gh->gh_wait);
-
+       wait_on_holder(gh);
        if (gh->gh_error)
                return gh->gh_error;
 
@@ -1164,6 +1123,8 @@ static void add_to_queue(struct gfs2_holder *gh)
        struct gfs2_holder *existing;
 
        BUG_ON(!gh->gh_owner);
+       if (test_and_set_bit(HIF_WAIT, &gh->gh_iflags))
+               BUG();
 
        existing = find_holder_by_owner(&gl->gl_holders, gh->gh_owner);
        if (existing) {
@@ -1227,8 +1188,6 @@ restart:
                }
        }
 
-       clear_bit(GLF_PREFETCH, &gl->gl_flags);
-
        return error;
 }
 
@@ -1320,98 +1279,6 @@ void gfs2_glock_dq(struct gfs2_holder *gh)
        spin_unlock(&gl->gl_spin);
 }
 
-/**
- * gfs2_glock_prefetch - Try to prefetch a glock
- * @gl: the glock
- * @state: the state to prefetch in
- * @flags: flags passed to go_xmote_th()
- *
- */
-
-static void gfs2_glock_prefetch(struct gfs2_glock *gl, unsigned int state,
-                               int flags)
-{
-       const struct gfs2_glock_operations *glops = gl->gl_ops;
-
-       spin_lock(&gl->gl_spin);
-
-       if (test_bit(GLF_LOCK, &gl->gl_flags) || !list_empty(&gl->gl_holders) ||
-           !list_empty(&gl->gl_waiters1) || !list_empty(&gl->gl_waiters2) ||
-           !list_empty(&gl->gl_waiters3) ||
-           relaxed_state_ok(gl->gl_state, state, flags)) {
-               spin_unlock(&gl->gl_spin);
-               return;
-       }
-
-       set_bit(GLF_PREFETCH, &gl->gl_flags);
-       set_bit(GLF_LOCK, &gl->gl_flags);
-       spin_unlock(&gl->gl_spin);
-
-       glops->go_xmote_th(gl, state, flags);
-}
-
-static void greedy_work(struct work_struct *work)
-{
-       struct greedy *gr = container_of(work, struct greedy, gr_work.work);
-       struct gfs2_holder *gh = &gr->gr_gh;
-       struct gfs2_glock *gl = gh->gh_gl;
-       const struct gfs2_glock_operations *glops = gl->gl_ops;
-
-       clear_bit(GLF_SKIP_WAITERS2, &gl->gl_flags);
-
-       if (glops->go_greedy)
-               glops->go_greedy(gl);
-
-       spin_lock(&gl->gl_spin);
-
-       if (list_empty(&gl->gl_waiters2)) {
-               clear_bit(GLF_GREEDY, &gl->gl_flags);
-               spin_unlock(&gl->gl_spin);
-               gfs2_holder_uninit(gh);
-               kfree(gr);
-       } else {
-               gfs2_glock_hold(gl);
-               list_add_tail(&gh->gh_list, &gl->gl_waiters2);
-               run_queue(gl);
-               spin_unlock(&gl->gl_spin);
-               gfs2_glock_put(gl);
-       }
-}
-
-/**
- * gfs2_glock_be_greedy -
- * @gl:
- * @time:
- *
- * Returns: 0 if go_greedy will be called, 1 otherwise
- */
-
-int gfs2_glock_be_greedy(struct gfs2_glock *gl, unsigned int time)
-{
-       struct greedy *gr;
-       struct gfs2_holder *gh;
-
-       if (!time || gl->gl_sbd->sd_args.ar_localcaching ||
-           test_and_set_bit(GLF_GREEDY, &gl->gl_flags))
-               return 1;
-
-       gr = kmalloc(sizeof(struct greedy), GFP_KERNEL);
-       if (!gr) {
-               clear_bit(GLF_GREEDY, &gl->gl_flags);
-               return 1;
-       }
-       gh = &gr->gr_gh;
-
-       gfs2_holder_init(gl, 0, 0, gh);
-       set_bit(HIF_GREEDY, &gh->gh_iflags);
-       INIT_DELAYED_WORK(&gr->gr_work, greedy_work);
-
-       set_bit(GLF_SKIP_WAITERS2, &gl->gl_flags);
-       schedule_delayed_work(&gr->gr_work, time);
-
-       return 0;
-}
-
 /**
  * gfs2_glock_dq_uninit - dequeue a holder from a glock and initialize it
  * @gh: the holder structure
@@ -1470,10 +1337,7 @@ static int glock_compare(const void *arg_a, const void *arg_b)
                return 1;
        if (a->ln_number < b->ln_number)
                return -1;
-       if (gh_a->gh_state == LM_ST_SHARED && gh_b->gh_state == LM_ST_EXCLUSIVE)
-               return 1;
-       if (!(gh_a->gh_flags & GL_LOCAL_EXCL) && (gh_b->gh_flags & GL_LOCAL_EXCL))
-               return 1;
+       BUG_ON(gh_a->gh_gl->gl_ops->go_type == gh_b->gh_gl->gl_ops->go_type);
        return 0;
 }
 
@@ -1617,34 +1481,6 @@ void gfs2_glock_dq_uninit_m(unsigned int num_gh, struct gfs2_holder *ghs)
                gfs2_glock_dq_uninit(&ghs[x]);
 }
 
-/**
- * gfs2_glock_prefetch_num - prefetch a glock based on lock number
- * @sdp: the filesystem
- * @number: the lock number
- * @glops: the glock operations for the type of glock
- * @state: the state to acquire the glock in
- * @flags: modifier flags for the aquisition
- *
- * Returns: errno
- */
-
-void gfs2_glock_prefetch_num(struct gfs2_sbd *sdp, u64 number,
-                            const struct gfs2_glock_operations *glops,
-                            unsigned int state, int flags)
-{
-       struct gfs2_glock *gl;
-       int error;
-
-       if (atomic_read(&sdp->sd_reclaim_count) <
-           gfs2_tune_get(sdp, gt_reclaim_limit)) {
-               error = gfs2_glock_get(sdp, number, glops, CREATE, &gl);
-               if (!error) {
-                       gfs2_glock_prefetch(gl, state, flags);
-                       gfs2_glock_put(gl);
-               }
-       }
-}
-
 /**
  * gfs2_lvb_hold - attach a LVB from a glock
  * @gl: The glock in question
@@ -1703,8 +1539,6 @@ static void blocking_cb(struct gfs2_sbd *sdp, struct lm_lockname *name,
        if (!gl)
                return;
 
-       if (gl->gl_ops->go_callback)
-               gl->gl_ops->go_callback(gl, state);
        handle_callback(gl, state);
 
        spin_lock(&gl->gl_spin);
@@ -1746,12 +1580,14 @@ void gfs2_glock_cb(void *cb_data, unsigned int type, void *data)
                struct lm_async_cb *async = data;
                struct gfs2_glock *gl;
 
+               down_read(&gfs2_umount_flush_sem);
                gl = gfs2_glock_find(sdp, &async->lc_name);
                if (gfs2_assert_warn(sdp, gl))
                        return;
                if (!gfs2_assert_warn(sdp, gl->gl_req_bh))
                        gl->gl_req_bh(gl, async->lc_ret);
                gfs2_glock_put(gl);
+               up_read(&gfs2_umount_flush_sem);
                return;
        }
 
@@ -1781,15 +1617,11 @@ void gfs2_glock_cb(void *cb_data, unsigned int type, void *data)
 
 static int demote_ok(struct gfs2_glock *gl)
 {
-       struct gfs2_sbd *sdp = gl->gl_sbd;
        const struct gfs2_glock_operations *glops = gl->gl_ops;
        int demote = 1;
 
        if (test_bit(GLF_STICKY, &gl->gl_flags))
                demote = 0;
-       else if (test_bit(GLF_PREFETCH, &gl->gl_flags))
-               demote = time_after_eq(jiffies, gl->gl_stamp +
-                                   gfs2_tune_get(sdp, gt_prefetch_secs) * HZ);
        else if (glops->go_demote_ok)
                demote = glops->go_demote_ok(gl);
 
@@ -1845,7 +1677,7 @@ void gfs2_reclaim_glock(struct gfs2_sbd *sdp)
        atomic_inc(&sdp->sd_reclaimed);
 
        if (gfs2_glmutex_trylock(gl)) {
-               if (queue_empty(gl, &gl->gl_holders) &&
+               if (list_empty(&gl->gl_holders) &&
                    gl->gl_state != LM_ST_UNLOCKED && demote_ok(gl))
                        handle_callback(gl, LM_ST_UNLOCKED);
                gfs2_glmutex_unlock(gl);
@@ -1909,7 +1741,7 @@ static void scan_glock(struct gfs2_glock *gl)
                return;
 
        if (gfs2_glmutex_trylock(gl)) {
-               if (queue_empty(gl, &gl->gl_holders) &&
+               if (list_empty(&gl->gl_holders) &&
                    gl->gl_state != LM_ST_UNLOCKED && demote_ok(gl))
                        goto out_schedule;
                gfs2_glmutex_unlock(gl);
@@ -1958,7 +1790,7 @@ static void clear_glock(struct gfs2_glock *gl)
        }
 
        if (gfs2_glmutex_trylock(gl)) {
-               if (queue_empty(gl, &gl->gl_holders) &&
+               if (list_empty(&gl->gl_holders) &&
                    gl->gl_state != LM_ST_UNLOCKED)
                        handle_callback(gl, LM_ST_UNLOCKED);
                gfs2_glmutex_unlock(gl);
@@ -2000,7 +1832,9 @@ void gfs2_gl_hash_clear(struct gfs2_sbd *sdp, int wait)
                        t = jiffies;
                }
 
+               down_write(&gfs2_umount_flush_sem);
                invalidate_inodes(sdp->sd_vfs);
+               up_write(&gfs2_umount_flush_sem);
                msleep(10);
        }
 }
index fb39108fc05c2e378f6d349775ab808a986e3594..f50e40ceca432c721d429eaa20ddf4bd7c29b4e5 100644 (file)
@@ -20,7 +20,6 @@
 #define LM_FLAG_ANY            0x00000008
 #define LM_FLAG_PRIORITY       0x00000010 */
 
-#define GL_LOCAL_EXCL          0x00000020
 #define GL_ASYNC               0x00000040
 #define GL_EXACT               0x00000080
 #define GL_SKIP                        0x00000100
@@ -83,17 +82,11 @@ void gfs2_holder_init(struct gfs2_glock *gl, unsigned int state, unsigned flags,
 void gfs2_holder_reinit(unsigned int state, unsigned flags,
                        struct gfs2_holder *gh);
 void gfs2_holder_uninit(struct gfs2_holder *gh);
-
-void gfs2_glock_xmote_th(struct gfs2_glock *gl, unsigned int state, int flags);
-void gfs2_glock_drop_th(struct gfs2_glock *gl);
-
 int gfs2_glock_nq(struct gfs2_holder *gh);
 int gfs2_glock_poll(struct gfs2_holder *gh);
 int gfs2_glock_wait(struct gfs2_holder *gh);
 void gfs2_glock_dq(struct gfs2_holder *gh);
 
-int gfs2_glock_be_greedy(struct gfs2_glock *gl, unsigned int time);
-
 void gfs2_glock_dq_uninit(struct gfs2_holder *gh);
 int gfs2_glock_nq_num(struct gfs2_sbd *sdp,
                      u64 number, const struct gfs2_glock_operations *glops,
@@ -103,10 +96,6 @@ int gfs2_glock_nq_m(unsigned int num_gh, struct gfs2_holder *ghs);
 void gfs2_glock_dq_m(unsigned int num_gh, struct gfs2_holder *ghs);
 void gfs2_glock_dq_uninit_m(unsigned int num_gh, struct gfs2_holder *ghs);
 
-void gfs2_glock_prefetch_num(struct gfs2_sbd *sdp, u64 number,
-                            const struct gfs2_glock_operations *glops,
-                            unsigned int state, int flags);
-
 /**
  * gfs2_glock_nq_init - intialize a holder and enqueue it on a glock
  * @gl: the glock
index b068d10bcb6e75b981eef0b977cd4ca912743960..c4b0391b7aa23cbb5433d2235cc202793cf5797e 100644 (file)
@@ -117,12 +117,14 @@ static void gfs2_pte_inval(struct gfs2_glock *gl)
 
 static void meta_go_sync(struct gfs2_glock *gl)
 {
+       if (gl->gl_state != LM_ST_EXCLUSIVE)
+               return;
+
        if (test_and_clear_bit(GLF_DIRTY, &gl->gl_flags)) {
                gfs2_log_flush(gl->gl_sbd, gl);
                gfs2_meta_sync(gl);
                gfs2_ail_empty_gl(gl);
        }
-
 }
 
 /**
@@ -141,6 +143,37 @@ static void meta_go_inval(struct gfs2_glock *gl, int flags)
        gl->gl_vn++;
 }
 
+/**
+ * inode_go_sync - Sync the dirty data and/or metadata for an inode glock
+ * @gl: the glock protecting the inode
+ *
+ */
+
+static void inode_go_sync(struct gfs2_glock *gl)
+{
+       struct gfs2_inode *ip = gl->gl_object;
+
+       if (ip && !S_ISREG(ip->i_inode.i_mode))
+               ip = NULL;
+
+       if (test_bit(GLF_DIRTY, &gl->gl_flags)) {
+               gfs2_log_flush(gl->gl_sbd, gl);
+               if (ip)
+                       filemap_fdatawrite(ip->i_inode.i_mapping);
+               gfs2_meta_sync(gl);
+               if (ip) {
+                       struct address_space *mapping = ip->i_inode.i_mapping;
+                       int error = filemap_fdatawait(mapping);
+                       if (error == -ENOSPC)
+                               set_bit(AS_ENOSPC, &mapping->flags);
+                       else if (error)
+                               set_bit(AS_EIO, &mapping->flags);
+               }
+               clear_bit(GLF_DIRTY, &gl->gl_flags);
+               gfs2_ail_empty_gl(gl);
+       }
+}
+
 /**
  * inode_go_xmote_th - promote/demote a glock
  * @gl: the glock
@@ -149,12 +182,12 @@ static void meta_go_inval(struct gfs2_glock *gl, int flags)
  *
  */
 
-static void inode_go_xmote_th(struct gfs2_glock *gl, unsigned int state,
-                             int flags)
+static void inode_go_xmote_th(struct gfs2_glock *gl)
 {
        if (gl->gl_state != LM_ST_UNLOCKED)
                gfs2_pte_inval(gl);
-       gfs2_glock_xmote_th(gl, state, flags);
+       if (gl->gl_state == LM_ST_EXCLUSIVE)
+               inode_go_sync(gl);
 }
 
 /**
@@ -189,38 +222,8 @@ static void inode_go_xmote_bh(struct gfs2_glock *gl)
 static void inode_go_drop_th(struct gfs2_glock *gl)
 {
        gfs2_pte_inval(gl);
-       gfs2_glock_drop_th(gl);
-}
-
-/**
- * inode_go_sync - Sync the dirty data and/or metadata for an inode glock
- * @gl: the glock protecting the inode
- *
- */
-
-static void inode_go_sync(struct gfs2_glock *gl)
-{
-       struct gfs2_inode *ip = gl->gl_object;
-
-       if (ip && !S_ISREG(ip->i_inode.i_mode))
-               ip = NULL;
-
-       if (test_bit(GLF_DIRTY, &gl->gl_flags)) {
-               gfs2_log_flush(gl->gl_sbd, gl);
-               if (ip)
-                       filemap_fdatawrite(ip->i_inode.i_mapping);
-               gfs2_meta_sync(gl);
-               if (ip) {
-                       struct address_space *mapping = ip->i_inode.i_mapping;
-                       int error = filemap_fdatawait(mapping);
-                       if (error == -ENOSPC)
-                               set_bit(AS_ENOSPC, &mapping->flags);
-                       else if (error)
-                               set_bit(AS_EIO, &mapping->flags);
-               }
-               clear_bit(GLF_DIRTY, &gl->gl_flags);
-               gfs2_ail_empty_gl(gl);
-       }
+       if (gl->gl_state == LM_ST_EXCLUSIVE)
+               inode_go_sync(gl);
 }
 
 /**
@@ -295,7 +298,7 @@ static int inode_go_lock(struct gfs2_holder *gh)
 
        if ((ip->i_di.di_flags & GFS2_DIF_TRUNC_IN_PROG) &&
            (gl->gl_state == LM_ST_EXCLUSIVE) &&
-           (gh->gh_flags & GL_LOCAL_EXCL))
+           (gh->gh_state == LM_ST_EXCLUSIVE))
                error = gfs2_truncatei_resume(ip);
 
        return error;
@@ -318,39 +321,6 @@ static void inode_go_unlock(struct gfs2_holder *gh)
                gfs2_meta_cache_flush(ip);
 }
 
-/**
- * inode_greedy -
- * @gl: the glock
- *
- */
-
-static void inode_greedy(struct gfs2_glock *gl)
-{
-       struct gfs2_sbd *sdp = gl->gl_sbd;
-       struct gfs2_inode *ip = gl->gl_object;
-       unsigned int quantum = gfs2_tune_get(sdp, gt_greedy_quantum);
-       unsigned int max = gfs2_tune_get(sdp, gt_greedy_max);
-       unsigned int new_time;
-
-       spin_lock(&ip->i_spin);
-
-       if (time_after(ip->i_last_pfault + quantum, jiffies)) {
-               new_time = ip->i_greedy + quantum;
-               if (new_time > max)
-                       new_time = max;
-       } else {
-               new_time = ip->i_greedy - quantum;
-               if (!new_time || new_time > max)
-                       new_time = 1;
-       }
-
-       ip->i_greedy = new_time;
-
-       spin_unlock(&ip->i_spin);
-
-       iput(&ip->i_inode);
-}
-
 /**
  * rgrp_go_demote_ok - Check to see if it's ok to unlock a RG's glock
  * @gl: the glock
@@ -398,8 +368,7 @@ static void rgrp_go_unlock(struct gfs2_holder *gh)
  *
  */
 
-static void trans_go_xmote_th(struct gfs2_glock *gl, unsigned int state,
-                             int flags)
+static void trans_go_xmote_th(struct gfs2_glock *gl)
 {
        struct gfs2_sbd *sdp = gl->gl_sbd;
 
@@ -408,8 +377,6 @@ static void trans_go_xmote_th(struct gfs2_glock *gl, unsigned int state,
                gfs2_meta_syncfs(sdp);
                gfs2_log_shutdown(sdp);
        }
-
-       gfs2_glock_xmote_th(gl, state, flags);
 }
 
 /**
@@ -461,8 +428,6 @@ static void trans_go_drop_th(struct gfs2_glock *gl)
                gfs2_meta_syncfs(sdp);
                gfs2_log_shutdown(sdp);
        }
-
-       gfs2_glock_drop_th(gl);
 }
 
 /**
@@ -478,8 +443,8 @@ static int quota_go_demote_ok(struct gfs2_glock *gl)
 }
 
 const struct gfs2_glock_operations gfs2_meta_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
+       .go_xmote_th = meta_go_sync,
+       .go_drop_th = meta_go_sync,
        .go_type = LM_TYPE_META,
 };
 
@@ -487,19 +452,14 @@ const struct gfs2_glock_operations gfs2_inode_glops = {
        .go_xmote_th = inode_go_xmote_th,
        .go_xmote_bh = inode_go_xmote_bh,
        .go_drop_th = inode_go_drop_th,
-       .go_sync = inode_go_sync,
        .go_inval = inode_go_inval,
        .go_demote_ok = inode_go_demote_ok,
        .go_lock = inode_go_lock,
        .go_unlock = inode_go_unlock,
-       .go_greedy = inode_greedy,
        .go_type = LM_TYPE_INODE,
 };
 
 const struct gfs2_glock_operations gfs2_rgrp_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
-       .go_sync = meta_go_sync,
        .go_inval = meta_go_inval,
        .go_demote_ok = rgrp_go_demote_ok,
        .go_lock = rgrp_go_lock,
@@ -515,33 +475,23 @@ const struct gfs2_glock_operations gfs2_trans_glops = {
 };
 
 const struct gfs2_glock_operations gfs2_iopen_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
        .go_type = LM_TYPE_IOPEN,
 };
 
 const struct gfs2_glock_operations gfs2_flock_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
        .go_type = LM_TYPE_FLOCK,
 };
 
 const struct gfs2_glock_operations gfs2_nondisk_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
        .go_type = LM_TYPE_NONDISK,
 };
 
 const struct gfs2_glock_operations gfs2_quota_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
        .go_demote_ok = quota_go_demote_ok,
        .go_type = LM_TYPE_QUOTA,
 };
 
 const struct gfs2_glock_operations gfs2_journal_glops = {
-       .go_xmote_th = gfs2_glock_xmote_th,
-       .go_drop_th = gfs2_glock_drop_th,
        .go_type = LM_TYPE_JOURNAL,
 };
 
index 734421edae85a21ac02bf00e6735228208965919..12c80fd28db51d8f3b92fdebddcfee954bc926d5 100644 (file)
@@ -101,17 +101,14 @@ struct gfs2_bufdata {
 };
 
 struct gfs2_glock_operations {
-       void (*go_xmote_th) (struct gfs2_glock *gl, unsigned int state, int flags);
+       void (*go_xmote_th) (struct gfs2_glock *gl);
        void (*go_xmote_bh) (struct gfs2_glock *gl);
        void (*go_drop_th) (struct gfs2_glock *gl);
        void (*go_drop_bh) (struct gfs2_glock *gl);
-       void (*go_sync) (struct gfs2_glock *gl);
        void (*go_inval) (struct gfs2_glock *gl, int flags);
        int (*go_demote_ok) (struct gfs2_glock *gl);
        int (*go_lock) (struct gfs2_holder *gh);
        void (*go_unlock) (struct gfs2_holder *gh);
-       void (*go_callback) (struct gfs2_glock *gl, unsigned int state);
-       void (*go_greedy) (struct gfs2_glock *gl);
        const int go_type;
 };
 
@@ -120,7 +117,6 @@ enum {
        HIF_MUTEX               = 0,
        HIF_PROMOTE             = 1,
        HIF_DEMOTE              = 2,
-       HIF_GREEDY              = 3,
 
        /* States */
        HIF_ALLOCED             = 4,
@@ -128,6 +124,7 @@ enum {
        HIF_HOLDER              = 6,
        HIF_FIRST               = 7,
        HIF_ABORTED             = 9,
+       HIF_WAIT                = 10,
 };
 
 struct gfs2_holder {
@@ -140,17 +137,14 @@ struct gfs2_holder {
 
        int gh_error;
        unsigned long gh_iflags;
-       struct completion gh_wait;
        unsigned long gh_ip;
 };
 
 enum {
        GLF_LOCK                = 1,
        GLF_STICKY              = 2,
-       GLF_PREFETCH            = 3,
        GLF_DIRTY               = 5,
        GLF_SKIP_WAITERS2       = 6,
-       GLF_GREEDY              = 7,
 };
 
 struct gfs2_glock {
@@ -167,7 +161,7 @@ struct gfs2_glock {
        unsigned long gl_ip;
        struct list_head gl_holders;
        struct list_head gl_waiters1;   /* HIF_MUTEX */
-       struct list_head gl_waiters2;   /* HIF_DEMOTE, HIF_GREEDY */
+       struct list_head gl_waiters2;   /* HIF_DEMOTE */
        struct list_head gl_waiters3;   /* HIF_PROMOTE */
 
        const struct gfs2_glock_operations *gl_ops;
@@ -236,7 +230,6 @@ struct gfs2_inode {
 
        spinlock_t i_spin;
        struct rw_semaphore i_rw_mutex;
-       unsigned int i_greedy;
        unsigned long i_last_pfault;
 
        struct buffer_head *i_cache[GFS2_MAX_META_HEIGHT];
@@ -418,17 +411,12 @@ struct gfs2_tune {
        unsigned int gt_atime_quantum; /* Min secs between atime updates */
        unsigned int gt_new_files_jdata;
        unsigned int gt_new_files_directio;
-       unsigned int gt_max_atomic_write; /* Split big writes into this size */
        unsigned int gt_max_readahead; /* Max bytes to read-ahead from disk */
        unsigned int gt_lockdump_size;
        unsigned int gt_stall_secs; /* Detects trouble! */
        unsigned int gt_complain_secs;
        unsigned int gt_reclaim_limit; /* Max num of glocks in reclaim list */
        unsigned int gt_entries_per_readdir;
-       unsigned int gt_prefetch_secs; /* Usage window for prefetched glocks */
-       unsigned int gt_greedy_default;
-       unsigned int gt_greedy_quantum;
-       unsigned int gt_greedy_max;
        unsigned int gt_statfs_quantum;
        unsigned int gt_statfs_slow;
 };
index d122074c45e16d3163dd61d9e819b6c01ae652aa..0d6831a40565015da1af7f343544e25551bcaf57 100644 (file)
@@ -287,10 +287,8 @@ out:
  *
  * Returns: errno
  */
-
 int gfs2_change_nlink(struct gfs2_inode *ip, int diff)
 {
-       struct gfs2_sbd *sdp = ip->i_inode.i_sb->s_fs_info;
        struct buffer_head *dibh;
        u32 nlink;
        int error;
@@ -315,42 +313,34 @@ int gfs2_change_nlink(struct gfs2_inode *ip, int diff)
        else
                drop_nlink(&ip->i_inode);
 
-       ip->i_inode.i_ctime.tv_sec = get_seconds();
+       ip->i_inode.i_ctime = CURRENT_TIME_SEC;
 
        gfs2_trans_add_bh(ip->i_gl, dibh, 1);
        gfs2_dinode_out(ip, dibh->b_data);
        brelse(dibh);
        mark_inode_dirty(&ip->i_inode);
 
-       if (ip->i_inode.i_nlink == 0) {
-               struct gfs2_rgrpd *rgd;
-               struct gfs2_holder ri_gh, rg_gh;
-
-               error = gfs2_rindex_hold(sdp, &ri_gh);
-               if (error)
-                       goto out;
-               error = -EIO;
-               rgd = gfs2_blk2rgrpd(sdp, ip->i_num.no_addr);
-               if (!rgd)
-                       goto out_norgrp;
-               error = gfs2_glock_nq_init(rgd->rd_gl, LM_ST_EXCLUSIVE, 0, &rg_gh);
-               if (error)
-                       goto out_norgrp;
-
+       if (ip->i_inode.i_nlink == 0)
                gfs2_unlink_di(&ip->i_inode); /* mark inode unlinked */
-               gfs2_glock_dq_uninit(&rg_gh);
-out_norgrp:
-               gfs2_glock_dq_uninit(&ri_gh);
-       }
-out:
+
        return error;
 }
 
 struct inode *gfs2_lookup_simple(struct inode *dip, const char *name)
 {
        struct qstr qstr;
+       struct inode *inode;
        gfs2_str2qstr(&qstr, name);
-       return gfs2_lookupi(dip, &qstr, 1, NULL);
+       inode = gfs2_lookupi(dip, &qstr, 1, NULL);
+       /* gfs2_lookupi has inconsistent callers: vfs
+        * related routines expect NULL for no entry found,
+        * gfs2_lookup_simple callers expect ENOENT
+        * and do not check for NULL.
+        */
+       if (inode == NULL)
+               return ERR_PTR(-ENOENT);
+       else
+               return inode;
 }
 
 
@@ -361,8 +351,10 @@ struct inode *gfs2_lookup_simple(struct inode *dip, const char *name)
  * @is_root: If 1, ignore the caller's permissions
  * @i_gh: An uninitialized holder for the new inode glock
  *
- * There will always be a vnode (Linux VFS inode) for the d_gh inode unless
- * @is_root is true.
+ * This can be called via the VFS filldir function when NFS is doing
+ * a readdirplus and the inode which its intending to stat isn't
+ * already in cache. In this case we must not take the directory glock
+ * again, since the readdir call will have already taken that lock.
  *
  * Returns: errno
  */
@@ -375,8 +367,9 @@ struct inode *gfs2_lookupi(struct inode *dir, const struct qstr *name,
        struct gfs2_holder d_gh;
        struct gfs2_inum_host inum;
        unsigned int type;
-       int error = 0;
+       int error;
        struct inode *inode = NULL;
+       int unlock = 0;
 
        if (!name->len || name->len > GFS2_FNAMESIZE)
                return ERR_PTR(-ENAMETOOLONG);
@@ -388,9 +381,12 @@ struct inode *gfs2_lookupi(struct inode *dir, const struct qstr *name,
                return dir;
        }
 
-       error = gfs2_glock_nq_init(dip->i_gl, LM_ST_SHARED, 0, &d_gh);
-       if (error)
-               return ERR_PTR(error);
+       if (gfs2_glock_is_locked_by_me(dip->i_gl) == 0) {
+               error = gfs2_glock_nq_init(dip->i_gl, LM_ST_SHARED, 0, &d_gh);
+               if (error)
+                       return ERR_PTR(error);
+               unlock = 1;
+       }
 
        if (!is_root) {
                error = permission(dir, MAY_EXEC, NULL);
@@ -405,10 +401,11 @@ struct inode *gfs2_lookupi(struct inode *dir, const struct qstr *name,
        inode = gfs2_inode_lookup(sb, &inum, type);
 
 out:
-       gfs2_glock_dq_uninit(&d_gh);
+       if (unlock)
+               gfs2_glock_dq_uninit(&d_gh);
        if (error == -ENOENT)
                return NULL;
-       return inode;
+       return inode ? inode : ERR_PTR(error);
 }
 
 static int pick_formal_ino_1(struct gfs2_sbd *sdp, u64 *formal_ino)
index effe4a337c1dbce0d8156545b393794d4163aeb6..e30673dd37e017565d801492dda8248754c8e6a8 100644 (file)
@@ -104,15 +104,9 @@ int gfs2_lm_withdraw(struct gfs2_sbd *sdp, char *fmt, ...)
        vprintk(fmt, args);
        va_end(args);
 
-       fs_err(sdp, "about to withdraw from the cluster\n");
+       fs_err(sdp, "about to withdraw this file system\n");
        BUG_ON(sdp->sd_args.ar_debug);
 
-
-       fs_err(sdp, "waiting for outstanding I/O\n");
-
-       /* FIXME: suspend dm device so oustanding bio's complete
-          and all further io requests fail */
-
        fs_err(sdp, "telling LM to withdraw\n");
        gfs2_withdraw_lockproto(&sdp->sd_lockstruct);
        fs_err(sdp, "withdrawn\n");
index 33af707a4d3f39eed36bfb1fdefb9ff5cc9b3117..a87c7bf3c568afd0a5a19252eb2e5a35c8c9d7b5 100644 (file)
@@ -36,7 +36,7 @@
 
 #define GDLM_STRNAME_BYTES     24
 #define GDLM_LVB_SIZE          32
-#define GDLM_DROP_COUNT                50000
+#define GDLM_DROP_COUNT                200000
 #define GDLM_DROP_PERIOD       60
 #define GDLM_NAME_LEN          128
 
index 2194b1d5b5ec7163138717b6f89d0057e894eac6..a0e7eda643ed415dd337eef0f2813cb1a46c5562 100644 (file)
@@ -11,9 +11,6 @@
 
 #include "lock_dlm.h"
 
-extern int gdlm_drop_count;
-extern int gdlm_drop_period;
-
 extern struct lm_lockops gdlm_ops;
 
 static int __init init_lock_dlm(void)
@@ -40,9 +37,6 @@ static int __init init_lock_dlm(void)
                return error;
        }
 
-       gdlm_drop_count = GDLM_DROP_COUNT;
-       gdlm_drop_period = GDLM_DROP_PERIOD;
-
        printk(KERN_INFO
               "Lock_DLM (built %s %s) installed\n", __DATE__, __TIME__);
        return 0;
index cdd1694e889bac2c866f3fdc328acc16b47d169f..1d8faa3da8af1f984168de88796f89281c8773ea 100644 (file)
@@ -9,8 +9,6 @@
 
 #include "lock_dlm.h"
 
-int gdlm_drop_count;
-int gdlm_drop_period;
 const struct lm_lockops gdlm_ops;
 
 
@@ -24,8 +22,8 @@ static struct gdlm_ls *init_gdlm(lm_callback_t cb, struct gfs2_sbd *sdp,
        if (!ls)
                return NULL;
 
-       ls->drop_locks_count = gdlm_drop_count;
-       ls->drop_locks_period = gdlm_drop_period;
+       ls->drop_locks_count = GDLM_DROP_COUNT;
+       ls->drop_locks_period = GDLM_DROP_PERIOD;
        ls->fscb = cb;
        ls->sdp = sdp;
        ls->fsflags = flags;
index 29ae06f949445c735fac49c62cb65f3aa090a9a5..4746b884662ddc61cbd8d7ccfae950deefcd7793 100644 (file)
@@ -116,6 +116,17 @@ static ssize_t recover_status_show(struct gdlm_ls *ls, char *buf)
        return sprintf(buf, "%d\n", ls->recover_jid_status);
 }
 
+static ssize_t drop_count_show(struct gdlm_ls *ls, char *buf)
+{
+       return sprintf(buf, "%d\n", ls->drop_locks_count);
+}
+
+static ssize_t drop_count_store(struct gdlm_ls *ls, const char *buf, size_t len)
+{
+       ls->drop_locks_count = simple_strtol(buf, NULL, 0);
+       return len;
+}
+
 struct gdlm_attr {
        struct attribute attr;
        ssize_t (*show)(struct gdlm_ls *, char *);
@@ -135,6 +146,7 @@ GDLM_ATTR(first_done,     0444, first_done_show,     NULL);
 GDLM_ATTR(recover,        0644, recover_show,        recover_store);
 GDLM_ATTR(recover_done,   0444, recover_done_show,   NULL);
 GDLM_ATTR(recover_status, 0444, recover_status_show, NULL);
+GDLM_ATTR(drop_count,     0644, drop_count_show,     drop_count_store);
 
 static struct attribute *gdlm_attrs[] = {
        &gdlm_attr_proto_name.attr,
@@ -147,6 +159,7 @@ static struct attribute *gdlm_attrs[] = {
        &gdlm_attr_recover.attr,
        &gdlm_attr_recover_done.attr,
        &gdlm_attr_recover_status.attr,
+       &gdlm_attr_drop_count.attr,
        NULL,
 };
 
index 4d7f94d8c7bd2ca07162fed385efcc1ae65ea835..16bb4b4561aed9d653a4f7b23746b70495e7718b 100644 (file)
@@ -69,13 +69,16 @@ static void buf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
        struct gfs2_bufdata *bd = container_of(le, struct gfs2_bufdata, bd_le);
        struct gfs2_trans *tr;
 
-       if (!list_empty(&bd->bd_list_tr))
+       gfs2_log_lock(sdp);
+       if (!list_empty(&bd->bd_list_tr)) {
+               gfs2_log_unlock(sdp);
                return;
-
+       }
        tr = current->journal_info;
        tr->tr_touched = 1;
        tr->tr_num_buf++;
        list_add(&bd->bd_list_tr, &tr->tr_list_buf);
+       gfs2_log_unlock(sdp);
 
        if (!list_empty(&le->le_list))
                return;
@@ -84,7 +87,6 @@ static void buf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
 
        gfs2_meta_check(sdp, bd->bd_bh);
        gfs2_pin(sdp, bd->bd_bh);
-
        gfs2_log_lock(sdp);
        sdp->sd_log_num_buf++;
        list_add(&le->le_list, &sdp->sd_log_le_buf);
@@ -98,11 +100,13 @@ static void buf_lo_incore_commit(struct gfs2_sbd *sdp, struct gfs2_trans *tr)
        struct list_head *head = &tr->tr_list_buf;
        struct gfs2_bufdata *bd;
 
+       gfs2_log_lock(sdp);
        while (!list_empty(head)) {
                bd = list_entry(head->next, struct gfs2_bufdata, bd_list_tr);
                list_del_init(&bd->bd_list_tr);
                tr->tr_num_buf--;
        }
+       gfs2_log_unlock(sdp);
        gfs2_assert_warn(sdp, !tr->tr_num_buf);
 }
 
@@ -462,13 +466,17 @@ static void databuf_lo_add(struct gfs2_sbd *sdp, struct gfs2_log_element *le)
        struct address_space *mapping = bd->bd_bh->b_page->mapping;
        struct gfs2_inode *ip = GFS2_I(mapping->host);
 
+       gfs2_log_lock(sdp);
        tr->tr_touched = 1;
        if (list_empty(&bd->bd_list_tr) &&
            (ip->i_di.di_flags & GFS2_DIF_JDATA)) {
                tr->tr_num_buf++;
                list_add(&bd->bd_list_tr, &tr->tr_list_buf);
+               gfs2_log_unlock(sdp);
                gfs2_pin(sdp, bd->bd_bh);
                tr->tr_num_buf_new++;
+       } else {
+               gfs2_log_unlock(sdp);
        }
        gfs2_trans_add_gl(bd->bd_gl);
        gfs2_log_lock(sdp);
index d8d69a72a10dd819bfcdbe6a736009bda5a92e27..56e33590b65661cec2bd8cdf8c375513d450c153 100644 (file)
@@ -16,6 +16,7 @@
 #include <linux/pagevec.h>
 #include <linux/mpage.h>
 #include <linux/fs.h>
+#include <linux/writeback.h>
 #include <linux/gfs2_ondisk.h>
 #include <linux/lm_interface.h>
 
@@ -156,6 +157,32 @@ out_ignore:
        return 0;
 }
 
+/**
+ * gfs2_writepages - Write a bunch of dirty pages back to disk
+ * @mapping: The mapping to write
+ * @wbc: Write-back control
+ *
+ * For journaled files and/or ordered writes this just falls back to the
+ * kernel's default writepages path for now. We will probably want to change
+ * that eventually (i.e. when we look at allocate on flush).
+ *
+ * For the data=writeback case though we can already ignore buffer heads
+ * and write whole extents at once. This is a big reduction in the
+ * number of I/O requests we send and the bmap calls we make in this case.
+ */
+static int gfs2_writepages(struct address_space *mapping,
+                          struct writeback_control *wbc)
+{
+       struct inode *inode = mapping->host;
+       struct gfs2_inode *ip = GFS2_I(inode);
+       struct gfs2_sbd *sdp = GFS2_SB(inode);
+
+       if (sdp->sd_args.ar_data == GFS2_DATA_WRITEBACK && !gfs2_is_jdata(ip))
+               return mpage_writepages(mapping, wbc, gfs2_get_block_noalloc);
+
+       return generic_writepages(mapping, wbc);
+}
+
 /**
  * stuffed_readpage - Fill in a Linux page with stuffed file data
  * @ip: the inode
@@ -256,7 +283,7 @@ out_unlock:
  *    the page lock and the glock) and return having done no I/O. Its
  *    obviously not something we'd want to do on too regular a basis.
  *    Any I/O we ignore at this time will be done via readpage later.
- * 2. We have to handle stuffed files here too.
+ * 2. We don't handle stuffed files here we let readpage do the honours.
  * 3. mpage_readpages() does most of the heavy lifting in the common case.
  * 4. gfs2_get_block() is relied upon to set BH_Boundary in the right places.
  * 5. We use LM_FLAG_TRY_1CB here, effectively we then have lock-ahead as
@@ -269,8 +296,7 @@ static int gfs2_readpages(struct file *file, struct address_space *mapping,
        struct gfs2_inode *ip = GFS2_I(inode);
        struct gfs2_sbd *sdp = GFS2_SB(inode);
        struct gfs2_holder gh;
-       unsigned page_idx;
-       int ret;
+       int ret = 0;
        int do_unlock = 0;
 
        if (likely(file != &gfs2_internal_file_sentinel)) {
@@ -289,29 +315,8 @@ static int gfs2_readpages(struct file *file, struct address_space *mapping,
                        goto out_unlock;
        }
 skip_lock:
-       if (gfs2_is_stuffed(ip)) {
-               struct pagevec lru_pvec;
-               pagevec_init(&lru_pvec, 0);
-               for (page_idx = 0; page_idx < nr_pages; page_idx++) {
-                       struct page *page = list_entry(pages->prev, struct page, lru);
-                       prefetchw(&page->flags);
-                       list_del(&page->lru);
-                       if (!add_to_page_cache(page, mapping,
-                                              page->index, GFP_KERNEL)) {
-                               ret = stuffed_readpage(ip, page);
-                               unlock_page(page);
-                               if (!pagevec_add(&lru_pvec, page))
-                                        __pagevec_lru_add(&lru_pvec);
-                       } else {
-                               page_cache_release(page);
-                       }
-               }
-               pagevec_lru_add(&lru_pvec);
-               ret = 0;
-       } else {
-               /* What we really want to do .... */
+       if (!gfs2_is_stuffed(ip))
                ret = mpage_readpages(mapping, pages, nr_pages, gfs2_get_block);
-       }
 
        if (do_unlock) {
                gfs2_glock_dq_m(1, &gh);
@@ -356,8 +361,10 @@ static int gfs2_prepare_write(struct file *file, struct page *page,
        gfs2_holder_init(ip->i_gl, LM_ST_EXCLUSIVE, GL_ATIME|LM_FLAG_TRY_1CB, &ip->i_gh);
        error = gfs2_glock_nq_atime(&ip->i_gh);
        if (unlikely(error)) {
-               if (error == GLR_TRYFAILED)
+               if (error == GLR_TRYFAILED) {
+                       unlock_page(page);
                        error = AOP_TRUNCATED_PAGE;
+               }
                goto out_uninit;
        }
 
@@ -594,6 +601,36 @@ static void gfs2_invalidatepage(struct page *page, unsigned long offset)
        return;
 }
 
+/**
+ * gfs2_ok_for_dio - check that dio is valid on this file
+ * @ip: The inode
+ * @rw: READ or WRITE
+ * @offset: The offset at which we are reading or writing
+ *
+ * Returns: 0 (to ignore the i/o request and thus fall back to buffered i/o)
+ *          1 (to accept the i/o request)
+ */
+static int gfs2_ok_for_dio(struct gfs2_inode *ip, int rw, loff_t offset)
+{
+       /*
+        * Should we return an error here? I can't see that O_DIRECT for
+        * a journaled file makes any sense. For now we'll silently fall
+        * back to buffered I/O, likewise we do the same for stuffed
+        * files since they are (a) small and (b) unaligned.
+        */
+       if (gfs2_is_jdata(ip))
+               return 0;
+
+       if (gfs2_is_stuffed(ip))
+               return 0;
+
+       if (offset > i_size_read(&ip->i_inode))
+               return 0;
+       return 1;
+}
+
+
+
 static ssize_t gfs2_direct_IO(int rw, struct kiocb *iocb,
                              const struct iovec *iov, loff_t offset,
                              unsigned long nr_segs)
@@ -604,42 +641,28 @@ static ssize_t gfs2_direct_IO(int rw, struct kiocb *iocb,
        struct gfs2_holder gh;
        int rv;
 
-       if (rw == READ)
-               mutex_lock(&inode->i_mutex);
        /*
-        * Shared lock, even if its a write, since we do no allocation
-        * on this path. All we need change is atime.
+        * Deferred lock, even if its a write, since we do no allocation
+        * on this path. All we need change is atime, and this lock mode
+        * ensures that other nodes have flushed their buffered read caches
+        * (i.e. their page cache entries for this inode). We do not,
+        * unfortunately have the option of only flushing a range like
+        * the VFS does.
         */
-       gfs2_holder_init(ip->i_gl, LM_ST_SHARED, GL_ATIME, &gh);
+       gfs2_holder_init(ip->i_gl, LM_ST_DEFERRED, GL_ATIME, &gh);
        rv = gfs2_glock_nq_atime(&gh);
        if (rv)
-               goto out;
-
-       if (offset > i_size_read(inode))
-               goto out;
-
-       /*
-        * Should we return an error here? I can't see that O_DIRECT for
-        * a journaled file makes any sense. For now we'll silently fall
-        * back to buffered I/O, likewise we do the same for stuffed
-        * files since they are (a) small and (b) unaligned.
-        */
-       if (gfs2_is_jdata(ip))
-               goto out;
-
-       if (gfs2_is_stuffed(ip))
-               goto out;
-
-       rv = blockdev_direct_IO_own_locking(rw, iocb, inode,
-                                           inode->i_sb->s_bdev,
-                                           iov, offset, nr_segs,
-                                           gfs2_get_block_direct, NULL);
+               return rv;
+       rv = gfs2_ok_for_dio(ip, rw, offset);
+       if (rv != 1)
+               goto out; /* dio not valid, fall back to buffered i/o */
+
+       rv = blockdev_direct_IO_no_locking(rw, iocb, inode, inode->i_sb->s_bdev,
+                                          iov, offset, nr_segs,
+                                          gfs2_get_block_direct, NULL);
 out:
        gfs2_glock_dq_m(1, &gh);
        gfs2_holder_uninit(&gh);
-       if (rw == READ)
-               mutex_unlock(&inode->i_mutex);
-
        return rv;
 }
 
@@ -763,6 +786,7 @@ out:
 
 const struct address_space_operations gfs2_file_aops = {
        .writepage = gfs2_writepage,
+       .writepages = gfs2_writepages,
        .readpage = gfs2_readpage,
        .readpages = gfs2_readpages,
        .sync_page = block_sync_page,
index d355899585d822b800d15f2d8826734bc2d02a6f..9187eb174b43a0c91547bbbe36760a8cd9d65789 100644 (file)
@@ -46,6 +46,7 @@ static int gfs2_drevalidate(struct dentry *dentry, struct nameidata *nd)
        struct gfs2_inum_host inum;
        unsigned int type;
        int error;
+       int had_lock=0;
 
        if (inode && is_bad_inode(inode))
                goto invalid;
@@ -53,9 +54,12 @@ static int gfs2_drevalidate(struct dentry *dentry, struct nameidata *nd)
        if (sdp->sd_args.ar_localcaching)
                goto valid;
 
-       error = gfs2_glock_nq_init(dip->i_gl, LM_ST_SHARED, 0, &d_gh);
-       if (error)
-               goto fail;
+       had_lock = gfs2_glock_is_locked_by_me(dip->i_gl);
+       if (!had_lock) {
+               error = gfs2_glock_nq_init(dip->i_gl, LM_ST_SHARED, 0, &d_gh);
+               if (error)
+                       goto fail;
+       } 
 
        error = gfs2_dir_search(parent->d_inode, &dentry->d_name, &inum, &type);
        switch (error) {
@@ -82,13 +86,15 @@ static int gfs2_drevalidate(struct dentry *dentry, struct nameidata *nd)
        }
 
 valid_gunlock:
-       gfs2_glock_dq_uninit(&d_gh);
+       if (!had_lock)
+               gfs2_glock_dq_uninit(&d_gh);
 valid:
        dput(parent);
        return 1;
 
 invalid_gunlock:
-       gfs2_glock_dq_uninit(&d_gh);
+       if (!had_lock)
+               gfs2_glock_dq_uninit(&d_gh);
 invalid:
        if (inode && S_ISDIR(inode->i_mode)) {
                if (have_submounts(dentry))
index b4e7b8775315e34dd70ad60b935875d3ed274b1c..4855e8cca62269e01b1c5e70433771747c64b95c 100644 (file)
@@ -22,6 +22,7 @@
 #include "glock.h"
 #include "glops.h"
 #include "inode.h"
+#include "ops_dentry.h"
 #include "ops_export.h"
 #include "rgrp.h"
 #include "util.h"
@@ -112,13 +113,12 @@ struct get_name_filldir {
        char *name;
 };
 
-static int get_name_filldir(void *opaque, const char *name, unsigned int length,
-                           u64 offset, struct gfs2_inum_host *inum,
-                           unsigned int type)
+static int get_name_filldir(void *opaque, const char *name, int length,
+                           loff_t offset, u64 inum, unsigned int type)
 {
-       struct get_name_filldir *gnfd = (struct get_name_filldir *)opaque;
+       struct get_name_filldir *gnfd = opaque;
 
-       if (!gfs2_inum_equal(inum, &gnfd->inum))
+       if (inum != gnfd->inum.no_addr)
                return 0;
 
        memcpy(gnfd->name, name, length);
@@ -189,6 +189,7 @@ static struct dentry *gfs2_get_parent(struct dentry *child)
                return ERR_PTR(-ENOMEM);
        }
 
+       dentry->d_op = &gfs2_dops;
        return dentry;
 }
 
@@ -215,8 +216,7 @@ static struct dentry *gfs2_get_dentry(struct super_block *sb, void *inum_obj)
        }
 
        error = gfs2_glock_nq_num(sdp, inum->no_addr, &gfs2_inode_glops,
-                                 LM_ST_SHARED, LM_FLAG_ANY | GL_LOCAL_EXCL,
-                                 &i_gh);
+                                 LM_ST_SHARED, LM_FLAG_ANY, &i_gh);
        if (error)
                return ERR_PTR(error);
 
@@ -269,6 +269,7 @@ out_inode:
                return ERR_PTR(-ENOMEM);
        }
 
+       dentry->d_op = &gfs2_dops;
        return dentry;
 
 fail_rgd:
index faa07e4b97d025709cc246a22d51fd367e2c6d89..c996aa739a0515f5c11229591a83805a4abb6f99 100644 (file)
 #include "util.h"
 #include "eaops.h"
 
-/* For regular, non-NFS */
-struct filldir_reg {
-       struct gfs2_sbd *fdr_sbd;
-       int fdr_prefetch;
-
-       filldir_t fdr_filldir;
-       void *fdr_opaque;
-};
-
 /*
  * Most fields left uninitialised to catch anybody who tries to
  * use them. f_flags set to prevent file_accessed() from touching
@@ -127,41 +118,6 @@ static loff_t gfs2_llseek(struct file *file, loff_t offset, int origin)
        return error;
 }
 
-/**
- * filldir_func - Report a directory entry to the caller of gfs2_dir_read()
- * @opaque: opaque data used by the function
- * @name: the name of the directory entry
- * @length: the length of the name
- * @offset: the entry's offset in the directory
- * @inum: the inode number the entry points to
- * @type: the type of inode the entry points to
- *
- * Returns: 0 on success, 1 if buffer full
- */
-
-static int filldir_func(void *opaque, const char *name, unsigned int length,
-                       u64 offset, struct gfs2_inum_host *inum,
-                       unsigned int type)
-{
-       struct filldir_reg *fdr = (struct filldir_reg *)opaque;
-       struct gfs2_sbd *sdp = fdr->fdr_sbd;
-       int error;
-
-       error = fdr->fdr_filldir(fdr->fdr_opaque, name, length, offset,
-                                inum->no_addr, type);
-       if (error)
-               return 1;
-
-       if (fdr->fdr_prefetch && !(length == 1 && *name == '.')) {
-               gfs2_glock_prefetch_num(sdp, inum->no_addr, &gfs2_inode_glops,
-                                      LM_ST_SHARED, LM_FLAG_TRY | LM_FLAG_ANY);
-               gfs2_glock_prefetch_num(sdp, inum->no_addr, &gfs2_iopen_glops,
-                                      LM_ST_SHARED, LM_FLAG_TRY);
-       }
-
-       return 0;
-}
-
 /**
  * gfs2_readdir - Read directory entries from a directory
  * @file: The directory to read from
@@ -175,16 +131,10 @@ static int gfs2_readdir(struct file *file, void *dirent, filldir_t filldir)
 {
        struct inode *dir = file->f_mapping->host;
        struct gfs2_inode *dip = GFS2_I(dir);
-       struct filldir_reg fdr;
        struct gfs2_holder d_gh;
        u64 offset = file->f_pos;
        int error;
 
-       fdr.fdr_sbd = GFS2_SB(dir);
-       fdr.fdr_prefetch = 1;
-       fdr.fdr_filldir = filldir;
-       fdr.fdr_opaque = dirent;
-
        gfs2_holder_init(dip->i_gl, LM_ST_SHARED, GL_ATIME, &d_gh);
        error = gfs2_glock_nq_atime(&d_gh);
        if (error) {
@@ -192,7 +142,7 @@ static int gfs2_readdir(struct file *file, void *dirent, filldir_t filldir)
                return error;
        }
 
-       error = gfs2_dir_read(dir, &offset, &fdr, filldir_func);
+       error = gfs2_dir_read(dir, &offset, dirent, filldir);
 
        gfs2_glock_dq_uninit(&d_gh);
 
index 636dda4c7d38d688c76923c6289e130e8d4754ac..f40a84807d75468f5b9a302d9b3bf9d1a07f2618 100644 (file)
@@ -264,13 +264,23 @@ static int gfs2_unlink(struct inode *dir, struct dentry *dentry)
        struct gfs2_inode *dip = GFS2_I(dir);
        struct gfs2_sbd *sdp = GFS2_SB(dir);
        struct gfs2_inode *ip = GFS2_I(dentry->d_inode);
-       struct gfs2_holder ghs[2];
+       struct gfs2_holder ghs[3];
+       struct gfs2_rgrpd *rgd;
+       struct gfs2_holder ri_gh;
        int error;
 
+       error = gfs2_rindex_hold(sdp, &ri_gh);
+       if (error)
+               return error;
+
        gfs2_holder_init(dip->i_gl, LM_ST_EXCLUSIVE, 0, ghs);
-       gfs2_holder_init(ip->i_gl, LM_ST_EXCLUSIVE, 0, ghs + 1);
+       gfs2_holder_init(ip->i_gl,  LM_ST_EXCLUSIVE, 0, ghs + 1);
 
-       error = gfs2_glock_nq_m(2, ghs);
+       rgd = gfs2_blk2rgrpd(sdp, ip->i_num.no_addr);
+       gfs2_holder_init(rgd->rd_gl, LM_ST_EXCLUSIVE, 0, ghs + 2);
+
+
+       error = gfs2_glock_nq_m(3, ghs);
        if (error)
                goto out;
 
@@ -291,10 +301,12 @@ static int gfs2_unlink(struct inode *dir, struct dentry *dentry)
 out_end_trans:
        gfs2_trans_end(sdp);
 out_gunlock:
-       gfs2_glock_dq_m(2, ghs);
+       gfs2_glock_dq_m(3, ghs);
 out:
        gfs2_holder_uninit(ghs);
        gfs2_holder_uninit(ghs + 1);
+       gfs2_holder_uninit(ghs + 2);
+       gfs2_glock_dq_uninit(&ri_gh);
        return error;
 }
 
@@ -449,13 +461,22 @@ static int gfs2_rmdir(struct inode *dir, struct dentry *dentry)
        struct gfs2_inode *dip = GFS2_I(dir);
        struct gfs2_sbd *sdp = GFS2_SB(dir);
        struct gfs2_inode *ip = GFS2_I(dentry->d_inode);
-       struct gfs2_holder ghs[2];
+       struct gfs2_holder ghs[3];
+       struct gfs2_rgrpd *rgd;
+       struct gfs2_holder ri_gh;
        int error;
 
+
+       error = gfs2_rindex_hold(sdp, &ri_gh);
+       if (error)
+               return error;
        gfs2_holder_init(dip->i_gl, LM_ST_EXCLUSIVE, 0, ghs);
        gfs2_holder_init(ip->i_gl, LM_ST_EXCLUSIVE, 0, ghs + 1);
 
-       error = gfs2_glock_nq_m(2, ghs);
+       rgd = gfs2_blk2rgrpd(sdp, ip->i_num.no_addr);
+       gfs2_holder_init(rgd->rd_gl, LM_ST_EXCLUSIVE, 0, ghs + 2);
+
+       error = gfs2_glock_nq_m(3, ghs);
        if (error)
                goto out;
 
@@ -483,10 +504,12 @@ static int gfs2_rmdir(struct inode *dir, struct dentry *dentry)
        gfs2_trans_end(sdp);
 
 out_gunlock:
-       gfs2_glock_dq_m(2, ghs);
+       gfs2_glock_dq_m(3, ghs);
 out:
        gfs2_holder_uninit(ghs);
        gfs2_holder_uninit(ghs + 1);
+       gfs2_holder_uninit(ghs + 2);
+       gfs2_glock_dq_uninit(&ri_gh);
        return error;
 }
 
@@ -547,7 +570,8 @@ static int gfs2_rename(struct inode *odir, struct dentry *odentry,
        struct gfs2_inode *ip = GFS2_I(odentry->d_inode);
        struct gfs2_inode *nip = NULL;
        struct gfs2_sbd *sdp = GFS2_SB(odir);
-       struct gfs2_holder ghs[4], r_gh;
+       struct gfs2_holder ghs[5], r_gh;
+       struct gfs2_rgrpd *nrgd;
        unsigned int num_gh;
        int dir_rename = 0;
        int alloc_required;
@@ -587,6 +611,13 @@ static int gfs2_rename(struct inode *odir, struct dentry *odentry,
        if (nip) {
                gfs2_holder_init(nip->i_gl, LM_ST_EXCLUSIVE, 0, ghs + num_gh);
                num_gh++;
+               /* grab the resource lock for unlink flag twiddling 
+                * this is the case of the target file already existing
+                * so we unlink before doing the rename
+                */
+               nrgd = gfs2_blk2rgrpd(sdp, nip->i_num.no_addr);
+               if (nrgd)
+                       gfs2_holder_init(nrgd->rd_gl, LM_ST_EXCLUSIVE, 0, ghs + num_gh++);
        }
 
        error = gfs2_glock_nq_m(num_gh, ghs);
@@ -684,12 +715,12 @@ static int gfs2_rename(struct inode *odir, struct dentry *odentry,
                error = gfs2_trans_begin(sdp, sdp->sd_max_dirres +
                                         al->al_rgd->rd_ri.ri_length +
                                         4 * RES_DINODE + 4 * RES_LEAF +
-                                        RES_STATFS + RES_QUOTA, 0);
+                                        RES_STATFS + RES_QUOTA + 4, 0);
                if (error)
                        goto out_ipreserv;
        } else {
                error = gfs2_trans_begin(sdp, 4 * RES_DINODE +
-                                        5 * RES_LEAF, 0);
+                                        5 * RES_LEAF + 4, 0);
                if (error)
                        goto out_gunlock;
        }
@@ -728,7 +759,7 @@ static int gfs2_rename(struct inode *odir, struct dentry *odentry,
                error = gfs2_meta_inode_buffer(ip, &dibh);
                if (error)
                        goto out_end_trans;
-               ip->i_inode.i_ctime.tv_sec = get_seconds();
+               ip->i_inode.i_ctime = CURRENT_TIME_SEC;
                gfs2_trans_add_bh(ip->i_gl, dibh, 1);
                gfs2_dinode_out(ip, dibh->b_data);
                brelse(dibh);
@@ -1018,7 +1049,7 @@ static int gfs2_getattr(struct vfsmount *mnt, struct dentry *dentry,
        }
 
        generic_fillattr(inode, stat);
-       if (unlock);
+       if (unlock)
                gfs2_glock_dq_uninit(&gh);
 
        return 0;
index 7685b46f934b4ad07c419a4b11b4eef60aa0c167..47369d0112147d8c7fb814234bf2f4cd9bdd80e4 100644 (file)
@@ -173,6 +173,9 @@ static void gfs2_write_super_lockfs(struct super_block *sb)
        struct gfs2_sbd *sdp = sb->s_fs_info;
        int error;
 
+       if (test_bit(SDF_SHUTDOWN, &sdp->sd_flags))
+               return;
+
        for (;;) {
                error = gfs2_freeze_fs(sdp);
                if (!error)
@@ -426,6 +429,12 @@ static void gfs2_delete_inode(struct inode *inode)
        }
 
        error = gfs2_dinode_dealloc(ip);
+       /*
+        * Must do this before unlock to avoid trying to write back
+        * potentially dirty data now that inode no longer exists
+        * on disk.
+        */
+       truncate_inode_pages(&inode->i_data, 0);
 
 out_unlock:
        gfs2_glock_dq(&ip->i_iopen_gh);
@@ -443,14 +452,12 @@ out:
 
 static struct inode *gfs2_alloc_inode(struct super_block *sb)
 {
-       struct gfs2_sbd *sdp = sb->s_fs_info;
        struct gfs2_inode *ip;
 
        ip = kmem_cache_alloc(gfs2_inode_cachep, GFP_KERNEL);
        if (ip) {
                ip->i_flags = 0;
                ip->i_gl = NULL;
-               ip->i_greedy = gfs2_tune_get(sdp, gt_greedy_default);
                ip->i_last_pfault = jiffies;
        }
        return &ip->i_inode;
index 45a5f11fc39a88b0a53953e590cae739c4ad920b..14b380fb060290a77f24a96872e8b6b439707128 100644 (file)
 #include "trans.h"
 #include "util.h"
 
-static void pfault_be_greedy(struct gfs2_inode *ip)
-{
-       unsigned int time;
-
-       spin_lock(&ip->i_spin);
-       time = ip->i_greedy;
-       ip->i_last_pfault = jiffies;
-       spin_unlock(&ip->i_spin);
-
-       igrab(&ip->i_inode);
-       if (gfs2_glock_be_greedy(ip->i_gl, time))
-               iput(&ip->i_inode);
-}
-
 static struct page *gfs2_private_nopage(struct vm_area_struct *area,
                                        unsigned long address, int *type)
 {
        struct gfs2_inode *ip = GFS2_I(area->vm_file->f_mapping->host);
-       struct page *result;
 
        set_bit(GIF_PAGED, &ip->i_flags);
-
-       result = filemap_nopage(area, address, type);
-
-       if (result && result != NOPAGE_OOM)
-               pfault_be_greedy(ip);
-
-       return result;
+       return filemap_nopage(area, address, type);
 }
 
 static int alloc_page_backing(struct gfs2_inode *ip, struct page *page)
@@ -167,7 +146,6 @@ static struct page *gfs2_sharewrite_nopage(struct vm_area_struct *area,
                set_page_dirty(result);
        }
 
-       pfault_be_greedy(ip);
 out:
        gfs2_glock_dq_uninit(&i_gh);
 
index 43a24f2e5905f9a26be19bcc032b4761a777baef..70f424fcf1cdb8468c825367aead70c48a462254 100644 (file)
@@ -71,17 +71,12 @@ void gfs2_tune_init(struct gfs2_tune *gt)
        gt->gt_atime_quantum = 3600;
        gt->gt_new_files_jdata = 0;
        gt->gt_new_files_directio = 0;
-       gt->gt_max_atomic_write = 4 << 20;
        gt->gt_max_readahead = 1 << 18;
        gt->gt_lockdump_size = 131072;
        gt->gt_stall_secs = 600;
        gt->gt_complain_secs = 10;
        gt->gt_reclaim_limit = 5000;
        gt->gt_entries_per_readdir = 32;
-       gt->gt_prefetch_secs = 10;
-       gt->gt_greedy_default = HZ / 10;
-       gt->gt_greedy_quantum = HZ / 40;
-       gt->gt_greedy_max = HZ / 4;
        gt->gt_statfs_quantum = 30;
        gt->gt_statfs_slow = 0;
 }
@@ -359,8 +354,7 @@ int gfs2_jindex_hold(struct gfs2_sbd *sdp, struct gfs2_holder *ji_gh)
        mutex_lock(&sdp->sd_jindex_mutex);
 
        for (;;) {
-               error = gfs2_glock_nq_init(dip->i_gl, LM_ST_SHARED,
-                                          GL_LOCAL_EXCL, ji_gh);
+               error = gfs2_glock_nq_init(dip->i_gl, LM_ST_SHARED, 0, ji_gh);
                if (error)
                        break;
 
@@ -529,8 +523,7 @@ int gfs2_make_fs_rw(struct gfs2_sbd *sdp)
        struct gfs2_log_header_host head;
        int error;
 
-       error = gfs2_glock_nq_init(sdp->sd_trans_gl, LM_ST_SHARED,
-                                  GL_LOCAL_EXCL, &t_gh);
+       error = gfs2_glock_nq_init(sdp->sd_trans_gl, LM_ST_SHARED, 0, &t_gh);
        if (error)
                return error;
 
@@ -583,9 +576,8 @@ int gfs2_make_fs_ro(struct gfs2_sbd *sdp)
        gfs2_quota_sync(sdp);
        gfs2_statfs_sync(sdp);
 
-       error = gfs2_glock_nq_init(sdp->sd_trans_gl, LM_ST_SHARED,
-                               GL_LOCAL_EXCL | GL_NOCACHE,
-                               &t_gh);
+       error = gfs2_glock_nq_init(sdp->sd_trans_gl, LM_ST_SHARED, GL_NOCACHE,
+                                  &t_gh);
        if (error && !test_bit(SDF_SHUTDOWN, &sdp->sd_flags))
                return error;
 
index 983eaf1e06becb3e0c6516bb04ffa397bc121ab9..d01f9f0fda261de2b95cff57353b0f8ca44ff79e 100644 (file)
@@ -436,17 +436,12 @@ TUNE_ATTR(atime_quantum, 0);
 TUNE_ATTR(max_readahead, 0);
 TUNE_ATTR(complain_secs, 0);
 TUNE_ATTR(reclaim_limit, 0);
-TUNE_ATTR(prefetch_secs, 0);
 TUNE_ATTR(statfs_slow, 0);
 TUNE_ATTR(new_files_jdata, 0);
 TUNE_ATTR(new_files_directio, 0);
 TUNE_ATTR(quota_simul_sync, 1);
 TUNE_ATTR(quota_cache_secs, 1);
-TUNE_ATTR(max_atomic_write, 1);
 TUNE_ATTR(stall_secs, 1);
-TUNE_ATTR(greedy_default, 1);
-TUNE_ATTR(greedy_quantum, 1);
-TUNE_ATTR(greedy_max, 1);
 TUNE_ATTR(statfs_quantum, 1);
 TUNE_ATTR_DAEMON(scand_secs, scand_process);
 TUNE_ATTR_DAEMON(recoverd_secs, recoverd_process);
@@ -465,15 +460,10 @@ static struct attribute *tune_attrs[] = {
        &tune_attr_max_readahead.attr,
        &tune_attr_complain_secs.attr,
        &tune_attr_reclaim_limit.attr,
-       &tune_attr_prefetch_secs.attr,
        &tune_attr_statfs_slow.attr,
        &tune_attr_quota_simul_sync.attr,
        &tune_attr_quota_cache_secs.attr,
-       &tune_attr_max_atomic_write.attr,
        &tune_attr_stall_secs.attr,
-       &tune_attr_greedy_default.attr,
-       &tune_attr_greedy_quantum.attr,
-       &tune_attr_greedy_max.attr,
        &tune_attr_statfs_quantum.attr,
        &tune_attr_scand_secs.attr,
        &tune_attr_recoverd_secs.attr,