【动手学Apollo】8. Cyber RT通信机制

科技   2024-11-25 08:01   上海  

从零开始学Apollo(8)——Cyber RT通信机制

通信方式的总体介绍

在我们平时的开发中,遇到的通信场景大致可以分为以下三种场景:

  • 进程内通信:对象指针、函数等
  • 同主机进程间通信:共享内存、管道、消息队列等
  • 跨主机通信:RTPS、GRPC协议等

在Cyber RT中,一共有三种通信方式:

  • INTRA:函数、指针
  • SHM:共享内存
  • RTPS:实时发布订阅的网络通信协议

可以看到图中,有两台机器和三个进程。左边第一个进程中,有两个模块需要通信,可以通过函数指针的方式来实现进程内通信;左边第二个进程要和第一个进程通信,可以通过共享内存的方式来进行同主机的进程间通信;右边第三个进程要和左边第一个进程通信,可以通过RTPS协议来进行跨主机通信。

但是在实际的开发,可能我们并不知道对端的发布者或者订阅者是属于进程内、同主机进程还是不同主机上的进程间通信。而在Apollo中,还额外提供了一种通信方式,就是hybrid。这种通信方式是结合了以上三种通信模式,会动态根据对端信息和自身信息进行对比。如果发布者和订阅者IP不同,则使用RTPS协议进行通信,如果IP相同而进程ID不同,则使用共享内存的方式进行通信;如果进程ID也相同,则使用函数指针的方式来进行通信。

在Cyber RT中,也根据不同的场景和功能,将通信模式分为三种:

Writer/Reader 模式:发布订阅通信模式

Service/Clinet模式: 服务客户通信模式

Parameter Sevice-Client模式:参数服务客户通信模式。基于第二种扩展出的一种通信模式。

接下来将对这几种模式进行分别的讲解。

基于 Writer/Reader 的通信方式

Writer/Reader是基于共享内存的发布订阅通信模式

我们先来了解几个通信概念

Node:是整个数据拓扑网络中的基本单元,可以根据需求创建和管理若干个Writer、Reader、Service、Client。

Writer:是发布订阅者模式中的发布者。

Reader:是发布订阅者模式中的订阅者。

Channel:通信中的topic,通过相同channel连接发布者和订阅者。

Message:会定义通信的数据类型和数据结构(在Apollo中使用的是谷歌的protobuf)。

这种通信模式,一般会使用在以下两种通信场景:

单向通信,订阅者收到消息后不需要给发布者进行响应。 这种情况下,订阅者只需要定义好要监听哪个channel通道,以及接收到数据后要执行的回调函数。收到数据后也不需要给发布者返回响应数据。

要求高性能和低延迟的场景。这种通信方式是基于共享内存来实现的,不需要走系统调用,传输的效率是比较高的。

接下来,我们简单介绍一下protobuf。protobuf是谷歌公司开发的一种跨语言和跨平台序列化数据结构的方式,是一个灵活的、高效的用于序列化数据的协议。与XML和json格式相比,protobuf更小、更快、更便捷。它有以下4个优点:

1.性能效率高:序列化后字节占用空间比XML少3-10倍,序列化的时间效率比XML快20-100倍。

2.使用便捷:将对结构化数据的操作封装成一个类,提供一些set和get的方法,操作起来更加方便。

3.兼容性高:通信两方使用同一数据协议,当有一方修改了数据结构,不会影响另一方使用。

4.跨语言:支持Java、C++、Pyhton、Go等多种语言。

上面是proto的一个简单样例。proto2是一个proto的版本号,apollo中使用的proto是一个proto2版本的;package是生命这个数据结构所在的包,类似于C++的命名空间;使用message来定义一个消息,其中optional字段是一个可选字段,该字段可以不提供,也可以给该字段提供一个默认值;required字段是在使用时必须提供,否则该消息会被视为未初始化;repeated类似于动态数组,可以存储多个同类型的数据。

介绍完proto,我们介绍一个自动驾驶中使用Writer/Reader通信的通信案例。假如我们需要持续获取主车的实时速度,根据主车的实时速度进行规划处理,而且接收方不需要给发送方进行反馈。对于这种要求实时性高,且需要单向通信的场景,我们可以使用Writer/Reader这种通信模式来实现。

如图所示,两个模块要进行通信会各自创建一个node节点。发布者创建的节点名为talker,订阅者创建的节点名为listener。发布者为了实现诉说的功能,会创建一个Writer;订阅者为了实现聆听的功能,会创建一个Reader。要想两者通信在同一个通道上,可以定义同名的channel。这里的channel名字统一定义为car_speed。通信的消息类型为Car Message。Car Message中包含主车的速度。

