ldb: pack function for new pack format
authorAaron Haslett <aaronhaslett@catalyst.net.nz>
Fri, 10 May 2019 06:10:55 +0000 (18:10 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Wed, 22 May 2019 04:42:28 +0000 (04:42 +0000)
Pack function for new pack format with values separated from other data
so that while unpacking, the value section (which is probably large)
doesn't have to be loaded into cache/memory.
The new format is disabled for now.
Two tests are added that operate on a detailed binary breakdown of the
new format.

NOTE: Configure with --abi-check-disable to build this commit. This
patch is part of a set of LDB ABI changes, and the version update is
done on the last commit.

Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
lib/ldb/common/ldb_pack.c
lib/ldb/include/ldb_module.h
lib/ldb/ldb_key_value/ldb_kv.c
source4/torture/ldb/ldb.c

index 21c0708ffdc1920e1066dcba5def5de2809239cb..9d87a10b9f1ded6e21bfc8ce7b1d6340c5922975 100644 (file)
@@ -95,17 +95,9 @@ static int attribute_storable_values(const struct ldb_message_element *el)
        return el->num_values;
 }
 
-/*
-  pack a ldb message into a linear buffer in a ldb_val
-
-  note that this routine avoids saving elements with zero values,
-  as these are equivalent to having no element
-
-  caller frees the data buffer after use
-*/
-int ldb_pack_data(struct ldb_context *ldb,
-                 const struct ldb_message *message,
-                 struct ldb_val *data)
+static int ldb_pack_data_v1(struct ldb_context *ldb,
+                           const struct ldb_message *message,
+                           struct ldb_val *data)
 {
        unsigned int i, j, real_elements=0;
        size_t size, dn_len, attr_len, value_len;
@@ -213,6 +205,331 @@ int ldb_pack_data(struct ldb_context *ldb,
        return 0;
 }
 
+/*
+ * New pack version designed based on performance profiling of version 1.
+ * The approach is to separate value data from the rest of the record's data.
+ * This improves performance because value data is not needed during unpacking
+ * or filtering of the message's attribute list. During filtering we only copy
+ * attributes which are present in the attribute list, however at the parse
+ * stage we need to point to all attributes as they may be referenced in the
+ * search expression.
+ * With this new format, we don't lose time loading data (eg via
+ * talloc_memdup()) that is never needed (for the vast majority of attributes
+ * are are never found in either the search expression or attribute list).
+ * Additional changes include adding a canonicalized DN (for later
+ * optimizations) and variable width length fields for faster unpacking.
+ * The pack and unpack performance improvement is tested in the torture
+ * test torture_ldb_pack_format_perf.
+ *
+ * Layout:
+ *
+ * Version (4 bytes)
+ * Number of Elements (4 bytes)
+ * DN length (4 bytes)
+ * DN with null terminator (DN length + 1 bytes)
+ * Canonicalized DN length (4 bytes)
+ * Canonicalized DN with null terminator (Canonicalized DN length + 1 bytes)
+ * Number of bytes from here to value data section (4 bytes)
+ * # For each element:
+ *     Element name length (4 bytes)
+ *     Element name with null terminator (Element name length + 1 bytes)
+ *     Number of values (4 bytes)
+ *     Width of value lengths
+ *     # For each value:
+ *             Value data length (#bytes given by width field above)
+ * # For each element:
+ *     # For each value:
+ *             Value data (#bytes given by corresponding length above)
+ */
+static int ldb_pack_data_v2(struct ldb_context *ldb,
+                           const struct ldb_message *message,
+                           struct ldb_val *data)
+{
+       unsigned int i, j, real_elements=0;
+       size_t size, dn_len, dn_canon_len, attr_len, value_len;
+       const char *dn, *dn_canon;
+       uint8_t *p, *q;
+       size_t len;
+       size_t max_val_len;
+       uint8_t val_len_width;
+
+       /*
+        * First half of this function will calculate required size for
+        * packed data. Initial size is 20 = 5 * 4.  5 fixed fields are:
+        * version, num elements, dn len, canon dn len, attr section len
+        */
+       size = U32_LEN * 5;
+
+       /*
+        * Get linearized and canonicalized form of the DN and add the lengths
+        * of each to size, plus 1 for null terminator.
+        */
+       dn = ldb_dn_get_linearized(message->dn);
+       if (dn == NULL) {
+               errno = ENOMEM;
+               return -1;
+       }
+
+       dn_len = strlen(dn) + NULL_PAD_BYTE_LEN;
+       if (size + dn_len < size) {
+               errno = ENOMEM;
+               return -1;
+       }
+       size += dn_len;
+
+       if (ldb_dn_is_special(message->dn)) {
+               dn_canon_len = NULL_PAD_BYTE_LEN;
+               dn_canon = discard_const_p(char, "\0");
+       } else {
+               dn_canon = ldb_dn_canonical_string(message->dn, message->dn);
+               if (dn_canon == NULL) {
+                       errno = ENOMEM;
+                       return -1;
+               }
+
+               dn_canon_len = strlen(dn_canon) + NULL_PAD_BYTE_LEN;
+               if (size + dn_canon_len < size) {
+                       errno = ENOMEM;
+                       return -1;
+               }
+       }
+       size += dn_canon_len;
+
+       /* Add the size required by each element */
+       for (i=0;i<message->num_elements;i++) {
+               if (attribute_storable_values(&message->elements[i]) == 0) {
+                       continue;
+               }
+
+               real_elements++;
+
+               /*
+                * Add length of element name + 9 for:
+                * 1 for null terminator
+                * 4 for element name length field
+                * 4 for number of values field
+                */
+               attr_len = strlen(message->elements[i].name);
+               if (size + attr_len + U32_LEN * 2 + NULL_PAD_BYTE_LEN < size) {
+                       errno = ENOMEM;
+                       return -1;
+               }
+               size += attr_len + U32_LEN * 2 + NULL_PAD_BYTE_LEN;
+
+               /*
+                * Find the max value length, so we can calculate the width
+                * required for the value length fields.
+                */
+               max_val_len = 0;
+               for (j=0;j<message->elements[i].num_values;j++) {
+                       value_len = message->elements[i].values[j].length;
+                       if (value_len > max_val_len) {
+                               max_val_len = value_len;
+                       }
+
+                       if (size + value_len + NULL_PAD_BYTE_LEN < size) {
+                               errno = ENOMEM;
+                               return -1;
+                       }
+                       size += value_len + NULL_PAD_BYTE_LEN;
+               }
+
+               if (max_val_len <= UCHAR_MAX) {
+                       val_len_width = U8_LEN;
+               } else if (max_val_len <= USHRT_MAX) {
+                       val_len_width = U16_LEN;
+               } else if (max_val_len <= UINT_MAX) {
+                       val_len_width = U32_LEN;
+               } else {
+                       errno = EMSGSIZE;
+                       return -1;
+               }
+
+               /* Total size required for val lengths (re-using variable) */
+               max_val_len = (val_len_width*message->elements[i].num_values);
+
+               /* Add one for storing the width */
+               max_val_len += U8_LEN;
+               if (size + max_val_len < size) {
+                       errno = ENOMEM;
+                       return -1;
+               }
+               size += max_val_len;
+       }
+
+       /* Allocate */
+       data->data = talloc_array(ldb, uint8_t, size);
+       if (!data->data) {
+               errno = ENOMEM;
+               return -1;
+       }
+       data->length = size;
+
+       /* Packing format version and number of element */
+       p = data->data;
+       PUSH_LE_U32(p, 0, LDB_PACKING_FORMAT_V2);
+       p += U32_LEN;
+       PUSH_LE_U32(p, 0, real_elements);
+       p += U32_LEN;
+
+       /* Pack DN and Canonicalized DN */
+       PUSH_LE_U32(p, 0, dn_len-NULL_PAD_BYTE_LEN);
+       p += U32_LEN;
+       memcpy(p, dn, dn_len);
+       p += dn_len;
+
+       PUSH_LE_U32(p, 0, dn_canon_len-NULL_PAD_BYTE_LEN);
+       p += U32_LEN;
+       memcpy(p, dn_canon, dn_canon_len);
+       p += dn_canon_len;
+
+       /*
+        * Save pointer at this point and leave a U32_LEN gap for
+        * storing the size of the attribute names and value lengths
+        * section
+        */
+       q = p;
+       p += U32_LEN;
+
+       for (i=0;i<message->num_elements;i++) {
+               if (attribute_storable_values(&message->elements[i]) == 0) {
+                       continue;
+               }
+
+               /* Length of el name */
+               len = strlen(message->elements[i].name);
+               PUSH_LE_U32(p, 0, len);
+               p += U32_LEN;
+
+               /*
+                * Even though we have the element name's length, put a null
+                * terminator at the end so if any code uses the name
+                * directly, it'll be safe to do things requiring null
+                * termination like strlen
+                */
+               memcpy(p, message->elements[i].name, len+NULL_PAD_BYTE_LEN);
+               p += len + NULL_PAD_BYTE_LEN;
+               /* Num values */
+               PUSH_LE_U32(p, 0, message->elements[i].num_values);
+               p += U32_LEN;
+
+               /*
+                * Calculate value length width again. It's faster to
+                * calculate it again than do the array management to
+                * store the result during size calculation.
+                */
+               max_val_len = 0;
+               for (j=0;j<message->elements[i].num_values;j++) {
+                       value_len = message->elements[i].values[j].length;
+                       if (value_len > max_val_len) {
+                               max_val_len = value_len;
+                       }
+               }
+
+               if (max_val_len <= UCHAR_MAX) {
+                       val_len_width = U8_LEN;
+               } else if (max_val_len <= USHRT_MAX) {
+                       val_len_width = U16_LEN;
+               } else if (max_val_len <= UINT_MAX) {
+                       val_len_width = U32_LEN;
+               } else {
+                       errno = EMSGSIZE;
+                       return -1;
+               }
+
+               /* Pack the width */
+               *p = val_len_width & 0xFF;
+               p += U8_LEN;
+
+               /*
+                * Pack each value's length using the minimum number of bytes
+                * required, which we just calculated. We repeat the loop
+                * for each case here so the compiler can inline code.
+                */
+               if (val_len_width == U8_LEN) {
+                       for (j=0;j<message->elements[i].num_values;j++) {
+                               PUSH_LE_U8(p, 0,
+                                       message->elements[i].values[j].length);
+                               p += U8_LEN;
+                       }
+               } else if (val_len_width == U16_LEN) {
+                       for (j=0;j<message->elements[i].num_values;j++) {
+                               PUSH_LE_U16(p, 0,
+                                       message->elements[i].values[j].length);
+                               p += U16_LEN;
+                       }
+               } else if (val_len_width == U32_LEN) {
+                       for (j=0;j<message->elements[i].num_values;j++) {
+                               PUSH_LE_U32(p, 0,
+                                       message->elements[i].values[j].length);
+                               p += U32_LEN;
+                       }
+               }
+       }
+
+       /*
+        * We've finished packing the attr names and value lengths
+        * section, so store the size in the U32_LEN gap we left
+        * earlier
+        */
+       PUSH_LE_U32(q, 0, p-q);
+
+       /* Now pack the values */
+       for (i=0;i<message->num_elements;i++) {
+               if (attribute_storable_values(&message->elements[i]) == 0) {
+                       continue;
+               }
+               for (j=0;j<message->elements[i].num_values;j++) {
+                       memcpy(p, message->elements[i].values[j].data,
+                              message->elements[i].values[j].length);
+
+                       /*
+                        * Even though we have the data length, put a null
+                        * terminator at the end of each value's data so if
+                        * any code uses the data directly, it'll  be safe to
+                        * do things requiring null termination like strlen.
+                        */
+                       p[message->elements[i].values[j].length] = 0;
+                       p += message->elements[i].values[j].length +
+                               NULL_PAD_BYTE_LEN;
+               }
+       }
+
+       /*
+        * If we didn't end up at the end of the data here, something has
+        * gone very wrong.
+        */
+       if (p != data->data + size) {
+               errno = ENOMEM;
+               return -1;
+       }
+
+       return 0;
+}
+
+/*
+  pack a ldb message into a linear buffer in a ldb_val
+
+  note that this routine avoids saving elements with zero values,
+  as these are equivalent to having no element
+
+  caller frees the data buffer after use
+*/
+int ldb_pack_data(struct ldb_context *ldb,
+                 const struct ldb_message *message,
+                 struct ldb_val *data,
+                 uint32_t pack_format_version) {
+
+       if (pack_format_version == LDB_PACKING_FORMAT) {
+               return ldb_pack_data_v1(ldb, message, data);
+       } else if (pack_format_version == LDB_PACKING_FORMAT_V2) {
+               return ldb_pack_data_v2(ldb, message, data);
+       } else {
+               errno = EINVAL;
+               return -1;
+       }
+}
+
 /*
  * Unpack a ldb message from a linear buffer in ldb_val
  */
index db80b28fcfa3ddfc0fb0f1d9de53a66d4e645e3e..3b7286149094394503b7f5428a3c0283e552d7bc 100644 (file)
@@ -509,7 +509,8 @@ int ldb_register_extended_match_rule(struct ldb_context *ldb,
  */
 int ldb_pack_data(struct ldb_context *ldb,
                  const struct ldb_message *message,
-                 struct ldb_val *data);
+                 struct ldb_val *data,
+                 uint32_t pack_format_version);
 /*
  * Unpack a ldb message from a linear buffer in ldb_val
  */
index 81a77462c8307d4e5bc68bc482d18ee3b93debfa..c8f7fd1396d7d46a0cd99cec2be1777d530c4d2d 100644 (file)
@@ -384,7 +384,8 @@ int ldb_kv_store(struct ldb_module *module,
        }
 
        ret = ldb_pack_data(ldb_module_get_ctx(module),
-                           msg, &ldb_data);
+                           msg, &ldb_data,
+                           ldb_kv->pack_format_version);
        if (ret == -1) {
                TALLOC_FREE(key_ctx);
                return LDB_ERR_OTHER;
index ba59b025e9fdee8c2c17c255b0195649c2509ce4..1de24cbc7edbfbce87303bd1cb4e567725cecd39 100644 (file)
@@ -1371,6 +1371,143 @@ static bool torture_ldb_unpack_data_corrupt(struct torture_context *torture)
        return true;
 }
 
