news 2026/6/9 23:51:58

Python缓存策略与实现

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python缓存策略与实现

Python缓存策略与实现

一、缓存的基本概念

缓存是一种用空间换时间的优化策略,将计算结果或频繁访问的数据存储在快速访问的位置,避免重复计算或IO操作。

缓存的关键指标:
- 命中率:缓存命中次数 / 总请求次数
- 淘汰策略:缓存满时如何选择移除的数据
- 过期策略:数据何时失效
- 一致性:缓存与源数据的同步


二、functools.lru_cache

from functools import lru_cache, cache

# lru_cache:最近最少使用缓存
@lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)

print(fibonacci(100))
print(fibonacci.cache_info())
# CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)

# cache:无限制缓存(Python 3.9+,等价于lru_cache(maxsize=None))
@cache
def factorial(n):
return n * factorial(n - 1) if n else 1

# 清除缓存
fibonacci.cache_clear()

# 注意:lru_cache要求参数可哈希
# 对于不可哈希参数,需要转换
def make_hashable(obj):
if isinstance(obj, dict):
return tuple(sorted((k, make_hashable(v)) for k, v in obj.items()))
if isinstance(obj, (list, tuple)):
return tuple(make_hashable(x) for x in obj)
return obj


三、带过期时间的缓存

import time
from functools import wraps

def timed_cache(seconds=300, maxsize=128):
"""带过期时间的缓存装饰器"""
def decorator(func):
cache = {}
access_order = []

@wraps(func)
def wrapper(*args, **kwargs):
key = (args, tuple(sorted(kwargs.items())))
now = time.time()

# 检查缓存
if key in cache:
value, timestamp = cache[key]
if now - timestamp < seconds:
return value
else:
del cache[key]
access_order.remove(key)

# 计算结果
result = func(*args, **kwargs)
cache[key] = (result, now)
access_order.append(key)

# 淘汰超出容量的条目
while len(cache) > maxsize:
oldest_key = access_order.pop(0)
cache.pop(oldest_key, None)

return result

wrapper.cache_clear = lambda: (cache.clear(), access_order.clear())
wrapper.cache_info = lambda: {
'size': len(cache), 'maxsize': maxsize, 'ttl': seconds
}
return wrapper
return decorator

@timed_cache(seconds=60, maxsize=100)
def get_user_profile(user_id):
"""缓存60秒的用户信息"""
return database.query(f"SELECT * FROM users WHERE id = {user_id}")


四、多级缓存

class MultiLevelCache:
"""多级缓存:内存 -> Redis -> 数据库"""
def __init__(self):
self.l1_cache = {} # 内存缓存
self.l1_ttl = {}
self.l1_max_size = 1000
self.l1_default_ttl = 60

def get(self, key, fetch_func=None):
# L1: 内存缓存
value = self._l1_get(key)
if value is not None:
return value

# L2: Redis缓存(示意)
value = self._l2_get(key)
if value is not None:
self._l1_set(key, value)
return value

# L3: 数据源
if fetch_func:
value = fetch_func()
if value is not None:
self._l1_set(key, value)
self._l2_set(key, value)
return value

return None

def _l1_get(self, key):
if key in self.l1_cache:
if time.time() < self.l1_ttl.get(key, 0):
return self.l1_cache[key]
else:
del self.l1_cache[key]
del self.l1_ttl[key]
return None

def _l1_set(self, key, value, ttl=None):
if len(self.l1_cache) >= self.l1_max_size:
# 淘汰最早过期的
oldest = min(self.l1_ttl, key=self.l1_ttl.get)
del self.l1_cache[oldest]
del self.l1_ttl[oldest]
self.l1_cache[key] = value
self.l1_ttl[key] = time.time() + (ttl or self.l1_default_ttl)

def _l2_get(self, key):
# 实际项目中连接Redis
return None

def _l2_set(self, key, value, ttl=300):
# 实际项目中写入Redis
pass

def invalidate(self, key):
"""使缓存失效"""
self.l1_cache.pop(key, None)
self.l1_ttl.pop(key, None)
# self.redis.delete(key)


五、缓存淘汰策略

from collections import OrderedDict
import heapq

class LRUCache:
"""最近最少使用"""
def __init__(self, capacity):
self.cache = OrderedDict()
self.capacity = capacity

def get(self, key):
if key in self.cache:
self.cache.move_to_end(key)
return self.cache[key]
return None

def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)

class LFUCache:
"""最不经常使用"""
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> value
self.freq = {} # key -> frequency
self.freq_map = {} # frequency -> OrderedDict of keys
self.min_freq = 0

def get(self, key):
if key not in self.cache:
return None
self._increase_freq(key)
return self.cache[key]

def put(self, key, value):
if self.capacity <= 0:
return

if key in self.cache:
self.cache[key] = value
self._increase_freq(key)
return

if len(self.cache) >= self.capacity:
self._evict()

self.cache[key] = value
self.freq[key] = 1
self.freq_map.setdefault(1, OrderedDict())[key] = None
self.min_freq = 1

def _increase_freq(self, key):
f = self.freq[key]
self.freq[key] = f + 1
del self.freq_map[f][key]
if not self.freq_map[f]:
del self.freq_map[f]
if self.min_freq == f:
self.min_freq += 1
self.freq_map.setdefault(f + 1, OrderedDict())[key] = None

def _evict(self):
keys = self.freq_map[self.min_freq]
evict_key, _ = keys.popitem(last=False)
if not keys:
del self.freq_map[self.min_freq]
del self.cache[evict_key]
del self.freq[evict_key]


六、缓存模式

6.1 Cache-Aside(旁路缓存)

class CacheAside:
def __init__(self, cache, db):
self.cache = cache
self.db = db

