4o-realtime构建客服系统-Chainlit

文摘   2024-11-02 15:34   韩国  
先看效果

更多AI技术,关注我的github,欢迎点亮星星:

https://github.com/xinyuwei-david/david-share.git

本demo代码使用Chainlit框架。

Chainlit是一个开源的Python框架,用于构建生产级的对话式人工智能(Conversational AI)应用。它允许开发者快速地创建和部署对话式AI或代理应用,支持多平台和数据持久化。

Chainlit的一些关键特点包括:

  • 快速构建:可以轻松集成到现有代码库,或者从头开始构建

  • 多平台支持:一次编写逻辑,可以在多个平台上使用2

  • 数据持久化:收集、监控和分析用户数据2

  • 可视化多步推理:理解生成输出的中间步骤2

  • 与流行库和框架的集成:支持OpenAI、Mistral AI、Llama Index、LangChain、Autogen、Haystack等。


我们分析代码的主程序:app.py

代码中,Chainlit 框架封装了所有的接口,使得您需要直接处理底层的网络通信、事件分发或接口实现。

详细说明

 

1. Chainlit 提供的装饰器封装了事件处理

 
代码中使用了许多由 Chainlit 提供的装饰器,这些装饰器封装了与前端交互的接口,简化了编程过程。例如:

  • @cl.on_chat_start:当聊天会话开始时触发。

  • @cl.on_message:当收到用户文本消息时触发。

  • @cl.on_audio_start:当用户开始音频输入时触发。

  • @cl.on_audio_chunk:当收到音频数据块时触发。

  • @cl.on_audio_end@cl.on_chat_end@cl.on_stop:当音频输入结束、聊天结束或会话停止时触发。

    这些装饰器使您能够专注于业务逻辑的实现,而不必关心底层的事件监听和处理机制。

2. Chainlit 封装了前后端的通信

 
在传统的应用程序中,您可能需要自己编写代码来处理前端和后端之间的通信,例如:

  • 监听前端的请求或事件。

  • 解析前端发送的数据。

  • 将处理结果返回给前端。

    然而,Chainlit 为您处理了这些细节,它封装了前端与后端之间的通信,使得后端代码可以直接处理事件。

3. Chainlit 提供了上下文和会话管理

 
通过 
cl.user_session,您可以在用户会话中存储和获取数据。这使您能够:

  • 为每个用户维护独立的会话状态。

  • 存储用户相关的信息,如 track_idopenai_realtime 实例等。

  • 在不同的事件处理函数之间共享数据。

4. Chainlit 集成了与 OpenAI API 的交互

 
在您的代码中,您使用了 
AsyncAzureOpenAI 客户端与 Azure OpenAI 服务进行交互。Chainlit 可以与这些服务无缝集成,简化了与大型语言模型的交互过程。

5. Chainlit 封装了错误处理和日志记录

 

  • 错误处理:通过提供 @cl.on_error 等装饰器,Chainlit 允许您处理在应用程序中发生的异常和错误。

  • 日志记录:通过 chainlit.logger 模块,您可以方便地记录日志信息,帮助调试和监控应用程序。

这样设计的好处

 

  1. 简化开发:您不需要处理复杂的事件循环、网络通信或异步 I/O 操作,只需专注于实现业务逻辑。

  2. 提高效率:使用 Chainlit 的封装,可以大大减少样板代码(boilerplate code)的数量,使代码更加简洁明了。

  3. 代码可读性:通过装饰器和上下文管理,代码的结构更加清晰,逻辑更加直观。

  4. 可扩展性:Chainlit 提供了丰富的功能和插件,方便您在未来扩展应用程序的功能。

总结

 

  • Chainlit 封装了应用程序与前端的交互接口,包括事件监听、消息传递、音频数据处理等。

  • 您的代码主要使用 Chainlit 提供的装饰器和上下文管理来处理各种事件和会话数据。

  • 这种封装使得您可以专注于业务逻辑的实现,而不必关心底层的通信和接口细节。

 

导入模块

 

import os
import asyncio
from openai import AsyncAzureOpenAI

import chainlit as cl
from uuid import uuid4
from chainlit.logger import logger

from realtime import RealtimeClient
from realtime.tools import tools

 
解释

  • os:用于与操作系统交互,主要用来获取环境变量。

  • asyncio:Python 标准库,用于编写异步代码,支持异步 I/O 操作。

  • AsyncAzureOpenAI:从 openai 包中导入,异步的 Azure OpenAI 客户端,用于与 Azure OpenAI 服务进行异步交互。

  • chainlit:一个用于创建聊天机器人的框架,简化了聊天机器人的开发流程。

  • uuid4:用于生成随机的 UUID(通用唯一标识符)。

  • logger:从 chainlit.logger 导入,用于日志记录,方便调试和监控。

  • RealtimeClient 和 tools:从自定义的 realtime 模块中导入,用于实时通信和处理。

