4o-realtime相关的类主要有以下三个:
RealtimeAPI Class、RealtimeConversation Class、RealtimeClient Class,调用逻辑示意图如下:
调用流程示例:
示例:用户查询订单状态
客户端发送音频数据:
用户通过客户端应用发送语音查询:“我的订单状态是什么?”
客户端将音频数据传递给
RealtimeClient
的append_input_audio
方法。
await realtime_client.append_input_audio(audio_chunk)
2. RealtimeClient处理音频数据:
RealtimeClient
调用RealtimeAPI
的send
方法,将音频数据通过WebSocket协议发送到服务器。
await self.realtime.send("input_audio_buffer.append", {"audio": array_buffer_to_base64(np.array(audio_chunk))})
3. 服务器处理请求并返回响应:
RealtimeAPI
通过WebSocket协议接收来自服务器的消息。服务器处理音频数据,识别用户的查询,并生成响应。
RealtimeAPI分发事件:
RealtimeAPI
接收到服务器的响应消息后,分发事件到RealtimeClient
。
async for message in self.ws:
event = json.loads(message)
self.dispatch(f"server.{event['type']}", event)
self.dispatch("server.*", event)
5. RealtimeClient处理服务器响应:
RealtimeClient
调用RealtimeConversation
的方法来处理事件并更新对话状态。
item, delta = self.conversation.process_event(event)
if item:
self.dispatch("conversation.updated", {"item": item, "delta": delta})
6. RealtimeConversation管理对话状态:
RealtimeConversation
处理不同类型的事件(如消息创建、转录完成、音频流等),并更新对话状态。
def process_event(self, event, *args):
event_processor = self.EventProcessors.get(event['type'])
if not event_processor:
raise Exception(f"Missing conversation event processor for {event['type']}")
return event_processor(self, event, *args)
7. 客户端接收并播放响应:
RealtimeClient
将处理后的响应发送回客户端。客户端接收响应并播放音频或显示文本。
await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))
使用协议说明:
WebSocket协议:
WebSocket是一种在单个TCP连接上进行全双工通信的协议。
在本项目中,
RealtimeAPI
类使用WebSocket协议与OpenAI的实时API进行通信。WebSocket连接的建立和维护由
RealtimeAPI
类负责,确保低延迟和高效的数据传输。通过WebSocket协议,
RealtimeAPI
类能够实时发送和接收音频数据和文本消息,确保对话的流畅性和自然性。
示例:用户通过文本查询订单状态
客户端发送文本数据:
用户通过客户端应用输入文本查询:“我的订单状态是什么?”
客户端将文本数据传递给
RealtimeClient
的send_user_message_content
方法。
await realtime_client.send_user_message_content([{"type": "text", "text": "我的订单状态是什么?"}])
2. RealtimeClient处理文本数据:
RealtimeClient
调用RealtimeAPI
的send
方法,将文本数据通过WebSocket协议发送到服务器。
await self.realtime.send("conversation.item.create", {
"item": {
"type": "message",
"role": "user",
"content": [{"type": "text", "text": "我的订单状态是什么?"}]
}
})
3. 服务器处理请求并返回响应:
RealtimeAPI
通过WebSocket协议接收来自服务器的消息。服务器处理文本数据,识别用户的查询,并生成响应。
RealtimeAPI分发事件:
RealtimeAPI
接收到服务器的响应消息后,分发事件到RealtimeClient
。
async for message in self.ws:
event = json.loads(message)
self.dispatch(f"server.{event['type']}", event)
self.dispatch("server.*", event)
5. RealtimeClient处理服务器响应:
RealtimeClient
调用RealtimeConversation
的方法来处理事件并更新对话状态。
item, delta = self.conversation.process_event(event)
if item:
self.dispatch("conversation.updated", {"item": item, "delta": delta})
6. RealtimeConversation管理对话状态:
RealtimeConversation
处理不同类型的事件(如消息创建、转录完成、音频流等),并更新对话状态。
def process_event(self, event, *args):
event_processor = self.EventProcessors.get(event['type'])
if not event_processor:
raise Exception(f"Missing conversation event processor for {event['type']}")
return event_processor(self, event, *args)
7. 客户端接收并显示响应:
RealtimeClient
将处理后的响应发送回客户端。客户端接收响应并显示文本。
await cl.context.emitter.send_text_message(cl.OutputTextMessage(content=response_text))
使用协议说明:
WebSocket协议:
WebSocket是一种在单个TCP连接上进行全双工通信的协议。
在本项目中,
RealtimeAPI
类使用WebSocket协议与OpenAI的实时API进行通信。WebSocket连接的建立和维护由
RealtimeAPI
类负责,确保低延迟和高效的数据传输。通过WebSocket协议,
RealtimeAPI
类能够实时发送和接收文本消息,确保对话的流畅性和自然性。