LangChain进阶:本地应用搭建实战

文摘   2025-01-20 09:01   湖北  

 

LangChain进阶:本地应用搭建实战

引言

随着 AI 技术的快速发展,构建本地化的 AI 开发环境变得越来越重要。本文将详细介绍如何使用 LangChain 搭建一个功能完备的本地 AI 工作站,涵盖从环境配置到多模态应用开发的全过程。

1. 本地开发环境搭建

1.1 基础环境配置

首先创建一个独立的 Python 虚拟环境:

# 创建项目目录
mkdir langchain-workspace && cd langchain-workspace

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/Mac
# venv\Scripts\activate  # Windows

# 安装基础依赖
pip install -U pip
pip install langchain langchain-community langchain-core
pip install python-dotenv requests numpy pandas

1.2 本地模型部署

为了减少对云服务的依赖,我们可以使用 Ollama 部署本地模型:

# 安装 Ollama
curl https://ollama.ai/install.sh | sh

# 拉取模型
ollama pull llama2
ollama pull codellama
ollama pull mistral

# 安装 Python 接口
pip install ollama

1.3 向量数据库配置

选择 Chroma 作为本地向量数据库:

pip install chromadb

1.4 多模态支持

安装必要的多模态处理库:

pip install torch torchvision
pip install transformers
pip install Pillow
pip install pytesseract

2. 核心组件

2.1 环境配置管理

创建统一的环境配置文件 config.py

from pathlib import Path
from typing importDict
import os
from dotenv import load_dotenv

classWorkstationConfig:
    def__init__(self):
        load_dotenv()
        
        self.base_path = Path(__file__).parent
        self.data_path = self.base_path / "data"
        self.models_path = self.base_path / "models"
        
        # 创建必要的目录
        self.data_path.mkdir(exist_ok=True)
        self.models_path.mkdir(exist_ok=True)
        
        # 模型配置
        self.model_configs: Dict = {
            "local": {
                "llama2""ollama/llama2",
                "codellama""ollama/codellama",
                "mistral""ollama/mistral"
            },
            "embedding": {
                "local""local_embeddings",
                "remote""openai"
            }
        }
    
    defget_model_path(self, model_name: str) -> Path:
        returnself.models_path / model_name

config = WorkstationConfig()

2.2 模型管理器

创建统一的模型管理接口 model_manager.py

from langchain.llms import Ollama
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler

classModelManager:
    def__init__(self, config):
        self.config = config
        self.loaded_models = {}
        
    defget_llm(self, model_name: str, **kwargs):
        if model_name notinself.loaded_models:
            # 使用流式输出
            callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])
            
            self.loaded_models[model_name] = Ollama(
                model=model_name,
                callback_manager=callback_manager,
                **kwargs
            )
        returnself.loaded_models[model_name]
    
    defget_embeddings(self, model_name: str = "all-MiniLM-L6-v2"):
        iff"embeddings_{model_name}"notinself.loaded_models:
            self.loaded_models[f"embeddings_{model_name}"] = HuggingFaceEmbeddings(
                model_name=f"sentence-transformers/{model_name}"
            )
        returnself.loaded_models[f"embeddings_{model_name}"]

3. 高级应用实践

3.1 智能提示词管理系统

创建结构化的提示词管理系统:

from langchain.prompts import PromptTemplate
from langchain.prompts import FewShotPromptTemplate
from typing importListDict
import json

