Project02/AIGC/main.py

212 lines
7.8 KiB
Python
Raw Normal View History

2025-06-06 10:28:34 +08:00
import argparse
2025-06-05 19:55:37 +08:00
import asyncio
import json
import logging
import re
import threading
import time
from typing import List, Tuple
from fastapi import FastAPI, Request, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.websockets import WebSocketState
from h11 import ConnectionClosed
import uvicorn
from Utils.AIGCLog import AIGCLog
from ollama import Client, ResponseError
app = FastAPI(title = "AI 通信服务")
logger = AIGCLog(name = "TestLog", log_file = "aigc.log")
2025-06-06 10:28:34 +08:00
parser = argparse.ArgumentParser()
parser.add_argument('--model', type=str, default='deepseek-r1:1.5b',
help='Ollama模型名称')
args = parser.parse_args()
logger.log(logging.INFO, f"使用的模型是 {args.model}")
2025-06-05 19:55:37 +08:00
#初始化ollama客户端
ollamaClient = Client(host='http://localhost:11434')
async def heartbeat(websocket: WebSocket):
while True:
await asyncio.sleep(30) # 每30秒发送一次心跳
try:
await senddata(websocket, 0, [])
except ConnectionClosed:
break # 连接已关闭时退出循环
async def senddata(websocket: WebSocket, statusCode: int, messages: List[str]):
# 将AI响应发送回UE5
if websocket.client_state == WebSocketState.CONNECTED:
data = {
"statuscode": statusCode,
"messages": messages
}
json_string = json.dumps(data, ensure_ascii=False)
await websocket.send_text(json_string)
# WebSocket路由处理
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
logger.log(logging.INFO, f"UE5客户端 {client_id} 已连接")
# 添加心跳任务
asyncio.create_task(heartbeat(websocket))
try:
while True:
logger.log(logging.INFO, "websocket_endpoint ")
# 接收UE5发来的消息
data = await websocket.receive_text()
logger.log(logging.INFO, f"收到UE5消息 [{client_id}]: {data}")
success, prompt = process_prompt(data)
# 调用AI生成响应
if(success):
asyncio.create_task(generateAIChat(prompt, websocket))
await senddata(websocket, 0, [])
else:
await senddata(websocket, -1, [])
except WebSocketDisconnect:
#manager.disconnect(client_id)
logger.log(logging.WARNING, f"UE5客户端主动断开 [{client_id}]")
except Exception as e:
#manager.disconnect(client_id)
logger.log(logging.ERROR, f"WebSocket异常 [{client_id}]: {str(e)}")
def process_prompt(promptFromUE: str) -> Tuple[bool, str]:
try:
data = json.loads(promptFromUE)
# 提取数据
dialog_scene = data["dialogContent"]["dialogScene"]
persons = data["persons"]
assert len(persons) == 2
for person in persons:
print(f" 姓名: {person['name']}, 职业: {person['job']}")
prompt = f"""
你是一个游戏NPC对话生成器请严格按以下要求生成两个路人NPC{persons[0]["name"]}{persons[1]["name"]}的日常对话
1. 生成2轮完整对话每轮包含双方各一次发言共4句
2. 对话场景中世纪奇幻小镇的日常场景如市场/酒馆/街道
3. 角色设定
{persons[0]["name"]}{persons[0]["job"]}
{persons[1]["name"]}{persons[1]["job"]}
4. 对话要求
* 每轮对话需自然衔接体现生活细节
* 避免任务指引或玩家交互内容
* 结尾保持对话未完成感
5. 输出格式
必须确保输出内容的第一行是三个连字符`---`最后一行也是三个连字符`---`
中间内容严格按以下顺序排列禁止添加任何额外说明或换行
---
{persons[0]["name"]}[第一轮发言]
{persons[1]["name"]}[第一轮回应]
{persons[0]["name"]}[第二轮发言]
{persons[1]["name"]}[第二轮回应]
---
6.重要若未按此格式输出请重新生成直至完全符合
"""
return True, prompt
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}")
return False, ""
except KeyError as e:
print(f"缺少必要字段: {e}")
def run_webserver():
logger.log(logging.INFO, "启动web服务器 ")
#启动服务器
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
timeout_keep_alive=3000,
log_level="info"
)
async def generateAIChat(prompt: str, websocket: WebSocket):
logger.log(logging.INFO, "prompt:" + prompt)
starttime = time.time()
receivemessage=[
{"role": "system", "content": prompt}
]
try:
response = ollamaClient.chat(
2025-06-06 10:28:34 +08:00
model = args.model,
stream = False,
2025-06-05 19:55:37 +08:00
messages = receivemessage,
options={
"temperature": 0.8,
"repeat_penalty": 1.2, # 抑制重复
"top_p": 0.9,
"num_ctx": 4096, # 上下文长度
"seed": 42
}
)
except ResponseError as e:
if e.status_code == 503:
print("🔄 服务不可用5秒后重试...")
return await senddata(websocket, -1, messages={"ollama 服务不可用"})
except Exception as e:
print(f"🔥 未预料错误: {str(e)}")
return await senddata(websocket, -1, messages={"未预料错误"})
logger.log(logging.INFO, "接口调用耗时 :" + str(time.time() - starttime))
logger.log(logging.INFO, "AI生成" + response['message']['content'])
#处理ai输出内容
think_remove_text = re.sub(r'<think>.*?</think>', '', response['message']['content'], flags=re.DOTALL)
pattern = r".*---(.*?)---" # .* 吞掉前面所有字符,定位最后一组
match = re.search(pattern, think_remove_text, re.DOTALL)
if not match:
await senddata(websocket, -1, messages={"ai生成格式不正确"})
else:
core_dialog = match.group(1).strip()
logger.log(logging.INFO, "AI内容处理" + core_dialog)
await senddata(websocket, 1, core_dialog.split('\n'))
if __name__ == "__main__":
# 创建并启动服务器线程
server_thread = threading.Thread(target=run_webserver)
server_thread.daemon = True # 设为守护线程(主程序退出时自动终止)
server_thread.start()
## Test
# generateAIChat(f"""
# 你是一个游戏NPC对话生成器。请严格按以下要求生成两个路人NPCA和B的日常对话
# 1. 生成【2轮完整对话】每轮包含双方各一次发言共4句
# 2. 对话场景:中世纪奇幻小镇的日常场景(如市场/酒馆/街道)
# 3. 角色设定:
# - NPC A随机职业铁匠/农夫/商人/卫兵等)
# - NPC B随机职业不同于A
# 4. 对话要求:
# * 每轮对话需自然衔接,体现生活细节
# * 避免任务指引或玩家交互内容
# * 结尾保持对话未完成感
# 5. 输出格式(严格遵循,
# ---
# A[第一轮发言]
# B[第一轮回应]
# A[第二轮发言]
# B[第二轮回应]
# ---
# """
# )
try:
# 主线程永久挂起(监听退出信号)
while True:
time.sleep(3600) # 每1小时唤醒一次避免CPU占用
except KeyboardInterrupt:
logger.log(logging.WARNING, "接收到中断信号,程序退出 ")