+static bool torture_ldb_pack_data_v2(struct torture_context *torture)
+{
+       TALLOC_CTX *mem_ctx = talloc_new(torture);
+       struct ldb_context *ldb;
+       struct ldb_val binary;
+
+       uint8_t bin[] = {0x68, 0x19, 0x01, 0x26, /* version */
+               2, 0, 0, 0, /* num elements */
+               4, 0, 0, 0, /* dn length */
+               'D', 'N', '=', 'A', 0, /* dn with null term */
+               2, 0, 0, 0, /* canonicalized dn length */
+               '/', 'A', 0, /* canonicalized dn with null term */
+               42, 0, 0, 0, /* distance from here to values section */
+               3, 0, 0, 0, /* el name length */
+               'a', 'b', 'c', 0, /* name with null term */
+               4, 0, 0, 0, 1, /* num values and length width */
+               1, 1, 1, 1, /* value lengths */
+               3, 0, 0, 0, /* el name length */
+               'd', 'e', 'f', 0, /* name def with null term */
+               4, 0, 0, 0, 2, /* num of values and length width */
+               1, 0, 1, 0, 1, 0, 0, 1, /* value lengths */
+               '1', 0, '2', 0, '3', 0, '4', 0, /* values for abc */
+               '5', 0, '6', 0, '7', 0}; /* first 3 values for def */
+
+       char eight_256[257] =\
+               "88888888888888888888888888888888888888888888888888888888888"
+               "88888888888888888888888888888888888888888888888888888888888"
+               "88888888888888888888888888888888888888888888888888888888888"
+               "88888888888888888888888888888888888888888888888888888888888"
+               "88888888888888888888"; /* def's 4th value */
+
+       struct ldb_val vals[4] = {{.data=discard_const_p(uint8_t, "1"),
+                                  .length=1},
+                                 {.data=discard_const_p(uint8_t, "2"),
+                                  .length=1},
+                                 {.data=discard_const_p(uint8_t, "3"),
+                                  .length=1},
+                                 {.data=discard_const_p(uint8_t, "4"),
+                                  .length=1}};
+       struct ldb_val vals2[4] = {{.data=discard_const_p(uint8_t,"5"),
+                                  .length=1},
+                                 {.data=discard_const_p(uint8_t, "6"),
+                                  .length=1},
+                                 {.data=discard_const_p(uint8_t, "7"),
+                                  .length=1},
+                                 {.data=discard_const_p(uint8_t, eight_256),
+                                  .length=256}};
+       struct ldb_message_element els[2] = {{.name=discard_const_p(char, "abc"),
+                                          .num_values=4, .values=vals},
+                                         {.name=discard_const_p(char, "def"),
+                                          .num_values=4, .values=vals2}};
+       struct ldb_message msg = {.num_elements=2, .elements=els};
+
+       uint8_t *expect_bin;
+       struct ldb_val expect_bin_ldb;
+       size_t expect_size = sizeof(bin) + sizeof(eight_256);
+       expect_bin = talloc_size(NULL, expect_size);
+       memcpy(expect_bin, bin, sizeof(bin));
+       memcpy(expect_bin + sizeof(bin), eight_256, sizeof(eight_256));
+       expect_bin_ldb = data_blob_const(expect_bin, expect_size);
+
+       ldb = samba_ldb_init(mem_ctx, torture->ev, NULL,NULL,NULL);
+       torture_assert(torture, ldb != NULL, "Failed to init ldb");
+
+       msg.dn = ldb_dn_new(NULL, ldb, "DN=A");
+
+       torture_assert_int_equal(torture,
+                                ldb_pack_data(ldb, &msg, &binary,
+                                              LDB_PACKING_FORMAT_V2),
+                                0, "ldb_pack_data failed");
+
+       torture_assert_int_equal(torture, expect_bin_ldb.length,
+                                binary.length,
+                                "packed data length not as expected");
+
+       torture_assert_mem_equal(torture,
+                                expect_bin_ldb.data,
+                                binary.data,
+                                binary.length,
+                                "packed data not as expected");
+       talloc_free(expect_bin);
+
+       return true;
+}
+
+static bool torture_ldb_pack_data_v2_special(struct torture_context *torture)
+{
+       TALLOC_CTX *mem_ctx = talloc_new(torture);
+       struct ldb_context *ldb;
+       struct ldb_val binary;
+
+       uint8_t bin[] = {0x68, 0x19, 0x01, 0x26, /* version */
+               1, 0, 0, 0, /* num elements */
+               2, 0, 0, 0, /* dn length */
+               '@', 'A', 0, /* dn with null term */
+               0, 0, 0, 0, /* canonicalized dn length */
+               0, /* no canonicalized dn, just null term */
+               18, 0, 0, 0, /* distance from here to values section */
+               3, 0, 0, 0, /* el name length */
+               'a', 'b', 'c', 0, /* name with null term */
+               1, 0, 0, 0, 1, /* num values and length width */
+               1, /* value lengths */
+               '1', 0}; /* values for abc */
+
+       struct ldb_val vals[1] = {{.data=discard_const_p(uint8_t, "1"),
+                                  .length=1}};
+       struct ldb_message_element els[1] = {{.name=discard_const_p(char,
+                                                                   "abc"),
+                                             .num_values=1, .values=vals}};
+       struct ldb_message msg = {.num_elements=1, .elements=els};
+
+       struct ldb_val expect_bin_ldb;
+       expect_bin_ldb = data_blob_const(bin, sizeof(bin));
+
+       ldb = samba_ldb_init(mem_ctx, torture->ev, NULL,NULL,NULL);
+       torture_assert(torture, ldb != NULL, "Failed to init ldb");
+
+       msg.dn = ldb_dn_new(NULL, ldb, "@A");
+
+       torture_assert_int_equal(torture,
+                                ldb_pack_data(ldb, &msg, &binary,
+                                              LDB_PACKING_FORMAT_V2),
+                                0, "ldb_pack_data failed");
+
+       torture_assert_int_equal(torture, expect_bin_ldb.length,
+                                binary.length,
+                                "packed data length not as expected");
+
+       torture_assert_mem_equal(torture,
+                                expect_bin_ldb.data,
+                                binary.data,
+                                binary.length,
+                                "packed data not as expected");
+
+       return true;
+}
+
 static bool torture_ldb_parse_ldif(struct torture_context *torture,
                                   const void *data_p)
 {
@@ -1394,8 +1531,9 @@ static bool torture_ldb_parse_ldif(struct torture_context *torture,
        torture_assert_int_equal(torture, ldif->changetype, LDB_CHANGETYPE_NONE,
                                 "changetype is incorrect");
        torture_assert_int_equal(torture,
-                                ldb_pack_data(ldb, ldif->msg, &binary), 0,
-                                "ldb_pack_data failed");
+                                ldb_pack_data(ldb, ldif->msg, &binary,
+                                              LDB_PACKING_FORMAT_V2),
+                                0, "ldb_pack_data failed");
 
        torture_assert_int_equal(torture, ldb_unpack_data(ldb, &data, msg), 0,
                                 "ldb_unpack_data failed");
@@ -1443,8 +1581,9 @@ static bool torture_ldb_pack_format_perf(struct torture_context *torture)
        torture_assert_int_equal(torture, ldif->changetype, LDB_CHANGETYPE_NONE,
                                 "changetype is incorrect");
        torture_assert_int_equal(torture,
-                                ldb_pack_data(ldb, ldif->msg, &binary), 0,
-                                "ldb_pack_data failed");
+                                ldb_pack_data(ldb, ldif->msg, &binary,
+                                              LDB_PACKING_FORMAT_V2),
+                                0, "ldb_pack_data failed");
 
        ret = ldb_unpack_data(ldb, &binary, msg);
        torture_assert_int_equal(torture, ret, 0,
@@ -1457,7 +1596,7 @@ static bool torture_ldb_pack_format_perf(struct torture_context *torture)
        i = 0;
        start = clock();
        while (true) {
-               ldb_pack_data(ldb, ldif->msg, &binary);
+               ldb_pack_data(ldb, ldif->msg, &binary, LDB_PACKING_FORMAT_V2);
                i++;
 
                if (i >= 1000) {
@@ -1624,6 +1763,10 @@ struct torture_suite *torture_ldb(TALLOC_CTX *mem_ctx)
        torture_suite_add_simple_test(suite, "dn", torture_ldb_dn);
        torture_suite_add_simple_test(suite, "pack-format-perf",
                                      torture_ldb_pack_format_perf);
+       torture_suite_add_simple_test(suite, "pack-data-v2",
+                                     torture_ldb_pack_data_v2);
+       torture_suite_add_simple_test(suite, "pack-data-special-v2",
+                                     torture_ldb_pack_data_v2_special);
        torture_suite_add_simple_test(suite, "unpack-corrupt-v2",
                                      torture_ldb_unpack_data_corrupt);