vfs_ceph: remove ceph_fallocate/ceph_ftruncate fallback
[vlendec/samba-autobuild/.git] / source3 / modules / vfs_ceph.c
index d61213110e01402aca3671cdcf1c0475eb7ff89d..3fb5943eca619b135d30b130e9dc0ab8fcd49b0c 100644 (file)
@@ -37,6 +37,7 @@
 #include "cephfs/libcephfs.h"
 #include "smbprofile.h"
 #include "modules/posixacl_xattr.h"
+#include "lib/util/tevent_unix.h"
 
 #undef DBGC_CLASS
 #define DBGC_CLASS DBGC_VFS
@@ -131,6 +132,11 @@ static int cephwrap_connect(struct vfs_handle_struct *handle,  const char *servi
        handle->data = cmount;
        cmount_cnt++;
 
+       /*
+        * Unless we have an async implementation of getxattrat turn this off.
+        */
+       lp_do_parameter(SNUM(handle->conn), "smbd:async dosmode", "false");
+
        return 0;
 
 err_cm_release:
@@ -254,19 +260,20 @@ static int cephwrap_statvfs(struct vfs_handle_struct *handle,
        ret = ceph_statfs(handle->data, smb_fname->base_name, &statvfs_buf);
        if (ret < 0) {
                WRAP_RETURN(ret);
-       } else {
-               statbuf->OptimalTransferSize = statvfs_buf.f_frsize;
-               statbuf->BlockSize = statvfs_buf.f_bsize;
-               statbuf->TotalBlocks = statvfs_buf.f_blocks;
-               statbuf->BlocksAvail = statvfs_buf.f_bfree;
-               statbuf->UserBlocksAvail = statvfs_buf.f_bavail;
-               statbuf->TotalFileNodes = statvfs_buf.f_files;
-               statbuf->FreeFileNodes = statvfs_buf.f_ffree;
-               statbuf->FsIdentifier = statvfs_buf.f_fsid;
-               DBG_DEBUG("[CEPH] f_bsize: %ld, f_blocks: %ld, f_bfree: %ld, f_bavail: %ld\n",
-                       (long int)statvfs_buf.f_bsize, (long int)statvfs_buf.f_blocks,
-                       (long int)statvfs_buf.f_bfree, (long int)statvfs_buf.f_bavail);
        }
+
+       statbuf->OptimalTransferSize = statvfs_buf.f_frsize;
+       statbuf->BlockSize = statvfs_buf.f_bsize;
+       statbuf->TotalBlocks = statvfs_buf.f_blocks;
+       statbuf->BlocksAvail = statvfs_buf.f_bfree;
+       statbuf->UserBlocksAvail = statvfs_buf.f_bavail;
+       statbuf->TotalFileNodes = statvfs_buf.f_files;
+       statbuf->FreeFileNodes = statvfs_buf.f_ffree;
+       statbuf->FsIdentifier = statvfs_buf.f_fsid;
+       DBG_DEBUG("[CEPH] f_bsize: %ld, f_blocks: %ld, f_bfree: %ld, f_bavail: %ld\n",
+               (long int)statvfs_buf.f_bsize, (long int)statvfs_buf.f_blocks,
+               (long int)statvfs_buf.f_bfree, (long int)statvfs_buf.f_bavail);
+
        return ret;
 }
 
@@ -367,7 +374,6 @@ static int cephwrap_mkdir(struct vfs_handle_struct *handle,
                          mode_t mode)
 {
        int result;
-       bool has_dacl = False;
        char *parent = NULL;
        const char *path = smb_fname->base_name;
 
@@ -375,34 +381,14 @@ static int cephwrap_mkdir(struct vfs_handle_struct *handle,
 
        if (lp_inherit_acls(SNUM(handle->conn))
            && parent_dirname(talloc_tos(), path, &parent, NULL)
-           && (has_dacl = directory_has_default_acl(handle->conn, parent)))
+           && directory_has_default_acl(handle->conn, parent)) {
                mode = 0777;
+       }
 
        TALLOC_FREE(parent);
 
        result = ceph_mkdir(handle->data, path, mode);
-
-       /*
-        * Note. This order is important
-        */
-       if (result) {
-               WRAP_RETURN(result);
-       } else if (result == 0 && !has_dacl) {
-               /*
-                * We need to do this as the default behavior of POSIX ACLs
-                * is to set the mask to be the requested group permission
-                * bits, not the group permission bits to be the requested
-                * group permission bits. This is not what we want, as it will
-                * mess up any inherited ACL bits that were set. JRA.
-                */
-               int saved_errno = errno; /* We may get ENOSYS */
-               if ((SMB_VFS_CHMOD_ACL(handle->conn, smb_fname, mode) == -1) &&
-                               (errno == ENOSYS)) {
-                       errno = saved_errno;
-               }
-       }
-
-       return result;
+       return WRAP_RETURN(result);
 }
 
 static int cephwrap_rmdir(struct vfs_handle_struct *handle,
@@ -457,18 +443,6 @@ static int cephwrap_close(struct vfs_handle_struct *handle, files_struct *fsp)
        WRAP_RETURN(result);
 }
 
-static ssize_t cephwrap_read(struct vfs_handle_struct *handle, files_struct *fsp, void *data, size_t n)
-{
-       ssize_t result;
-
-       DBG_DEBUG("[CEPH] read(%p, %p, %p, %llu)\n", handle, fsp, data, llu(n));
-
-       /* Using -1 for the offset means read/write rather than pread/pwrite */
-       result = ceph_read(handle->data, fsp->fh->fd, data, n, -1);
-       DBG_DEBUG("[CEPH] read(...) = %llu\n", llu(result));
-       WRAP_RETURN(result);
-}
-
 static ssize_t cephwrap_pread(struct vfs_handle_struct *handle, files_struct *fsp, void *data,
                        size_t n, off_t offset)
 {
@@ -481,21 +455,56 @@ static ssize_t cephwrap_pread(struct vfs_handle_struct *handle, files_struct *fs
        WRAP_RETURN(result);
 }
 
+struct cephwrap_pread_state {
+       ssize_t bytes_read;
+       struct vfs_aio_state vfs_aio_state;
+};
 
-static ssize_t cephwrap_write(struct vfs_handle_struct *handle, files_struct *fsp, const void *data, size_t n)
-{
-       ssize_t result;
+/*
+ * Fake up an async ceph read by calling the synchronous API.
+ */
+static struct tevent_req *cephwrap_pread_send(struct vfs_handle_struct *handle,
+                                             TALLOC_CTX *mem_ctx,
+                                             struct tevent_context *ev,
+                                             struct files_struct *fsp,
+                                             void *data,
+                                             size_t n, off_t offset)
+{
+       struct tevent_req *req = NULL;
+       struct cephwrap_pread_state *state = NULL;
+       int ret = -1;
+
+       DBG_DEBUG("[CEPH] %s\n", __func__);
+       req = tevent_req_create(mem_ctx, &state, struct cephwrap_pread_state);
+       if (req == NULL) {
+               return NULL;
+       }
 
-       DBG_DEBUG("[CEPH] write(%p, %p, %p, %llu)\n", handle, fsp, data, llu(n));
+       ret = ceph_read(handle->data, fsp->fh->fd, data, n, offset);
+       if (ret < 0) {
+               /* ceph returns -errno on error. */
+               tevent_req_error(req, -ret);
+               return tevent_req_post(req, ev);
+       }
 
-       result = ceph_write(handle->data, fsp->fh->fd, data, n, -1);
+       state->bytes_read = ret;
+       tevent_req_done(req);
+       /* Return and schedule the completion of the call. */
+       return tevent_req_post(req, ev);
+}
 
-       DBG_DEBUG("[CEPH] write(...) = %llu\n", llu(result));
-       if (result < 0) {
-               WRAP_RETURN(result);
+static ssize_t cephwrap_pread_recv(struct tevent_req *req,
+                                  struct vfs_aio_state *vfs_aio_state)
+{
+       struct cephwrap_pread_state *state =
+               tevent_req_data(req, struct cephwrap_pread_state);
+
+       DBG_DEBUG("[CEPH] %s\n", __func__);
+       if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+               return -1;
        }
-       fsp->fh->pos += result;
-       return result;
+       *vfs_aio_state = state->vfs_aio_state;
+       return state->bytes_read;
 }
 
 static ssize_t cephwrap_pwrite(struct vfs_handle_struct *handle, files_struct *fsp, const void *data,
@@ -509,6 +518,58 @@ static ssize_t cephwrap_pwrite(struct vfs_handle_struct *handle, files_struct *f
        WRAP_RETURN(result);
 }
 
+struct cephwrap_pwrite_state {
+       ssize_t bytes_written;
+       struct vfs_aio_state vfs_aio_state;
+};
+
+/*
+ * Fake up an async ceph write by calling the synchronous API.
+ */
+static struct tevent_req *cephwrap_pwrite_send(struct vfs_handle_struct *handle,
+                                              TALLOC_CTX *mem_ctx,
+                                              struct tevent_context *ev,
+                                              struct files_struct *fsp,
+                                              const void *data,
+                                              size_t n, off_t offset)
+{
+       struct tevent_req *req = NULL;
+       struct cephwrap_pwrite_state *state = NULL;
+       int ret = -1;
+
+       DBG_DEBUG("[CEPH] %s\n", __func__);
+       req = tevent_req_create(mem_ctx, &state, struct cephwrap_pwrite_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       ret = ceph_write(handle->data, fsp->fh->fd, data, n, offset);
+       if (ret < 0) {
+               /* ceph returns -errno on error. */
+               tevent_req_error(req, -ret);
+               return tevent_req_post(req, ev);
+       }
+
+       state->bytes_written = ret;
+       tevent_req_done(req);
+       /* Return and schedule the completion of the call. */
+       return tevent_req_post(req, ev);
+}
+
+static ssize_t cephwrap_pwrite_recv(struct tevent_req *req,
+                                   struct vfs_aio_state *vfs_aio_state)
+{
+       struct cephwrap_pwrite_state *state =
+               tevent_req_data(req, struct cephwrap_pwrite_state);
+
+       DBG_DEBUG("[CEPH] %s\n", __func__);
+       if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+               return -1;
+       }
+       *vfs_aio_state = state->vfs_aio_state;
+       return state->bytes_written;
+}
+
 static off_t cephwrap_lseek(struct vfs_handle_struct *handle, files_struct *fsp, off_t offset, int whence)
 {
        off_t result = 0;
@@ -561,12 +622,54 @@ static int cephwrap_rename(struct vfs_handle_struct *handle,
        WRAP_RETURN(result);
 }
 
-static int cephwrap_fsync(struct vfs_handle_struct *handle, files_struct *fsp)
+/*
+ * Fake up an async ceph fsync by calling the synchronous API.
+ */
+
+static struct tevent_req *cephwrap_fsync_send(struct vfs_handle_struct *handle,
+                                       TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       files_struct *fsp)
 {
-       int result;
-       DBG_DEBUG("[CEPH] cephwrap_fsync\n");
-       result = ceph_fsync(handle->data, fsp->fh->fd, false);
-       WRAP_RETURN(result);
+       struct tevent_req *req = NULL;
+       struct vfs_aio_state *state = NULL;
+       int ret = -1;
+
+       DBG_DEBUG("[CEPH] cephwrap_fsync_send\n");
+
+       req = tevent_req_create(mem_ctx, &state, struct vfs_aio_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       /* Make sync call. */
+       ret = ceph_fsync(handle->data, fsp->fh->fd, false);
+
+       if (ret != 0) {
+               /* ceph_fsync returns -errno on error. */
+               tevent_req_error(req, -ret);
+               return tevent_req_post(req, ev);
+       }
+
+       /* Mark it as done. */
+       tevent_req_done(req);
+       /* Return and schedule the completion of the call. */
+       return tevent_req_post(req, ev);
+}
+
+static int cephwrap_fsync_recv(struct tevent_req *req,
+                               struct vfs_aio_state *vfs_aio_state)
+{
+       struct vfs_aio_state *state =
+               tevent_req_data(req, struct vfs_aio_state);
+
+       DBG_DEBUG("[CEPH] cephwrap_fsync_recv\n");
+
+       if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+               return -1;
+       }
+       *vfs_aio_state = *state;
+       return 0;
 }
 
 #ifdef HAVE_CEPH_STATX
@@ -574,9 +677,21 @@ static int cephwrap_fsync(struct vfs_handle_struct *handle, files_struct *fsp)
 
 static void init_stat_ex_from_ceph_statx(struct stat_ex *dst, const struct ceph_statx *stx)
 {
-       if ((stx->stx_mask & SAMBA_STATX_ATTR_MASK) != SAMBA_STATX_ATTR_MASK)
+       DBG_DEBUG("[CEPH]\tstx = {dev = %llx, ino = %llu, mode = 0x%x, "
+                 "nlink = %llu, uid = %d, gid = %d, rdev = %llx, size = %llu, "
+                 "blksize = %llu, blocks = %llu, atime = %llu, mtime = %llu, "
+                 "ctime = %llu, btime = %llu}\n",
+                 llu(stx->stx_dev), llu(stx->stx_ino), stx->stx_mode,
+                 llu(stx->stx_nlink), stx->stx_uid, stx->stx_gid,
+                 llu(stx->stx_rdev), llu(stx->stx_size), llu(stx->stx_blksize),
+                 llu(stx->stx_blocks), llu(stx->stx_atime.tv_sec),
+                 llu(stx->stx_mtime.tv_sec), llu(stx->stx_ctime.tv_sec),
+                 llu(stx->stx_btime.tv_sec));
+
+       if ((stx->stx_mask & SAMBA_STATX_ATTR_MASK) != SAMBA_STATX_ATTR_MASK) {
                DBG_WARNING("%s: stx->stx_mask is incorrect (wanted %x, got %x)",
                                __func__, SAMBA_STATX_ATTR_MASK, stx->stx_mask);
+       }
 
        dst->st_ex_dev = stx->stx_dev;
        dst->st_ex_rdev = stx->stx_rdev;
@@ -613,16 +728,8 @@ static int cephwrap_stat(struct vfs_handle_struct *handle,
        DBG_DEBUG("[CEPH] statx(...) = %d\n", result);
        if (result < 0) {
                WRAP_RETURN(result);
-       } else {
-               DBG_DEBUG("[CEPH]\tstx = {dev = %llx, ino = %llu, mode = 0x%x, nlink = %llu, "
-                          "uid = %d, gid = %d, rdev = %llx, size = %llu, blksize = %llu, "
-                          "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu, btime = %llu}\n",
-                          llu(stx.stx_dev), llu(stx.stx_ino), stx.stx_mode,
-                          llu(stx.stx_nlink), stx.stx_uid, stx.stx_gid, llu(stx.stx_rdev),
-                          llu(stx.stx_size), llu(stx.stx_blksize),
-                          llu(stx.stx_blocks), llu(stx.stx_atime.tv_sec), llu(stx.stx_mtime.tv_sec),
-                          llu(stx.stx_ctime.tv_sec), llu(stx.stx_btime.tv_sec));
        }
+
        init_stat_ex_from_ceph_statx(&smb_fname->st, &stx);
        DBG_DEBUG("[CEPH] mode = 0x%x\n", smb_fname->st.st_ex_mode);
        return result;
@@ -639,16 +746,8 @@ static int cephwrap_fstat(struct vfs_handle_struct *handle, files_struct *fsp, S
        DBG_DEBUG("[CEPH] fstat(...) = %d\n", result);
        if (result < 0) {
                WRAP_RETURN(result);
-       } else {
-               DBG_DEBUG("[CEPH]\tstx = {dev = %llx, ino = %llu, mode = 0x%x, nlink = %llu, "
-                          "uid = %d, gid = %d, rdev = %llx, size = %llu, blksize = %llu, "
-                          "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu, btime = %llu}\n",
-                          llu(stx.stx_dev), llu(stx.stx_ino), stx.stx_mode,
-                          llu(stx.stx_nlink), stx.stx_uid, stx.stx_gid, llu(stx.stx_rdev),
-                          llu(stx.stx_size), llu(stx.stx_blksize),
-                          llu(stx.stx_blocks), llu(stx.stx_atime.tv_sec), llu(stx.stx_mtime.tv_sec),
-                          llu(stx.stx_ctime.tv_sec), llu(stx.stx_btime.tv_sec));
        }
+
        init_stat_ex_from_ceph_statx(sbuf, &stx);
        DBG_DEBUG("[CEPH] mode = 0x%x\n", sbuf->st_ex_mode);
        return result;
@@ -673,6 +772,7 @@ static int cephwrap_lstat(struct vfs_handle_struct *handle,
        if (result < 0) {
                WRAP_RETURN(result);
        }
+
        init_stat_ex_from_ceph_statx(&smb_fname->st, &stx);
        return result;
 }
@@ -728,14 +828,15 @@ static int cephwrap_stat(struct vfs_handle_struct *handle,
        DBG_DEBUG("[CEPH] stat(...) = %d\n", result);
        if (result < 0) {
                WRAP_RETURN(result);
-       } else {
-               DBG_DEBUG("[CEPH]\tstbuf = {dev = %llu, ino = %llu, mode = 0x%x, nlink = %llu, "
-                          "uid = %d, gid = %d, rdev = %llu, size = %llu, blksize = %llu, "
-                          "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu}\n",
-                          llu(stbuf.st_dev), llu(stbuf.st_ino), stbuf.st_mode, llu(stbuf.st_nlink),
-                          stbuf.st_uid, stbuf.st_gid, llu(stbuf.st_rdev), llu(stbuf.st_size), llu(stbuf.st_blksize),
-                          llu(stbuf.st_blocks), llu(stbuf.st_atime), llu(stbuf.st_mtime), llu(stbuf.st_ctime));
        }
+
+       DBG_DEBUG("[CEPH]\tstbuf = {dev = %llu, ino = %llu, mode = 0x%x, nlink = %llu, "
+                  "uid = %d, gid = %d, rdev = %llu, size = %llu, blksize = %llu, "
+                  "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu}\n",
+                  llu(stbuf.st_dev), llu(stbuf.st_ino), stbuf.st_mode, llu(stbuf.st_nlink),
+                  stbuf.st_uid, stbuf.st_gid, llu(stbuf.st_rdev), llu(stbuf.st_size), llu(stbuf.st_blksize),
+                  llu(stbuf.st_blocks), llu(stbuf.st_atime), llu(stbuf.st_mtime), llu(stbuf.st_ctime));
+
        init_stat_ex_from_stat(
                        &smb_fname->st, &stbuf,
                        lp_fake_directory_create_times(SNUM(handle->conn)));
@@ -753,15 +854,15 @@ static int cephwrap_fstat(struct vfs_handle_struct *handle, files_struct *fsp, S
        DBG_DEBUG("[CEPH] fstat(...) = %d\n", result);
        if (result < 0) {
                WRAP_RETURN(result);
-       } else {
-               DBG_DEBUG("[CEPH]\tstbuf = {dev = %llu, ino = %llu, mode = 0x%x, nlink = %llu, "
-                          "uid = %d, gid = %d, rdev = %llu, size = %llu, blksize = %llu, "
-                          "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu}\n",
-                          llu(stbuf.st_dev), llu(stbuf.st_ino), stbuf.st_mode, llu(stbuf.st_nlink),
-                          stbuf.st_uid, stbuf.st_gid, llu(stbuf.st_rdev), llu(stbuf.st_size), llu(stbuf.st_blksize),
-                          llu(stbuf.st_blocks), llu(stbuf.st_atime), llu(stbuf.st_mtime), llu(stbuf.st_ctime));
        }
 
+       DBG_DEBUG("[CEPH]\tstbuf = {dev = %llu, ino = %llu, mode = 0x%x, nlink = %llu, "
+                  "uid = %d, gid = %d, rdev = %llu, size = %llu, blksize = %llu, "
+                  "blocks = %llu, atime = %llu, mtime = %llu, ctime = %llu}\n",
+                  llu(stbuf.st_dev), llu(stbuf.st_ino), stbuf.st_mode, llu(stbuf.st_nlink),
+                  stbuf.st_uid, stbuf.st_gid, llu(stbuf.st_rdev), llu(stbuf.st_size), llu(stbuf.st_blksize),
+                  llu(stbuf.st_blocks), llu(stbuf.st_atime), llu(stbuf.st_mtime), llu(stbuf.st_ctime));
+
        init_stat_ex_from_stat(
                        sbuf, &stbuf,
                        lp_fake_directory_create_times(SNUM(handle->conn)));
@@ -787,6 +888,7 @@ static int cephwrap_lstat(struct vfs_handle_struct *handle,
        if (result < 0) {
                WRAP_RETURN(result);
        }
+
        init_stat_ex_from_stat(
                        &smb_fname->st, &stbuf,
                        lp_fake_directory_create_times(SNUM(handle->conn)));
@@ -849,26 +951,6 @@ static int cephwrap_chmod(struct vfs_handle_struct *handle,
        int result;
 
        DBG_DEBUG("[CEPH] chmod(%p, %s, %d)\n", handle, smb_fname->base_name, mode);
-
-       /*
-        * We need to do this due to the fact that the default POSIX ACL
-        * chmod modifies the ACL *mask* for the group owner, not the
-        * group owner bits directly. JRA.
-        */
-
-
-       {
-               int saved_errno = errno; /* We might get ENOSYS */
-               result = SMB_VFS_CHMOD_ACL(handle->conn,
-                                       smb_fname,
-                                       mode);
-               if (result == 0) {
-                       return result;
-               }
-               /* Error - return the old errno. */
-               errno = saved_errno;
-       }
-
        result = ceph_chmod(handle->data, smb_fname->base_name, mode);
        DBG_DEBUG("[CEPH] chmod(...) = %d\n", result);
        WRAP_RETURN(result);
@@ -880,21 +962,6 @@ static int cephwrap_fchmod(struct vfs_handle_struct *handle, files_struct *fsp,
 
        DBG_DEBUG("[CEPH] fchmod(%p, %p, %d)\n", handle, fsp, mode);
 
-       /*
-        * We need to do this due to the fact that the default POSIX ACL
-        * chmod modifies the ACL *mask* for the group owner, not the
-        * group owner bits directly. JRA.
-        */
-
-       {
-               int saved_errno = errno; /* We might get ENOSYS */
-               if ((result = SMB_VFS_FCHMOD_ACL(fsp, mode)) == 0) {
-                       return result;
-               }
-               /* Error - return the old errno. */
-               errno = saved_errno;
-       }
-
 #if defined(HAVE_FCHMOD)
        result = ceph_fchmod(handle->data, fsp->fh->fd, mode);
        DBG_DEBUG("[CEPH] fchmod(...) = %d\n", result);
@@ -970,9 +1037,7 @@ static struct smb_filename *cephwrap_getwd(struct vfs_handle_struct *handle,
 static int strict_allocate_ftruncate(struct vfs_handle_struct *handle, files_struct *fsp, off_t len)
 {
        off_t space_to_write;
-       uint64_t space_avail;
-       uint64_t bsize,dfree,dsize;
-       int ret;
+       int result;
        NTSTATUS status;
        SMB_STRUCT_STAT *pst;
 
@@ -991,111 +1056,45 @@ static int strict_allocate_ftruncate(struct vfs_handle_struct *handle, files_str
                return 0;
 
        /* Shrink - just ftruncate. */
-       if (pst->st_ex_size > len)
-               return ftruncate(fsp->fh->fd, len);
-
-       space_to_write = len - pst->st_ex_size;
-
-       /* for allocation try fallocate first. This can fail on some
-          platforms e.g. when the filesystem doesn't support it and no
-          emulation is being done by the libc (like on AIX with JFS1). In that
-          case we do our own emulation. fallocate implementations can
-          return ENOTSUP or EINVAL in cases like that. */
-       ret = SMB_VFS_FALLOCATE(fsp, 0, pst->st_ex_size, space_to_write);
-       if (ret == -1 && errno == ENOSPC) {
-               return -1;
-       }
-       if (ret == 0) {
-               return 0;
-       }
-       DEBUG(10,("[CEPH] strict_allocate_ftruncate: SMB_VFS_FALLOCATE failed with "
-               "error %d. Falling back to slow manual allocation\n", errno));
-
-       /* available disk space is enough or not? */
-       space_avail =
-           get_dfree_info(fsp->conn, fsp->fsp_name, &bsize, &dfree, &dsize);
-       /* space_avail is 1k blocks */
-       if (space_avail == (uint64_t)-1 ||
-                       ((uint64_t)space_to_write/1024 > space_avail) ) {
-               errno = ENOSPC;
-               return -1;
+       if (pst->st_ex_size > len) {
+               result = ceph_ftruncate(handle->data, fsp->fh->fd, len);
+               WRAP_RETURN(result);
        }
 
-       /* Write out the real space on disk. */
-       return vfs_slow_fallocate(fsp, pst->st_ex_size, space_to_write);
+       space_to_write = len - pst->st_ex_size;
+       result = ceph_fallocate(handle->data, fsp->fh->fd, 0, pst->st_ex_size,
+                               space_to_write);
+       WRAP_RETURN(result);
 }
 
 static int cephwrap_ftruncate(struct vfs_handle_struct *handle, files_struct *fsp, off_t len)
 {
        int result = -1;
-       SMB_STRUCT_STAT st;
-       char c = 0;
-       off_t currpos;
 
        DBG_DEBUG("[CEPH] ftruncate(%p, %p, %llu\n", handle, fsp, llu(len));
 
        if (lp_strict_allocate(SNUM(fsp->conn))) {
-               result = strict_allocate_ftruncate(handle, fsp, len);
-               return result;
+               return strict_allocate_ftruncate(handle, fsp, len);
        }
 
-       /* we used to just check HAVE_FTRUNCATE_EXTEND and only use
-          sys_ftruncate if the system supports it. Then I discovered that
-          you can have some filesystems that support ftruncate
-          expansion and some that don't! On Linux fat can't do
-          ftruncate extend but ext2 can. */
-
        result = ceph_ftruncate(handle->data, fsp->fh->fd, len);
-       if (result == 0)
-               goto done;
-
-       /* According to W. R. Stevens advanced UNIX prog. Pure 4.3 BSD cannot
-          extend a file with ftruncate. Provide alternate implementation
-          for this */
-       currpos = SMB_VFS_LSEEK(fsp, 0, SEEK_CUR);
-       if (currpos == -1) {
-               goto done;
-       }
-
-       /* Do an fstat to see if the file is longer than the requested
-          size in which case the ftruncate above should have
-          succeeded or shorter, in which case seek to len - 1 and
-          write 1 byte of zero */
-       if (SMB_VFS_FSTAT(fsp, &st) == -1) {
-               goto done;
-       }
-
-#ifdef S_ISFIFO
-       if (S_ISFIFO(st.st_ex_mode)) {
-               result = 0;
-               goto done;
-       }
-#endif
-
-       if (st.st_ex_size == len) {
-               result = 0;
-               goto done;
-       }
-
-       if (st.st_ex_size > len) {
-               /* the sys_ftruncate should have worked */
-               goto done;
-       }
-
-       if (SMB_VFS_LSEEK(fsp, len-1, SEEK_SET) != len -1)
-               goto done;
-
-       if (SMB_VFS_WRITE(fsp, &c, 1)!=1)
-               goto done;
-
-       /* Seek to where we were */
-       if (SMB_VFS_LSEEK(fsp, currpos, SEEK_SET) != currpos)
-               goto done;
-       result = 0;
+       WRAP_RETURN(result);
+}
 
-  done:
+static int cephwrap_fallocate(struct vfs_handle_struct *handle,
+                             struct files_struct *fsp,
+                             uint32_t mode,
+                             off_t offset,
+                             off_t len)
+{
+       int result;
 
-       return result;
+       DBG_DEBUG("[CEPH] fallocate(%p, %p, %u, %llu, %llu\n",
+                 handle, fsp, mode, llu(offset), llu(len));
+       /* unsupported mode flags are rejected by libcephfs */
+       result = ceph_fallocate(handle->data, fsp->fh->fd, mode, offset, len);
+       DBG_DEBUG("[CEPH] fallocate(...) = %d\n", result);
+       WRAP_RETURN(result);
 }
 
 static bool cephwrap_lock(struct vfs_handle_struct *handle, files_struct *fsp, int op, off_t offset, off_t count, int type)
@@ -1107,12 +1106,11 @@ static bool cephwrap_lock(struct vfs_handle_struct *handle, files_struct *fsp, i
 static int cephwrap_kernel_flock(struct vfs_handle_struct *handle, files_struct *fsp,
                                uint32_t share_mode, uint32_t access_mask)
 {
-       DBG_DEBUG("[CEPH] kernel_flock\n");
-       /*
-        * We must return zero here and pretend all is good.
-        * One day we might have this in CEPH.
-        */
-       return 0;
+       DBG_ERR("[CEPH] flock unsupported! Consider setting "
+               "\"kernel share modes = no\"\n");
+
+       errno = ENOSYS;
+       return -1;
 }
 
 static bool cephwrap_getlock(struct vfs_handle_struct *handle, files_struct *fsp, off_t *poffset, off_t *pcount, int *ptype, pid_t *ppid)
@@ -1201,30 +1199,31 @@ static struct smb_filename *cephwrap_realpath(struct vfs_handle_struct *handle,
                                TALLOC_CTX *ctx,
                                const struct smb_filename *smb_fname)
 {
-       char *result;
+       char *result = NULL;
        const char *path = smb_fname->base_name;
        size_t len = strlen(path);
        struct smb_filename *result_fname = NULL;
+       int r = -1;
 
-       result = SMB_MALLOC_ARRAY(char, PATH_MAX+1);
        if (len && (path[0] == '/')) {
-               int r = asprintf(&result, "%s", path);
-               if (r < 0) return NULL;
+               r = asprintf(&result, "%s", path);
        } else if ((len >= 2) && (path[0] == '.') && (path[1] == '/')) {
                if (len == 2) {
-                       int r = asprintf(&result, "%s",
+                       r = asprintf(&result, "%s",
                                        handle->conn->connectpath);
-                       if (r < 0) return NULL;
                } else {
-                       int r = asprintf(&result, "%s/%s",
+                       r = asprintf(&result, "%s/%s",
                                        handle->conn->connectpath, &path[2]);
-                       if (r < 0) return NULL;
                }
        } else {
-               int r = asprintf(&result, "%s/%s",
+               r = asprintf(&result, "%s/%s",
                                handle->conn->connectpath, path);
-               if (r < 0) return NULL;
        }
+
+       if (r < 0) {
+               return NULL;
+       }
+
        DBG_DEBUG("[CEPH] realpath(%p, %s) = %s\n", handle, path, result);
        result_fname = synthetic_smb_fname(ctx,
                                result,
@@ -1281,9 +1280,8 @@ static ssize_t cephwrap_getxattr(struct vfs_handle_struct *handle,
        DBG_DEBUG("[CEPH] getxattr(...) = %d\n", ret);
        if (ret < 0) {
                WRAP_RETURN(ret);
-       } else {
-               return (ssize_t)ret;
        }
+       return (ssize_t)ret;
 }
 
 static ssize_t cephwrap_fgetxattr(struct vfs_handle_struct *handle, struct files_struct *fsp, const char *name, void *value, size_t size)
@@ -1298,9 +1296,8 @@ static ssize_t cephwrap_fgetxattr(struct vfs_handle_struct *handle, struct files
        DBG_DEBUG("[CEPH] fgetxattr(...) = %d\n", ret);
        if (ret < 0) {
                WRAP_RETURN(ret);
-       } else {
-               return (ssize_t)ret;
        }
+       return (ssize_t)ret;
 }
 
 static ssize_t cephwrap_listxattr(struct vfs_handle_struct *handle,
@@ -1315,9 +1312,8 @@ static ssize_t cephwrap_listxattr(struct vfs_handle_struct *handle,
        DBG_DEBUG("[CEPH] listxattr(...) = %d\n", ret);
        if (ret < 0) {
                WRAP_RETURN(ret);
-       } else {
-               return (ssize_t)ret;
        }
+       return (ssize_t)ret;
 }
 
 static ssize_t cephwrap_flistxattr(struct vfs_handle_struct *handle, struct files_struct *fsp, char *list, size_t size)
@@ -1332,9 +1328,8 @@ static ssize_t cephwrap_flistxattr(struct vfs_handle_struct *handle, struct file
        DBG_DEBUG("[CEPH] flistxattr(...) = %d\n", ret);
        if (ret < 0) {
                WRAP_RETURN(ret);
-       } else {
-               return (ssize_t)ret;
        }
+       return (ssize_t)ret;
 }
 
 static int cephwrap_removexattr(struct vfs_handle_struct *handle,
@@ -1431,15 +1426,18 @@ static struct vfs_fn_pointers ceph_fns = {
 
        .open_fn = cephwrap_open,
        .close_fn = cephwrap_close,
-       .read_fn = cephwrap_read,
        .pread_fn = cephwrap_pread,
-       .write_fn = cephwrap_write,
+       .pread_send_fn = cephwrap_pread_send,
+       .pread_recv_fn = cephwrap_pread_recv,
        .pwrite_fn = cephwrap_pwrite,
+       .pwrite_send_fn = cephwrap_pwrite_send,
+       .pwrite_recv_fn = cephwrap_pwrite_recv,
        .lseek_fn = cephwrap_lseek,
        .sendfile_fn = cephwrap_sendfile,
        .recvfile_fn = cephwrap_recvfile,
        .rename_fn = cephwrap_rename,
-       .fsync_fn = cephwrap_fsync,
+       .fsync_send_fn = cephwrap_fsync_send,
+       .fsync_recv_fn = cephwrap_fsync_recv,
        .stat_fn = cephwrap_stat,
        .fstat_fn = cephwrap_fstat,
        .lstat_fn = cephwrap_lstat,
@@ -1453,6 +1451,7 @@ static struct vfs_fn_pointers ceph_fns = {
        .getwd_fn = cephwrap_getwd,
        .ntimes_fn = cephwrap_ntimes,
        .ftruncate_fn = cephwrap_ftruncate,
+       .fallocate_fn = cephwrap_fallocate,
        .lock_fn = cephwrap_lock,
        .kernel_flock_fn = cephwrap_kernel_flock,
        .linux_setlease_fn = cephwrap_linux_setlease,
@@ -1468,6 +1467,8 @@ static struct vfs_fn_pointers ceph_fns = {
 
        /* EA operations. */
        .getxattr_fn = cephwrap_getxattr,
+       .getxattrat_send_fn = vfs_not_implemented_getxattrat_send,
+       .getxattrat_recv_fn = vfs_not_implemented_getxattrat_recv,
        .fgetxattr_fn = cephwrap_fgetxattr,
        .listxattr_fn = cephwrap_listxattr,
        .flistxattr_fn = cephwrap_flistxattr,