431 lines
15 KiB
Python
431 lines
15 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
'''
|
||
PDF文档转RAG知识库处理模块
|
||
支持将世界观文档(如COC.pdf)转换为RAG检索格式
|
||
'''
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
from typing import List, Dict, Tuple
|
||
import hashlib
|
||
from datetime import datetime
|
||
|
||
try:
|
||
import PyPDF2
|
||
import fitz # pymupdf
|
||
PYMUPDF_AVAILABLE = True
|
||
except ImportError:
|
||
PYMUPDF_AVAILABLE = False
|
||
print("Warning: pymupdf not available, using PyPDF2 only")
|
||
|
||
try:
|
||
from sentence_transformers import SentenceTransformer
|
||
import faiss
|
||
import numpy as np
|
||
EMBEDDING_AVAILABLE = True
|
||
except ImportError:
|
||
EMBEDDING_AVAILABLE = False
|
||
print("Warning: sentence-transformers or faiss not available, using simple text matching")
|
||
|
||
class PDFToRAGProcessor:
|
||
def __init__(self, embedding_model: str = "./sentence-transformers/all-MiniLM-L6-v2"):
|
||
"""
|
||
初始化PDF处理器
|
||
|
||
Args:
|
||
embedding_model: 向量化模型名称
|
||
"""
|
||
global EMBEDDING_AVAILABLE
|
||
self.chunks = []
|
||
self.embeddings = None
|
||
self.index = None
|
||
|
||
if EMBEDDING_AVAILABLE:
|
||
try:
|
||
self.embedding_model = SentenceTransformer(embedding_model)
|
||
print(f"✓ 向量模型加载成功: {embedding_model}")
|
||
except Exception as e:
|
||
print(f"✗ 向量模型加载失败: {e}")
|
||
self.embedding_model = None
|
||
EMBEDDING_AVAILABLE = False
|
||
else:
|
||
self.embedding_model = None
|
||
|
||
def extract_text_from_pdf(self, pdf_path: str) -> str:
|
||
"""从PDF提取文本"""
|
||
if not os.path.exists(pdf_path):
|
||
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
|
||
|
||
text = ""
|
||
|
||
# 优先使用pymupdf,效果更好
|
||
if PYMUPDF_AVAILABLE:
|
||
try:
|
||
doc = fitz.open(pdf_path)
|
||
for page_num in range(len(doc)):
|
||
page = doc.load_page(page_num)
|
||
text += page.get_text()
|
||
doc.close()
|
||
print(f"✓ 使用pymupdf提取文本成功")
|
||
return text
|
||
except Exception as e:
|
||
print(f"✗ pymupdf提取失败: {e}, 尝试PyPDF2")
|
||
|
||
# 备用PyPDF2
|
||
try:
|
||
with open(pdf_path, 'rb') as file:
|
||
pdf_reader = PyPDF2.PdfReader(file)
|
||
for page in pdf_reader.pages:
|
||
text += page.extract_text()
|
||
print(f"✓ 使用PyPDF2提取文本成功")
|
||
except Exception as e:
|
||
raise Exception(f"PDF文本提取失败: {e}")
|
||
|
||
return text
|
||
|
||
def clean_text(self, text: str) -> str:
|
||
"""清理文本"""
|
||
# 移除多余空行
|
||
text = re.sub(r'\n\s*\n', '\n\n', text)
|
||
# 移除页码等
|
||
text = re.sub(r'第\s*\d+\s*页', '', text)
|
||
text = re.sub(r'Page\s*\d+', '', text)
|
||
# 统一标点符号
|
||
text = text.replace(' ', ' ') # 全角空格转半角
|
||
text = re.sub(r'\s+', ' ', text) # 多个空格合并
|
||
|
||
return text.strip()
|
||
|
||
def chunk_text_by_semantic(self, text: str, max_chunk_size: int = 500) -> List[Dict]:
|
||
"""按语义分块文本"""
|
||
# 按段落分割
|
||
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
|
||
|
||
chunks = []
|
||
current_chunk = ""
|
||
current_size = 0
|
||
|
||
for para in paragraphs:
|
||
para_size = len(para)
|
||
|
||
# 如果当前段落太长,需要进一步分割
|
||
if para_size > max_chunk_size:
|
||
# 保存当前chunk
|
||
if current_chunk:
|
||
chunks.append({
|
||
"content": current_chunk.strip(),
|
||
"size": current_size,
|
||
"type": "paragraph"
|
||
})
|
||
current_chunk = ""
|
||
current_size = 0
|
||
|
||
# 按句子分割长段落
|
||
sentences = re.split(r'[。!?;]\s*', para)
|
||
for sentence in sentences:
|
||
if sentence.strip():
|
||
sentence += '。' # 添加标点
|
||
if current_size + len(sentence) > max_chunk_size and current_chunk:
|
||
chunks.append({
|
||
"content": current_chunk.strip(),
|
||
"size": current_size,
|
||
"type": "sentence"
|
||
})
|
||
current_chunk = sentence
|
||
current_size = len(sentence)
|
||
else:
|
||
current_chunk += sentence
|
||
current_size += len(sentence)
|
||
else:
|
||
# 正常段落处理
|
||
if current_size + para_size > max_chunk_size and current_chunk:
|
||
chunks.append({
|
||
"content": current_chunk.strip(),
|
||
"size": current_size,
|
||
"type": "paragraph"
|
||
})
|
||
current_chunk = para
|
||
current_size = para_size
|
||
else:
|
||
current_chunk += "\n" + para if current_chunk else para
|
||
current_size += para_size
|
||
|
||
# 保存最后的chunk
|
||
if current_chunk:
|
||
chunks.append({
|
||
"content": current_chunk.strip(),
|
||
"size": current_size,
|
||
"type": "paragraph"
|
||
})
|
||
|
||
# 为每个chunk添加唯一ID和元数据
|
||
for i, chunk in enumerate(chunks):
|
||
chunk["id"] = f"chunk_{i:04d}"
|
||
chunk["hash"] = hashlib.md5(chunk["content"].encode()).hexdigest()[:8]
|
||
|
||
return chunks
|
||
|
||
def extract_key_concepts(self, chunks: List[Dict]) -> List[Dict]:
|
||
"""提取关键概念和术语"""
|
||
concepts = []
|
||
|
||
# 简单的关键词提取规则
|
||
concept_patterns = [
|
||
r'(?:技能|属性|规则)[::]\s*([^。\n]+)',
|
||
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', # 英文专有名词
|
||
r'【([^】]+)】', # 中文术语标记
|
||
r'"([^"]+)"', # 引号内容
|
||
]
|
||
|
||
for chunk in chunks:
|
||
content = chunk["content"]
|
||
chunk_concepts = []
|
||
|
||
for pattern in concept_patterns:
|
||
matches = re.findall(pattern, content)
|
||
chunk_concepts.extend(matches)
|
||
|
||
if chunk_concepts:
|
||
concepts.append({
|
||
"chunk_id": chunk["id"],
|
||
"concepts": list(set(chunk_concepts)), # 去重
|
||
"content_preview": content[:100] + "..." if len(content) > 100 else content
|
||
})
|
||
|
||
return concepts
|
||
|
||
def build_vector_index(self, chunks: List[Dict]) -> bool:
|
||
"""构建向量索引"""
|
||
if not EMBEDDING_AVAILABLE or not self.embedding_model:
|
||
print("✗ 向量化功能不可用,将使用文本匹配")
|
||
return False
|
||
|
||
try:
|
||
# 提取文本内容
|
||
texts = [chunk["content"] for chunk in chunks]
|
||
|
||
# 生成向量
|
||
print("生成文档向量...")
|
||
embeddings = self.embedding_model.encode(texts, show_progress_bar=True)
|
||
|
||
# 构建FAISS索引
|
||
dimension = embeddings.shape[1]
|
||
self.index = faiss.IndexFlatL2(dimension)
|
||
self.index.add(embeddings.astype(np.float32))
|
||
|
||
self.embeddings = embeddings
|
||
print(f"✓ 向量索引构建完成,维度: {dimension}, 文档数: {len(chunks)}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 向量索引构建失败: {e}")
|
||
return False
|
||
|
||
def process_pdf_to_rag(self, pdf_path: str, output_dir: str = "./rag_knowledge") -> Dict:
|
||
"""
|
||
完整处理PDF到RAG知识库
|
||
|
||
Args:
|
||
pdf_path: PDF文件路径
|
||
output_dir: 输出目录
|
||
|
||
Returns:
|
||
处理结果统计
|
||
"""
|
||
print(f"开始处理PDF: {pdf_path}")
|
||
|
||
# 创建输出目录
|
||
os.makedirs(output_dir, exist_ok=True)
|
||
|
||
# 1. 提取文本
|
||
raw_text = self.extract_text_from_pdf(pdf_path)
|
||
print(f"✓ 提取文本长度: {len(raw_text)} 字符")
|
||
|
||
# 2. 清理文本
|
||
clean_text = self.clean_text(raw_text)
|
||
|
||
# 保存清理后的文本
|
||
text_output_path = os.path.join(output_dir, "extracted_text.txt")
|
||
with open(text_output_path, 'w', encoding='utf-8') as f:
|
||
f.write(clean_text)
|
||
print(f"✓ 清理后文本保存至: {text_output_path}")
|
||
|
||
# 3. 语义分块
|
||
chunks = self.chunk_text_by_semantic(clean_text)
|
||
self.chunks = chunks
|
||
print(f"✓ 文档分块完成: {len(chunks)} 个块")
|
||
|
||
# 4. 提取关键概念
|
||
concepts = self.extract_key_concepts(chunks)
|
||
print(f"✓ 提取关键概念: {len(concepts)} 个")
|
||
|
||
# 5. 构建向量索引
|
||
vector_success = self.build_vector_index(chunks)
|
||
|
||
# 6. 保存知识库文件
|
||
knowledge_base = {
|
||
"metadata": {
|
||
"source_file": os.path.basename(pdf_path),
|
||
"processed_time": datetime.now().isoformat(),
|
||
"total_chunks": len(chunks),
|
||
"total_concepts": len(concepts),
|
||
"vector_enabled": vector_success
|
||
},
|
||
"chunks": chunks,
|
||
"concepts": concepts
|
||
}
|
||
|
||
# 保存JSON知识库
|
||
kb_output_path = os.path.join(output_dir, "knowledge_base.json")
|
||
with open(kb_output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(knowledge_base, f, ensure_ascii=False, indent=2)
|
||
print(f"✓ 知识库保存至: {kb_output_path}")
|
||
|
||
# 保存向量索引
|
||
if vector_success:
|
||
index_path = os.path.join(output_dir, "vector_index.faiss")
|
||
faiss.write_index(self.index, index_path)
|
||
|
||
embeddings_path = os.path.join(output_dir, "embeddings.npy")
|
||
np.save(embeddings_path, self.embeddings)
|
||
print(f"✓ 向量索引保存至: {index_path}")
|
||
|
||
return {
|
||
"status": "success",
|
||
"chunks_count": len(chunks),
|
||
"concepts_count": len(concepts),
|
||
"vector_enabled": vector_success,
|
||
"output_dir": output_dir
|
||
}
|
||
|
||
def load_knowledge_base(self, knowledge_dir: str) -> bool:
|
||
"""加载已有知识库"""
|
||
try:
|
||
# 加载JSON知识库
|
||
kb_path = os.path.join(knowledge_dir, "knowledge_base.json")
|
||
with open(kb_path, 'r', encoding='utf-8') as f:
|
||
knowledge_base = json.load(f)
|
||
|
||
self.chunks = knowledge_base["chunks"]
|
||
|
||
# 加载向量索引
|
||
if EMBEDDING_AVAILABLE:
|
||
index_path = os.path.join(knowledge_dir, "vector_index.faiss")
|
||
embeddings_path = os.path.join(knowledge_dir, "embeddings.npy")
|
||
|
||
if os.path.exists(index_path) and os.path.exists(embeddings_path):
|
||
self.index = faiss.read_index(index_path)
|
||
self.embeddings = np.load(embeddings_path)
|
||
print(f"✓ 向量索引加载成功")
|
||
|
||
print(f"✓ 知识库加载成功: {len(self.chunks)} 个文档块")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 知识库加载失败: {e}")
|
||
return False
|
||
|
||
def search_relevant_content(self, query: str, top_k: int = 3) -> List[Dict]:
|
||
"""搜索相关内容"""
|
||
if not self.chunks:
|
||
return []
|
||
|
||
# 向量搜索
|
||
if EMBEDDING_AVAILABLE and self.embedding_model and self.index:
|
||
try:
|
||
query_vector = self.embedding_model.encode([query])
|
||
distances, indices = self.index.search(query_vector.astype(np.float32), top_k)
|
||
|
||
results = []
|
||
for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
|
||
if idx < len(self.chunks):
|
||
result = self.chunks[idx].copy()
|
||
result["relevance_score"] = float(1 / (1 + distance)) # 转换为相似度分数
|
||
result["rank"] = i + 1
|
||
results.append(result)
|
||
|
||
return results
|
||
|
||
except Exception as e:
|
||
print(f"向量搜索失败: {e}, 使用文本匹配")
|
||
|
||
# 文本匹配搜索
|
||
query_lower = query.lower()
|
||
scored_chunks = []
|
||
|
||
for chunk in self.chunks:
|
||
content_lower = chunk["content"].lower()
|
||
|
||
# 简单的相关性评分
|
||
score = 0
|
||
query_words = query_lower.split()
|
||
|
||
for word in query_words:
|
||
if word in content_lower:
|
||
score += content_lower.count(word)
|
||
|
||
if score > 0:
|
||
result = chunk.copy()
|
||
result["relevance_score"] = score
|
||
scored_chunks.append(result)
|
||
|
||
# 按分数排序
|
||
scored_chunks.sort(key=lambda x: x["relevance_score"], reverse=True)
|
||
|
||
# 添加排名
|
||
for i, chunk in enumerate(scored_chunks[:top_k]):
|
||
chunk["rank"] = i + 1
|
||
|
||
return scored_chunks[:top_k]
|
||
|
||
def main():
|
||
"""测试PDF处理功能"""
|
||
processor = PDFToRAGProcessor()
|
||
|
||
# 示例:处理COC规则书
|
||
pdf_path = input("请输入PDF文件路径 (如: ./coc.pdf): ").strip()
|
||
|
||
if not os.path.exists(pdf_path):
|
||
print(f"文件不存在: {pdf_path}")
|
||
return
|
||
|
||
try:
|
||
result = processor.process_pdf_to_rag(pdf_path)
|
||
|
||
print(f"\n{'='*50}")
|
||
print(f"PDF处理完成!")
|
||
print(f"状态: {result['status']}")
|
||
print(f"文档块数量: {result['chunks_count']}")
|
||
print(f"关键概念数量: {result['concepts_count']}")
|
||
print(f"向量索引: {'启用' if result['vector_enabled'] else '未启用'}")
|
||
print(f"输出目录: {result['output_dir']}")
|
||
|
||
# 测试搜索
|
||
print(f"\n{'='*50}")
|
||
print("测试知识库搜索:")
|
||
|
||
while True:
|
||
query = input("\n请输入搜索关键词 (输入'quit'退出): ").strip()
|
||
if query.lower() == 'quit':
|
||
break
|
||
|
||
results = processor.search_relevant_content(query, top_k=3)
|
||
|
||
if results:
|
||
print(f"\n找到 {len(results)} 个相关结果:")
|
||
for result in results:
|
||
print(f"\n排名 {result['rank']} (相关度: {result['relevance_score']:.3f}):")
|
||
content = result['content']
|
||
preview = content[:200] + "..." if len(content) > 200 else content
|
||
print(f"{preview}")
|
||
print("-" * 40)
|
||
else:
|
||
print("未找到相关内容")
|
||
|
||
except Exception as e:
|
||
print(f"处理失败: {e}")
|
||
|
||
if __name__ == '__main__':
|
||
main() |