Project02/AITrain/npc_dialogue_generator.py

571 lines
22 KiB
Python
Raw Normal View History

2025-08-14 07:17:50 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
游戏NPC角色对话生成器
基于微调后的LoRA模型生成角色对话
2025-08-15 17:58:11 +08:00
支持双模型对话系统每个模型扮演一个角色
2025-08-14 07:17:50 +08:00
'''
import torch
import json
import random
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer
2025-08-15 17:58:11 +08:00
from typing import Dict, List, Optional, Tuple
2025-08-14 07:17:50 +08:00
import platform
2025-08-15 17:58:11 +08:00
import os
2025-08-14 07:17:50 +08:00
# Windows multiprocessing兼容性修复
if platform.system() == "Windows":
import multiprocessing
multiprocessing.set_start_method('spawn', force=True)
class NPCDialogueGenerator:
2025-08-15 14:42:13 +08:00
def __init__(self, base_model_path: str, lora_model_path: Optional[str] = None, external_character_data: Optional[Dict] = None):
2025-08-14 07:17:50 +08:00
"""
初始化NPC对话生成器
Args:
base_model_path: 基础模型路径
lora_model_path: LoRA模型路径可选
2025-08-15 14:42:13 +08:00
external_character_data: 外部角色数据可选优先使用
2025-08-14 07:17:50 +08:00
"""
self.base_model_path = base_model_path
self.lora_model_path = lora_model_path
self.model = None
self.tokenizer = None
2025-08-15 14:42:13 +08:00
# 优先使用外部角色数据,如果没有则使用内置数据
if external_character_data:
self.character_profiles = self._process_external_character_data(external_character_data)
print(f"✓ 使用外部角色数据: {list(self.character_profiles.keys())}")
2025-08-14 07:17:50 +08:00
self._load_model()
2025-08-15 14:42:13 +08:00
def _process_external_character_data(self, external_data: Dict) -> Dict:
"""
处理外部角色数据转换为对话生成器可用的格式
Args:
external_data: 来自knowledge_base的角色数据
Returns:
处理后的角色数据字典
"""
processed_profiles = {}
for char_name, char_data in external_data.items():
# 提取基本信息
basic_info = char_data.get('basic_info', {})
personality = char_data.get('personality', {})
background = char_data.get('background', {})
skills = char_data.get('skills_and_abilities', {})
speech_patterns = char_data.get('speech_patterns', {})
# 构建角色画像
profile = {
"name": char_data.get('character_name', char_name),
"title": basic_info.get('occupation', '未知'),
"personality": personality.get('core_traits', []) + personality.get('strengths', []),
"background": background.get('childhood', '') + ' ' + background.get('education', ''),
"speech_patterns": speech_patterns.get('vocabulary', []) + speech_patterns.get('tone', []),
"sample_dialogues": self._generate_sample_dialogues(char_data),
# 保存完整数据供高级功能使用
"full_data": char_data
}
processed_profiles[char_name] = profile
return processed_profiles
def _generate_sample_dialogues(self, char_data: Dict) -> List[str]:
"""
基于角色数据生成示例对话
Args:
char_data: 角色数据
Returns:
示例对话列表
"""
# 这里可以根据角色的性格、背景等生成更合适的示例对话
# 暂时返回一些通用的示例
basic_info = char_data.get('basic_info', {})
occupation = basic_info.get('occupation', '角色')
if '侦探' in occupation or '调查员' in occupation:
return [
"我需要仔细分析这个案件。",
"每个细节都可能很重要。",
"让我重新梳理一下线索。"
]
elif '教授' in occupation or '博士' in occupation:
return [
"根据我的研究,这个现象很特殊。",
"我们需要更谨慎地处理这个问题。",
"知识就是力量,但也要小心使用。"
]
else:
return [
"我遇到了一些困难。",
"请帮帮我。",
"这太奇怪了。"
]
2025-08-14 07:17:50 +08:00
def _load_model(self):
"""加载模型和分词器"""
print(f"Loading tokenizer from: {self.base_model_path}")
self.tokenizer = AutoTokenizer.from_pretrained(
self.base_model_path,
use_fast=False,
trust_remote_code=True
)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
print(f"Loading base model from: {self.base_model_path}")
self.model = AutoModelForCausalLM.from_pretrained(
self.base_model_path,
device_map="auto",
torch_dtype=torch.bfloat16,
trust_remote_code=True
)
# 如果有LoRA模型则加载
if self.lora_model_path:
print(f"Loading LoRA weights from: {self.lora_model_path}")
self.model = PeftModel.from_pretrained(self.model, self.lora_model_path)
2025-08-18 15:08:13 +08:00
2025-08-14 07:17:50 +08:00
def generate_character_dialogue(
self,
character_name: str,
context: str = "",
temperature: float = 0.8,
max_new_tokens: int = 150,
2025-08-18 14:32:55 +08:00
top_p: float = 0.9,
dialogue_history: List[Dict] = None,
history_context_count: int = 3
2025-08-14 07:17:50 +08:00
) -> str:
"""
生成指定角色的对话
Args:
character_name: 角色名称
context: 对话上下文
user_input: 用户输入/触发内容
temperature: 采样温度
max_new_tokens: 最大生成token数
top_p: 核采样参数
2025-08-18 14:32:55 +08:00
dialogue_history: 对话历史记录列表每个元素包含speaker和content
history_context_count: 使用的历史对话轮数默认3轮
2025-08-14 07:17:50 +08:00
Returns:
生成的对话内容
"""
if character_name not in self.character_profiles:
raise ValueError(f"Unknown character: {character_name}")
profile = self.character_profiles[character_name]
2025-08-18 14:32:55 +08:00
# 构建系统提示,包含历史对话数据
system_prompt = self._build_system_prompt(profile, context, dialogue_history, history_context_count)
2025-08-14 07:17:50 +08:00
# 构建用户输入
2025-08-18 14:32:55 +08:00
user_input = "请说一段符合你角色设定的话。"
2025-08-14 07:17:50 +08:00
# 准备消息
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_input}
]
# 应用对话模板
inputs = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
tokenize=True,
return_tensors="pt",
return_dict=True,
enable_thinking=False
)
# 移动到设备
inputs = {k: v.to(self.model.device) for k, v in inputs.items()}
2025-08-18 15:08:13 +08:00
# 计算input token数并与模型最大token数比较
input_token_count = inputs['input_ids'].shape[1]
try:
max_model_tokens = self.model.config.max_position_embeddings
except AttributeError:
max_model_tokens = 2048
if input_token_count + max_new_tokens > max_model_tokens:
print(f"警告当前输入token数({input_token_count})加上最大生成token数({max_new_tokens})超过模型最大token数({max_model_tokens}),可能导致生成结果不完整或报错。")
2025-08-14 07:17:50 +08:00
2025-08-18 15:08:13 +08:00
2025-08-14 07:17:50 +08:00
# 生成对话
with torch.no_grad():
outputs = self.model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=True,
temperature=temperature,
top_p=top_p,
pad_token_id=self.tokenizer.eos_token_id,
repetition_penalty=1.1
)
# 解码输出
response = outputs[0][inputs['input_ids'].shape[1]:]
dialogue = self.tokenizer.decode(response, skip_special_tokens=True).strip()
return dialogue
2025-08-18 14:32:55 +08:00
def _build_system_prompt(self, profile: Dict, context: str = "", dialogue_history: List[Dict] = None, history_context_count: int = 3) -> str:
"""构建系统提示
Args:
profile: 角色配置信息
context: 当前情境
dialogue_history: 对话历史记录列表每个元素包含speaker和content
history_context_count: 使用的历史对话轮数默认3轮
"""
2025-08-14 07:17:50 +08:00
personality_str = "".join(profile["personality"])
speech_pattern_str = "".join(profile["speech_patterns"])
system_prompt = f"""你是游戏中的NPC角色{profile["name"]}{profile["title"]})。
角色背景{profile["background"]}
性格特点{personality_str}
说话风格{speech_pattern_str}
请严格按照这个角色的设定来回应保持角色的一致性和独特性"""
2025-08-18 14:32:55 +08:00
# 添加当前情境
2025-08-14 07:17:50 +08:00
if context:
system_prompt += f"\n\n当前情境:{context}"
2025-08-18 14:32:55 +08:00
# 添加历史对话数据参考generate_character_prompt的实现
if dialogue_history:
system_prompt += "\n\n最近的对话:"
# 使用参数控制历史对话轮数
history_to_use = dialogue_history[-history_context_count:] if history_context_count > 0 else []
for turn in history_to_use:
system_prompt += f"{turn.speaker}: {turn.content}"
# speaker = turn.get('speaker', '未知')
# content = turn.get('content', '')
# if content:
# system_prompt += f"\n{speaker}: {content}"
return system_prompt
2025-08-14 07:17:50 +08:00
def generate_dialogue_conversation(self, character1: str, character2: str, topic: str, turns: int = 4) -> List[Dict]:
"""生成两个角色之间的对话
Args:
character1: 第一个角色
character2: 第二个角色
topic: 对话主题
turns: 对话轮数
Returns:
对话列表每个元素包含speaker和dialogue
"""
conversation = []
context = f"现在{character1}{character2}在讨论关于{topic}的话题。"
for turn in range(turns):
if turn % 2 == 0:
# character1 说话
speaker = character1
if turn == 0:
user_input = f"开始和{character2}讨论{topic}这个话题。"
else:
# 基于上一轮对话内容
last_dialogue = conversation[-1]["dialogue"]
user_input = f"{character2}刚才说:\"{last_dialogue}\"。请回应。"
else:
# character2 说话
speaker = character2
last_dialogue = conversation[-1]["dialogue"]
user_input = f"{character1}刚才说:\"{last_dialogue}\"。请回应。"
dialogue = self.generate_character_dialogue(
speaker, context, user_input, temperature=0.8
)
conversation.append({
"speaker": speaker,
"dialogue": dialogue
})
return conversation
def get_character_info(self, character_name: str) -> Dict:
"""获取角色信息"""
return self.character_profiles.get(character_name, {})
def list_available_characters(self) -> List[str]:
"""列出所有可用角色"""
return list(self.character_profiles.keys())
2025-08-15 17:58:11 +08:00
class DualModelDialogueGenerator:
"""双模型对话生成器 - 每个模型扮演一个角色"""
def __init__(self,
base_model_path: str,
character1_config: Dict,
character2_config: Dict,
lora_model_path: Optional[str] = None):
"""
初始化双模型对话生成器
Args:
base_model_path: 基础模型路径
character1_config: 角色1配置 {"name": "角色名", "lora_path": "LoRA路径", "character_data": 角色数据}
character2_config: 角色2配置 {"name": "角色名", "lora_path": "LoRA路径", "character_data": 角色数据}
lora_model_path: 通用LoRA模型路径可选
"""
self.base_model_path = base_model_path
self.character1_config = character1_config
self.character2_config = character2_config
# 为每个角色创建独立的模型实例
self.character1_generator = None
self.character2_generator = None
2025-08-14 07:17:50 +08:00
2025-08-15 17:58:11 +08:00
self._initialize_character_models()
def _initialize_character_models(self):
"""初始化两个角色的模型"""
print("=== 初始化双模型对话系统 ===")
# 初始化角色1的模型
print(f"\n初始化角色1: {self.character1_config['name']}")
char1_lora_path = self.character1_config.get('lora_path') or self.character1_config.get('lora_model_path')
self.character1_generator = NPCDialogueGenerator(
self.base_model_path,
char1_lora_path,
{self.character1_config['name']: self.character1_config['character_data']}
)
# 初始化角色2的模型
print(f"\n初始化角色2: {self.character2_config['name']}")
char2_lora_path = self.character2_config.get('lora_path') or self.character2_config.get('lora_model_path')
self.character2_generator = NPCDialogueGenerator(
self.base_model_path,
char2_lora_path,
{self.character2_config['name']: self.character2_config['character_data']}
)
print("✓ 双模型对话系统初始化完成")
def generate_dual_character_dialogue(self,
character_name: str,
context: str = "",
temperature: float = 0.8,
2025-08-18 14:32:55 +08:00
max_new_tokens: int = 150,
dialogue_history: str = "",
history_context_count: int = 3) -> str:
2025-08-15 17:58:11 +08:00
"""
生成指定角色的对话使用对应的模型
Args:
character_name: 角色名称
context: 对话上下文
user_input: 用户输入
temperature: 采样温度
max_new_tokens: 最大生成token数
2025-08-14 07:17:50 +08:00
2025-08-15 17:58:11 +08:00
Returns:
生成的对话内容
"""
if character_name == self.character1_config['name']:
return self.character1_generator.generate_character_dialogue(
2025-08-18 14:32:55 +08:00
character_name, context, temperature, max_new_tokens, dialogue_history = dialogue_history, history_context_count=history_context_count,
2025-08-15 17:58:11 +08:00
)
elif character_name == self.character2_config['name']:
return self.character2_generator.generate_character_dialogue(
2025-08-18 14:32:55 +08:00
character_name, context, temperature, max_new_tokens, dialogue_history = dialogue_history, history_context_count=history_context_count,
2025-08-14 07:17:50 +08:00
)
2025-08-15 17:58:11 +08:00
else:
raise ValueError(f"Unknown character: {character_name}")
def run_dual_character_conversation(self,
topic: str = "",
2025-08-18 14:32:55 +08:00
turn_index: int = 4,
2025-08-15 17:58:11 +08:00
context: str = "",
2025-08-18 14:32:55 +08:00
dialogue_history: str = "",
history_context_count: int = 3,
2025-08-15 17:58:11 +08:00
temperature: float = 0.8,
max_new_tokens: int = 150) -> List[Dict]:
"""
运行双角色对话
2025-08-14 07:17:50 +08:00
2025-08-15 17:58:11 +08:00
Args:
topic: 对话主题
turns: 对话轮数
context: 额外上下文
temperature: 采样温度
max_new_tokens: 最大生成token数
Returns:
对话列表
"""
conversation = []
char1_name = self.character1_config['name']
char2_name = self.character2_config['name']
2025-08-14 07:17:50 +08:00
2025-08-15 17:58:11 +08:00
# 构建完整上下文
full_context = f"现在{char1_name}{char2_name}在讨论关于{topic}的话题。{context}"
2025-08-18 14:32:55 +08:00
# print(f"\n=== 开始双角色对话 ===")
# print(f"主题: {topic}")
# print(f"角色: {char1_name} vs {char2_name}")
# print("-" * 50)
2025-08-14 07:17:50 +08:00
2025-08-18 14:32:55 +08:00
if turn_index % 2 == 0:
# 角色1说话
speaker = char1_name
# if turn_index == 0:
# user_input = f"开始和{char2_name}讨论{topic}这个话题。"
# else:
# last_dialogue = conversation[-1]["dialogue"]
# user_input = f"{char2_name}刚才说:\"{last_dialogue}\"。请回应。"
else:
# 角色2说话
speaker = char2_name
# last_dialogue = conversation[-1]["dialogue"]
# user_input = f"{char1_name}刚才说:\"{last_dialogue}\"。请回应。"
print(f"\n[第{turn_index+1}轮] {speaker}正在思考...")
# 使用对应角色的模型生成对话
dialogue = self.generate_dual_character_dialogue(
speaker, full_context, temperature, max_new_tokens, dialogue_history, history_context_count
)
conversation.append({
"turn": turn_index + 1,
"speaker": speaker,
"dialogue": dialogue,
"context_used": full_context[:100] + "..." if len(full_context) > 100 else full_context
})
print(f"{speaker}: {dialogue}")
2025-08-15 17:58:11 +08:00
2025-08-18 14:32:55 +08:00
2025-08-15 17:58:11 +08:00
return conversation
def get_character_info(self, character_name: str) -> Dict:
"""获取角色信息"""
if character_name == self.character1_config['name']:
return self.character1_generator.get_character_info(character_name)
elif character_name == self.character2_config['name']:
return self.character2_generator.get_character_info(character_name)
else:
return {}
def list_characters(self) -> List[str]:
"""列出两个角色名称"""
return [self.character1_config['name'], self.character2_config['name']]
2025-08-18 09:55:18 +08:00
# def main():
# """测试对话生成器"""
# # 配置路径
# base_model_path = '/mnt/g/Project02/AITrain/Qwen/Qwen3-8B-AWQ'
# lora_model_path = './output/NPC_Dialogue_LoRA/final_model' # 如果没有训练LoRA设为None
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# # 检查LoRA模型是否存在
# if not os.path.exists(lora_model_path):
# print("LoRA模型不存在使用基础模型")
# lora_model_path = None
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# # 创建对话生成器
# generator = NPCDialogueGenerator(base_model_path, lora_model_path)
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# print("=== 游戏NPC角色对话生成器 ===")
# print(f"可用角色:{', '.join(generator.list_available_characters())}")
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# # 测试单个角色对话生成
# print("\n=== 单角色对话测试 ===")
# test_scenarios = [
# {
# "character": "克莱恩",
# "context": "玩家向你咨询神秘学知识",
# "input": "请告诉我一些关于灵界的注意事项。"
# },
# {
# "character": "阿兹克",
# "context": "学生遇到了修炼瓶颈",
# "input": "导师,我在修炼中遇到了困难。"
# },
# {
# "character": "塔利姆",
# "context": "在俱乐部偶遇老朋友",
# "input": "好久不见,最近怎么样?"
# }
# ]
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# for scenario in test_scenarios:
# print(f"\n--- {scenario['character']} ---")
# print(f"情境:{scenario['context']}")
# print(f"输入:{scenario['input']}")
# dialogue = generator.generate_character_dialogue(
# scenario["character"],
# scenario["context"],
# scenario["input"]
# )
# print(f"回复:{dialogue}")
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# # 测试角色间对话
# print("\n=== 角色间对话测试 ===")
# conversation = generator.generate_dialogue_conversation(
# "克莱恩", "塔利姆", "最近遇到的神秘事件", turns=4
# )
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# for turn in conversation:
# print(f"{turn['speaker']}{turn['dialogue']}")
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# # 交互式对话模式
# print("\n=== 交互式对话模式 ===")
# print("输入格式:角色名 上下文 用户输入")
# print("例如:克莱恩 在俱乐部 请给我一些建议")
# print("输入'quit'退出")
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# while True:
# try:
# user_command = input("\n请输入指令: ").strip()
# if user_command.lower() == 'quit':
# break
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# parts = user_command.split(' ', 2)
# if len(parts) < 2:
# print("格式错误,请使用:角色名 上下文 [用户输入]")
# continue
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# character = parts[0]
# context = parts[1]
# user_input = parts[2] if len(parts) > 2 else ""
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# if character not in generator.list_available_characters():
# print(f"未知角色:{character}")
# print(f"可用角色:{', '.join(generator.list_available_characters())}")
# continue
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# dialogue = generator.generate_character_dialogue(
# character, context, user_input
# )
# print(f"\n{character}{dialogue}")
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# except KeyboardInterrupt:
# break
# except Exception as e:
# print(f"生成对话时出错:{e}")
2025-08-15 17:58:11 +08:00
2025-08-18 09:55:18 +08:00
# print("\n对话生成器已退出")
2025-08-14 07:17:50 +08:00
2025-08-18 09:55:18 +08:00
# if __name__ == '__main__':
# main()