Python缓存入门实战:核心概念与用法详解
一、缓存的核心概念
1.1 什么是缓存?
缓存是一种临时存储技术,将频繁访问的数据存储在高速存储介质中,以减少对慢速数据源(如数据库、API)的访问次数,提升系统性能。
1.2 缓存的核心优势
- 提升性能:从内存读取比从磁盘/网络读取快100-1000倍
- 减轻后端压力:减少对数据库/API的重复查询
- 降低成本:减少外部服务的调用次数
二、Python内置缓存机制
2.1 functools.lru_cache - 最常用的缓存装饰器
from functools import lru_cache
import time
# 基础用法
@lru_cache(maxsize=128)
def fibonacci(n):
"""计算斐波那契数列"""
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
# 测试性能
def test_fibonacci():
start = time.time()
result = fibonacci(35)
elapsed = time.time() - start
print(f"fibonacci(35) = {result}, 耗时: {elapsed:.4f}秒")
# 第二次调用会从缓存读取
start = time.time()
result = fibonacci(35)
elapsed = time.time() - start
print(f"第二次调用耗时: {elapsed:.6f}秒")
# 带参数的缓存
@lru_cache(maxsize=32)
def get_user_data(user_id, include_profile=False):
"""模拟获取用户数据"""
print(f"从数据库获取用户 {user_id} 的数据...")
time.sleep(1) # 模拟耗时操作
return {
"id": user_id,
"name": f"用户{user_id}",
"profile": {"age": 25} if include_profile else None
}
# 查看缓存信息
def show_cache_info():
print(f"缓存命中率: {get_user_data.cache_info().hits}")
print(f"缓存未命中: {get_user_data.cache_info().misses}")
# 清空缓存
get_user_data.cache_clear()
三、内存缓存实战
3.1 使用 cachetools 库
# 首先安装:pip install cachetools
from cachetools import TTLCache, LRUCache, cached
import time
# 1. TTL缓存(自动过期)
ttl_cache = TTLCache(maxsize=100, ttl=300) # 最大100条,5分钟过期
# 2. LRU缓存
lru_cache = LRUCache(maxsize=50)
# 3. 使用装饰器
from cachetools import cached
@cached(cache=TTLCache(maxsize=100, ttl=60))
def api_call(endpoint, params=None):
"""模拟API调用"""
print(f"调用API: {endpoint}")
time.sleep(0.5)
return {"data": f"来自{endpoint}的响应", "timestamp": time.time()}
# 4. 带键函数的高级用法
def custom_key(endpoint, params=None):
"""自定义缓存键生成规则"""
return f"{endpoint}:{hash(str(params) if params else '')}"
@cached(cache=TTLCache(maxsize=50, ttl=30), key=custom_key)
def cached_api_call(endpoint, params=None):
print(f"实际调用API: {endpoint}")
time.sleep(0.5)
return {"result": "success", "endpoint": endpoint}
四、文件系统缓存
4.1 使用 diskcache 库
# 首先安装:pip install diskcache
from diskcache import Cache
import json
import time
class FileSystemCache:
def __init__(self, directory=".cache"):
self.cache = Cache(directory)
def get_or_set(self, key, func, expire=3600, **kwargs):
"""
获取缓存,如果没有则执行函数并缓存结果
"""
if key in self.cache:
print(f"从缓存读取: {key}")
return self.cache[key]
print(f"缓存未命中,执行函数: {key}")
result = func(**kwargs)
self.cache.set(key, result, expire=expire)
return result
def clear_old(self):
"""清理过期缓存"""
self.cache.expire()
def stats(self):
"""获取缓存统计信息"""
return {
"size": self.cache.volume(),
"count": len(self.cache),
"hits": self.cache.stats()[0],
"misses": self.cache.stats()[1]
}
# 使用示例
def expensive_operation(data_id):
"""模拟耗时操作"""
print(f"执行耗时操作: {data_id}")
time.sleep(2)
return {"id": data_id, "result": "processed", "time": time.time()}
# 实战应用
fs_cache = FileSystemCache("data_cache")
# 模拟API数据缓存
def fetch_api_data(api_url, params=None):
"""模拟获取API数据"""
# 这里应该是实际的API调用
print(f"调用外部API: {api_url}")
time.sleep(1)
return {
"url": api_url,
"params": params,
"data": [1, 2, 3, 4, 5],
"timestamp": time.time()
}
# 使用缓存获取数据
data = fs_cache.get_or_set(
key="api:users:list",
func=fetch_api_data,
expire=300, # 5分钟过期
api_url="https://api.example.com/users",
params={"page": 1, "limit": 10}
)
五、Redis缓存实战
5.1 分布式缓存方案
# 首先安装:pip install redis
import redis
import json
import pickle
from functools import wraps
import time
class RedisCache:
def __init__(self, host='localhost', port=6379, db=0, password=None):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=False
)
def set(self, key, value, expire=None):
"""设置缓存"""
serialized = pickle.dumps(value)
if expire:
self.redis_client.setex(key, expire, serialized)
else:
self.redis_client.set(key, serialized)
def get(self, key):
"""获取缓存"""
data = self.redis_client.get(key)
if data:
return pickle.loads(data)
return None
def delete(self, key):
"""删除缓存"""
return self.redis_client.delete(key)
def exists(self, key):
"""检查键是否存在"""
return self.redis_client.exists(key)
def clear_pattern(self, pattern="*"):
"""按模式清除缓存"""
keys = self.redis_client.keys(pattern)
if keys:
return self.redis_client.delete(*keys)
return 0
def cache_decorator(self, expire=3600):
"""缓存装饰器工厂"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{args}:{kwargs}"
# 尝试从缓存获取
cached_result = self.get(cache_key)
if cached_result is not None:
print(f"缓存命中: {cache_key}")
return cached_result
# 缓存未命中,执行函数
print(f"缓存未命中: {cache_key}")
result = func(*args, **kwargs)
# 缓存结果
self.set(cache_key, result, expire=expire)
return result
return wrapper
return decorator
# 使用示例
def setup_redis_cache():
cache = RedisCache(host='localhost', port=6379)
# 测试基本操作
cache.set("user:1001", {"name": "张三", "age": 25}, expire=60)
user_data = cache.get("user:1001")
print(f"用户数据: {user_data}")
return cache
# 实战:缓存数据库查询结果
class DatabaseService:
def __init__(self):
self.cache = RedisCache()
@cached(cache=TTLCache(maxsize=100, ttl=300)) # 内存缓存
def get_user_from_memory(self, user_id):
"""内存缓存:快速但容量有限"""
print(f"从数据库查询用户 {user_id}")
time.sleep(0.5) # 模拟数据库查询
return {"id": user_id, "name": f"用户{user_id}", "source": "memory_cache"}
def get_user_from_redis(self, user_id):
"""Redis缓存:分布式缓存"""
cache_key = f"user:{user_id}"
# 尝试从Redis获取
cached_data = self.cache.get(cache_key)
if cached_data:
cached_data["source"] = "redis_cache"
return cached_data
# 从数据库查询
print(f"从数据库查询用户 {user_id}")
time.sleep(0.5)
user_data = {"id": user_id, "name": f"用户{user_id}", "source": "database"}
# 存入Redis,1小时过期
self.cache.set(cache_key, user_data, expire=3600)
return user_data
六、多层缓存架构
class MultiLevelCache:
"""
多层缓存:内存缓存 + Redis缓存 + 数据库
访问顺序:内存 -> Redis -> 数据库
"""
def __init__(self):
# L1: 内存缓存(快速,容量小)
self.l1_cache = TTLCache(maxsize=1000, ttl=60)
# L2: Redis缓存(较慢,容量大,可分布式)
self.l2_cache = RedisCache()
# 命中统计
self.stats = {"l1_hits": 0, "l2_hits": 0, "misses": 0}
def get(self, key, fetch_func=None, expire=3600):
"""
获取数据,支持回源查询
fetch_func: 缓存未命中时的数据获取函数
"""
# 1. 检查L1缓存
if key in self.l1_cache:
self.stats["l1_hits"] += 1
return self.l1_cache[key]
# 2. 检查L2缓存(Redis)
l2_data = self.l2_cache.get(key)
if l2_data is not None:
self.stats["l2_hits"] += 1
# 回填到L1缓存
self.l1_cache[key] = l2_data
return l2_data
# 3. 缓存未命中,执行回源查询
self.stats["misses"] += 1
if fetch_func:
data = fetch_func()
# 写入两级缓存
self.l2_cache.set(key, data, expire=expire)
self.l1_cache[key] = data
return data
return None
def set(self, key, value, l1_expire=60, l2_expire=3600):
"""设置多级缓存"""
self.l1_cache[key] = value
self.l2_cache.set(key, value, expire=l2_expire)
def get_stats(self):
"""获取缓存统计"""
total = sum(self.stats.values())
if total > 0:
hit_rate = (self.stats["l1_hits"] + self.stats["l2_hits"]) / total
else:
hit_rate = 0
return {
**self.stats,
"total_requests": total,
"hit_rate": f"{hit_rate:.2%}",
"l1_size": len(self.l1_cache),
"l1_maxsize": self.l1_cache.maxsize
}
七、缓存策略与最佳实践
7.1 缓存更新策略
class CacheUpdateStrategies:
"""
缓存更新策略示例
"""
@staticmethod
def write_through(cache, key, value):
"""
直写策略:同时更新缓存和数据库
优点:数据一致性高
缺点:写入性能较低
"""
# 1. 写入数据库
# db.write(key, value)
# 2. 更新缓存
cache.set(key, value)
@staticmethod
def write_back(cache, key, value):
"""
回写策略:先写缓存,异步写数据库
优点:写入性能高
缺点:数据可能丢失
"""
# 1. 只更新缓存
cache.set(key, value)
# 2. 标记为脏数据,异步写入数据库
# async_write_to_db(key, value)
@staticmethod
def cache_aside(key, cache, db_query_func):
"""
旁路缓存策略
读:先读缓存,未命中读数据库并更新缓存
写:直接写数据库,删除缓存
"""
# 读操作
data = cache.get(key)
if data is None:
data = db_query_func() # 从数据库读取
cache.set(key, data)
return data
@staticmethod
def refresh_ahead(cache, key, fetch_func, refresh_time=30):
"""
预刷新策略:在缓存过期前主动刷新
"""
data = cache.get(key)
if data and data.get("expire_time", 0) - time.time() < refresh_time:
# 接近过期,异步刷新
# threading.Thread(target=fetch_func).start()
pass
return data
7.2 缓存穿透、击穿、雪崩解决方案
class CacheProtection:
"""
缓存保护机制
"""
def __init__(self, cache):
self.cache = cache
def prevent_penetration(self, key, fetch_func, empty_marker=None, ttl=60):
"""
防止缓存穿透
空结果也缓存,避免频繁查询数据库
"""
# 1. 尝试从缓存获取
data = self.cache.get(key)
# 2. 如果是空值标记,直接返回
if data == empty_marker:
return None
# 3. 如果有数据,直接返回
if data is not None:
return data
# 4. 缓存未命中,查询数据库
result = fetch_func()
# 5. 缓存结果(即使是空值)
if result is None:
self.cache.set(key, empty_marker, expire=ttl)
else:
self.cache.set(key, result, expire=ttl)
return result
def prevent_breakdown(self, key, fetch_func, lock_timeout=10, ttl=300):
"""
防止缓存击穿
使用互斥锁,避免大量并发请求同时查询数据库
"""
import threading
# 1. 尝试获取数据
data = self.cache.get(key)
if data is not None:
return data
# 2. 尝试获取锁
lock_key = f"lock:{key}"
lock_acquired = False
try:
# 这里可以使用Redis分布式锁
# 简化版本使用本地锁
lock = threading.Lock()
if lock.acquire(timeout=lock_timeout):
lock_acquired = True
# 再次检查缓存(双检锁)
data = self.cache.get(key)
if data is not None:
return data
# 查询数据库
data = fetch_func()
# 更新缓存
if data is not None:
self.cache.set(key, data, expire=ttl)
finally:
if lock_acquired:
lock.release()
return data
def prevent_avalanche(self, keys, fetch_func, base_ttl=300, random_range=60):
"""
防止缓存雪崩
为缓存设置随机过期时间
"""
import random
results = {}
for key in keys:
# 为每个key设置不同的过期时间
ttl = base_ttl + random.randint(0, random_range)
data = self.cache.get(key)
if data is None:
data = fetch_func(key)
if data is not None:
self.cache.set(key, data, expire=ttl)
results[key] = data
return results
八、实战案例:Web应用缓存优化
from flask import Flask, jsonify, request
import time
app = Flask(__name__)
# 初始化缓存
cache = MultiLevelCache()
class ProductService:
"""商品服务示例"""
@staticmethod
def get_product_from_db(product_id):
"""模拟从数据库获取商品"""
print(f"查询数据库,商品ID: {product_id}")
time.sleep(1) # 模拟数据库查询
return {
"id": product_id,
"name": f"商品{product_id}",
"price": 100 + product_id,
"stock": 50,
"updated_at": time.time()
}
# API路由
@app.route('/product/<int:product_id>')
def get_product(product_id):
"""获取商品信息(带缓存)"""
cache_key = f"product:{product_id}"
# 使用多层缓存获取数据
product = cache.get(
key=cache_key,
fetch_func=lambda: ProductService.get_product_from_db(product_id),
expire=300 # 5分钟过期
)
if product:
return jsonify({
"data": product,
"source": "cache" if "source" not in product else product["source"],
"cached": True
})
return jsonify({"error": "Product not found"}), 404
@app.route('/cache/stats')
def cache_stats():
"""查看缓存统计"""
return jsonify(cache.get_stats())
@app.route('/cache/clear')
def clear_cache():
"""清理缓存"""
# 在实际应用中,这里应该更精确地清理
cache.l1_cache.clear()
return jsonify({"message": "缓存已清理"})
if __name__ == '__main__':
# 启动前预热缓存
print("预热缓存...")
for i in range(1, 6):
cache_key = f"product:{i}"
cache.get(
key=cache_key,
fetch_func=lambda i=i: ProductService.get_product_from_db(i)
)
print("缓存预热完成")
app.run(debug=True)
九、性能对比测试
def performance_comparison():
"""不同缓存方案的性能对比"""
import time
import random
# 测试数据
test_data = {i: f"value_{i}" for i in range(1000)}
# 1. 无缓存
def no_cache(key):
time.sleep(0.01) # 模拟10ms延迟
return test_data.get(key)
# 2. 内存缓存
from functools import lru_cache
@lru_cache(maxsize=100)
def memory_cache(key):
time.sleep(0.01)
return test_data.get(key)
# 3. 模拟Redis缓存
class MockRedis:
def __init__(self):
self.cache = {}
def get(self, key):
return self.cache.get(key)
def set(self, key, value):
self.cache[key] = value
redis_cache = MockRedis()
def redis_cached(key):
value = redis_cache.get(key)
if value is not None:
return value
time.sleep(0.01)
value = test_data.get(key)
redis_cache.set(key, value)
return value
# 测试函数
def test_performance(func, name, iterations=100):
start = time.time()
for _ in range(iterations):
key = random.randint(0, 99) # 使用前100个key,有一定重复
func(key)
elapsed = time.time() - start
print(f"{name}: {iterations}次调用耗时 {elapsed:.4f}秒,平均 {elapsed/iterations*1000:.2f}ms/次")
# 执行测试
print("=== 缓存性能对比测试 ===")
test_performance(no_cache, "无缓存")
test_performance(memory_cache, "内存缓存")
test_performance(redis_cached, "Redis缓存")
# 测试缓存命中率
print("\n=== 缓存命中率测试 ===")
hits = memory_cache.cache_info().hits
misses = memory_cache.cache_info().misses
total = hits + misses
if total > 0:
hit_rate = hits / total
print(f"内存缓存命中率: {hit_rate:.2%} ({hits}/{total})")
if __name__ == "__main__":
performance_comparison()
十、总结与最佳实践
10.1 选择合适的缓存方案
- 小型应用:使用
functools.lru_cache 或 cachetools
- 单机应用:使用
diskcache 文件缓存
- 分布式系统:使用 Redis 或 Memcached
- 多层缓存:结合内存缓存 + Redis
10.2 缓存键设计原则
- 使用有意义的键名
- 包含版本信息以便清理
- 避免键名过长
- 使用一致的命名规范
10.3 缓存失效策略
- 设置合理的过期时间
- 使用惰性删除 + 定期清理
- 重要数据使用主动更新
10.4 监控与维护
- 监控缓存命中率
- 设置缓存大小限制
- 定期清理无用缓存
- 记录缓存访问日志
通过以上实战示例,你应该能够理解Python中缓存的核心概念,并根据具体需求选择合适的缓存方案。记住:缓存不是万能的,需要根据实际场景合理使用,避免过度缓存导致的数据不一致和内存浪费。