474 lines
18 KiB
Python
474 lines
18 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
'''
|
||
游戏NPC角色对话生成器
|
||
基于微调后的LoRA模型生成角色对话
|
||
支持双模型对话系统,每个模型扮演一个角色
|
||
'''
|
||
|
||
import torch
|
||
import json
|
||
import random
|
||
from peft import PeftModel
|
||
from transformers import AutoModelForCausalLM, AutoTokenizer
|
||
from typing import Dict, List, Optional, Tuple
|
||
import platform
|
||
import os
|
||
|
||
# Windows multiprocessing兼容性修复
|
||
if platform.system() == "Windows":
|
||
import multiprocessing
|
||
multiprocessing.set_start_method('spawn', force=True)
|
||
|
||
class NPCDialogueGenerator:
|
||
def __init__(self, base_model_path: str, lora_model_path: Optional[str] = None, external_character_data: Optional[Dict] = None):
|
||
"""
|
||
初始化NPC对话生成器
|
||
|
||
Args:
|
||
base_model_path: 基础模型路径
|
||
lora_model_path: LoRA模型路径(可选)
|
||
external_character_data: 外部角色数据(可选,优先使用)
|
||
"""
|
||
self.base_model_path = base_model_path
|
||
self.lora_model_path = lora_model_path
|
||
self.model = None
|
||
self.tokenizer = None
|
||
|
||
# 优先使用外部角色数据,如果没有则使用内置数据
|
||
if external_character_data:
|
||
self.character_profiles = self._process_external_character_data(external_character_data)
|
||
print(f"✓ 使用外部角色数据: {list(self.character_profiles.keys())}")
|
||
|
||
self._load_model()
|
||
|
||
def _process_external_character_data(self, external_data: Dict) -> Dict:
|
||
"""
|
||
处理外部角色数据,转换为对话生成器可用的格式
|
||
|
||
Args:
|
||
external_data: 来自knowledge_base的角色数据
|
||
|
||
Returns:
|
||
处理后的角色数据字典
|
||
"""
|
||
processed_profiles = {}
|
||
|
||
for char_name, char_data in external_data.items():
|
||
# 提取基本信息
|
||
basic_info = char_data.get('basic_info', {})
|
||
personality = char_data.get('personality', {})
|
||
background = char_data.get('background', {})
|
||
skills = char_data.get('skills_and_abilities', {})
|
||
speech_patterns = char_data.get('speech_patterns', {})
|
||
|
||
# 构建角色画像
|
||
profile = {
|
||
"name": char_data.get('character_name', char_name),
|
||
"title": basic_info.get('occupation', '未知'),
|
||
"personality": personality.get('core_traits', []) + personality.get('strengths', []),
|
||
"background": background.get('childhood', '') + ' ' + background.get('education', ''),
|
||
"speech_patterns": speech_patterns.get('vocabulary', []) + speech_patterns.get('tone', []),
|
||
"sample_dialogues": self._generate_sample_dialogues(char_data),
|
||
# 保存完整数据供高级功能使用
|
||
"full_data": char_data
|
||
}
|
||
|
||
processed_profiles[char_name] = profile
|
||
|
||
return processed_profiles
|
||
|
||
def _generate_sample_dialogues(self, char_data: Dict) -> List[str]:
|
||
"""
|
||
基于角色数据生成示例对话
|
||
|
||
Args:
|
||
char_data: 角色数据
|
||
|
||
Returns:
|
||
示例对话列表
|
||
"""
|
||
# 这里可以根据角色的性格、背景等生成更合适的示例对话
|
||
# 暂时返回一些通用的示例
|
||
basic_info = char_data.get('basic_info', {})
|
||
occupation = basic_info.get('occupation', '角色')
|
||
|
||
if '侦探' in occupation or '调查员' in occupation:
|
||
return [
|
||
"我需要仔细分析这个案件。",
|
||
"每个细节都可能很重要。",
|
||
"让我重新梳理一下线索。"
|
||
]
|
||
elif '教授' in occupation or '博士' in occupation:
|
||
return [
|
||
"根据我的研究,这个现象很特殊。",
|
||
"我们需要更谨慎地处理这个问题。",
|
||
"知识就是力量,但也要小心使用。"
|
||
]
|
||
else:
|
||
return [
|
||
"我遇到了一些困难。",
|
||
"请帮帮我。",
|
||
"这太奇怪了。"
|
||
]
|
||
|
||
def _load_model(self):
|
||
"""加载模型和分词器"""
|
||
print(f"Loading tokenizer from: {self.base_model_path}")
|
||
self.tokenizer = AutoTokenizer.from_pretrained(
|
||
self.base_model_path,
|
||
use_fast=False,
|
||
trust_remote_code=True
|
||
)
|
||
|
||
if self.tokenizer.pad_token is None:
|
||
self.tokenizer.pad_token = self.tokenizer.eos_token
|
||
|
||
print(f"Loading base model from: {self.base_model_path}")
|
||
self.model = AutoModelForCausalLM.from_pretrained(
|
||
self.base_model_path,
|
||
device_map="auto",
|
||
torch_dtype=torch.bfloat16,
|
||
trust_remote_code=True
|
||
)
|
||
|
||
# 如果有LoRA模型,则加载
|
||
# if self.lora_model_path:
|
||
# print(f"Loading LoRA weights from: {self.lora_model_path}")
|
||
# self.model = PeftModel.from_pretrained(self.model, self.lora_model_path)
|
||
|
||
def generate_character_dialogue(
|
||
self,
|
||
character_name: str,
|
||
context: str = "",
|
||
temperature: float = 0.8,
|
||
max_new_tokens: int = 150,
|
||
top_p: float = 0.9,
|
||
dialogue_history: List[Dict] = None,
|
||
history_context_count: int = 3
|
||
) -> str:
|
||
"""
|
||
生成指定角色的对话
|
||
|
||
Args:
|
||
character_name: 角色名称
|
||
context: 对话上下文
|
||
user_input: 用户输入/触发内容
|
||
temperature: 采样温度
|
||
max_new_tokens: 最大生成token数
|
||
top_p: 核采样参数
|
||
dialogue_history: 对话历史记录列表,每个元素包含speaker和content
|
||
history_context_count: 使用的历史对话轮数(默认3轮)
|
||
|
||
Returns:
|
||
生成的对话内容
|
||
"""
|
||
if character_name not in self.character_profiles:
|
||
raise ValueError(f"Unknown character: {character_name}")
|
||
|
||
profile = self.character_profiles[character_name]
|
||
|
||
# 构建系统提示,包含历史对话数据
|
||
system_prompt = self._build_system_prompt(profile, context, dialogue_history, history_context_count)
|
||
|
||
# 构建用户输入
|
||
user_input = "请说一段符合你角色设定的话,保持对话的连贯性。"
|
||
|
||
|
||
# 准备消息
|
||
messages = [
|
||
{"role": "system", "content": system_prompt},
|
||
{"role": "user", "content": user_input}
|
||
]
|
||
|
||
# 应用对话模板
|
||
inputs = self.tokenizer.apply_chat_template(
|
||
messages,
|
||
add_generation_prompt=True,
|
||
tokenize=True,
|
||
return_tensors="pt",
|
||
return_dict=True,
|
||
enable_thinking=False
|
||
)
|
||
|
||
# 移动到设备
|
||
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
|
||
# 计算input token数并与模型最大token数比较
|
||
input_token_count = inputs['input_ids'].shape[1]
|
||
try:
|
||
max_model_tokens = self.model.config.max_position_embeddings
|
||
except AttributeError:
|
||
max_model_tokens = 2048
|
||
|
||
if input_token_count + max_new_tokens > max_model_tokens:
|
||
print(f"警告:当前输入token数({input_token_count})加上最大生成token数({max_new_tokens})超过模型最大token数({max_model_tokens}),可能导致生成结果不完整或报错。")
|
||
|
||
|
||
# 生成对话
|
||
with torch.no_grad():
|
||
outputs = self.model.generate(
|
||
**inputs,
|
||
max_new_tokens=max_new_tokens,
|
||
do_sample=True,
|
||
temperature=0.95,
|
||
top_p=0.92,
|
||
pad_token_id=self.tokenizer.eos_token_id,
|
||
repetition_penalty=1.15
|
||
)
|
||
|
||
# 解码输出
|
||
response = outputs[0][inputs['input_ids'].shape[1]:]
|
||
dialogue = self.tokenizer.decode(response, skip_special_tokens=True).strip()
|
||
|
||
return dialogue
|
||
|
||
def _build_system_prompt(self, profile: Dict, context: str = "", dialogue_history: List[Dict] = None, history_context_count: int = 3) -> str:
|
||
"""构建系统提示
|
||
|
||
Args:
|
||
profile: 角色配置信息
|
||
context: 当前情境
|
||
dialogue_history: 对话历史记录列表,每个元素包含speaker和content
|
||
history_context_count: 使用的历史对话轮数(默认3轮)
|
||
"""
|
||
personality_str = "、".join(profile["personality"])
|
||
speech_pattern_str = ";".join(profile["speech_patterns"])
|
||
|
||
system_prompt = f"""你是游戏中的NPC角色{profile["name"]}({profile["title"]})。
|
||
角色背景:{profile["background"]}
|
||
性格特点:{personality_str}
|
||
说话风格:{speech_pattern_str}
|
||
请严格按照这个角色的设定来回应,保持角色的一致性和独特性。"""
|
||
|
||
# 添加当前情境
|
||
if context:
|
||
system_prompt += f"\n\n当前情境:{context}"
|
||
|
||
# 添加历史对话数据(参考generate_character_prompt的实现)
|
||
if dialogue_history:
|
||
system_prompt += "\n\n最近的对话:"
|
||
# 使用参数控制历史对话轮数
|
||
history_to_use = dialogue_history[-history_context_count:] if history_context_count > 0 else []
|
||
for turn in history_to_use:
|
||
system_prompt += f"{turn.speaker}: {turn.content}"
|
||
# speaker = turn.get('speaker', '未知')
|
||
# content = turn.get('content', '')
|
||
# if content:
|
||
# system_prompt += f"\n{speaker}: {content}"
|
||
|
||
return system_prompt
|
||
|
||
def generate_dialogue_conversation(self, character1: str, character2: str, topic: str, turns: int = 4) -> List[Dict]:
|
||
"""生成两个角色之间的对话
|
||
|
||
Args:
|
||
character1: 第一个角色
|
||
character2: 第二个角色
|
||
topic: 对话主题
|
||
turns: 对话轮数
|
||
|
||
Returns:
|
||
对话列表,每个元素包含speaker和dialogue
|
||
"""
|
||
conversation = []
|
||
context = f"现在{character1}和{character2}在讨论关于{topic}的话题。"
|
||
|
||
for turn in range(turns):
|
||
if turn % 2 == 0:
|
||
# character1 说话
|
||
speaker = character1
|
||
if turn == 0:
|
||
user_input = f"开始和{character2}讨论{topic}这个话题。"
|
||
else:
|
||
# 基于上一轮对话内容
|
||
last_dialogue = conversation[-1]["dialogue"]
|
||
user_input = f"{character2}刚才说:\"{last_dialogue}\"。请回应。"
|
||
else:
|
||
# character2 说话
|
||
speaker = character2
|
||
last_dialogue = conversation[-1]["dialogue"]
|
||
user_input = f"{character1}刚才说:\"{last_dialogue}\"。请回应。"
|
||
|
||
dialogue = self.generate_character_dialogue(
|
||
speaker, context, user_input, temperature=0.8
|
||
)
|
||
|
||
conversation.append({
|
||
"speaker": speaker,
|
||
"dialogue": dialogue
|
||
})
|
||
|
||
return conversation
|
||
|
||
def get_character_info(self, character_name: str) -> Dict:
|
||
"""获取角色信息"""
|
||
return self.character_profiles.get(character_name, {})
|
||
|
||
def list_available_characters(self) -> List[str]:
|
||
"""列出所有可用角色"""
|
||
return list(self.character_profiles.keys())
|
||
|
||
class DualModelDialogueGenerator:
|
||
"""双模型对话生成器 - 每个模型扮演一个角色"""
|
||
|
||
def __init__(self,
|
||
base_model_path: str,
|
||
character1_config: Dict,
|
||
character2_config: Dict,
|
||
lora_model_path: Optional[str] = None):
|
||
"""
|
||
初始化双模型对话生成器
|
||
|
||
Args:
|
||
base_model_path: 基础模型路径
|
||
character1_config: 角色1配置 {"name": "角色名", "lora_path": "LoRA路径", "character_data": 角色数据}
|
||
character2_config: 角色2配置 {"name": "角色名", "lora_path": "LoRA路径", "character_data": 角色数据}
|
||
lora_model_path: 通用LoRA模型路径(可选)
|
||
"""
|
||
self.base_model_path = base_model_path
|
||
self.character1_config = character1_config
|
||
self.character2_config = character2_config
|
||
|
||
# 为每个角色创建独立的模型实例
|
||
self.character1_generator = None
|
||
self.character2_generator = None
|
||
|
||
self._initialize_character_models()
|
||
|
||
def _initialize_character_models(self):
|
||
"""初始化两个角色的模型"""
|
||
print("=== 初始化双模型对话系统 ===")
|
||
|
||
# 初始化角色1的模型
|
||
print(f"\n初始化角色1: {self.character1_config['name']}")
|
||
char1_lora_path = self.character1_config.get('lora_path') or self.character1_config.get('lora_model_path')
|
||
self.character1_generator = NPCDialogueGenerator(
|
||
self.base_model_path,
|
||
char1_lora_path,
|
||
{self.character1_config['name']: self.character1_config['character_data']}
|
||
)
|
||
|
||
# 初始化角色2的模型
|
||
print(f"\n初始化角色2: {self.character2_config['name']}")
|
||
char2_lora_path = self.character2_config.get('lora_path') or self.character2_config.get('lora_model_path')
|
||
self.character2_generator = NPCDialogueGenerator(
|
||
self.base_model_path,
|
||
char2_lora_path,
|
||
{self.character2_config['name']: self.character2_config['character_data']}
|
||
)
|
||
|
||
print("✓ 双模型对话系统初始化完成")
|
||
|
||
def generate_dual_character_dialogue(self,
|
||
character_name: str,
|
||
context: str = "",
|
||
temperature: float = 0.8,
|
||
max_new_tokens: int = 150,
|
||
dialogue_history: str = "",
|
||
history_context_count: int = 3) -> str:
|
||
"""
|
||
生成指定角色的对话(使用对应的模型)
|
||
|
||
Args:
|
||
character_name: 角色名称
|
||
context: 对话上下文
|
||
user_input: 用户输入
|
||
temperature: 采样温度
|
||
max_new_tokens: 最大生成token数
|
||
|
||
Returns:
|
||
生成的对话内容
|
||
"""
|
||
if character_name == self.character1_config['name']:
|
||
return self.character1_generator.generate_character_dialogue(
|
||
character_name, context, temperature, max_new_tokens, dialogue_history = dialogue_history, history_context_count=history_context_count,
|
||
)
|
||
elif character_name == self.character2_config['name']:
|
||
return self.character2_generator.generate_character_dialogue(
|
||
character_name, context, temperature, max_new_tokens, dialogue_history = dialogue_history, history_context_count=history_context_count,
|
||
)
|
||
else:
|
||
raise ValueError(f"Unknown character: {character_name}")
|
||
|
||
def run_dual_character_conversation(self,
|
||
topic: str = "",
|
||
turn_index: int = 4,
|
||
context: str = "",
|
||
dialogue_history: str = "",
|
||
history_context_count: int = 3,
|
||
temperature: float = 0.8,
|
||
max_new_tokens: int = 150) -> List[Dict]:
|
||
"""
|
||
运行双角色对话
|
||
|
||
Args:
|
||
topic: 对话主题
|
||
turns: 对话轮数
|
||
context: 额外上下文
|
||
temperature: 采样温度
|
||
max_new_tokens: 最大生成token数
|
||
|
||
Returns:
|
||
对话列表
|
||
"""
|
||
conversation = []
|
||
char1_name = self.character1_config['name']
|
||
char2_name = self.character2_config['name']
|
||
|
||
# 构建完整上下文
|
||
full_context = f"现在{char1_name}和{char2_name}在讨论关于{topic}的话题。{context}"
|
||
|
||
# print(f"\n=== 开始双角色对话 ===")
|
||
# print(f"主题: {topic}")
|
||
# print(f"角色: {char1_name} vs {char2_name}")
|
||
# print("-" * 50)
|
||
|
||
|
||
if turn_index % 2 == 0:
|
||
# 角色1说话
|
||
speaker = char1_name
|
||
# if turn_index == 0:
|
||
# user_input = f"开始和{char2_name}讨论{topic}这个话题。"
|
||
# else:
|
||
# last_dialogue = conversation[-1]["dialogue"]
|
||
# user_input = f"{char2_name}刚才说:\"{last_dialogue}\"。请回应。"
|
||
else:
|
||
# 角色2说话
|
||
speaker = char2_name
|
||
# last_dialogue = conversation[-1]["dialogue"]
|
||
# user_input = f"{char1_name}刚才说:\"{last_dialogue}\"。请回应。"
|
||
|
||
print(f"\n[第{turn_index+1}轮] {speaker}正在思考...")
|
||
|
||
# 使用对应角色的模型生成对话
|
||
dialogue = self.generate_dual_character_dialogue(
|
||
speaker, full_context, temperature, max_new_tokens, dialogue_history, history_context_count
|
||
)
|
||
|
||
conversation.append({
|
||
"turn": turn_index + 1,
|
||
"speaker": speaker,
|
||
"dialogue": dialogue,
|
||
"context_used": full_context[:100] + "..." if len(full_context) > 100 else full_context
|
||
})
|
||
|
||
print(f"{speaker}: {dialogue}")
|
||
|
||
|
||
|
||
return conversation
|
||
|
||
def get_character_info(self, character_name: str) -> Dict:
|
||
"""获取角色信息"""
|
||
if character_name == self.character1_config['name']:
|
||
return self.character1_generator.get_character_info(character_name)
|
||
elif character_name == self.character2_config['name']:
|
||
return self.character2_generator.get_character_info(character_name)
|
||
else:
|
||
return {}
|
||
|
||
def list_characters(self) -> List[str]:
|
||
"""列出两个角色名称"""
|
||
return [self.character1_config['name'], self.character2_config['name']]
|
||
|