定义完通信方、通信通道和通信格式,talker和listener就可以进行通信了。talker会通过Writer,将主车的实时速度存在Car Message中,然后经过car_speed这个通道;listener会从car_speed这个通道拿到car_message,从中获取到主车的实时速度,进行下一步的处理。

这种通信模式还适用于持续性通信的场景,如雷达信号,摄像头图像信息这类数据的传输。

基于 Service/Client 的通信方式

Service/Client是基于RTPS协议封装的一种C/S通信模式。

我们先来了解几个通信概念:

Service:是C/S模式下的服务端

Client:是C/S模式下的客户端

RTPS:是一种实时发布订阅协议。这种通信的优点主要在于实时性高,以及支持丰富的通信质量服务配置。包括消息的历史保留策略、可靠性、优先级等。

这种通信模式通常使用在以下两种场景:

  1. 通信双方不在一个机器上。这时必须通过网络来实现。
  2. 双向通信。客户端在发布完数据请求后,会等待服务端的响应数据,服务端在接收到数据后对数据处理,并将响应数据发送给客户端。

接下来,我们举一个自动驾驶中的通信案例:

假如我们需要从云端获取主车的车牌信息,车牌信息一般都不会变,我们只需要在更新的时候去获取就行。像这种通信双方不在一个机器上,且需要双向通信的场景,就可以使用Service/Client通信模式。

如图所示,两个模块要进行通信会各自创建一个node节点。客户端创建的节点名为Client,服务端创建的节点名为Service。客户端为了发送请求信息,并能接收响应数据,会创建一个Client,而服务端为了能够接收数据,并将处理后数据返回客户端,会创建一个Service。通信的消息为Car message, 其中包含主车的车牌信息。起初Client会sendRequest()请求获取车牌信息,请求会到RTPS中server-client的topic中,Service从这个topic获取请求进行处理,并sendResponse()将响应数据发送到这个topic中;客户端从这个topic拿到响应数据,获取到主车的车牌信息。

基于参数服务的通信方式

参数服务的通信方式,是在Service/Client通信模式拓展出来的,以用来实现全局配置参数的共享。

我们先了解几个通信概念:

ParameterService:是参数服务器,用来存储全局的配置参数。

parameterClient:是全局客户端,用来获取和设置全局配置参数

这种通信方式的使用场景:多个进程模块需要共享配置参数,某个参数能够影响多个进程运行。

接下来,我们举一个自动驾驶的通信案例:

假如,有模块会根据当前主车的最多人数,或者最高速度来进行不同逻辑处理。这个参数可能会影响多个模块,或者被多个模块进行修改,则可以使用参数服务器这种通信模式来实现。

如图所示,为了存储参数,我们可以创建一个ParameterServerNode,在这个节点上创建一个全局参数服务器ParameterServer,对于要获取或者需要修改这个配置参数的模块,可以根据参数服务器名称来创建ParameterClient,绑定要获取和修改的参数服务器。这里,因为它们是基于Service/Client通信模式实现的,所以也会通过RTPS协议来进行通信。

ParameterClient提供了3个接口:分别是获取指定名称的参数的值 、获取参数服务器上的所有参数、修改某一个参数的值。

实验案例

实验内容:

构建一个简单的通信流程,发送方通过Cyber的Node创建Writer,接收方通过Cyebr的Node创建Reader,发送方实时发送车辆速度,接收方将接收到的车辆速度进行打印。

实验目的:

通过Cyber RT的发布-订阅机制,实现一个节点发布消息,另一个节点订阅并接收该消息,了解Cyber的基础通信知识。

生成component模版及必要文件

在终端输入以下命令,生成模版

buildtool create --template component communication

生成完模版后,我们在communication目录下创建两个文件,分别是talker.cc和listener.cc

touch /apollo_workspace/communication/listener.cc
touch /apollo_workspace/communication/talker.cc

编写需要发送的数据结构

首先在 /communication/proto/communication.proto 中定义了一个Car的消息类型,这个类型中有车的型号、车牌号、速度等信息。具体代码为

syntax = "proto2";
package apollo.communication.proto;

//定义一个车的消息,车的型号,车主,车的车牌号,已跑公里数,车速
message Car{
optional string plate = 1;
optional string type = 2;
optional string owner = 3;
optional uint64 kilometers = 4;
optional uint64 speed = 5;
};

/communication/proto/BUILD在编译成功后,会根据.proto文件生成.cc以及.h的源文件和头文件(communication.pb.h,在源文件编写中会引用到),并生成.so的动态库。

这些文件存放在opt/apollo/neo/include/communication下。

编写发送方和接收方代码

编写发送方代码

首先先导入我们刚刚定义的消息类型Car,然后初始化cyber框架。

