Project02/AITrain/pdf_to_rag_processor.py

431 lines
15 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
PDF文档转RAG知识库处理模块
支持将世界观文档(如COC.pdf)转换为RAG检索格式
'''
import json
import os
import re
from typing import List, Dict, Tuple
import hashlib
from datetime import datetime
try:
import PyPDF2
import fitz # pymupdf
PYMUPDF_AVAILABLE = True
except ImportError:
PYMUPDF_AVAILABLE = False
print("Warning: pymupdf not available, using PyPDF2 only")
try:
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
EMBEDDING_AVAILABLE = True
except ImportError:
EMBEDDING_AVAILABLE = False
print("Warning: sentence-transformers or faiss not available, using simple text matching")
class PDFToRAGProcessor:
def __init__(self, embedding_model: str = "./sentence-transformers/all-MiniLM-L6-v2"):
"""
初始化PDF处理器
Args:
embedding_model: 向量化模型名称
"""
global EMBEDDING_AVAILABLE
self.chunks = []
self.embeddings = None
self.index = None
if EMBEDDING_AVAILABLE:
try:
self.embedding_model = SentenceTransformer(embedding_model)
print(f"✓ 向量模型加载成功: {embedding_model}")
except Exception as e:
print(f"✗ 向量模型加载失败: {e}")
self.embedding_model = None
EMBEDDING_AVAILABLE = False
else:
self.embedding_model = None
def extract_text_from_pdf(self, pdf_path: str) -> str:
"""从PDF提取文本"""
if not os.path.exists(pdf_path):
raise FileNotFoundError(f"PDF文件不存在: {pdf_path}")
text = ""
# 优先使用pymupdf效果更好
if PYMUPDF_AVAILABLE:
try:
doc = fitz.open(pdf_path)
for page_num in range(len(doc)):
page = doc.load_page(page_num)
text += page.get_text()
doc.close()
print(f"✓ 使用pymupdf提取文本成功")
return text
except Exception as e:
print(f"✗ pymupdf提取失败: {e}, 尝试PyPDF2")
# 备用PyPDF2
try:
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page in pdf_reader.pages:
text += page.extract_text()
print(f"✓ 使用PyPDF2提取文本成功")
except Exception as e:
raise Exception(f"PDF文本提取失败: {e}")
return text
def clean_text(self, text: str) -> str:
"""清理文本"""
# 移除多余空行
text = re.sub(r'\n\s*\n', '\n\n', text)
# 移除页码等
text = re.sub(r'\s*\d+\s*页', '', text)
text = re.sub(r'Page\s*\d+', '', text)
# 统一标点符号
text = text.replace(' ', ' ') # 全角空格转半角
text = re.sub(r'\s+', ' ', text) # 多个空格合并
return text.strip()
def chunk_text_by_semantic(self, text: str, max_chunk_size: int = 500) -> List[Dict]:
"""按语义分块文本"""
# 按段落分割
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_chunk = ""
current_size = 0
for para in paragraphs:
para_size = len(para)
# 如果当前段落太长,需要进一步分割
if para_size > max_chunk_size:
# 保存当前chunk
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"size": current_size,
"type": "paragraph"
})
current_chunk = ""
current_size = 0
# 按句子分割长段落
sentences = re.split(r'[。!?;]\s*', para)
for sentence in sentences:
if sentence.strip():
sentence += '' # 添加标点
if current_size + len(sentence) > max_chunk_size and current_chunk:
chunks.append({
"content": current_chunk.strip(),
"size": current_size,
"type": "sentence"
})
current_chunk = sentence
current_size = len(sentence)
else:
current_chunk += sentence
current_size += len(sentence)
else:
# 正常段落处理
if current_size + para_size > max_chunk_size and current_chunk:
chunks.append({
"content": current_chunk.strip(),
"size": current_size,
"type": "paragraph"
})
current_chunk = para
current_size = para_size
else:
current_chunk += "\n" + para if current_chunk else para
current_size += para_size
# 保存最后的chunk
if current_chunk:
chunks.append({
"content": current_chunk.strip(),
"size": current_size,
"type": "paragraph"
})
# 为每个chunk添加唯一ID和元数据
for i, chunk in enumerate(chunks):
chunk["id"] = f"chunk_{i:04d}"
chunk["hash"] = hashlib.md5(chunk["content"].encode()).hexdigest()[:8]
return chunks
def extract_key_concepts(self, chunks: List[Dict]) -> List[Dict]:
"""提取关键概念和术语"""
concepts = []
# 简单的关键词提取规则
concept_patterns = [
r'(?:技能|属性|规则)[:]\s*([^。\n]+)',
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)', # 英文专有名词
r'【([^】]+)】', # 中文术语标记
r'"([^"]+)"', # 引号内容
]
for chunk in chunks:
content = chunk["content"]
chunk_concepts = []
for pattern in concept_patterns:
matches = re.findall(pattern, content)
chunk_concepts.extend(matches)
if chunk_concepts:
concepts.append({
"chunk_id": chunk["id"],
"concepts": list(set(chunk_concepts)), # 去重
"content_preview": content[:100] + "..." if len(content) > 100 else content
})
return concepts
def build_vector_index(self, chunks: List[Dict]) -> bool:
"""构建向量索引"""
if not EMBEDDING_AVAILABLE or not self.embedding_model:
print("✗ 向量化功能不可用,将使用文本匹配")
return False
try:
# 提取文本内容
texts = [chunk["content"] for chunk in chunks]
# 生成向量
print("生成文档向量...")
embeddings = self.embedding_model.encode(texts, show_progress_bar=True)
# 构建FAISS索引
dimension = embeddings.shape[1]
self.index = faiss.IndexFlatL2(dimension)
self.index.add(embeddings.astype(np.float32))
self.embeddings = embeddings
print(f"✓ 向量索引构建完成,维度: {dimension}, 文档数: {len(chunks)}")
return True
except Exception as e:
print(f"✗ 向量索引构建失败: {e}")
return False
def process_pdf_to_rag(self, pdf_path: str, output_dir: str = "./rag_knowledge") -> Dict:
"""
完整处理PDF到RAG知识库
Args:
pdf_path: PDF文件路径
output_dir: 输出目录
Returns:
处理结果统计
"""
print(f"开始处理PDF: {pdf_path}")
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 1. 提取文本
raw_text = self.extract_text_from_pdf(pdf_path)
print(f"✓ 提取文本长度: {len(raw_text)} 字符")
# 2. 清理文本
clean_text = self.clean_text(raw_text)
# 保存清理后的文本
text_output_path = os.path.join(output_dir, "extracted_text.txt")
with open(text_output_path, 'w', encoding='utf-8') as f:
f.write(clean_text)
print(f"✓ 清理后文本保存至: {text_output_path}")
# 3. 语义分块
chunks = self.chunk_text_by_semantic(clean_text)
self.chunks = chunks
print(f"✓ 文档分块完成: {len(chunks)} 个块")
# 4. 提取关键概念
concepts = self.extract_key_concepts(chunks)
print(f"✓ 提取关键概念: {len(concepts)}")
# 5. 构建向量索引
vector_success = self.build_vector_index(chunks)
# 6. 保存知识库文件
knowledge_base = {
"metadata": {
"source_file": os.path.basename(pdf_path),
"processed_time": datetime.now().isoformat(),
"total_chunks": len(chunks),
"total_concepts": len(concepts),
"vector_enabled": vector_success
},
"chunks": chunks,
"concepts": concepts
}
# 保存JSON知识库
kb_output_path = os.path.join(output_dir, "knowledge_base.json")
with open(kb_output_path, 'w', encoding='utf-8') as f:
json.dump(knowledge_base, f, ensure_ascii=False, indent=2)
print(f"✓ 知识库保存至: {kb_output_path}")
# 保存向量索引
if vector_success:
index_path = os.path.join(output_dir, "vector_index.faiss")
faiss.write_index(self.index, index_path)
embeddings_path = os.path.join(output_dir, "embeddings.npy")
np.save(embeddings_path, self.embeddings)
print(f"✓ 向量索引保存至: {index_path}")
return {
"status": "success",
"chunks_count": len(chunks),
"concepts_count": len(concepts),
"vector_enabled": vector_success,
"output_dir": output_dir
}
def load_knowledge_base(self, knowledge_dir: str) -> bool:
"""加载已有知识库"""
try:
# 加载JSON知识库
kb_path = os.path.join(knowledge_dir, "knowledge_base.json")
with open(kb_path, 'r', encoding='utf-8') as f:
knowledge_base = json.load(f)
self.chunks = knowledge_base["chunks"]
# 加载向量索引
if EMBEDDING_AVAILABLE:
index_path = os.path.join(knowledge_dir, "vector_index.faiss")
embeddings_path = os.path.join(knowledge_dir, "embeddings.npy")
if os.path.exists(index_path) and os.path.exists(embeddings_path):
self.index = faiss.read_index(index_path)
self.embeddings = np.load(embeddings_path)
print(f"✓ 向量索引加载成功")
print(f"✓ 知识库加载成功: {len(self.chunks)} 个文档块")
return True
except Exception as e:
print(f"✗ 知识库加载失败: {e}")
return False
def search_relevant_content(self, query: str, top_k: int = 3) -> List[Dict]:
"""搜索相关内容"""
if not self.chunks:
return []
# 向量搜索
if EMBEDDING_AVAILABLE and self.embedding_model and self.index:
try:
query_vector = self.embedding_model.encode([query])
distances, indices = self.index.search(query_vector.astype(np.float32), top_k)
results = []
for i, (distance, idx) in enumerate(zip(distances[0], indices[0])):
if idx < len(self.chunks):
result = self.chunks[idx].copy()
result["relevance_score"] = float(1 / (1 + distance)) # 转换为相似度分数
result["rank"] = i + 1
results.append(result)
return results
except Exception as e:
print(f"向量搜索失败: {e}, 使用文本匹配")
# 文本匹配搜索
query_lower = query.lower()
scored_chunks = []
for chunk in self.chunks:
content_lower = chunk["content"].lower()
# 简单的相关性评分
score = 0
query_words = query_lower.split()
for word in query_words:
if word in content_lower:
score += content_lower.count(word)
if score > 0:
result = chunk.copy()
result["relevance_score"] = score
scored_chunks.append(result)
# 按分数排序
scored_chunks.sort(key=lambda x: x["relevance_score"], reverse=True)
# 添加排名
for i, chunk in enumerate(scored_chunks[:top_k]):
chunk["rank"] = i + 1
return scored_chunks[:top_k]
def main():
"""测试PDF处理功能"""
processor = PDFToRAGProcessor()
# 示例处理COC规则书
pdf_path = input("请输入PDF文件路径 (如: ./coc.pdf): ").strip()
if not os.path.exists(pdf_path):
print(f"文件不存在: {pdf_path}")
return
try:
result = processor.process_pdf_to_rag(pdf_path)
print(f"\n{'='*50}")
print(f"PDF处理完成!")
print(f"状态: {result['status']}")
print(f"文档块数量: {result['chunks_count']}")
print(f"关键概念数量: {result['concepts_count']}")
print(f"向量索引: {'启用' if result['vector_enabled'] else '未启用'}")
print(f"输出目录: {result['output_dir']}")
# 测试搜索
print(f"\n{'='*50}")
print("测试知识库搜索:")
while True:
query = input("\n请输入搜索关键词 (输入'quit'退出): ").strip()
if query.lower() == 'quit':
break
results = processor.search_relevant_content(query, top_k=3)
if results:
print(f"\n找到 {len(results)} 个相关结果:")
for result in results:
print(f"\n排名 {result['rank']} (相关度: {result['relevance_score']:.3f}):")
content = result['content']
preview = content[:200] + "..." if len(content) > 200 else content
print(f"{preview}")
print("-" * 40)
else:
print("未找到相关内容")
except Exception as e:
print(f"处理失败: {e}")
if __name__ == '__main__':
main()