4o-realtime API调用分析

文摘   2024-10-31 23:56   天津  

GPT-4o-Realtime应用场景

Voice-Azure Bing Search

4o-realtime相关的类主要有以下三个:

RealtimeAPI Class、RealtimeConversation Class、RealtimeClient Class,调用逻辑示意图如下:

调用流程示例:

 

示例:用户查询订单状态

 

  1. 客户端发送音频数据

  • 用户通过客户端应用发送语音查询:“我的订单状态是什么?”

  • 客户端将音频数据传递给RealtimeClientappend_input_audio方法。

await realtime_client.append_input_audio(audio_chunk)

 
2. RealtimeClient处理音频数据

  • RealtimeClient调用RealtimeAPIsend方法,将音频数据通过WebSocket协议发送到服务器。

await self.realtime.send("input_audio_buffer.append", {"audio": array_buffer_to_base64(np.array(audio_chunk))})

 
3. 服务器处理请求并返回响应

  • RealtimeAPI通过WebSocket协议接收来自服务器的消息。

  • 服务器处理音频数据,识别用户的查询,并生成响应。

  1. RealtimeAPI分发事件

  • RealtimeAPI接收到服务器的响应消息后,分发事件到RealtimeClient

async for message in self.ws:
event = json.loads(message)
self.dispatch(f"server.{event['type']}", event)
self.dispatch("server.*", event)

 
5. RealtimeClient处理服务器响应

  • RealtimeClient调用RealtimeConversation的方法来处理事件并更新对话状态。

item, delta = self.conversation.process_event(event)
if item:
self.dispatch("conversation.updated", {"item": item, "delta": delta})

 
6. RealtimeConversation管理对话状态

  • RealtimeConversation处理不同类型的事件(如消息创建、转录完成、音频流等),并更新对话状态。

def process_event(self, event, *args):
event_processor = self.EventProcessors.get(event['type'])
if not event_processor:
raise Exception(f"Missing conversation event processor for {event['type']}")
return event_processor(self, event, *args)

 
7. 客户端接收并播放响应

  • RealtimeClient将处理后的响应发送回客户端。

  • 客户端接收响应并播放音频或显示文本。

await cl.context.emitter.send_audio_chunk(cl.OutputAudioChunk(mimeType="pcm16", data=audio, track=cl.user_session.get("track_id")))

 

使用协议说明:

 

  • WebSocket协议

    • WebSocket是一种在单个TCP连接上进行全双工通信的协议。

    • 在本项目中,RealtimeAPI类使用WebSocket协议与OpenAI的实时API进行通信。

    • WebSocket连接的建立和维护由RealtimeAPI类负责,确保低延迟和高效的数据传输。

    • 通过WebSocket协议,RealtimeAPI类能够实时发送和接收音频数据和文本消息,确保对话的流畅性和自然性。


示例:用户通过文本查询订单状态

 

  1. 客户端发送文本数据

  • 用户通过客户端应用输入文本查询:“我的订单状态是什么?”

  • 客户端将文本数据传递给RealtimeClientsend_user_message_content方法。

await realtime_client.send_user_message_content([{"type": "text", "text": "我的订单状态是什么?"}])

 
2. RealtimeClient处理文本数据

  • RealtimeClient调用RealtimeAPIsend方法,将文本数据通过WebSocket协议发送到服务器。

await self.realtime.send("conversation.item.create", {
"item": {
"type": "message",
"role": "user",
"content": [{"type": "text", "text": "我的订单状态是什么?"}]
}
})

 
3. 服务器处理请求并返回响应

  • RealtimeAPI通过WebSocket协议接收来自服务器的消息。

  • 服务器处理文本数据,识别用户的查询,并生成响应。

  1. RealtimeAPI分发事件

  • RealtimeAPI接收到服务器的响应消息后,分发事件到RealtimeClient

async for message in self.ws:
event = json.loads(message)
self.dispatch(f"server.{event['type']}", event)
self.dispatch("server.*", event)

 
5. RealtimeClient处理服务器响应

  • RealtimeClient调用RealtimeConversation的方法来处理事件并更新对话状态。

item, delta = self.conversation.process_event(event)
if item:
self.dispatch("conversation.updated", {"item": item, "delta": delta})

 
6. RealtimeConversation管理对话状态

  • RealtimeConversation处理不同类型的事件(如消息创建、转录完成、音频流等),并更新对话状态。

def process_event(self, event, *args):
event_processor = self.EventProcessors.get(event['type'])
if not event_processor:
raise Exception(f"Missing conversation event processor for {event['type']}")
return event_processor(self, event, *args)

 
7. 客户端接收并显示响应

  • RealtimeClient将处理后的响应发送回客户端。

  • 客户端接收响应并显示文本。

await cl.context.emitter.send_text_message(cl.OutputTextMessage(content=response_text))

 

使用协议说明:

 

  • WebSocket协议

    • WebSocket是一种在单个TCP连接上进行全双工通信的协议。

    • 在本项目中,RealtimeAPI类使用WebSocket协议与OpenAI的实时API进行通信。

    • WebSocket连接的建立和维护由RealtimeAPI类负责,确保低延迟和高效的数据传输。

    • 通过WebSocket协议,RealtimeAPI类能够实时发送和接收文本消息,确保对话的流畅性和自然性。


大魏分享
https://github.com/davidsajare/david-share.git
 最新文章