classPromptManager:
    def__init__(self, prompt_path: str = "prompts"):
        self.prompt_path = Path(prompt_path)
        self.prompt_path.mkdir(exist_ok=True)
        self.prompts: Dict[str, PromptTemplate] = {}
        self.load_prompts()
    
    defload_prompts(self):
        for prompt_file inself.prompt_path.glob("*.json"):
            withopen(prompt_file, "r", encoding="utf-8"as f:
                prompt_data = json.load(f)
                self.prompts[prompt_file.stem] = PromptTemplate(
                    template=prompt_data["template"],
                    input_variables=prompt_data["input_variables"]
                )
    
    defcreate_few_shot_prompt(
        self,
        prefix: str,
        examples: List[Dict],
        suffix: str,
        input_variables: List[str],
        example_template: str
    
):
        example_prompt = PromptTemplate(
            input_variables=["input""output"],
            template=example_template
        )
        
        return FewShotPromptTemplate(
            examples=examples,
            example_prompt=example_prompt,
            prefix=prefix,
            suffix=suffix,
            input_variables=input_variables
        )

3.2 高性能向量检索

实现高效的向量检索系统:

from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing importListOptional

classVectorStore:
    def__init__(
        self,
        embedding_function,
        persist_directory: str = "vectorstore",
        collection_name: str = "documents"
    
):
        self.embedding_function = embedding_function
        self.persist_directory = persist_directory
        self.collection_name = collection_name
        
        self.vector_store = Chroma(
            persist_directory=persist_directory,
            embedding_function=embedding_function,
            collection_name=collection_name
        )
        
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=1000,
            chunk_overlap=200,
            length_function=len
        )
    
    defadd_texts(self, texts: List[str], metadatas: Optional[List[dict]] = None):
        documents = self.text_splitter.create_documents(texts, metadatas)
        self.vector_store.add_documents(documents)
        
    defsimilarity_search(
        self,
        query: str,
        k: int = 4,
        filterOptional[dict] = None
    
):
        returnself.vector_store.similarity_search(
            query,
            k=k,
            filter=filter
        )

3.3 多模态处理系统

实现图像和文本的统一处理:

from PIL import Image
import torch
from transformers import AutoProcessor, AutoModelForVision2Seq
from typing importUnionList

classMultiModalProcessor:
    def__init__(self):
        self.device = "cuda"if torch.cuda.is_available() else"cpu"
        
        # 加载图像理解模型
        self.processor = AutoProcessor.from_pretrained(
            "microsoft/git-base-coco"
        )
        self.model = AutoModelForVision2Seq.from_pretrained(
            "microsoft/git-base-coco"
        ).to(self.device)
    
    defprocess_image(
        self,
        image: Union[str, Image.Image],
        max_length: int = 50
    
) -> str:
        ifisinstance(image, str):
            image = Image.open(image)
        
        inputs = self.processor(
            images=image,
            return_tensors="pt"
        ).to(self.device)
        
        outputs = self.model.generate(
            **inputs,
            max_length=max_length,
            num_beams=4
        )
        
        returnself.processor.batch_decode(
            outputs,
            skip_special_tokens=True
        )[0]
    
    defbatch_process_images(
        self,
        images: List[Union[str, Image.Image]],
        max_length: int = 50
    
) -> List[str]:
        return [
            self.process_image(image, max_length)
            for image in images
        ]

4. 优化与最佳实践

4.1 性能优化

  1. 1. 模型加载优化:
class ModelCache:
    def__init__(self, max_size: int = 3):
        self.max_size = max_size
        self.cache = {}
        self.access_count = {}
    
    defget(self, key: str):
        if key inself.cache:
            self.access_count[key] += 1
            returnself.cache[key]
        returnNone
    
    defput(self, key: str, value: any):
        iflen(self.cache) >= self.max_size:
            # 移除最少使用的模型
            min_key = min(
                self.access_count.items(),
                key=lambda x: x[1]
            )[0]
            delself.cache[min_key]
            delself.access_count[min_key]
        
        self.cache[key] = value
        self.access_count[key] = 1
  1. 2. 批处理优化:
from typing importListDict
import asyncio

classBatchProcessor:
    def__init__(self, model_manager):
        self.model_manager = model_manager
    
    asyncdefprocess_batch(
        self,
        inputs: List[str],
        batch_size: int = 4
    
) -> List[str]:
        results = []
        for i inrange(0len(inputs), batch_size):
            batch = inputs[i:i + batch_size]
            tasks = [
                self.model_manager.get_llm().apredict(text)
                for text in batch
            ]
            batch_results = await asyncio.gather(*tasks)
            results.extend(batch_results)
        return results

4.2 提示工程最佳实践

创建结构化的提示模板:

# 任务分解模板
TASK_DECOMPOSITION_TEMPLATE = """
请将以下复杂任务分解为可执行的子任务:
任务描述:{task_description}

请按照以下格式输出:
1. 子任务1:[具体描述]
   输入:[需要的输入]
   输出:[预期输出]
2. 子任务2:[具体描述]
   ...

现在开始分析:
"""


# 结构化输出模板
STRUCTURED_OUTPUT_TEMPLATE = """
请按照以下JSON格式输出结果:
{format_description}

输入内容:{input_text}

请保证输出是有效的JSON格式。
"""


