二、基金知识库构建(核心 RAG 数据层)(第 5 周)
一、先安装依赖(复制运行)
pip install langchain langchain-community pymilvus unstructured pdfplumber pandas sentence-transformers torch二、基金知识库构建 完整代码(三合一)
PDF 文档解析
使用 Unstructured 解析文本、表格
处理跨页表格、复杂段落、目录、页眉页脚
文本分块(Chunking)
使用 RecursiveCharacterTextSplitter 递归分块
设置块大小、重叠长度,保证金融语义完整
向量入库
使用 BAAI/bge-large-zh 生成向量
存入 Milvus/PGVector,建立索引
构建基金知识结构化索引表
import os import re import pandas as pd try: import torch except ImportError: torch = None from langchain_community.document_loaders import PyPDFLoader from langchain_community.embeddings import HuggingFaceEmbeddings from langchain_community.vectorstores import Milvus from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter # ===================== 配置 ===================== PDF_FOLDER = "./fund_documents" CSV_PATH = "./fund_basic_info.csv" MILVUS_HOST = "localhost" MILVUS_PORT = 19530 COLLECTION = "fund_knowledge_base" EMBEDDING_MODEL = "BAAI/bge-large-zh" # ================================================= def resolve_embedding_device(): """ 选择嵌入用设备。仅 is_available() 不够:RTX 50 系等为 sm_120 时,旧 wheel 会报 no kernel image is available for execution on the device,需回退 CPU 或升级 PyTorch。 """ if torch is None or not torch.cuda.is_available(): return "cpu", None try: x = torch.randn(128, 128, device="cuda", dtype=torch.float32) _ = (x @ x).sum().cpu() torch.cuda.synchronize() return "cuda", None except Exception as e: return "cpu", str(e) # 1. 读取PDF def load_pdfs(pdf_folder): docs = [] for filename in os.listdir(pdf_folder): if filename.endswith(".pdf"): path = os.path.join(pdf_folder, filename) loader = PyPDFLoader(path) pages = loader.load() for page in pages: # 清洗内容 txt = page.page_content txt = re.sub(r"\n+", "\n", txt) txt = re.sub(r" +", " ", txt) # 元数据 fund_code = filename.split("_")[0] docs.append(Document( page_content=txt, metadata={ "source": filename, "fund_code": fund_code } )) print(f"✅ 已加载:{filename}") return docs # 2. 文本分块 def split_docs(docs): splitter = RecursiveCharacterTextSplitter( chunk_size=512, chunk_overlap=64, separators=["\n\n", "\n", "。", ";", " "] ) return splitter.split_documents(docs) # 3. 向量入库 Milvus def build_milvus_db(chunks): device, cuda_probe_err = resolve_embedding_device() if device == "cuda" and torch is not None: print(f"🖥️ 嵌入设备: GPU ({torch.cuda.get_device_name(0)})") elif torch is not None and torch.cuda.is_available() and cuda_probe_err: print("🖥️ 嵌入设备: CPU(本机显卡与当前 PyTorch 的 CUDA 架构不匹配,已在 GPU 上探测失败并回退)") print(f" 原因摘要: {cuda_probe_err[:200]}…" if len(cuda_probe_err) > 200 else f" 原因: {cuda_probe_err}") print(" 解决: 按 https://pytorch.org/get-started/locally/ 安装支持本卡(如 CUDA 12.8/13.0、含 sm_120)的 PyTorch 后再用 GPU。") else: print("🖥️ 嵌入设备: CPU(无可用 CUDA,或需安装带 CUDA 的 PyTorch)") embeddings = HuggingFaceEmbeddings( model_name=EMBEDDING_MODEL, model_kwargs={"device": device}, ) db = Milvus.from_documents( chunks, embeddings, collection_name=COLLECTION, connection_args={ "host": MILVUS_HOST, "port": MILVUS_PORT } ) print("🎉 Milvus 向量库构建完成!") return db # 4. 构建基金索引表 def build_index(): df = pd.read_csv(CSV_PATH, encoding="utf-8-sig") index_df = df[["基金代码", "基金简称", "风险等级", "近1年涨幅", "基金类型"]] index_df.to_csv("fund_knowledge_index.csv", index=False, encoding="utf-8-sig") print("📊 基金知识索引表已保存") # ===================== 主程序 ===================== if __name__ == "__main__": print("=" * 50) print(" 基金 RAG 知识库构建(Milvus 版)") print("=" * 50) docs = load_pdfs(PDF_FOLDER) chunks = split_docs(docs) build_milvus_db(chunks) build_index() print("\n✅ 全部完成!")当前环境里是 CPU 轮子,换成官方 CUDA 轮子(体积约 2.6GB,
python -m pip install torch torchvision --index-url https://download.pytorch.org/whl/cu128完成后在本机同一 Python 里执行:
python -c "import torch; print(torch.__version__); x=torch.randn(256,256,device='cuda'); print((x@x).sum())"若显示True,再跑build_knowledge_base.py就会用 RTX 5070 做嵌入。