Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 39 additions & 73 deletions src/storage/src/redis_sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,7 @@ impl Redis {
let meta_get = db.get_cf(&cf, &base_meta_key).context(RocksSnafu)?;
let (mut set_meta_value, version, is_new_set) = match meta_get {
Some(val) => {
// Type check
self.check_type(&val, DataType::Set)?;
let set_meta_value = ParsedSetsMetaValue::new(&val[..])?;
// Check if expired
if set_meta_value.is_stale() {
if self.is_stale(&val)? {
// Create new set if expired
let count_bytes = 0u64.to_le_bytes().to_vec();
let mut new_meta = BaseMetaValue::new(Bytes::from(count_bytes));
Expand All @@ -93,6 +89,8 @@ impl Redis {
let version = new_set_meta.initial_meta_value();
(new_set_meta, version, true)
} else {
self.check_type(&val, DataType::Set)?;
let set_meta_value = ParsedSetsMetaValue::new(&val[..])?;
let version = set_meta_value.version();
(set_meta_value, version, false)
}
Expand Down Expand Up @@ -186,16 +184,14 @@ impl Redis {

match db.get_cf(&cf, &base_meta_key).context(RocksSnafu)? {
Some(val) => {
// Type check
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
return KeyNotFoundSnafu {
key: String::from_utf8_lossy(key).to_string(),
}
.fail();
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;
Ok(set_meta.count() as i32)
}
None => KeyNotFoundSnafu {
Expand Down Expand Up @@ -231,15 +227,12 @@ impl Redis {
return Ok(Vec::new());
};

// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Redis SMEMBERS returns empty array for expired/invalid keys
return Ok(Vec::new());
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;
let version = set_meta.version();

// Iterate from prefix and collect members until prefix no longer matches
Expand Down Expand Up @@ -290,15 +283,12 @@ impl Redis {
return Ok(false);
};

// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Redis SISMEMBER returns 0 for expired/invalid keys
return Ok(false);
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;
let version = set_meta.version();

// Check if member exists
Expand Down Expand Up @@ -337,15 +327,12 @@ impl Redis {
return Ok(Vec::new());
};

// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Redis SRANDMEMBER returns empty array for expired/invalid keys
return Ok(Vec::new());
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;

let version = set_meta.version();
let set_size = set_meta.count() as usize;
Expand Down Expand Up @@ -459,15 +446,12 @@ impl Redis {
return Ok(0);
};

// Type check
self.check_type(&val, DataType::Set)?;

let mut set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Redis SREM returns 0 for expired/invalid keys
return Ok(0);
}
self.check_type(&val, DataType::Set)?;
let mut set_meta = ParsedSetsMetaValue::new(&val[..])?;

let version = set_meta.version();
let mut removed_count = 0i32;
Expand Down Expand Up @@ -554,15 +538,12 @@ impl Redis {
return Ok(Vec::new());
};

// Type check
self.check_type(&val, DataType::Set)?;

let mut set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Redis SPOP returns empty array for expired/invalid keys
return Ok(Vec::new());
}
self.check_type(&val, DataType::Set)?;
let mut set_meta = ParsedSetsMetaValue::new(&val[..])?;

let version = set_meta.version();
let set_size = set_meta.count() as usize;
Expand Down Expand Up @@ -712,14 +693,12 @@ impl Redis {
return Ok(false);
};

// Type check for source
self.check_type(&source_val, DataType::Set)?;

let mut source_meta = ParsedSetsMetaValue::new(&source_val[..])?;
if !source_meta.is_valid() {
if self.is_stale(&source_val)? {
// Source set is expired/invalid
return Ok(false);
}
self.check_type(&source_val, DataType::Set)?;
let mut source_meta = ParsedSetsMetaValue::new(&source_val[..])?;

let source_version = source_meta.version();

Expand Down Expand Up @@ -749,15 +728,13 @@ impl Redis {
// Handle destination set
let dest_meta_val = db.get_cf(&cf_meta, &dest_meta_key).context(RocksSnafu)?;
let (mut dest_meta, dest_version, dest_exists) = if let Some(dest_val) = dest_meta_val {
// Type check for destination
self.check_type(&dest_val, DataType::Set)?;

let mut meta = ParsedSetsMetaValue::new(&dest_val[..])?;
let version = if meta.is_valid() {
meta.version()
} else {
let version = if self.is_stale(&dest_val)? {
// Destination set is expired, create new version
meta.initial_meta_value()
} else {
self.check_type(&dest_val, DataType::Set)?;
meta.version()
};
(meta, version, true)
} else {
Expand Down Expand Up @@ -1987,15 +1964,12 @@ impl Redis {
return Ok(result);
};

// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Return empty result for expired/invalid keys
return Ok(result);
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;

let version = set_meta.version();

Expand Down Expand Up @@ -2081,14 +2055,12 @@ impl Redis {
}

let val = meta_val.expect("meta_val checked non-None above");
// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// If any set is expired/invalid, intersection is empty
return Ok(result);
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;

let count = set_meta.count() as i32;
if count < smallest_size {
Expand Down Expand Up @@ -2156,11 +2128,8 @@ impl Redis {
let meta_val = db.get_cf(&cf_meta, &base_meta_key).context(RocksSnafu)?;

if let Some(val) = meta_val {
// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
if set_meta.is_valid() {
if !self.is_stale(&val)? {
self.check_type(&val, DataType::Set)?;
// Get all members of this set and add to union
let members = self.smembers(key)?;
for member in members {
Expand Down Expand Up @@ -2383,15 +2352,12 @@ impl Redis {
return Ok((0, Vec::new()));
};

// Type check
self.check_type(&val, DataType::Set)?;

let set_meta = ParsedSetsMetaValue::new(&val[..])?;
// Validity check (not expired and count > 0)
if !set_meta.is_valid() {
if self.is_stale(&val)? {
// Return empty result with cursor 0 for expired/invalid keys
return Ok((0, Vec::new()));
}
self.check_type(&val, DataType::Set)?;
let set_meta = ParsedSetsMetaValue::new(&val[..])?;

let version = set_meta.version();
let _set_size = set_meta.count() as usize;
Expand Down
10 changes: 4 additions & 6 deletions src/storage/src/redis_strings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,21 +503,19 @@ impl Redis {
.context(RocksSnafu)?
{
Some(val) => {
// Check type - if not string type, return None (like Redis does)
if self.check_type(val.as_slice(), DataType::String).is_err() {
results.push(None);
continue;
}

let string_value = ParsedStringsValue::new(&val[..])?;

// Check if key is expired
if string_value.is_stale() {
results.push(None);
} else {
let user_value = string_value.user_value();
results.push(Some(String::from_utf8_lossy(&user_value).to_string()));
continue;
}

let user_value = string_value.user_value();
results.push(Some(String::from_utf8_lossy(&user_value).to_string()));
}
None => {
results.push(None);
Expand Down
Loading
Loading