diff --git a/.config/typos.toml b/.config/typos.toml index 6aef279d64a..10103279c57 100644 --- a/.config/typos.toml +++ b/.config/typos.toml @@ -56,6 +56,7 @@ seeked = "seeked" [type.c.extend-words] arange = "arange" +Threadsave = "Threadsave" fo = "fo" frst = "frst" limite = "limite" diff --git a/src/config.c b/src/config.c index 8b5c84ea899..b51beff21c9 100644 --- a/src/config.c +++ b/src/config.c @@ -3280,6 +3280,7 @@ standardConfig static_configs[] = { createBoolConfig("replica-ignore-maxmemory", "slave-ignore-maxmemory", MODIFIABLE_CONFIG, server.repl_replica_ignore_maxmemory, 1, NULL, NULL), createBoolConfig("jemalloc-bg-thread", NULL, MODIFIABLE_CONFIG, server.jemalloc_bg_thread, 1, NULL, updateJemallocBgThread), createBoolConfig("activedefrag", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.active_defrag_enabled, CONFIG_ACTIVE_DEFRAG_DEFAULT, isValidActiveDefrag, NULL), + createBoolConfig("forkless-options-supported", NULL, IMMUTABLE_CONFIG, server.forkless_options_supported, 0, NULL, NULL), createBoolConfig("syslog-enabled", NULL, IMMUTABLE_CONFIG, server.syslog_enabled, 0, NULL, NULL), createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, server.cluster_enabled, 0, NULL, NULL), createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, server.aof_enabled, 0, NULL, updateAppendOnly), diff --git a/src/object.c b/src/object.c index fe2e76fa6fb..21eb57e5cbd 100644 --- a/src/object.c +++ b/src/object.c @@ -49,6 +49,82 @@ * so if expire is set later, we don't need to reallocate the object. */ #define KEY_SIZE_TO_INCLUDE_EXPIRE_THRESHOLD 128 +/* Get beginning of embedded data, which may contain expire, metadata, key, and/or value. + * Embedded data flags must be accurate when called. */ +static unsigned char *objectEmbeddedData(const robj *o) { + unsigned char *data = (void *)(o + 1); + if (o->hasembval) data -= sizeof(void *); + return data; +} + +/* ===================== Object Metadata Management ========================= */ + +/* Static variable to store metadata size. Set once at server initialization. */ +static size_t object_metadata_size = 0; + +/* Set the metadata size. + * Size should not be changed once set. */ +void objectSetMetadataSize(size_t size) { + /* Metadata size already set - only allow setting to the same value */ + if (object_metadata_size == size) return; + + /* When current size is 0 and the incoming size is not - setting for the first time */ + serverAssert(object_metadata_size == 0); + + /* Check that all databases are empty */ + if (server.db != NULL) { + for (int j = 0; j < server.dbnum; j++) { + if (server.db[j] != NULL) { + serverAssert(kvstoreSize(server.db[j]->keys) == 0); + } + } + } + + object_metadata_size = size; +} + +/* Calculate the size of metadata for an object. + * Returns the configured metadata size if the object has an embedded key, 0 otherwise. */ +size_t objectGetMetadataSize(const robj *o) { + if (o->hasembkey) return object_metadata_size; + return 0; +} + +/* Get a void pointer to the metadata for an object. + * Returns NULL if the object doesn't have metadata. + * The caller must cast this to the appropriate metadata structure type. + * + * Memory layout visualization for objects: + * + * ┌─────────────────────────────────────────────────────────────────┐ + * │ robj (struct serverObject) │ + * │ - type, encoding, lru, hasexpire, hasembkey, hasembval... │ + * ├─────────────────────────────────────────────────────────────────┤ + * │ expire field (optional, if hasexpire == 1) │ + * │ - long long (8 bytes) │ + * ├─────────────────────────────────────────────────────────────────┤ + * │ metadata (optional, if hasembkey == 1 && metadata_size > 0) │ + * │ - (object_metadata_size) │ ← objectGetMetadata returns pointer here + * ├─────────────────────────────────────────────────────────────────┤ + * │ embedded key (if hasembkey == 1) │ + * ├─────────────────────────────────────────────────────────────────┤ + * │ embedded value (if hasembval == 1) │ + * └─────────────────────────────────────────────────────────────────┘ + */ +void *objectGetMetadata(const robj *o) { + if (object_metadata_size == 0 || !o->hasembkey) return NULL; + + /* The memory after the struct where we embedded metadata. */ + unsigned char *data = objectEmbeddedData(o); + + /* If expire field exists, metadata is after it */ + if (o->hasexpire) { + data += sizeof(long long); + } + + return (void *)data; +} + /* ===================== Creation and parsing of objects ==================== */ /* Creates an object, optionally with embedded key and expire fields. The key @@ -62,10 +138,12 @@ static robj *createUnembeddedObjectWithKeyAndExpire(int type, void *val, const_s size_t key_sds_len = has_embkey ? sdslen(key) : 0; char key_sds_type = has_embkey ? sdsReqType(key_sds_len) : 0; size_t key_sds_size = has_embkey ? sdsReqSize(key_sds_len, key_sds_type) : 0; + size_t metadata_size = has_embkey ? object_metadata_size : 0; size_t min_size = sizeof(robj); if (has_expire) { min_size += sizeof(long long); } + min_size += metadata_size; if (has_embkey) { /* Size of embedded key, incl. 1 byte for prefixed sds hdr size. */ min_size += 1 + key_sds_size; @@ -98,6 +176,12 @@ static robj *createUnembeddedObjectWithKeyAndExpire(int type, void *val, const_s data += sizeof(long long); } + /* Initialize metadata to zero */ + if (metadata_size > 0) { + memset(data, 0, metadata_size); + data += metadata_size; + } + /* Copy embedded key. */ if (o->hasembkey) { *data++ = sdsHdrSize(key_sds_type); @@ -140,12 +224,6 @@ robj *createRawStringObject(const char *ptr, size_t len) { return createObject(OBJ_STRING, sdsnewlen(ptr, len)); } -/* Get beginning of embedded data, which may contain expire, key, and/or value. Embedded data flags must be accurate when called. */ -static unsigned char *objectEmbeddedData(const robj *o) { - unsigned char *data = (void *)(o + 1); - if (o->hasembval) data -= sizeof(void *); - return data; -} /* Creates a new embedded string object and copies the content of key, val_ptr * and expire to the new object. LRU is set to 0. */ @@ -159,6 +237,7 @@ static robj *createEmbeddedStringObjectWithKeyAndExpire(const char *val_ptr, char key_sds_type = has_embkey ? sdsReqType(key_sds_len) : 0; size_t key_sds_size = has_embkey ? sdsReqSize(key_sds_len, key_sds_type) : 0; size_t val_sds_size = sdsReqSize(val_len, SDS_TYPE_8); + size_t metadata_size = has_embkey ? object_metadata_size : 0; if (val_sds_size < sizeof(void *)) { val_sds_size = sizeof(void *); /* Ensure it's possible to "unembed" value later */ } @@ -168,6 +247,7 @@ static robj *createEmbeddedStringObjectWithKeyAndExpire(const char *val_ptr, if (expire != EXPIRY_NONE) { min_size += sizeof(long long); } + min_size += metadata_size; if (has_embkey) { /* Size of embedded key, incl. 1 byte for prefixed sds hdr size. */ min_size += 1 + key_sds_size; @@ -201,6 +281,12 @@ static robj *createEmbeddedStringObjectWithKeyAndExpire(const char *val_ptr, data += sizeof(long long); } + /* Initialize metadata to zero */ + if (metadata_size > 0) { + memset(data, 0, metadata_size); + data += metadata_size; + } + /* Copy embedded key. */ if (o->hasembkey) { *data++ = sdsHdrSize(key_sds_type); @@ -234,6 +320,7 @@ static bool shouldEmbedStringObject(size_t val_len, const_sds key, long long exp if (key) { size_t key_len = sdslen(key); size += sdsReqSize(key_len, sdsReqType(key_len)) + 1; /* 1 byte for prefixed sds hdr size */ + size += object_metadata_size; } size += (expire != EXPIRY_NONE) * sizeof(long long); size += sdsReqSize(val_len, SDS_TYPE_8); @@ -269,6 +356,8 @@ void *objectGetVal(const robj *o) { data += sizeof(long long); } if (o->hasembkey) { + /* Skip metadata */ + data += objectGetMetadataSize(o); /* Skip embedded key */ uint8_t hdr_size = *(uint8_t *)data; data += 1 + hdr_size; /* +1 for header size byte */ @@ -287,6 +376,9 @@ sds objectGetKey(const robj *o) { data += sizeof(long long); } if (o->hasembkey) { + /* Skip metadata */ + data += objectGetMetadataSize(o); + /* Skip header size byte */ uint8_t hdr_size = *(uint8_t *)data; data += 1 + hdr_size; return (sds)data; diff --git a/src/server.c b/src/server.c index f5e5e6b38ad..fa70885ef8f 100644 --- a/src/server.c +++ b/src/server.c @@ -2316,6 +2316,7 @@ void initServerConfig(void) { for (j = 0; j < CONFIG_DEFAULT_BINDADDR_COUNT; j++) server.bindaddr[j] = zstrdup(default_bindaddr[j]); memset(server.listeners, 0x00, sizeof(server.listeners)); server.active_expire_enabled = 1; + server.forkless_options_supported = 0; server.lazy_expire_disabled = 0; server.skip_checksum_validation = 0; server.loading = 0; @@ -2995,6 +2996,13 @@ void initServer(void) { server.dbnum = server.cluster_enabled ? server.config_databases_cluster : server.config_databases; server.db = zcalloc(sizeof(serverDb *) * server.dbnum); + + /* Set object metadata size before creating any database key objects */ + if (server.forkless_options_supported) { + objectSetMetadataSize(sizeof(uint32_t)); /* This is a placeholder until Threadsave defines a metadata structure */ + /* 4 bytes for iterator_epoch for now*/ + } + createDatabaseIfNeeded(0); /* The default database should always exist */ evictionPoolAlloc(); /* Initialize the LRU keys pool. */ diff --git a/src/server.h b/src/server.h index 7ea4c9874ef..465e55d19fe 100644 --- a/src/server.h +++ b/src/server.h @@ -784,25 +784,27 @@ typedef struct ValkeyModuleType moduleType; * The optional variable-sized embedded data has 2 possible layouts. If value is embedded (hasembval == 1) * the `val_ptr` pointer is not used - instead the val data is embedded: * - * +------+----------+-----+------------+----------+--------+-----------------+---------+------------+ - * | type | encoding | lru | has* flags | refcount | expire | key_header_size | key sds | value data | - * +------+----------+-----+------------+----------+--------+-----------------+---------+------------+ - * ^ ^ ^ ^ - * | | | | - * | | | +--- present because hasembval == 1 - * | | | - * | +-----------------+--- present if hasembkey == 1 + * +------+----------+-----+------------+----------+--------+----------+-----------------+---------+------------+ + * | type | encoding | lru | has* flags | refcount | expire | metadata | key_header_size | key sds | value data | + * +------+----------+-----+------------+----------+--------+----------+-----------------+---------+------------+ + * ^ ^ ^ ^ ^ + * | | | | | + * | | | | +--- present because hasembval == 1 + * | | | | + * | +----------+-----------------+--- present if hasembkey == 1 + * | * | * +--- present if hasexpire == 1 * * Otherwise value is not embedded and we use the `val_ptr` pointer: * - * +------+----------+-----+------------+----------+---------+--------+-----------------+---------+ - * | type | encoding | lru | has* flags | refcount | val_ptr | expire | key_header_size | key sds | - * +------+----------+-----+------------+----------+---------+--------+-----------------+---------+ - * ^ ^ ^ ^ - * | | | | - * | | +-----------------+--- present if hasembkey == 1 + * +------+----------+-----+------------+----------+---------+--------+----------+-----------------+---------+ + * | type | encoding | lru | has* flags | refcount | val_ptr | expire | metadata | key_header_size | key sds | + * +------+----------+-----+------------+----------+---------+--------+----------+-----------------+---------+ + * ^ ^ ^ ^ ^ + * | | | | | + * | | +----------+-----------------+--- present if hasembkey == 1 + * | | * | | * | +--- present if hasexpire == 1 * | @@ -2040,6 +2042,7 @@ struct valkeyServer { int rdb_checksum; /* Use RDB checksum? */ int rdb_del_sync_files; /* Remove RDB files used only for SYNC if the instance does not use persistence. */ + int forkless_options_supported; /* Enable forkless options support. */ time_t lastsave; /* Unix time of last successful save */ time_t lastbgsave_try; /* Unix time of last attempted bgsave */ time_t rdb_save_time_last; /* Time used by last RDB save run. */ @@ -3157,6 +3160,11 @@ uint8_t objectGetLFUFrequency(robj *o); uint32_t objectGetLRUIdleSecs(robj *o); uint32_t objectGetIdleness(robj *o); +/* Object metadata management */ +void objectSetMetadataSize(size_t size); +size_t objectGetMetadataSize(const robj *o); +void *objectGetMetadata(const robj *o); + /* Synchronous I/O with timeout */ ssize_t syncWrite(int fd, char *ptr, ssize_t size, long long timeout); ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout); diff --git a/src/unit/test_object.cpp b/src/unit/test_object.cpp index 054802d0831..9e030bde002 100644 --- a/src/unit/test_object.cpp +++ b/src/unit/test_object.cpp @@ -15,7 +15,47 @@ extern "C" { #include "server.h" } +/* Metadata test helpers */ +typedef struct objMetadata { + uint32_t meta_int; +} objMetadata; + class ObjectTest : public ::testing::Test { + protected: + robj *createKeyValueObject(const char *k, const char *v) { + sds key = sdsnew(k); + robj *obj = createStringObject(v, strlen(v)); + robj *obj_with_key = objectSetKeyAndExpire(obj, key, -1); + sdsfree(key); + return obj_with_key; + } + + void objectSetMetaInt(robj *o, uint32_t metadata_int) { + objMetadata *meta = (objMetadata *)objectGetMetadata(o); + meta->meta_int = metadata_int; + } + + uint32_t objectGetMetaInt(const robj *o) { + objMetadata *meta = (objMetadata *)objectGetMetadata(o); + return meta->meta_int; + } + + /* Find the largest value length that still embeds with the given key and expire. */ + int findMaxEmbeddableValueLen(const char *key, long long expire) { + sds k = key ? sdsnew(key) : NULL; + + int len; + for (len = 1; len <= 256; len++) { + robj *obj = createStringObject(NULL, len); + if (k) obj = objectSetKeyAndExpire(obj, k, expire); + bool isEmbedded = (obj->encoding == OBJ_ENCODING_EMBSTR); + decrRefCount(obj); + if (!isEmbedded) break; + } + + sdsfree(k); + return len - 1; + } }; TEST_F(ObjectTest, object_with_key) { @@ -57,88 +97,71 @@ TEST_F(ObjectTest, object_with_key) { } TEST_F(ObjectTest, embedded_string_with_key) { - /* key of length 32 - type 8 */ - sds key = sdsnew("k:123456789012345678901234567890"); - ASSERT_EQ(sdslen(key), 32u); - - /* 32B key and 79B value should be embedded within 128B. Contents: - * - 8B robj (no ptr) + 1B key header size - * - 3B key header + 32B key + 1B null terminator - * - 3B val header + 79B val + 1B null terminator - * because no pointers are stored, there is no difference for 32 bit builds*/ - const char *short_value = "1234567890123456789012345678901234567890123456789012345678901234567890123456789"; - ASSERT_EQ(strlen(short_value), 79u); - robj *short_val_obj = createStringObject(short_value, strlen(short_value)); - robj *embstr_obj = objectSetKeyAndExpire(short_val_obj, key, -1); + const char *key = "k:123456789012345678901234567890"; + int max_len = findMaxEmbeddableValueLen(key, -1); + ASSERT_GT(max_len, 0); + + /* Value at max length should embed. */ + sds k1 = sdsnew(key); + robj *embstr_obj = createStringObject(NULL, max_len); + embstr_obj = objectSetKeyAndExpire(embstr_obj, k1, -1); ASSERT_EQ(embstr_obj->encoding, (unsigned)OBJ_ENCODING_EMBSTR); - ASSERT_EQ(sdslen(objectGetKey(embstr_obj)), 32u); - ASSERT_EQ(sdscmp(objectGetKey(embstr_obj), key), 0); - ASSERT_EQ(sdslen((sds)objectGetVal(embstr_obj)), 79u); - ASSERT_EQ(strcmp((const char *)objectGetVal(embstr_obj), short_value), 0); - - /* value of length 80 cannot be embedded with other contents within 128B */ - const char *longer_value = "12345678901234567890123456789012345678901234567890123456789012345678901234567890"; - ASSERT_EQ(strlen(longer_value), 80u); - robj *longer_val_obj = createStringObject(longer_value, strlen(longer_value)); - robj *raw_obj = objectSetKeyAndExpire(longer_val_obj, key, -1); + ASSERT_EQ(sdslen((sds)objectGetVal(embstr_obj)), (size_t)max_len); + + /* One byte more should not embed. */ + sds k2 = sdsnew(key); + robj *raw_obj = createStringObject(NULL, max_len + 1); + raw_obj = objectSetKeyAndExpire(raw_obj, k2, -1); ASSERT_EQ(raw_obj->encoding, (unsigned)OBJ_ENCODING_RAW); - ASSERT_EQ(sdslen(objectGetKey(raw_obj)), 32u); - ASSERT_EQ(sdscmp(objectGetKey(raw_obj), key), 0); - ASSERT_EQ(sdslen((sds)objectGetVal(raw_obj)), 80u); - ASSERT_EQ(strcmp((const char *)objectGetVal(raw_obj), longer_value), 0); + ASSERT_EQ(sdslen((sds)objectGetVal(raw_obj)), (size_t)(max_len + 1)); - sdsfree(key); + sdsfree(k1); + sdsfree(k2); decrRefCount(embstr_obj); decrRefCount(raw_obj); } TEST_F(ObjectTest, embedded_string_with_key_and_expire) { - /* key of length 32 - type 8 */ - sds key = sdsnew("k:123456789012345678901234567890"); - ASSERT_EQ(sdslen(key), 32u); - - /* 32B key and 71B value should be embedded within 128B. Contents: - * - 8B robj (no ptr) + 8B expire + 1B key header size - * - 3B key header + 32B key + 1B null terminator - * - 3B val header + 71B val + 1B null terminator - * because no pointers are stored, there is no difference for 32 bit builds*/ - const char *short_value = "12345678901234567890123456789012345678901234567890123456789012345678901"; - ASSERT_EQ(strlen(short_value), 71u); - robj *short_val_obj = createStringObject(short_value, strlen(short_value)); - robj *embstr_obj = objectSetKeyAndExpire(short_val_obj, key, 128); + const char *key = "k:123456789012345678901234567890"; + int max_len = findMaxEmbeddableValueLen(key, 128); + ASSERT_GT(max_len, 0); + + /* Adding an expire reduces the available space for the value. */ + int max_len_no_expire = findMaxEmbeddableValueLen(key, -1); + ASSERT_LT(max_len, max_len_no_expire); + + /* Value at max length should embed. */ + sds k1 = sdsnew(key); + robj *embstr_obj = createStringObject(NULL, max_len); + embstr_obj = objectSetKeyAndExpire(embstr_obj, k1, 128); ASSERT_EQ(embstr_obj->encoding, (unsigned)OBJ_ENCODING_EMBSTR); - ASSERT_EQ(sdslen(objectGetKey(embstr_obj)), 32u); - ASSERT_EQ(sdscmp(objectGetKey(embstr_obj), key), 0); - ASSERT_EQ(sdslen((sds)objectGetVal(embstr_obj)), 71u); - ASSERT_EQ(strcmp((const char *)objectGetVal(embstr_obj), short_value), 0); - - /* value of length 72 cannot be embedded with other contents within 128B */ - const char *longer_value = "123456789012345678901234567890123456789012345678901234567890123456789012"; - ASSERT_EQ(strlen(longer_value), 72u); - robj *longer_val_obj = createStringObject(longer_value, strlen(longer_value)); - robj *raw_obj = objectSetKeyAndExpire(longer_val_obj, key, 128); + + /* One byte more should not embed. */ + sds k2 = sdsnew(key); + robj *raw_obj = createStringObject(NULL, max_len + 1); + raw_obj = objectSetKeyAndExpire(raw_obj, k2, 128); ASSERT_EQ(raw_obj->encoding, (unsigned)OBJ_ENCODING_RAW); - ASSERT_EQ(sdslen(objectGetKey(raw_obj)), 32u); - ASSERT_EQ(sdscmp(objectGetKey(raw_obj), key), 0); - ASSERT_EQ(sdslen((sds)objectGetVal(raw_obj)), 72u); - ASSERT_EQ(strcmp((const char *)objectGetVal(raw_obj), longer_value), 0); - sdsfree(key); + sdsfree(k1); + sdsfree(k2); decrRefCount(embstr_obj); decrRefCount(raw_obj); } TEST_F(ObjectTest, embedded_value) { - /* with only value there is only 12B overhead, so we can embed up to 52B. - * 8B robj (no ptr) + 3B val header + 52B val + 1B null terminator */ - const char *val = "v:12345678901234567890123456789012345678901234567890"; - ASSERT_EQ(strlen(val), 52u); - robj *embstr_obj = createStringObject(val, strlen(val)); + /* Value-only object (no key): find the largest value that embeds. */ + int max_len = findMaxEmbeddableValueLen(NULL, -1); + ASSERT_GT(max_len, 0); + + robj *embstr_obj = createStringObject(NULL, max_len); ASSERT_EQ(embstr_obj->encoding, (unsigned)OBJ_ENCODING_EMBSTR); - ASSERT_EQ(sdslen((sds)objectGetVal(embstr_obj)), 52u); - ASSERT_EQ(strcmp((const char *)objectGetVal(embstr_obj), val), 0); + ASSERT_EQ(sdslen((sds)objectGetVal(embstr_obj)), (size_t)max_len); + + robj *raw_obj = createStringObject(NULL, max_len + 1); + ASSERT_EQ(raw_obj->encoding, (unsigned)OBJ_ENCODING_RAW); decrRefCount(embstr_obj); + decrRefCount(raw_obj); } TEST_F(ObjectTest, unembed_value) { @@ -166,3 +189,107 @@ TEST_F(ObjectTest, unembed_value) { sdsfree(key); decrRefCount(obj); } + + +TEST_F(ObjectTest, metadata_disabled) { + robj *obj_with_key = createKeyValueObject("testkey", "value"); + + ASSERT_EQ(objectGetMetadata(obj_with_key), nullptr); + ASSERT_EQ(objectGetMetadataSize(obj_with_key), 0u); + + decrRefCount(obj_with_key); +} + +TEST_F(ObjectTest, metadata_without_key) { + objectSetMetadataSize(sizeof(objMetadata)); + + robj *obj_no_key = createStringObject("value_without_key", 17); + + ASSERT_EQ(objectGetMetadata(obj_no_key), nullptr); + ASSERT_EQ(objectGetMetadataSize(obj_no_key), 0u); + + decrRefCount(obj_no_key); +} + +TEST_F(ObjectTest, metadata_with_key) { + objectSetMetadataSize(sizeof(objMetadata)); + + robj *obj_with_key = createKeyValueObject("testkey", "value"); + + ASSERT_EQ(objectGetMetadataSize(obj_with_key), sizeof(objMetadata)); + + objMetadata *meta = (objMetadata *)objectGetMetadata(obj_with_key); + ASSERT_NE(meta, nullptr); + EXPECT_EQ(meta->meta_int, 0u); + + decrRefCount(obj_with_key); +} + +TEST_F(ObjectTest, metadata_read_write) { + objectSetMetadataSize(sizeof(objMetadata)); + + robj *obj_with_key = createKeyValueObject("mykey", "myvalue"); + + ASSERT_EQ(objectGetMetadataSize(obj_with_key), sizeof(objMetadata)); + + objectSetMetaInt(obj_with_key, 12345); + EXPECT_EQ(objectGetMetaInt(obj_with_key), 12345u); + + objectSetMetaInt(obj_with_key, 67890); + EXPECT_EQ(objectGetMetaInt(obj_with_key), 67890u); + + decrRefCount(obj_with_key); +} + +TEST_F(ObjectTest, metadata_multiple_objects) { + objectSetMetadataSize(sizeof(objMetadata)); + + robj *obj_with_key1 = createKeyValueObject("key1", "val1"); + robj *obj_with_key2 = createKeyValueObject("key2", "val2"); + robj *obj_with_key3 = createKeyValueObject("key3", "val3"); + + ASSERT_EQ(objectGetMetadataSize(obj_with_key1), sizeof(objMetadata)); + ASSERT_EQ(objectGetMetadataSize(obj_with_key2), sizeof(objMetadata)); + ASSERT_EQ(objectGetMetadataSize(obj_with_key3), sizeof(objMetadata)); + + objectSetMetaInt(obj_with_key1, 100); + objectSetMetaInt(obj_with_key2, 200); + objectSetMetaInt(obj_with_key3, 300); + + EXPECT_EQ(objectGetMetaInt(obj_with_key1), 100u); + EXPECT_EQ(objectGetMetaInt(obj_with_key2), 200u); + EXPECT_EQ(objectGetMetaInt(obj_with_key3), 300u); + + objectSetMetaInt(obj_with_key2, 999); + EXPECT_EQ(objectGetMetaInt(obj_with_key1), 100u); + EXPECT_EQ(objectGetMetaInt(obj_with_key2), 999u); + EXPECT_EQ(objectGetMetaInt(obj_with_key3), 300u); + + decrRefCount(obj_with_key1); + decrRefCount(obj_with_key2); + decrRefCount(obj_with_key3); +} + +TEST_F(ObjectTest, metadata_changes_embed_threshold) { + /* Find the max embeddable value length without metadata, then verify + * that enabling metadata reduces it (some previously-embeddable objects + * become RAW). */ + const char *key = "k:123456789012345678901234567890"; + int max_without = findMaxEmbeddableValueLen(key, -1); + ASSERT_GT(max_without, 0); + + objectSetMetadataSize(sizeof(objMetadata)); + int max_with = findMaxEmbeddableValueLen(key, -1); + + /* Metadata takes space, so the threshold must shrink. */ + ASSERT_LT(max_with, max_without); + + /* An object that just fit before should now be RAW. */ + sds k = sdsnew(key); + robj *obj = createStringObject(NULL, max_without); + obj = objectSetKeyAndExpire(obj, k, -1); + ASSERT_EQ(obj->encoding, (unsigned)OBJ_ENCODING_RAW); + + sdsfree(k); + decrRefCount(obj); +} diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index d3c51905e35..4457c5dd1ea 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -888,9 +888,9 @@ start_server {tags {"repl external:skip"} overrides {save ""}} { set master_host [srv 0 host] set master_port [srv 0 port] set master_pid [srv 0 pid] - # put enough data in the db that the rdb file will be bigger than the socket buffers - # and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds - # we also need the replica to process requests during transfer (which it does only once in 2mb) + # Put enough data in the db that the rdb file is bigger than the socket + # buffers so the primary can hit the blocked writer path while replicas + # consume the streamed RDB. $master debug populate 20000 test 10000 $master config set rdbcompression no # If running on Linux, we also measure utime/stime to detect possible I/O handling issues @@ -913,7 +913,6 @@ start_server {tags {"repl external:skip"} overrides {save ""}} { # so that the whole rdb generation process is bound to that set loglines [count_log_lines -2] [lindex $replicas 0] config set repl-diskless-load swapdb - [lindex $replicas 0] config set key-load-delay 100 ;# 20k keys and 100 microseconds sleep means at least 2 seconds [lindex $replicas 0] replicaof $master_host $master_port [lindex $replicas 1] replicaof $master_host $master_port @@ -927,13 +926,30 @@ start_server {tags {"repl external:skip"} overrides {save ""}} { set start_time [clock seconds] } - # wait a while so that the pipe socket writer will be - # blocked on write (since replica 0 is slow to read from the socket) + # Bound the slow-reader simulation to avoid very long + # scheduler-sensitive tails on slower CI runners. + set slow_replica_pid [srv -1 pid] + pause_process $slow_replica_pid + + # Wait a while so that the pipe socket writer will be + # blocked on write while replica 0 is slowed down. after 500 # add some command to be present in the command stream after the rdb. $master incr $all_drop + if {$all_drop == "no" || $all_drop == "fast"} { + set slow_replica_resume_delay [expr {$all_drop == "no" ? 1000 : 1500}] + after $slow_replica_resume_delay + resume_process $slow_replica_pid + } + + # Resume before terminating the paused slow replica so the + # disconnect is observed immediately instead of timing out. + if {$all_drop == "all" || $all_drop == "slow"} { + resume_process $slow_replica_pid + } + # disconnect replicas depending on the current test if {$all_drop == "all" || $all_drop == "fast"} { exec kill [srv 0 pid] @@ -950,8 +966,11 @@ start_server {tags {"repl external:skip"} overrides {save ""}} { after 2000 } - # wait for rdb child to exit - wait_for_condition 1200 100 { + # The "no" case still needs the most headroom because both + # replicas stay connected and the full RDB must finish + # streaming before the child exits. + set rdb_child_wait_tries [expr {$all_drop == "no" ? 2400 : 2100}] + wait_for_condition $rdb_child_wait_tries 100 { [s -2 rdb_bgsave_in_progress] == 0 } else { fail "rdb child didn't terminate" diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index c788abac0ba..c489cabee67 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -1274,6 +1274,7 @@ start_server {tags {"introspection"}} { rdma-rx-size rdma-bind rdma-port + forkless-options-supported } if {!$::tls} { diff --git a/valkey.conf b/valkey.conf index 9be05f68d6f..ef2ef47da48 100644 --- a/valkey.conf +++ b/valkey.conf @@ -551,6 +551,17 @@ locale-collate "" # # hash-seed example-seed-val +# Enable support for forkless save operations by allocating metadata for each key. +# This is an immutable configuration that must be set at server startup and +# cannot be changed at runtime. +# +# When enabled, the server allocates 4 additional bytes per key. +# +# Note: This only enables the infrastructure support. The actual forkless save +# behavior is controlled separately by the 'forkless-enabled' runtime configuration. +# +# forkless-options-supported no + ################################ SNAPSHOTTING ################################ # Save the DB to disk.