本地智能创作Agent实战
1.1 架构设计
1.1.1 系统组件规划
from dataclasses import dataclass
from typing importList, Optional
from enum import Enum
import logging
# 定义内容类型枚举
classContentType(Enum):
TEXT = "text"
IMAGE = "image"
AUDIO = "audio"
VIDEO = "video"
# 定义任务状态
classTaskStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
# 定义基础消息结构
@dataclass
classMessage:
content: str
content_type: ContentType
metadata: dict = None
# 定义任务结构
@dataclass
classTask:
id: str
type: str
input: Message
status: TaskStatus
output: Optional[Message] = None
# 核心组件接口定义
classContentProcessor:
asyncdefprocess(self, content: Message) -> Message:
raise NotImplementedError
classContentPlanner:
asyncdefplan(self, idea: str) -> List[Task]:
raise NotImplementedError
classTaskScheduler:
asyncdefschedule(self, tasks: List[Task]) -> None:
raise NotImplementedError
# 错误处理装饰器
defhandle_errors(func):
asyncdefwrapper(*args, **kwargs):
try:
returnawait func(*args, **kwargs)
except Exception as e:
logging.error(f"Error in {func.__name__}: {str(e)}")
raise
return wrapper
# 主Agent类
classCreativeAgent:
def__init__(self):
self.planner = ContentPlanner()
self.scheduler = TaskScheduler()
self.processors = {
ContentType.TEXT: TextProcessor(),
ContentType.IMAGE: ImageProcessor(),
ContentType.AUDIO: AudioProcessor(),
ContentType.VIDEO: VideoProcessor()
}
@handle_errors
asyncdefcreate_from_idea(self, idea: str) -> List[Message]:
# 1. 规划任务
tasks = awaitself.planner.plan(idea)
# 2. 调度执行
awaitself.scheduler.schedule(tasks)
# 3. 收集结果
results = []
for task in tasks:
if task.output:
results.append(task.output)
return results
1.1.2 工作流程设计
主要工作流程包括:
1. 输入处理流程
• 用户输入解析 • 意图识别 • 任务分解
• 文本内容生成 • 多模态转换 • 质量控制
• 模型选择 • 资源分配 • 并行处理
1.2 核心模块开发
1.2.1 内容理解与规划模块
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
from typing importList, Dict
import json
classContentPlanner:
def__init__(self):
self.tokenizer = AutoTokenizer.from_pretrained("gpt2")
self.model = AutoModelForCausalLM.from_pretrained("gpt2")
asyncdefanalyze_idea(self, idea: str) -> Dict:
# 使用GPT模型解析创意想法
prompt = f"Analyze the following creative idea and break it down into steps:\n{idea}"
inputs = self.tokenizer(prompt, return_tensors="pt")
with torch.no_grad():
outputs = self.model.generate(
inputs["input_ids"],
max_length=200,
num_return_sequences=1,
temperature=0.7
)
analysis = self.tokenizer.decode(outputs[0])
return json.loads(analysis) # 返回结构化的分析结果
asyncdefcreate_task_list(self, analysis: Dict) -> List[Task]:
tasks = []
# 根据分析结果创建任务列表
for step in analysis["steps"]:
task = Task(
id=generate_task_id(),
type=step["type"],
input=Message(content=step["input"], content_type=step["content_type"]),
status=TaskStatus.PENDING
)
tasks.append(task)
return tasks
asyncdefplan(self, idea: str) -> List[Task]:
# 1. 分析创意想法
analysis = awaitself.analyze_idea(idea)
# 2. 创建任务列表
tasks = awaitself.create_task_list(analysis)
# 3. 优化任务顺序
optimized_tasks = self.optimize_task_order(tasks)
return optimized_tasks
defoptimize_task_order(self, tasks: List[Task]) -> List[Task]:
# 实现任务依赖分析和优化排序
# 这里可以使用拓扑排序等算法
returnsorted(tasks, key=lambda x: self.get_task_priority(x))
1.2.2 多模态生成调度器
import asyncio
from typing importDict, List
import torch
from diffusers import StableDiffusionPipeline
from transformers import pipeline
classMultiModalScheduler:
def__init__(self):
self.text_generator = pipeline("text-generation")
self.image_generator = StableDiffusionPipeline.from_pretrained(
"stabilityai/stable-diffusion-2-1"
).to("cuda")
# 初始化任务队列
self.task_queues: Dict[ContentType, asyncio.Queue] = {
ContentType.TEXT: asyncio.Queue(),
ContentType.IMAGE: asyncio.Queue(),
ContentType.VIDEO: asyncio.Queue(),
}
asyncdefschedule_task(self, task: Task):
# 将任务放入对应类型的队列
awaitself.task_queues[task.input.content_type].put(task)
asyncdefprocess_text_task(self, task: Task) -> Message:
# 文本生成处理
outputs = self.text_generator(
task.input.content,
max_length=200,
num_return_sequences=1
)
return Message(
content=outputs[0]["generated_text"],
content_type=ContentType.TEXT
)
asyncdefprocess_image_task(self, task: Task) -> Message:
# 图像生成处理
with torch.no_grad():
image = self.image_generator(
prompt=task.input.content,
num_inference_steps=50
).images[0]
# 保存图像
image_path = f"output/images/{task.id}.png"
image.save(image_path)
return Message(
content=image_path,
content_type=ContentType.IMAGE
)
asyncdefstart_processing(self):
# 启动各类型的任务处理器
processors = []
for content_type in ContentType:
if content_type == ContentType.TEXT:
processors.append(self.process_text_queue())
elif content_type == ContentType.IMAGE:
processors.append(self.process_image_queue())
await asyncio.gather(*processors)
asyncdefprocess_text_queue(self):
whileTrue:
task = awaitself.task_queues[ContentType.TEXT].get()
try:
result = awaitself.process_text_task(task)
task.output = result
task.status = TaskStatus.COMPLETED
except Exception as e:
task.status = TaskStatus.FAILED
logging.error(f"Error processing text task {task.id}: {str(e)}")
finally:
self.task_queues[ContentType.TEXT].task_done()
1.3 创作流程实现
1.3.1 文本内容生成
实现一个完整的文本生成工作流:
from langchain import PromptTemplate, LLMChain
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import (
ChatPromptTemplate,
SystemMessagePromptTemplate,
HumanMessagePromptTemplate,
)
classTextGenerationFlow:
def__init__(self):
self.llm = ChatOpenAI(temperature=0.7)
defcreate_system_prompt(self) -> str:
return"""You are a creative content generator. Your task is to:
1. Analyze the given topic
2. Generate detailed content
3. Optimize the content structure
4. Ensure high quality and coherence"""
asyncdefgenerate_content(self, topic: str) -> str:
# 创建提示模板
system_template = self.create_system_prompt()
human_template = "{topic}"
system_message_prompt = SystemMessagePromptTemplate.from_template(system_template)
human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)
chat_prompt = ChatPromptTemplate.from_messages([
system_message_prompt,
human_message_prompt
])
# 创建语言链
chain = LLMChain(
llm=self.llm,
prompt=chat_prompt
)
# 生成内容
result = await chain.arun(topic=topic)
return result
asyncdefoptimize_content(self, content: str) -> str:
# 内容优化逻辑
optimize_template = """Please optimize the following content:
{content}
Focus on:
1. Clarity and coherence
2. Engaging style
3. Natural flow
"""
prompt = PromptTemplate(
input_variables=["content"],
template=optimize_template
)
chain = LLMChain(llm=self.llm, prompt=prompt)
optimized = await chain.arun(content=content)
return optimized
1.3.2 多模态内容生成
提供完整的多模态转换流程:
1. 文本转图像流程 async def text_to_image(text: str) -> str:
# 使用DALL-E或Stable Diffusion
prompt = optimize_prompt_for_image(text)
image = await generate_image(prompt)
return save_image(image)2. 图像转视频流程 async def image_to_video(image_path: str) -> str:
# 使用Stable Video Diffusion
video = await generate_video_from_image(image_path)
return save_video(video)
1.4 Workflow模板开发
1.4.1 文章创作流程
class ArticleWorkflow:
def__init__(self):
self.text_gen = TextGenerationFlow()
self.image_gen = ImageGenerationFlow()
asyncdefcreate_article(self, topic: str) -> Dict:
# 1. 生成文章大纲
outline = awaitself.text_gen.generate_outline(topic)
# 2. 扩展每个章节
sections = []
for section in outline:
content = awaitself.text_gen.generate_content(section)
sections.append({
"title": section,
"content": content
})
# 3. 生成配图
images = []
for section in sections:
image = awaitself.image_gen.generate_image(
section["title"],
section["content"]
)
images.append(image)
# 4. 整合内容
article = {
"topic": topic,
"sections": sections,
"images": images
}
return article
asyncdefexport_article(self, article: Dict, format: str = "markdown"):
ifformat == "markdown":
returnself.export_markdown(article)
elifformat == "html":
returnself.export_html(article)
else:
raise ValueError(f"Unsupported format: {format}")
2. 优化与扩展
2.1 性能优化策略
1. 模型优化
• 模型量化 • 批处理优化 • 缓存管理
• GPU内存管理 • 并行处理 • 负载均衡
• 异步处理 • 流式输出 • 预加载机制
2.2 可扩展性设计
1. 插件系统 class PluginManager:
def __init__(self):
self.plugins = {}
def register_plugin(self, name: str, plugin: object):
self.plugins[name] = plugin
def get_plugin(self, name: str) -> object:
return self.plugins.get(name)2. 模型注册机制 class ModelRegistry:
def __init__(self):
self.models = {}
def register_model(self, task_type: str, model: object):
self.models[task_type] = model
def get_model(self, task_type: str) -> object:
return self.models.get(task_type)
2.3 自定义能力扩展
1. 自定义处理器 class CustomProcessor(ContentProcessor):
def __init__(self, config: Dict):
self.config = config
async def process(self, content: Message) -> Message:
# 实现自定义处理逻辑
pass2. 自定义工作流 class CustomWorkflow:
def__init__(self, config: Dict[str, Any]):
self.config = config
self.steps = []
self.processors = {}
defadd_step(self, step_name: str, processor: ContentProcessor):
"""添加工作流步骤"""
self.steps.append(step_name)
self.processors[step_name] = processor
defremove_step(self, step_name: str):
"""移除工作流步骤"""
if step_name inself.steps:
self.steps.remove(step_name)
delself.processors[step_name]
asyncdefexecute(self, input_data: Any) -> Any:
"""执行工作流"""
result = input_data
for step_name inself.steps:
processor = self.processors[step_name]
try:
result = await processor.process(result)
except Exception as e:
logging.error(f"Error in step {step_name}: {str(e)}")
raise WorkflowExecutionError(step_name, str(e))
return result
defexport_workflow(self) -> Dict:
"""导出工作流配置"""
return {
"config": self.config,
"steps": [
{
"name": step,
"processor": self.processors[step].__class__.__name__
}
for step inself.steps
]
}
@classmethod
defload_workflow(cls, workflow_config: Dict) -> "CustomWorkflow":
"""从配置加载工作流"""
workflow = cls(workflow_config["config"])
for step in workflow_config["steps"]:
processor_class = get_processor_class(step["processor"])
workflow.add_step(step["name"], processor_class())
return workflow
2.4 工作流程定制
以下是一些常用的工作流程定制示例:
2.4.1 营销内容生成流程
class MarketingContentWorkflow(CustomWorkflow):
def__init__(self, config: Dict[str, Any]):
super().__init__(config)
self.setup_workflow()
defsetup_workflow(self):
# 添加营销内容生成的处理步骤
self.add_step("market_research", MarketResearchProcessor())
self.add_step("content_planning", MarketingContentPlanner())
self.add_step("copy_generation", CopywritingProcessor())
self.add_step("image_creation", MarketingImageGenerator())
self.add_step("social_media_adaptation", SocialMediaAdapter())
asyncdefgenerate_marketing_campaign(self, brief: str) -> Dict[str, Any]:
"""生成完整的营销活动内容"""
try:
result = awaitself.execute({
"brief": brief,
"timestamp": datetime.now().isoformat()
})
return {
"campaign_id": str(uuid.uuid4()),
"brief": brief,
"content": result["content"],
"images": result["images"],
"social_media_posts": result["social_posts"],
"metrics": result["performance_metrics"]
}
except WorkflowExecutionError as e:
logging.error(f"Marketing campaign generation failed: {str(e)}")
raise
classMarketResearchProcessor(ContentProcessor):
asyncdefprocess(self, input_data: Dict) -> Dict:
"""市场研究处理器"""
research_result = awaitself.analyze_market(input_data["brief"])
return {
**input_data,
"market_research": research_result
}
asyncdefanalyze_market(self, brief: str) -> Dict:
# 实现市场分析逻辑
pass
2.4.2 教育内容开发流程
class EducationalContentWorkflow(CustomWorkflow):
def__init__(self, config: Dict[str, Any]):
super().__init__(config)
self.setup_workflow()
defsetup_workflow(self):
# 添加教育内容开发的处理步骤
self.add_step("curriculum_analysis", CurriculumAnalyzer())
self.add_step("learning_objectives", LearningObjectivesGenerator())
self.add_step("content_creation", EducationalContentCreator())
self.add_step("assessment_generation", AssessmentGenerator())
self.add_step("material_packaging", MaterialPackager())
asyncdefcreate_course_content(self,
subject: str,
level: str,
duration: str) -> Dict[str, Any]:
"""创建课程内容"""
try:
result = awaitself.execute({
"subject": subject,
"level": level,
"duration": duration
})
return {
"course_id": str(uuid.uuid4()),
"metadata": {
"subject": subject,
"level": level,
"duration": duration
},
"learning_objectives": result["objectives"],
"content_modules": result["modules"],
"assessments": result["assessments"],
"resources": result["resources"]
}
except WorkflowExecutionError as e:
logging.error(f"Course content creation failed: {str(e)}")
raise
classLearningObjectivesGenerator(ContentProcessor):
asyncdefprocess(self, input_data: Dict) -> Dict:
"""学习目标生成器"""
objectives = awaitself.generate_objectives(
input_data["subject"],
input_data["level"]
)
return {
**input_data,
"objectives": objectives
}
asyncdefgenerate_objectives(self, subject: str, level: str) -> List[str]:
# 实现学习目标生成逻辑
pass
2.5 系统监控与维护
为了确保系统的稳定运行,我们需要实现完善的监控和维护机制:
1. 性能监控 class PerformanceMonitor:
def__init__(self):
self.metrics = {}
asyncdefrecord_metric(self, name: str, value: float):
if name notinself.metrics:
self.metrics[name] = []
self.metrics[name].append({
"value": value,
"timestamp": datetime.now()
})
asyncdefget_metrics(self, name: str,
start_time: datetime = None) -> List[Dict]:
if start_time isNone:
returnself.metrics.get(name, [])
return [
m for m inself.metrics.get(name, [])
if m["timestamp"] >= start_time
]2. 错误追踪 class ErrorTracker:
def__init__(self):
self.errors = []
deftrack_error(self, error: Exception, context: Dict):
self.errors.append({
"error": str(error),
"type": error.__class__.__name__,
"context": context,
"timestamp": datetime.now(),
"traceback": traceback.format_exc()
})
defget_recent_errors(self, count: int = 10) -> List[Dict]:
returnsorted(
self.errors,
key=lambda x: x["timestamp"],
reverse=True
)[:count]3. 资源管理 class ResourceManager:
def__init__(self):
self.resources = {}
asyncdefallocate_resource(self,
resource_type: str,
amount: int) -> bool:
available = self.resources.get(resource_type, 0)
if available >= amount:
self.resources[resource_type] = available - amount
returnTrue
returnFalse
asyncdefrelease_resource(self,
resource_type: str,
amount: int):
self.resources[resource_type] = \
self.resources.get(resource_type, 0) + amount