//单个数据库结构 typedefstructredisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ longlong avg_ttl; /* Average TTL, just for stats */ unsignedlong expires_cursor; /* Cursor of the active expire cycle. */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb;
intexpireIfNeeded(redisDb *db, robj *key){ if (!keyIsExpired(db,key)) return0;
/* If we are running in the context of a slave, instead of * evicting the expired key from the database, we return ASAP: * the slave key expiration is controlled by the master that will * send us synthesized DEL operations for expired keys. * * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ if (server.masterhost != NULL) return1;
/* If clients are paused, we keep the current dataset constant, * but return to the client what we believe is the right state. Typically, * at the end of the pause we will properly expire the key OR we will * have failed over and the new primary will send us the expire. */ if (checkClientPauseTimeoutAndReturnIfPaused()) return1;
/* Delete the key */ deleteExpiredKeyAndPropagate(db,key); return1; }
voidactiveExpireCycle(int type){ /* Adjust the running parameters according to the configured expire * effort. The default effort is 1, and the maximum configurable effort * is 10. */ unsignedlong effort = server.active_expire_effort-1, /* Rescale from 0 to 9. */ config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4*effort, config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4*effort, config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2*effort, config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- effort;
/* This function has some global state in order to continue the work * incrementally across calls. */ staticunsignedint current_db = 0; /* Next DB to test. */ staticint timelimit_exit = 0; /* Time limit hit in previous call? */ staticlonglong last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0; int dbs_per_call = CRON_DBS_PER_CALL; longlong start = ustime(), timelimit, elapsed;
/* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (checkClientPauseTimeoutAndReturnIfPaused()) return;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) { /* Don't start a fast cycle if the previous cycle did not exit * for time limit, unless the percentage of estimated stale keys is * too high. Also never repeat a fast cycle for the same period * as the fast cycle total duration itself. */ if (!timelimit_exit && server.stat_expired_stale_perc < config_cycle_acceptable_stale) return;
if (start < last_fast_cycle + (longlong)config_cycle_fast_duration*2) return;
last_fast_cycle = start; }
/* We usually should test CRON_DBS_PER_CALL per iteration, with * two exceptions: * * 1) Don't test more DBs than we have. * 2) If last time we hit the time limit, we want to scan all DBs * in this iteration, as there is work to do in some DB and we don't want * expired keys to use memory for too much time. */ if (dbs_per_call > server.dbnum || timelimit_exit) dbs_per_call = server.dbnum;
/* We can use at max 'config_cycle_slow_time_perc' percentage of CPU * time per iteration. Since this function gets called with a frequency of * server.hz times per second, the following is the max amount of * microseconds we can spend in this function. */ timelimit = config_cycle_slow_time_perc*1000000/server.hz/100; timelimit_exit = 0; if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) timelimit = config_cycle_fast_duration; /* in microseconds. */
/* Accumulate some global stats as we expire keys, to have some idea * about the number of keys that are already logically expired, but still * existing inside the database. */ long total_sampled = 0; long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { /* Expired and checked in a single loop. */ unsignedlong expired, sampled;
/* Increment the DB now so we are sure if we run out of time * in the current DB we'll restart from the next. This allows to * distribute the time evenly across DBs. */ current_db++;
/* Continue to expire if at the end of the cycle there are still * a big percentage of keys to expire, compared to the number of keys * we scanned. The percentage, stored in config_cycle_acceptable_stale * is not fixed, but depends on the Redis configured "expire effort". */ do { unsignedlong num, slots; longlong now, ttl_sum; int ttl_samples; iteration++;
/* If there is nothing to expire try next DB ASAP. */ if ((num = dictSize(db->expires)) == 0) { db->avg_ttl = 0; break; } slots = dictSlots(db->expires); now = mstime();
/* When there are less than 1% filled slots, sampling the key * space is expensive, so stop here waiting for better times... * The dictionary will be resized asap. */ if (slots > DICT_HT_INITIAL_SIZE && (num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys * with an expire set, checking for expired ones. */ expired = 0; sampled = 0; ttl_sum = 0; ttl_samples = 0;
if (num > config_keys_per_loop) num = config_keys_per_loop;
/* Here we access the low level representation of the hash table * for speed concerns: this makes this code coupled with dict.c, * but it hardly changed in ten years. * * Note that certain places of the hash table may be empty, * so we want also a stop condition about the number of * buckets that we scanned. However scanning for free buckets * is very fast: we are in the cache line scanning a sequential * array of NULL pointers, so we can scan a lot more buckets * than keys in the same time. */ long max_buckets = num*20; long checked_buckets = 0;
while (sampled < num && checked_buckets < max_buckets) { for (int table = 0; table < 2; table++) { if (table == 1 && !dictIsRehashing(db->expires)) break;
/* Scan the current bucket of the current table. */ checked_buckets++; while(de) { /* Get the next entry now since this entry may get * deleted. */ dictEntry *e = de; de = de->next;
ttl = dictGetSignedIntegerVal(e)-now; if (activeExpireCycleTryExpire(db,e,now)) expired++; if (ttl > 0) { /* We want the average TTL of keys yet * not expired. */ ttl_sum += ttl; ttl_samples++; } sampled++; } } db->expires_cursor++; } total_expired += expired; total_sampled += sampled;
/* Update the average TTL stats for this database. */ if (ttl_samples) { longlong avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples. * We just use the current estimate with a weight of 2% * and the previous estimate with a weight of 98%. */ if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); }
/* We can't block forever here even if there are many keys to * expire. So after a given amount of milliseconds return to the * caller waiting for the other active expire cycle. */ if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; server.stat_expired_time_cap_reached_count++; break; } } /* We don't repeat the cycle for the current database if there are * an acceptable amount of stale keys (logically expired but yet * not reclaimed). */ } while (sampled == 0 || (expired*100/sampled) > config_cycle_acceptable_stale); }
/* Update our estimate of keys existing but yet to be expired. * Running average with this sample accounting for 5%. */ double current_perc; if (total_sampled) { current_perc = (double)total_expired/total_sampled; } else current_perc = 0; server.stat_expired_stale_perc = (current_perc*0.05)+ (server.stat_expired_stale_perc*0.95); }
AOF、RDB和复制功能对过期键的处理
生成RDB文件:执行 SAVE 或 BGSAVE 创建一个新的RDB文件时,程序会对数据库中的键进行检查,已过期的键不会被保存到新创建的RDB文件中
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags){ robj *val;
if (expireIfNeeded(db,key) == 1) { /* If we are in the context of a master, expireIfNeeded() returns 1 * when the key is no longer valid, so we can return NULL ASAP. */ if (server.masterhost == NULL) goto keymiss;
//如果备机是只读模式,那么返回的就是空值 if (server.current_client && server.current_client != server.master && server.current_client->cmd && server.current_client->cmd->flags & CMD_READONLY) { goto keymiss; } } val = lookupKey(db,key,flags); if (val == NULL) goto keymiss; server.stat_keyspace_hits++; return val;