因为cyber的通信是基于节点上的,这里创建一个名为talker_node的节点;然后在该节点上创建一个Writer,消息类型Car以类模版的形式传入,再将channel的名称”car_speed”以参数形式传入。

最后再while循环中,没1s中将车速+5,然后传入到channel中。

#include "communication/proto/communication.pb.h" 
#include "cyber/cyber.h"
#include "cyber/time/rate.h"

//car数据定义的引用,可以看出其定义来源于一个proto
using apollo::communication::proto::Car;

int main(int argc, char *argv[]) {
  // 初始化一个cyber框架
  apollo::cyber::Init(argv[0]);
  // 创建talker节点
  auto talker_node = apollo::cyber::CreateNode("talker"); 
  // 从节点创建一个Topic,来实现对车速的查看
  auto talker = talker_node->CreateWriter<Car>("car_speed");
  AINFO << "I'll start telling you the current speed of the car.";
    
  //设置初始速度为0,然后速度每秒增加5km/h
  uint64_t speed = 0;
  while (apollo::cyber::OK()) {
      auto msg = std::make_shared<Car>();
      msg->set_speed(speed);
      //假设车速持续增加
      speed += 5;
      talker->Write(msg);
      sleep(1);
  }
  return 0;
}

编写接收方代码

首先同样先导入要接收的消息类型Car,初始化cyber框架,并定义一个listener节点。

在节点上创建一个Reader,创建Reader要给出接收的消息类型,订阅的channel通道以及接收到数据后要执行的回调函数。这里创建的Reader会接受Car的消息类型,订阅car_speed这个channel通道,回调函数就是会打印接收到的速度信息。

#include "communication/proto/communication.pb.h"
#include "cyber/cyber.h"

using apollo::communication::proto::Car;

//接收到消息后的响应函数
void message_callback(
        const std::shared_ptr<Car>& msg) {
    AINFO << "now speed is: " << msg->speed();
}

int main(int argc, char* argv[]) {
    //初始化cyber框架
    apollo::cyber::Init(argv[0]);
    //创建监听节点
    auto listener_node = apollo::cyber::CreateNode("listener");
    //创建监听响应进行消息读取
    auto listener = listener_node->CreateReader<Car>(
            "car_speed", message_callback);
    apollo::cyber::WaitForShutdown();
    return 0;
}

编译

打开communication下的BUILD文件,写入代码为

load("//tools:apollo_package.bzl""apollo_cc_library""apollo_cc_binary""apollo_package""apollo_component")
load("//tools:cpplint.bzl""cpplint")

package(default_visibility = ["//visibility:public"])

apollo_cc_binary(
    name = "talker",
    srcs = ["talker.cc"],
    deps = [
        "//cyber",
        "//communication/proto:communication_proto",
    ],
    linkstatic = True,
)
apollo_cc_binary(
    name = "listener",
    srcs = ["listener.cc"],
    deps = [
        "//cyber",
        "//communication/proto:communication_proto",
    ],
    linkstatic = True,
)

apollo_package()

cpplint()

回到./apollo_workspace的目录下编译

buildtool build -p communication

编译成功后终端中会提示如下信息

执行查看运行结果

为了方便查看结果,在vscode中同时打开两个终端。同时,为了让cyber打印信息至终端,输入代码设置环境变量,并分别进入bin文件目录下执行可执行文件

export GLOG_alsologtostderr=1
// 编译产生的可执行文件在 /opt/apollo/neo/bin/
cd /opt/apollo/neo/bin/
// 执行listener
./listener

在另外一个终端中输入

export GLOG_alsologtostderr=1
// 编译产生的可执行文件在 /opt/apollo/neo/bin/
cd /opt/apollo/neo/bin/
// 执行talker
./talker

最后talker会一直给listener发送实时速度信息,而listener收到后会将速度打印到终端中。

总结

在这一部分中,我们学习了Cyber RT的通信框架,包括进程内函数指针、进程间的共享内存、主机间的RTPS网络通信。由于不能明确对端具体是什么,Apollo提供了hybrid通信模式能够自动识别这三种不同的通信。

根据功能场景分,可以分为Writer/Reader(用于进程间、接收端无需响应的情况,能高效传输数据),Service/Client(用于主机间、或进程间接收端需要响应的情况)以及 参数服务(用于多个进程共享并配置全局参数的情况)。

最后,在Writer/Reader的模式下,进行了实验,模拟在发送节点获取车辆速度通过Writer发送到channel中,再由接收节点从channel中获取数据的过程。重点在于在proto文件中编写我们需要的数据结构,并在其他代码中引用头文件并声明数据结构类型


自动驾驶小白说
输出专业自动驾驶算法教程的开发者社区. 🦈 官网: https://www.helloxiaobai.cn
 最新文章