-
-
Notifications
You must be signed in to change notification settings - Fork 28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TTL not really work #55
Comments
Hi, could you provide some code to help me reproduce your situation? |
Yeah, sorry about the issue without repro. I will try to make a small example. |
@al8n Okey this is repro: use tokio::time::{sleep, Duration};
use stretto::AsyncCache;
#[tokio::main]
async fn main() {
let cache: AsyncCache<String, String> = AsyncCache::new(12960, 1e6 as i64, tokio::spawn).unwrap();
for i in 0..10000 {
cache.insert_with_ttl(
format!("key{}", i),
format!("value{}", i),
1,
Duration::from_secs(60),
)
.await;
sleep(Duration::from_millis(1)).await;
}
cache.wait().await.unwrap();
println!("Current size: {}", cache.len());
sleep(Duration::from_secs(100)).await;
println!("New size: {}", cache.len());
} on my machine, I got something like this:
If I have no Regards, |
I hope you can reproduce my issue. |
@al8n, maybe next week I can look into it, but I suppose it's a bug. |
Okey, I found why https://github.com/al8n/stretto/blob/main/src/store.rs#L285C29-L285C40 you have no guarantee what |
Okey, this one is working, but I am not sure how it's efficient: diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..1b7c36a 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -3,6 +3,7 @@ use std::collections::{hash_map::RandomState, HashMap};
use std::hash::BuildHasher;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use itertools::Itertools;
use crate::CacheError;
@@ -200,11 +201,24 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
let bucket_num = cleanup_bucket(now);
- Ok(self
- .buckets
- .write()
- .remove(&bucket_num)
- .map(|bucket| bucket.map))
+ let bucket_keys: Vec<i64> = self.buckets.read().keys().sorted().cloned().collect();
+ // println!("try_cleanup bucket_num: {} buckets:{:#?}", bucket_num, bucket_keys);
+ let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+ for map in bucket_keys
+ .iter()
+ .filter(|key| **key < bucket_num)
+ .map(|key| {
+ self
+ .buckets
+ .write()
+ .remove(key)
+ .map(|bucket| bucket.map)
+ }) {
+ if map.is_some() {
+ ret_map.extend(map.unwrap().iter());
+ }
+ }
+ Ok(Some(ret_map))
}
pub fn hasher(&self) -> S { will be better to use BTreeMap and range to fast filtering over the index. |
Okey option with btree and range: diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..e72ed4d 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -1,5 +1,5 @@
use parking_lot::RwLock;
-use std::collections::{hash_map::RandomState, HashMap};
+use std::collections::{hash_map::RandomState, HashMap, BTreeMap};
use std::hash::BuildHasher;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -100,7 +100,7 @@ impl<S: BuildHasher> DerefMut for Bucket<S> {
#[derive(Debug)]
pub(crate) struct ExpirationMap<S = RandomState> {
- buckets: RwLock<HashMap<i64, Bucket<S>, S>>,
+ buckets: RwLock<BTreeMap<i64, Bucket<S>>>,
hasher: S,
}
@@ -108,7 +108,7 @@ impl Default for ExpirationMap {
fn default() -> Self {
let hasher = RandomState::default();
Self {
- buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+ buckets: RwLock::new(BTreeMap::new()),
hasher,
}
}
@@ -123,7 +123,7 @@ impl ExpirationMap {
impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub(crate) fn with_hasher(hasher: S) -> ExpirationMap<S> {
ExpirationMap {
- buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+ buckets: RwLock::new(BTreeMap::new()),
hasher,
}
}
@@ -200,11 +200,22 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
let bucket_num = cleanup_bucket(now);
- Ok(self
- .buckets
- .write()
- .remove(&bucket_num)
- .map(|bucket| bucket.map))
+ let bucket_keys: Vec<i64> = self.buckets.read().range(..bucket_num).map(|(key, _)| *key).collect();
+ let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+ for map in bucket_keys
+ .iter()
+ .map(|key| {
+ self
+ .buckets
+ .write()
+ .remove(key)
+ .map(|bucket| bucket.map)
+ }) {
+ if map.is_some() {
+ ret_map.extend(map.unwrap().iter());
+ }
+ }
+ Ok(Some(ret_map))
}
pub fn hasher(&self) -> S { |
Sorry for the late response, thanks! Would you mind open a PR and let us see if we can merge it? |
I don't think I will have time next few weeks. |
If I insert with ttl 10000 items, with 60sec TTL in my
on_evict
in a callback, I will see only ~1000 evict items after 60sec.During checking
len()
of such cache, it also shows ~9000 items.I suppose the cleaner process is not working correctly, and there is something with the TTL map.
The text was updated successfully, but these errors were encountered: