更多AI技术,关注我的github,欢迎点亮星星:
https://github.com/xinyuwei-david/david-share.git
本demo代码使用Chainlit框架。
Chainlit是一个开源的Python框架,用于构建生产级的对话式人工智能(Conversational AI)应用。它允许开发者快速地创建和部署对话式AI或代理应用,支持多平台和数据持久化。
Chainlit的一些关键特点包括:
快速构建:可以轻松集成到现有代码库,或者从头开始构建
多平台支持:一次编写逻辑,可以在多个平台上使用2
数据持久化:收集、监控和分析用户数据2
可视化多步推理:理解生成输出的中间步骤2
与流行库和框架的集成:支持OpenAI、Mistral AI、Llama Index、LangChain、Autogen、Haystack等。
我们分析代码的主程序:app.py。
代码中,Chainlit 框架封装了所有的接口,使得您需要直接处理底层的网络通信、事件分发或接口实现。
详细说明
1. Chainlit 提供的装饰器封装了事件处理
代码中使用了许多由 Chainlit 提供的装饰器,这些装饰器封装了与前端交互的接口,简化了编程过程。例如:
@cl.on_chat_start
:当聊天会话开始时触发。@cl.on_message
:当收到用户文本消息时触发。@cl.on_audio_start
:当用户开始音频输入时触发。@cl.on_audio_chunk
:当收到音频数据块时触发。@cl.on_audio_end
、@cl.on_chat_end
、@cl.on_stop
:当音频输入结束、聊天结束或会话停止时触发。
这些装饰器使您能够专注于业务逻辑的实现,而不必关心底层的事件监听和处理机制。
2. Chainlit 封装了前后端的通信
在传统的应用程序中,您可能需要自己编写代码来处理前端和后端之间的通信,例如:
监听前端的请求或事件。
解析前端发送的数据。
将处理结果返回给前端。
然而,Chainlit 为您处理了这些细节,它封装了前端与后端之间的通信,使得后端代码可以直接处理事件。
3. Chainlit 提供了上下文和会话管理
通过 cl.user_session
,您可以在用户会话中存储和获取数据。这使您能够:
为每个用户维护独立的会话状态。
存储用户相关的信息,如
track_id
、openai_realtime
实例等。在不同的事件处理函数之间共享数据。
4. Chainlit 集成了与 OpenAI API 的交互
在您的代码中,您使用了 AsyncAzureOpenAI
客户端与 Azure OpenAI 服务进行交互。Chainlit 可以与这些服务无缝集成,简化了与大型语言模型的交互过程。
5. Chainlit 封装了错误处理和日志记录
错误处理:通过提供
@cl.on_error
等装饰器,Chainlit 允许您处理在应用程序中发生的异常和错误。日志记录:通过
chainlit.logger
模块,您可以方便地记录日志信息,帮助调试和监控应用程序。
这样设计的好处
简化开发:您不需要处理复杂的事件循环、网络通信或异步 I/O 操作,只需专注于实现业务逻辑。
提高效率:使用 Chainlit 的封装,可以大大减少样板代码(boilerplate code)的数量,使代码更加简洁明了。
代码可读性:通过装饰器和上下文管理,代码的结构更加清晰,逻辑更加直观。
可扩展性:Chainlit 提供了丰富的功能和插件,方便您在未来扩展应用程序的功能。
总结
Chainlit 封装了应用程序与前端的交互接口,包括事件监听、消息传递、音频数据处理等。
您的代码主要使用 Chainlit 提供的装饰器和上下文管理来处理各种事件和会话数据。
这种封装使得您可以专注于业务逻辑的实现,而不必关心底层的通信和接口细节。
导入模块
import os
import asyncio
from openai import AsyncAzureOpenAI
import chainlit as cl
from uuid import uuid4
from chainlit.logger import logger
from realtime import RealtimeClient
from realtime.tools import tools
解释:
os:用于与操作系统交互,主要用来获取环境变量。
asyncio:Python 标准库,用于编写异步代码,支持异步 I/O 操作。
AsyncAzureOpenAI:从 openai 包中导入,异步的 Azure OpenAI 客户端,用于与 Azure OpenAI 服务进行异步交互。
chainlit:一个用于创建聊天机器人的框架,简化了聊天机器人的开发流程。
uuid4:用于生成随机的 UUID(通用唯一标识符)。
logger:从 chainlit.logger 导入,用于日志记录,方便调试和监控。
RealtimeClient 和 tools:从自定义的 realtime 模块中导入,用于实时通信和处理。
配置 Azure OpenAI 客户端
client = AsyncAzureOpenAI(
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
azure_deployment=os.environ["AZURE_OPENAI_DEPLOYMENT"],
api_version="2024-10-01-preview"
)
解释:
这里创建了一个异步的 Azure OpenAI 客户端
client
,需要从环境变量中获取 API 密钥、终端、部署名称。这些信息通常在 Azure 上注册服务时获得。api_version
指定了使用的 API 版本。
设置 OpenAI 实时客户端
async def setup_openai_realtime(system_prompt: str):
"""Instantiate and configure the OpenAI Realtime Client"""
openai_realtime = RealtimeClient(system_prompt=system_prompt)
cl.user_session.set("track_id", str(uuid4()))
# 以下省略事件处理函数的定义和配置
解释:
定义了一个异步函数
setup_openai_realtime
,用于实例化和配置 OpenAI 实时客户端。system_prompt
:系统提示,用于指导模型的行为(我们不需要详细分析这个部分)。openai_realtime
:创建了一个实时客户端实例,传入了系统提示。cl.user_session.set("track_id", str(uuid4()))
:为用户会话设置一个唯一的track_id
,用于跟踪音频流或其他会话相关的信息。
事件处理函数
在 setup_openai_realtime
函数内,定义了一系列异步的事件处理函数,用于处理不同的事件。这些事件处理函数会在特定的事件发生时被调用。
1. handle_conversation_updated
async def handle_conversation_updated(event):
item = event.get("item")
delta = event.get("delta")
"""Currently used to stream audio back to the client."""
if delta:
# Only one of the following will be populated for any given event
if 'audio' in delta:
audio = delta['audio'] # Int16Array, audio added
await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
if 'transcript' in delta:
transcript = delta['transcript'] # string, transcript added
print(transcript)
if 'arguments' in delta:
arguments = delta['arguments'] # string, function arguments added
pass
解释:
当对话更新时调用此函数。
event
:包含事件的数据。delta
:表示与上一次事件的差异,可能包含audio
、transcript
或arguments
。
处理逻辑:如果
delta
包含audio
:获取音频数据
audio
。使用
cl.context.emitter.send_audio_chunk
将音频数据发送回客户端,用于音频流的实时传输。如果
delta
包含transcript
:获取转录文本并打印出来。这可能是语音识别后的文本。
如果
delta
包含arguments
:获取函数参数。这可能与功能调用或命令有关。
2. handle_item_completed
async def handle_item_completed(item):
"""Used to populate the chat context with transcription once an item is completed."""
# print(item) # TODO
pass
解释:
当某个项目(可能是一次对话或消息)完成时调用。
当前函数内容为空,仅包含一个
pass
,表示未来可能会添加处理逻辑。
3. handle_conversation_interrupt
async def handle_conversation_interrupt(event):
"""Used to cancel the client previous audio playback."""
cl.user_session.set("track_id", str(uuid4()))
await cl.context.emitter.send_audio_interrupt()
解释:
当对话被中断时调用,例如用户停止了音频播放或有新的音频需要播放。
更新用户会话的
track_id
,以标记新的音频流。调用
send_audio_interrupt
通知客户端停止当前的音频播放。
4. handle_error
async def handle_error(event): logger.error(event)
解释:
当发生错误时调用。
使用日志记录器
logger
记录错误信息,方便调试。
注册事件处理函数
openai_realtime.on('conversation.updated', handle_conversation_updated)
openai_realtime.on('conversation.item.completed', handle_item_completed)
openai_realtime.on('conversation.interrupted', handle_conversation_interrupt)
openai_realtime.on('error', handle_error)
解释:
为
openai_realtime
客户端注册事件处理函数,当特定事件发生时,调用相应的处理函数。
添加工具并启动
cl.user_session.set("openai_realtime", openai_realtime)
coros = [openai_realtime.add_tool(tool_def, tool_handler) for tool_def, tool_handler in tools]
await asyncio.gather(*coros)
解释:
将
openai_realtime
客户端实例保存到用户会话中,方便在其他地方访问。使用列表推导式,为每个工具定义和处理器添加到
openai_realtime
中。await asyncio.gather(*coros)
:异步地运行所有添加工具的协程。
Chainlit 事件处理装饰器
接下来,代码使用了 @cl.on_*
的装饰器,定义了各种事件处理函数,用于响应聊天和音频相关的事件。
1. @cl.on_chat_start
@cl.on_chat_start
async def start():
await cl.Message(
content="Hi, Welcome to Xinyu Company. How can I help you?. Press `P` to talk!"
).send()
await setup_openai_realtime(system_prompt=system_prompt + "\n\n Customer ID: 333")
解释:
当聊天会话开始时调用。
发送一条欢迎消息给用户,提示可以按
P
说话。调用
setup_openai_realtime
函数,传入system_prompt
(系统提示,加上了客户 ID)。
2. @cl.on_message
@cl.on_message
async def on_message(message: cl.Message):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.send_user_message_content([{ "type": 'input_text', "text": message.content}])
else:
await cl.Message(content="Please activate voice mode before sending messages!").send()
解释:
当收到用户的文本消息时调用。
从用户会话中获取
openai_realtime
客户端实例。如果实时客户端存在且已连接:
调用
send_user_message_content
将用户的文本消息发送给实时客户端,类型为input_text
。否则:
提示用户需要激活语音模式才能发送消息。
3. @cl.on_audio_start
@cl.on_audio_start
async def on_audio_start():
try:
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
# TODO: might want to recreate items to restore context
# openai_realtime.create_conversation_item(item)
await openai_realtime.connect()
logger.info("Connected to OpenAI realtime")
return True
except Exception as e:
await cl.ErrorMessage(content=f"Failed to connect to OpenAI realtime: {e}").send()
return False
解释:
当音频输入开始时调用,即用户开始说话或录音。
尝试连接到
openai_realtime
客户端。如果成功,记录连接成功的信息,并返回
True
。如果发生异常,发送错误消息给用户,并返回
False
。
4. @cl.on_audio_chunk
@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.InputAudioChunk):
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime:
if openai_realtime.is_connected():
await openai_realtime.append_input_audio(chunk.data)
else:
logger.info("RealtimeClient is not connected")
解释:
当收到音频数据块时调用。
从用户会话中获取
openai_realtime
客户端实例。如果客户端存在且已连接:
将音频数据块添加到实时客户端的输入中,可能用于语音识别或处理。
否则:
记录客户端未连接的信息。
5. @cl.on_audio_end
, @cl.on_chat_end
, @cl.on_stop
@cl.on_audio_end
@cl.on_chat_end
@cl.on_stop
async def on_end():
openai_realtime: RealtimeClient = cl.user_session.get("openai_realtime")
if openai_realtime and openai_realtime.is_connected():
await openai_realtime.disconnect()
解释:
当音频输入结束、聊天结束或会话停止时调用。
从用户会话中获取
openai_realtime
客户端实例。如果客户端存在且已连接:
调用
disconnect
方法,与实时服务断开连接。
总体流程总结
会话开始:
用户进入聊天,会触发
@cl.on_chat_start
,发送欢迎消息,并调用setup_openai_realtime
初始化实时客户端。
用户发送消息:
如果用户发送文本消息,触发
@cl.on_message
。检查实时客户端是否连接,如果已连接,将消息发送给实时客户端处理。
语音交互:
用户按下
P
开始语音输入,触发@cl.on_audio_start
。尝试连接实时客户端,准备接收音频数据。
用户的语音被分割成音频数据块,每个数据块触发
@cl.on_audio_chunk
,将音频数据发送给实时客户端。用户结束语音输入,触发
@cl.on_audio_end
,断开与实时客户端的连接。
实时处理:
实时客户端将音频数据进行处理,可能是语音识别、生成回复等。
处理结果通过事件(如
conversation.updated
)回调,调用相应的处理函数handle_conversation_updated
。将生成的音频或文本回复发送回用户。
会话结束:
用户退出或会话结束,触发
@cl.on_chat_end
或@cl.on_stop
,确保断开与实时客户端的连接,释放资源。
结论
这段代码实现了一个基于 Chainlit 框架的聊天机器人,支持文本和语音交互。它使用了 Azure OpenAI 的服务,通过 RealtimeClient
与 OpenAI 的实时接口交互,处理用户的输入,并将生成的回复(文本或音频)发送给用户。
主要的逻辑包括:
初始化和配置实时客户端,设置事件处理函数。
处理用户的文本消息和语音输入,将其发送给实时客户端。
接收并处理实时客户端的回调事件,将回复发送给用户。
管理会话的开始和结束,确保资源的正确分配和释放。
源码地址:
https://github.com/monuminu/AOAI_Samples/tree/main/realtime-assistant-support