def read(self, key):
# 先查缓存
value = self.cache.get(key)
if value is not None:
return value
# 缓存未命中,查数据库
value = self.db.get(key)
if value is not None:
self.cache.set(key, value)
return value

def write(self, key, value):
# 先更新数据库
self.db.set(key, value)
# 再删除缓存(而非更新,避免并发问题)
self.cache.delete(key)

6.2 Write-Through(写穿透)

class WriteThrough:
def __init__(self, cache, db):
self.cache = cache
self.db = db

def write(self, key, value):
# 同时写入缓存和数据库
self.cache.set(key, value)
self.db.set(key, value)

6.3 Write-Behind(写回)

import threading
from queue import Queue

class WriteBehind:
def __init__(self, cache, db, flush_interval=5):
self.cache = cache
self.db = db
self.write_queue = Queue()
self._start_flush_thread(flush_interval)

def write(self, key, value):
self.cache.set(key, value)
self.write_queue.put((key, value))

def _start_flush_thread(self, interval):
def flush():
while True:
batch = []
while not self.write_queue.empty():
batch.append(self.write_queue.get())
if batch:
self.db.batch_write(batch)
time.sleep(interval)

thread = threading.Thread(target=flush, daemon=True)
thread.start()


七、缓存穿透、击穿、雪崩

import random
import hashlib

class RobustCache:
def __init__(self, cache):
self.cache = cache
self.null_keys = set() # 布隆过滤器的简化版

def get_with_protection(self, key, fetch_func, ttl=300):
"""防穿透:缓存空值"""
if key in self.null_keys:
return None

value = self.cache.get(key)
if value is not None:
return value

value = fetch_func(key)
if value is None:
self.null_keys.add(key) # 记录不存在的key
return None

# 防雪崩:TTL加随机偏移
jitter = random.randint(0, 60)
self.cache.set(key, value, ttl=ttl + jitter)
return value

def get_with_mutex(self, key, fetch_func, ttl=300):
"""防击穿:互斥锁"""
value = self.cache.get(key)
if value is not None:
return value

lock_key = f"lock:{key}"
if self.cache.set_nx(lock_key, "1", ttl=10): # 获取锁
try:
value = fetch_func(key)
self.cache.set(key, value, ttl=ttl)
return value
finally:
self.cache.delete(lock_key)
else:
# 等待其他线程填充缓存
time.sleep(0.1)
return self.cache.get(key)


八、异步缓存

import asyncio

class AsyncCache:
def __init__(self, maxsize=1000, ttl=300):
self._cache = {}
self._ttl = {}
self._locks = {}
self._maxsize = maxsize
self._default_ttl = ttl

async def get_or_set(self, key, factory, ttl=None):
"""获取缓存,不存在则通过factory创建"""
# 检查缓存
if key in self._cache and time.time() < self._ttl.get(key, 0):
return self._cache[key]

# 使用锁防止并发重复计算
if key not in self._locks:
self._locks[key] = asyncio.Lock()

async with self._locks[key]:
# 双重检查
if key in self._cache and time.time() < self._ttl.get(key, 0):
return self._cache[key]

value = await factory()
self._cache[key] = value
self._ttl[key] = time.time() + (ttl or self._default_ttl)

# 清理过期和超量
self._cleanup()
return value

def _cleanup(self):
now = time.time()
expired = [k for k, t in self._ttl.items() if t < now]
for k in expired:
self._cache.pop(k, None)
self._ttl.pop(k, None)

# 使用
cache = AsyncCache(ttl=60)

async def get_user(user_id):
return await cache.get_or_set(
f"user:{user_id}",
lambda: db.fetch_user(user_id)
)


九、缓存装饰器工厂

def cached(ttl=300, key_func=None, cache_none=False):
"""通用缓存装饰器"""
def decorator(func):
_cache = {}

def default_key(*args, **kwargs):
return (args, tuple(sorted(kwargs.items())))

_key_func = key_func or default_key

@wraps(func)
def wrapper(*args, **kwargs):
key = _key_func(*args, **kwargs)
now = time.time()

if key in _cache:
value, expire_at = _cache[key]
if now < expire_at:
return value

result = func(*args, **kwargs)

if result is not None or cache_none:
_cache[key] = (result, now + ttl)

return result

wrapper.invalidate = lambda *args, **kwargs: _cache.pop(
_key_func(*args, **kwargs), None
)
wrapper.clear = _cache.clear
return wrapper
return decorator

# 使用
@cached(ttl=120, key_func=lambda user_id: f"profile:{user_id}")
def get_profile(user_id):
return db.query_profile(user_id)

# 手动失效
get_profile.invalidate(123)


十、总结

缓存策略选择:
1. 读多写少 -> Cache-Aside
2. 数据一致性要求高 -> Write-Through
3. 写入频繁 -> Write-Behind(异步写回)
4. 热点数据 -> LFU淘汰策略
5. 时间局部性强 -> LRU淘汰策略

注意事项:
- 缓存不是银弹,增加了系统复杂度
- 必须考虑缓存失效和数据一致性
- 监控缓存命中率,命中率低说明缓存策略有问题
- 设置合理的TTL,避免数据过期太慢或缓存抖动

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/9 23:36:59

JN5169无线模块选型与设计实战:从硬件布局到低功耗优化

1. 从芯片到模块&#xff1a;JN5169的核心价值与设计哲学在物联网硬件开发的圈子里&#xff0c;选型永远是项目启动时最让人纠结又兴奋的一环。尤其是当你需要为智能家居传感器、工业无线控制器或者环境监测节点寻找一颗“心脏”时&#xff0c;面对市面上琳琅满目的无线微控制器…

作者头像 李华