配置 Azure OpenAI 客户端

 

client = AsyncAzureOpenAI(
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
api_version="2024-10-01-preview"
)

 
解释

  • 这里创建了一个异步的 Azure OpenAI 客户端 client,需要从环境变量中获取 API 密钥、终端、部署名称。这些信息通常在 Azure 上注册服务时获得。

  • api_version 指定了使用的 API 版本。

设置 OpenAI 实时客户端

 

async def setup_openai_realtime(system_prompt: str):
"""Instantiate and configure the OpenAI Realtime Client"""
openai_realtime = RealtimeClient(system_prompt=system_prompt)
cl.user_session.set("track_id", str(uuid4()))
# 以下省略事件处理函数的定义和配置

 
解释

  • 定义了一个异步函数 setup_openai_realtime,用于实例化和配置 OpenAI 实时客户端。

  • system_prompt:系统提示,用于指导模型的行为(我们不需要详细分析这个部分)。

  • openai_realtime:创建了一个实时客户端实例,传入了系统提示。

  • cl.user_session.set("track_id", str(uuid4())):为用户会话设置一个唯一的 track_id,用于跟踪音频流或其他会话相关的信息。

事件处理函数

 
在 
setup_openai_realtime 函数内,定义了一系列异步的事件处理函数,用于处理不同的事件。这些事件处理函数会在特定的事件发生时被调用。

1. handle_conversation_updated

 

async def handle_conversation_updated(event):
item = event.get("item")
delta = event.get("delta")
"""Currently used to stream audio back to the client."""
if delta:
# Only one of the following will be populated for any given event
if 'audio' in delta:
audio = delta['audio'] # Int16Array, audio added
await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
if 'transcript' in delta:
transcript = delta['transcript'] # string, transcript added
print(transcript)
if 'arguments' in delta:
arguments = delta['arguments'] # string, function arguments added
pass

 
解释

  • 当对话更新时调用此函数。

  • event:包含事件的数据。

  • delta:表示与上一次事件的差异,可能包含 audiotranscript 或 arguments

    处理逻辑:

  • 如果 delta 包含 audio

    • 获取音频数据 audio

    • 使用 cl.context.emitter.send_audio_chunk 将音频数据发送回客户端,用于音频流的实时传输。

  • 如果 delta 包含 transcript

    • 获取转录文本并打印出来。这可能是语音识别后的文本。

  • 如果 delta 包含 arguments

    • 获取函数参数。这可能与功能调用或命令有关。

2. handle_item_completed

 

async def handle_item_completed(item):
"""Used to populate the chat context with transcription once an item is completed."""
# print(item) # TODO
pass

 
解释

  • 当某个项目(可能是一次对话或消息)完成时调用。

  • 当前函数内容为空,仅包含一个 pass,表示未来可能会添加处理逻辑。

3. handle_conversation_interrupt

 

async def handle_conversation_interrupt(event):
"""Used to cancel the client previous audio playback."""
cl.user_session.set("track_id", str(uuid4()))
await cl.context.emitter.send_audio_interrupt()

 
解释

  • 当对话被中断时调用,例如用户停止了音频播放或有新的音频需要播放。

  • 更新用户会话的 track_id,以标记新的音频流。

  • 调用 send_audio_interrupt 通知客户端停止当前的音频播放。

4. handle_error

async def handle_error(event): logger.error(event)

解释

  • 当发生错误时调用。

  • 使用日志记录器 logger 记录错误信息,方便调试。

注册事件处理函数

openai_realtime.on('conversation.updated', handle_conversation_updated)
openai_realtime.on('conversation.item.completed', handle_item_completed)
openai_realtime.on('conversation.interrupted', handle_conversation_interrupt)
openai_realtime.on('error', handle_error)

 
解释

  • 为 openai_realtime 客户端注册事件处理函数,当特定事件发生时,调用相应的处理函数。

添加工具并启动

cl.user_session.set("openai_realtime", openai_realtime)
coros = [openai_realtime.add_tool(tool_def, tool_handler) for tool_def, tool_handler in tools]
await asyncio.gather(*coros)

 
解释

  • 将 openai_realtime 客户端实例保存到用户会话中,方便在其他地方访问。

  • 使用列表推导式,为每个工具定义和处理器添加到 openai_realtime 中。

  • await asyncio.gather(*coros):异步地运行所有添加工具的协程。

Chainlit 事件处理装饰器

 
接下来,代码使用了 
@cl.on_* 的装饰器,定义了各种事件处理函数,用于响应聊天和音频相关的事件。