# 错误处理模板
ERROR_HANDLING_TEMPLATE = """
如果遇到以下情况,请按照相应方式处理:
1. 输入不完整:请说明缺少哪些必要信息
2. 格式错误:请指出具体的格式问题
3. 内容不合规:请说明不合规的原因

输入内容:{input_text}
"""

4.3 异常处理与日志

实现完善的错误处理系统:

import logging
from functools import wraps
import traceback

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('workstation.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

deferror_handler(func):
    @wraps(func)
    defwrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            error_msg = f"Error in {func.__name__}{str(e)}\n{traceback.format_exc()}"
            logger.error(error_msg)
            raise
    return wrapper

5. 示例应用

5.1 多模态文档分析器

class DocumentAnalyzer:
    def__init__(self, model_manager, vector_store):
        self.model_manager = model_manager
        self.vector_store = vector_store
        self.multimodal = MultiModalProcessor()
    
    asyncdefanalyze_document(
        self,
        text_content: str,
        images: List[Image.Image]
    
) -> Dict:
        # 1. 处理图像
        image_descriptions = self.multimodal.batch_process_images(images)
        
        # 2. 向量化存储
        self.vector_store.add_texts(
            [text_content] + image_descriptions
        )
        
        # 3. 生成分析报告
        analysis_prompt = f"""
        基于以下内容生成分析报告:
        
        文本内容:{text_content}
        
        图像描述:
        {'\n'.join(image_descriptions)}
        
        请提供:
        1. 文档主题概述
        2. 关键信息提取
        3. 图文关联分析
        """

        
        llm = self.model_manager.get_llm("llama2")
        analysis = await llm.apredict(analysis_prompt)
        
        return {
            "text_content": text_content,
            "image_descriptions": image_descriptions,
            "analysis": analysis
        }

5.2 智能助手系统

from langchain.memory import ConversationBufferWindowMemory
from langchain.chains import ConversationChain

classAIAssistant:
    def__init__(self, model_manager):
        self.model_manager = model_manager
        self.memory = ConversationBufferWindowMemory(k=5)
        
        self.conversation = ConversationChain(
            llm=self.model_manager.get_llm("llama2"),
            memory=self.memory,
            verbose=True
        )
        
        # 工具集成
        self.tools = {
            "document_search"self.document_search,
            "code_generation"self.code_generation,
            "math_calculation"self.math_calculation
        }
    
    asyncdefprocess_message(self, message: str) -> str:
        # 1. 分析是否需要使用工具
        tool_prompt = f"""
        基于用户的输入,判断是否需要使用以下工具:
        - document_search: 搜索文档
        - code_generation: 生成代码
        - math_calculation: 数学计算
        
        用户输入: {message}
        
        如果需要使用工具,输出工具名称;如果不需要,输出 "none"
        """

        
        tool_decision = awaitself.model_manager.get_llm("llama2").apredict(tool_prompt)
        
        # 2. 使用工具或直接回复
        if tool_decision.strip().lower() != "none":
            tool_name = tool_decision.strip()
            if tool_name inself.tools:
                result = awaitself.tools[tool_name](message)
                self.memory.save_context({"input": message}, {"output": result})
                return result
        
        # 直接对话
        returnself.conversation.predict(input=message)
    
    asyncdefdocument_search(self, query: str) -> str:
        # 实现文档搜索逻辑
        pass
    
    asyncdefcode_generation(self, requirements: str) -> str:
        # 使用 CodeLlama 生成代码
        code_llm = self.model_manager.get_llm("codellama")
        returnawait code_llm.apredict(requirements)
    
    asyncdefmath_calculation(self, problem: str) -> str:
        # 实现数学计算逻辑
        pass

5.3 知识库管理系统

from typing importOptionalListDict
from datetime import datetime

classKnowledgeBase:
    def__init__(self, vector_store, model_manager):
        self.vector_store = vector_store
        self.model_manager = model_manager
        self.metadata = {}
    
    asyncdefadd_document(
        self,
        content: str,
        doc_type: str,
        tags: List[str],
        source: Optional[str] = None
    
):
        # 1. 生成文档摘要
        summary_prompt = f"""
        请为以下内容生成简短的摘要:
        
        {content[:1000]}  # 只使用前1000字符生成摘要
        """

        
        summary = awaitself.model_manager.get_llm("llama2").apredict(summary_prompt)
        
        # 2. 创建元数据
        metadata = {
            "doc_type": doc_type,
            "tags": tags,
            "source": source,
            "summary": summary,
            "created_at": datetime.now().isoformat(),
            "last_updated": datetime.now().isoformat()
        }
        
        # 3. 存储文档
        self.vector_store.add_texts(
            texts=[content],
            metadatas=[metadata]
        )
        
        return metadata
    
    asyncdefsearch(
        self,
        query: str,
        doc_type: Optional[str] = None,
        tags: Optional[List[str]] = None,
        k: int = 4
    
) -> List[Dict]:
        # 构建过滤器
        filter_dict = {}
        if doc_type:
            filter_dict["doc_type"] = doc_type
        if tags:
            filter_dict["tags"] = {"$in": tags}
        
        # 执行搜索
        results = self.vector_store.similarity_search(
            query,
            k=k,
            filter=filter_dict if filter_dict elseNone
        )
        
        # 生成答案
        context = "\n".join([doc.page_content for doc in results])
        answer_prompt = f"""
        基于以下上下文回答问题:
        
        上下文:
        {context}
        
        问题:{query}
        """

        
        answer = awaitself.model_manager.get_llm("llama2").apredict(answer_prompt)
        
        return {
            "answer": answer,
            "sources": [doc.metadata for doc in results]
        }

6. 部署与监控

6.1 Docker 部署

创建 Dockerfile:

FROM python:3.10-slim

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    build-essential \
    curl \
    git


# 安装 Ollama
RUN curl https://ollama.ai/install.sh | sh

# 设置工作目录
WORKDIR /app

# 复制项目文件
COPY requirements.txt .
COPY . .

# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt

# 启动脚本
CMD ["python""main.py"]

6.2 监控系统

import psutil
import GPUtil
from datetime import datetime
import json

classSystemMonitor:
    def__init__(self, log_interval: int = 60):
        self.log_interval = log_interval
        self.logs = []
    
    defget_system_metrics(self) -> Dict:
        # CPU 使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # 内存使用率
        memory = psutil.virtual_memory()
        memory_percent = memory.percent
        
        # GPU 使用率(如果可用)
        gpu_metrics = []
        try:
            gpus = GPUtil.getGPUs()
            for gpu in gpus:
                gpu_metrics.append({
                    "id": gpu.id,
                    "load": gpu.load * 100,
                    "memory_used": gpu.memoryUsed,
                    "memory_total": gpu.memoryTotal
                })
        except:
            pass
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent
        
        return {
            "timestamp": datetime.now().isoformat(),
            "cpu_percent": cpu_percent,
            "memory_percent": memory_percent,
            "disk_percent": disk_percent,
            "gpu_metrics": gpu_metrics
        }
    
    deflog_metrics(self):
        metrics = self.get_system_metrics()
        self.logs.append(metrics)
        
        # 保存日志
        withopen('system_metrics.json''a'as f:
            json.dump(metrics, f)
            f.write('\n')

7. 未来展望与优化方向

  1. 1. 模型优化
  • • 模型量化与压缩
  • • 模型蒸馏
  • • 自动模型选择
  • 2. 系统扩展
    • • 分布式处理支持
    • • 微服务架构演进
    • • API 网关集成
  • 3. 功能增强
    • • 更多模态支持
    • • 自定义工具开发
    • • 知识图谱集成
  • 4. 性能提升
    • • 缓存策略优化
    • • 并行处理增强
    • • 资源调度改进

    总结

    本文详细介绍了如何构建一个完整的本地 AI 开发工作站,涵盖了从基础环境搭建到高级应用开发的各个方面。通过合理的架构设计和优化实践,我们可以构建出一个高效、可靠的本地 AI 开发环境。随着技术的不断发展,这套系统也将持续演进和优化,为 AI 应用开发提供更好的支持。

    参考资源

    1. 1. LangChain 官方文档
    2. 2. Ollama 项目文档
    3. 3. HuggingFace Transformers 文档
    4. 4. Python 异步编程指南
    5. 5. Docker 容器化最佳实践

     


    前端道萌
    魔界如,佛界如,一如,无二如。
     最新文章