一、为什么需要 EmbeddingsCache
-
减少重复计算
对同一段文本,向量化模型会每次返回相同的嵌入。借助缓存,首次计算后无论后续何时再请求,都能直接复用上次结果。 -
降低业务延迟
嵌入模型推理耗时(数十毫秒到百毫秒不等),对实时系统尤为明显。缓存命中时,仅需一次 Redis GET,毫秒级响应,极大提升用户体验。 -
可扩展元数据管理
除了嵌入向量,往往还需对 “文本来源”、“类别标签”、“处理时间戳” 等信息进行关联存储。EmbeddingsCache 内置 metadata 字段,丰富应用场景。
二、核心设计与实现原理
-
Redis 哈希 + 二进制字段
每条缓存项以 Redis Hash 存储,字段包括:text
、model_name
、embedding
(二进制 Float32 数组)和metadata
(JSON 序列化),并可附加 TTL。 -
Key 计算
通常以 SHA1(text + model_name) 生成唯一 Key,如:embedcache:<sha1>
,避免长文本直接作为 Redis Key。 -
批量操作管道化
mset/mget/mexists/mdrop
等批量接口在底层使用 Redis pipeline 或 MGET/MSET 命令,实现高吞吐。 -
异步支持
提供aset/aget/amexists/amdrop
,与主流 async 框架(asyncio、FastAPI)无缝集成。
三、快速上手示例
3.1 环境与依赖
pip install redisvl sentence-transformers
import os, time
import numpy as np
os.environ["TOKENIZERS_PARALLELISM"] = "False"from redisvl.extensions.cache.embeddings import EmbeddingsCache
from redisvl.utils.vectorize import HFTextVectorizer
3.2 初始化与配置
# 1. 向量化器:选用 all-mpnet-base-v2
vectorizer = HFTextVectorizer(model="redis/langcache-embed-v1",cache_folder=os.getenv("SENTENCE_TRANSFORMERS_HOME")
)# 2. 缓存:不开启过期
cache = EmbeddingsCache(name="embedcache",redis_url="redis://localhost:6379",ttl=None
)
四、核心 API 详解
4.1 单条存取
-
存储
text, model = "What is AI?", "redis/langcache-embed-v1" emb = vectorizer.embed(text) meta = {"category":"ai","src":"user"} key = cache.set(text=text, model_name=model, embedding=emb, metadata=meta)
- 返回 Redis Key,可用于后续按键操作。
-
检索
record = cache.get(text=text, model_name=model) # 返回 dict: {'text', 'model_name', 'embedding', 'metadata', 'ttl_remain'}
-
存在性检查 & 删除
cache.exists(text=text, model_name=model) # bool cache.drop(text=text, model_name=model)
4.2 键名操作
无需提供原文本,只需 Key:
cache.exists_by_key(key)
cache.get_by_key(key)
cache.drop_by_key(key)
4.3 批量操作
最佳实践:高并发场景下尽量使用批量接口,减少网络延迟。
items = [{"text": t, "model_name": m, "embedding": vectorizer.embed(t)} for t in texts]
keys = cache.mset(items)
exists = cache.mexists(texts, model_name=m)
recs = cache.mget(texts, model_name=m)
cache.mdrop(texts, model_name=m)
注意:异步对应
amset
/amexists
/amget
/amdrop
。
五、TTL 与过期策略
- 全局 TTL:在初始化时指定
ttl=N
,所有条目默认 N 秒后自动过期。 - 单条覆盖:
set(..., ttl=5)
只对该条生效。
示例:5 秒默认、1 秒覆盖、验证过期行为:
ttl_cache = EmbeddingsCache("ttl","redis://localhost:6379", ttl=5)
k1 = ttl_cache.set(text, model, emb) # 5s
k2 = ttl_cache.set("short", model, emb, ttl=1) # 1stime.sleep(2)
assert not ttl_cache.exists_by_key(k2)
assert ttl_cache.exists_by_key(k1)
六、异步场景集成
import asyncioasync def async_demo():k = await cache.aset(text="Async", model_name=model, embedding=emb)hit = await cache.aexists_by_key(k)print("Async hit?", hit)await cache.adrop_by_key(k)asyncio.run(async_demo())
适用于 FastAPI、AIOHTTP 等异步框架。
七、实战案例:文本分类系统
# 1. 新建缓存 & 绑定向量化器
example = EmbeddingsCache("demo","redis://...", ttl=3600)
vectorizer.cache = example# 2. 处理流式请求
queries = ["What is AI?","How to learn ML?","What is AI?","NN?","How to learn ML?"]
hits=0
for q in queries:if example.exists(text=q, model_name=model): hits+=1emb = vectorizer.embed(q) # 自动缓存 or 计算print(f"命中率:{hits}/{len(queries)}={(hits/len(queries))*100:.1f}%")
八、性能对比实测
bench = EmbeddingsCache("bench","redis://...", ttl=3600)
vectorizer.cache=bench
n=20# 跳过缓存
t1=time.time()
for _ in range(n): vectorizer.embed(text, skip_cache=True)
t2=time.time()# 使用缓存
t3=time.time()
for _ in range(n): vectorizer.embed(text)
t4=time.time()print(f"NoCache avg={(t2-t1)/n:.4f}s,WithCache avg={(t4-t3)/n:.4f}s,加速{(t2-t1)/(t4-t3):.2f}×")
典型可达 5×–10× 加速。
九、架构扩展与最佳实践
-
多模型并行
不同业务可用不同model_name
,缓存同一文本多版本嵌入。 -
容量监控与清理
定期扫描 Redis 内存、基于 LFU/LRU 策略删除冷门条目,避免缓存膨胀。 -
分布式部署
在大规模集群中,可将 EmbeddingsCache 前置于微服务,后端统一 Redis;或使用 Redis Cluster、哨兵实现高可用。 -
落地监控
- 缓存命中率(hits/queries)
- 平均响应时延
- Redis 内存使用
-
安全与隔离
- 在多租户场景下,可为每个租户设置不同
name
前缀,避免 Key 冲突 - 使用 Redis ACL/密码保护访问
- 在多租户场景下,可为每个租户设置不同
十、小结
RedisVL EmbeddingsCache 通过简洁一致的 API,将文本嵌入与元数据高效落地到 Redis,支持单/批量、同步/异步、TTL 管理。本指南覆盖了从入门到进阶、实战到运维的全流程,结合典型业务场景与性能数据,助你轻松构建低成本、高性能的嵌入缓存层,显著提升向量检索应用的用户体验和系统效率。