1. @cl.on_chat_start

@cl.on_chat_start
async def start():
await cl.Message(
content="Hi, Welcome to Xinyu Company. How can I help you?. Press `P` to talk!"
).send()
await setup_openai_realtime(system_prompt=system_prompt + "\n\n Customer ID: 333")

 
解释

  • 当聊天会话开始时调用。

  • 发送一条欢迎消息给用户,提示可以按 P 说话。

  • 调用 setup_openai_realtime 函数,传入 system_prompt(系统提示,加上了客户 ID)。

2. @cl.on_message

@cl.on_message
async def on_message(message: cl.Message):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.send_user_message_content([{ "type": 'input_text', "text": message.content}])
else:
await cl.Message(content="Please activate voice mode before sending messages!").send()

 
解释

  • 当收到用户的文本消息时调用。

  • 从用户会话中获取 openai_realtime 客户端实例。

  • 如果实时客户端存在且已连接:

    • 调用 send_user_message_content 将用户的文本消息发送给实时客户端,类型为 input_text

  • 否则:

    • 提示用户需要激活语音模式才能发送消息。

3. @cl.on_audio_start

 

@cl.on_audio_start
async def on_audio_start():
try:
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
# TODO: might want to recreate items to restore context
# openai_realtime.create_conversation_item(item)
await openai_realtime.connect()
logger.info("Connected to OpenAI realtime")
return True
except Exception as e:
await cl.ErrorMessage(content=f"Failed to connect to OpenAI realtime: {e}").send()
return False

 
解释

  • 当音频输入开始时调用,即用户开始说话或录音。

  • 尝试连接到 openai_realtime 客户端。

  • 如果成功,记录连接成功的信息,并返回 True

  • 如果发生异常,发送错误消息给用户,并返回 False

4. @cl.on_audio_chunk

 

@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime:
if openai_realtime.is_connected():
await openai_realtime.append_input_audio(chunk.data)
else:
logger.info("RealtimeClient is not connected")

 
解释

  • 当收到音频数据块时调用。

  • 从用户会话中获取 openai_realtime 客户端实例。

  • 如果客户端存在且已连接:

    • 将音频数据块添加到实时客户端的输入中,可能用于语音识别或处理。

  • 否则:

    • 记录客户端未连接的信息。

5. @cl.on_audio_end@cl.on_chat_end@cl.on_stop

@cl.on_audio_end
@cl.on_chat_end
@cl.on_stop
async def on_end():
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.disconnect()

 
解释

  • 当音频输入结束、聊天结束或会话停止时调用。

  • 从用户会话中获取 openai_realtime 客户端实例。

  • 如果客户端存在且已连接:

    • 调用 disconnect 方法,与实时服务断开连接。

总体流程总结

 

  1. 会话开始

  • 用户进入聊天,会触发 @cl.on_chat_start,发送欢迎消息,并调用 setup_openai_realtime 初始化实时客户端。

  • 用户发送消息

    • 如果用户发送文本消息,触发 @cl.on_message

    • 检查实时客户端是否连接,如果已连接,将消息发送给实时客户端处理。

  • 语音交互

    • 用户按下 P 开始语音输入,触发 @cl.on_audio_start

    • 尝试连接实时客户端,准备接收音频数据。

    • 用户的语音被分割成音频数据块,每个数据块触发 @cl.on_audio_chunk,将音频数据发送给实时客户端。

    • 用户结束语音输入,触发 @cl.on_audio_end,断开与实时客户端的连接。

  • 实时处理

    • 实时客户端将音频数据进行处理,可能是语音识别、生成回复等。

    • 处理结果通过事件(如 conversation.updated)回调,调用相应的处理函数 handle_conversation_updated

    • 将生成的音频或文本回复发送回用户。

  • 会话结束

    • 用户退出或会话结束,触发 @cl.on_chat_end 或 @cl.on_stop,确保断开与实时客户端的连接,释放资源。

    结论

     
    这段代码实现了一个基于 Chainlit 框架的聊天机器人,支持文本和语音交互。它使用了 Azure OpenAI 的服务,通过 
    RealtimeClient 与 OpenAI 的实时接口交互,处理用户的输入,并将生成的回复(文本或音频)发送给用户。

    主要的逻辑包括:

    • 初始化和配置实时客户端,设置事件处理函数。

    • 处理用户的文本消息和语音输入,将其发送给实时客户端。

    • 接收并处理实时客户端的回调事件,将回复发送给用户。

    • 管理会话的开始和结束,确保资源的正确分配和释放。



    源码地址:

    https://github.com/monuminu/AOAI_Samples/tree/main/realtime-assistant-support

    大魏分享
    https://github.com/davidsajare/david-share.git
     最新文章