Postgres通过称为MemoryContext的嵌套内存区域来管理内存。MemoryContext很方便,因为在大多数情况下,你不需要担心显式释放内存,因为在上下文中分配的内存将在上下文被释放时被释放。所以接下来你需要考虑的两件事是:1)某个特定对象应该在哪个MemoryContext中分配(即某个特定对象应该存活多长时间),2)我何时应该创建一个新的MemoryContext?
为了帮助了解我们如何使用MemoryContext,我们将做一些有点奇怪的事情。我们将在Postgres扩展内从头构建一个小型HTTP服务器和Web框架。这对于生产扩展来说(以及本文中做出的许多其他选择)都不是一个好主意。但它可能提供一个足够有趣的场景来探索MemoryContext的使用。
我们将能够设置服务器并进行一些自定义路由和处理,如下所示:
DROP EXTENSION IF EXISTS pgweb; CREATE EXTENSION pgweb; -- Create a handler for a route DROP FUNCTION IF EXISTS handle_hello_world; CREATE FUNCTION handle_hello_world(params JSON) RETURNS TEXT AS $$ BEGIN RETURN 'Hello, ' || (params->>'name') || E'!\n'; END; $$ LANGUAGE plpgsql; -- Register the handler with the route SELECT pgweb.register_get('/hello', 'handle_hello_world'); -- Start the server SELECT pgweb.serve('127.0.0.1', 9003);
这将是大约500行代码,你可以在 GitHub[1] 上看到完整的项目。
我将假设你在一台Debian机器上,以便我可以给出一些精确的说明,但除了我要求你安装的包名外,这应该在任何Linux发行版或Mac上都能工作。
先决条件:获取并构建Postgres
安装几个包:1)构建Postgres和2)查找内存泄漏:
$ sudo sh -c 'echo deb http://cloudfront.debian.net/debian sid main >> /etc/apt/sources.list' $ sudo apt-get install -y bpfcc-tools libbpfcc libbpfcc-dev linux-headers-$(uname -r) build-essential git bison flex libcurl4-openssl-dev
现在是Postgres:
$ cd ~ $ git clone https://github.com/postgres/postgres $ cd postgres $ git checkout REL_16_STABLE $ ./configure --prefix=$(pwd)/build --libdir=$(pwd)/build/lib --enable-cassert --enable-debug --without-icu --without-readline --without-zlib $ make -j8 $ make install $ export PATH="$PATH:$(pwd)/build/bin"
现在我们可以设置基本的扩展样板。
Postgres扩展样板
为扩展创建一个新目录:
$ cd ~ $ mkdir pgweb $ cd pgweb
以下所有文件都将在pgweb目录中。
在Makefile中写:
MODULES = pgweb EXTENSION = pgweb DATA = pgweb--0.0.1.sql PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) include $(PGXS)
在pgweb.control中写:
default_version = '0.0.1' module_pathname = '$libdir/pgweb' relocatable = true
好了,现在我们需要考虑要公开的SQL接口。
SQL接口
我们需要用户能够注册路由。并且需要他们能够启动服务器。
所以在pgweb--0.0.1.sql中写:
-- pgweb--0.0.1.sql -- Create the extension schema CREATE SCHEMA pgweb; -- Registers a route with a Postgres function handler. -- Example: -- DROP FUNCTION IF EXISTS handle_hello_world -- CREATE FUNCTION handle_hello_world(params JSON) RETURNS TEXT AS $$ -- BEGIN -- RETURN 'Hello, ' || (params->>'name') || E'!\n'; -- END; -- $$ LANGUAGE plpgsql; -- -- SELECT pgweb.register_get('/hello', 'handle_hello_world'); CREATE OR REPLACE FUNCTION pgweb.register_get(TEXT, TEXT) RETURNS VOID AS 'pgweb', 'pgweb_register_get' LANGUAGE C STRICT; GRANT EXECUTE ON FUNCTION pgweb.register_get(TEXT, TEXT) TO PUBLIC; -- Starts the web server at the address and port. -- Example: -- SELECT pgweb.serve('127.0.0.1', 9090); CREATE OR REPLACE FUNCTION pgweb.serve(TEXT, INT) RETURNS VOID AS 'pgweb', 'pgweb_serve' LANGUAGE C STRICT; GRANT EXECUTE ON FUNCTION pgweb.serve(TEXT, INT) TO PUBLIC; -- Example: -- $ curl 127.0.0.1:9090/hello?name=Phil -- Hello, Phil!
这大部分只是注释,用于说明我们将如何使用这个扩展。再次强调,这实际上只是公开了两个C函数(我们很快就会编写)作为可以从SQL调用的函数。
实现
首先,让我们设置一个顶级MemoryContext,称为PGWServerContext,它将保存所有应该存活于服务器生命周期的对象。在pgweb.c中:
#include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <time.h> #include <unistd.h> #include "postgres.h" #include "catalog/pg_type_d.h" #include "fmgr.h" #include "nodes/pg_list.h" #include "parser/parse_func.h" #include "utils/builtins.h" #include "utils/datum.h" #include "utils/json.h" #include "utils/memutils.h" #include "utils/regproc.h" PG_MODULE_MAGIC; typedef struct PGWHandler PGWHandler; typedef enum PGWRequestMethod PGWRequestMethod; typedef struct PGWRequest PGWRequest; static bool pgweb_handle_connection(int client_fd); static void pgweb_handle_request(PGWRequest *request, PGWHandler *handler, char **errmsg); static Datum pgweb_request_params_to_json(PGWRequest *request); static void pgweb_parse_request_url(PGWRequest *r, int buflen, int *bufp, char **errmsg); static PGWRequestMethod pgweb_parse_request_method(PGWRequest *r, int buflen, int *bufp, char **errmsg); static void pgweb_parse_request(PGWRequest *request, char *buf, int buflen, char **errmsg); static void pgweb_send_response(PGWRequest *request, int code, char *status, char *body); static MemoryContext PGWServerContext = NULL;
在MemoryContext中分配
要在当前内存上下文(一个名为CurrentMemoryContext的全局变量)中分配内存,我们不能使用malloc(同样也不能使用free)。我们必须使用Postgres的CurrentMemoryContext感知方法,如 palloc[2] 、 pstrdup[3] 和 psprintf[4] 。这些方法可能在底层使用malloc,但如果我们直接使用malloc,我们将失去MemoryContext基础架构的所有好处。
我们可以使用 MemoryContextSwitchTo[5] 在MemoryContext之间切换。我们可以使用 MemoryContextReset[6] 释放MemoryContext中的所有内存。
_PG_init
现在我们可以编写一个**_PG_init()方法(由Postgres在加载扩展时调用),它将初始化这个内存。我们将让PGWServerContext成为TopMemoryContext**(Postgres中内置的顶级MemoryContext,存在于进程的整个生命周期)内的一个嵌套区域。
void _PG_init(void) { PGWServerContext = AllocSetContextCreate(TopMemoryContext, "PGWServerContext", ALLOCSET_DEFAULT_SIZES); }
接下来让我们处理注册路由。
注册路由
我们需要将路径映射到路由处理程序方法。为了保持简单,我们将使用关联列表。
typedef struct PGWHandler { char *route; char *funcname; } PGWHandler; static List /* PGWHandler * */ *handlers;
我们将处理程序列表存储在 PGWServerContext 中。为此,我们必须在分配之前切换到 PGWServerContext。为了成为一个好的公民,我们在完成分配后将切换回之前的上下文。
PG_FUNCTION_INFO_V1(pgweb_register_get); Datum pgweb_register_get(PG_FUNCTION_ARGS) { MemoryContext oldctx; PGWHandler *handler; oldctx = MemoryContextSwitchTo(PGWServerContext); handler = palloc(sizeof(PGWHandler)); handler->route = TextDatumGetCString(PG_GETARG_DATUM(0)); handler->funcname = TextDatumGetCString(PG_GETARG_DATUM(1)); handlers = lappend(handlers, handler); MemoryContextSwitchTo(oldctx); PG_RETURN_VOID(); }
服务路由
接下来,我们需要在端口上监听并接受新连接。这是 UNIX API 的一个非常基本的部分,所以我不会详细描述,如果您想了解更多,请参考 Beej 的网络编程指南中的 相关章节[7] 。
基本上,我们将一次接受一个 TCP 连接,并阻塞直到处理连接并使用适当的 HTTP 响应进行响应。像本文的许多部分一样,这对于生产 Web 服务器来说并不是一个好主意。但它使代码对于我们想要的示例用法保持简单和可用。
与 pgweb_register_get 方法类似,我们将在逻辑的主要部分之前和之后切换到 PGWServerContext,以便任何分配都发生在此上下文中。
PG_FUNCTION_INFO_V1(pgweb_serve); Datum pgweb_serve(PG_FUNCTION_ARGS) { char *address; int32 port = PG_GETARG_INT32(1); int server_fd; struct sockaddr_in server_addr; MemoryContextSwitchTo(PGWServerContext); address = TextDatumGetCString(PG_GETARG_DATUM(0)); memset(&server_addr, 0, sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = inet_addr(address); server_addr.sin_port = htons(port); server_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_fd == -1) { int e = errno; elog(ERROR, "Could not create socket: %s.", strerror(e)); } if (bind(server_fd, (struct sockaddr *) &server_addr, sizeof(server_addr)) == -1) { int e = errno; elog(ERROR, "Could not bind to %s:%d: %s.", address, port, strerror(e)); } if (listen(server_fd, 10 /* Listen backlog. */) == -1) { int e = errno; elog(ERROR, "Could not listen to %s:%d: %s.", address, port, strerror(e)); } elog(INFO, "Listening on %s:%d.", address, port); while (1) { struct sockaddr_in peer_addr; socklen_t peer_addr_size; int client_fd = accept(server_fd, (struct sockaddr *) &peer_addr, &peer_addr_size); bool stayalive; if (client_fd == -1) { int e = errno; elog(ERROR, "Could not accept connection: %s.", strerror(e)); } stayalive = pgweb_handle_connection(client_fd); Assert(CurrentMemoryContext == PGWServerContext); close(client_fd); if (!stayalive) { elog(INFO, "Shutting down."); break; } } close(server_fd); MemoryContextReset(PGWServerContext); PG_RETURN_VOID(); }
另请注意 pgweb_handle_connection 调用后的 Assert(CurrentMemoryContext == PGWServerContext);(我们将接下来编写)。这个断言是为了帮助我们在开发过程中确保 pgweb_handle_connection 总是切换回调用它的内存上下文(如果它曾经切换away)。
处理连接
在连接内部,我们将分配一个新的 MemoryContext 来处理仅在请求持续期间需要的所有分配。
static MemoryContext PGWRequestContext = NULL;
现在让我们为 HTTP 连接本身勾勒出类型。我们将仅尝试解析请求方法(GET 或 POST)和请求 URL。尽管我们将把 URL 分解为其路径和参数组件。
typedef enum PGWRequestMethod { PGW_REQUEST_METHOD_GET, PGW_REQUEST_METHOD_POST, } PGWRequestMethod ; typedef struct PGWRequestParam { char *key; char *value; } PGWRequestParam; typedef struct PGWRequest { int conn_fd; /* Where to read/write */ char *buf; /* Bytes we have already read */ PGWRequestMethod method; char *url; /* The entire requested URL. */ char *path; /* Only the path portion of the URL, excluding URL parameters. */ List /* PGWRequestParam * */ *params; /* All keyword parameters in the URL. */ } PGWRequest;
如果我们要处理多个并发请求,我们会希望为每个请求分配一个新的 MemoryContext。并且我们会将每个请求的 MemoryContext 存储在这个 PGWRequest 结构体本身上。我们将摆脱全局 PGWRequestContext。例如:
typedef struct PGWRequest { MemoryContext context; /* Where all request memory is allocated */ int conn_fd; /* Where to read/write */ char *buf; /* Bytes we have already read */ PGWRequestMethod method; char *url; /* The entire requested URL. */ char *path; /* Only the path portion of the URL, excluding URL parameters. */ List /* PGWRequestParam * */ *params; /* All keyword parameters in the URL. */ } PGWRequest;
但由于我们的代码一次只处理单个连接和请求,我们可以简化并使用上面的全局 PGWRequestContext。
处理连接
要处理连接,我们需要从网络读取字节,将字节解析为请求,找到路径的适当处理程序,调用处理程序,否则如果没有路径的处理程序则返回 404。
所有这些代码将在 PGWRequestContext 中分配。因此我们需要确保它已初始化,并且我们需要为方法体切换到此上下文。当方法结束时,我们将"重置" MemoryContext 以一次性释放在上下文中分配的所有内存。
static bool pgweb_handle_connection(int client_fd) { char *buf; ssize_t n; char *errmsg = NULL; ListCell *lc; bool handler_found = false; int errcode = 500; MemoryContext oldctx; clock_t start = clock(); clock_t stop; bool stayalive = true; PGWRequest request; memset(&request, 0, sizeof(request)); if (PGWRequestContext == NULL) PGWRequestContext = AllocSetContextCreate(PGWServerContext, "PGWRequestContext", ALLOCSET_DEFAULT_SIZES); oldctx = MemoryContextSwitchTo(PGWRequestContext); buf = palloc(4096); n = recv(client_fd, buf, 4096, 0); // Let's just not support longer requests. if (n == 4096) { errmsg = "Request is too long."; goto done; } pgweb_parse_request(&request, buf, n, &errmsg); if (errmsg != NULL) goto done; request.conn_fd = client_fd; /* Special case for */ if (strcmp(request.url, "/_exit") == 0) { stayalive = false; goto done; } foreach (lc, handlers) { PGWHandler *handler = lfirst(lc); if (strcmp(handler->route, request.path) == 0) { pgweb_handle_request(&request, handler, &errmsg); handler_found = true; break; } } if (!handler_found) { errcode = 404; errmsg = "Not found"; } done: if (errmsg) pgweb_send_response(&request, errcode, errcode == 404 ? "Not Found" : "Internal Server Error", errmsg); stop = clock(); elog(INFO, "[%fs] %s %s", (double)(stop - start) / CLOCKS_PER_SEC, request.method == PGW_REQUEST_METHOD_GET ? "GET" : "POST", request.url); Assert(CurrentMemoryContext == PGWRequestContext); MemoryContextReset(PGWRequestContext); MemoryContextSwitchTo(oldctx); return stayalive; }
由于我们在单线程/单进程上无限循环接受一次连接,我们黑客式地加入这个特殊的 /_exit 路径以允许我们干净地关闭服务器。
从这里,我们需要实现三个辅助方法:
- pgweb_parse_request
- pgweb_handle_request
- pgweb_send_response
发送响应很简单,所以让我们先做这个。
发送 HTTP 响应
根据 RFC9112[8] 的 HTTP 规定,我们必须写出一个 状态行[9] 来发送 HTTP 响应。我们还将在发送 HTTP 响应体之前发送 Content-Length 和 Content-Type 标头。
static void pgweb_send_response(PGWRequest *request, int code, char *status, char *body) { char *buf = psprintf("HTTP/1.1 %d %s\r\n" "Content-Length: %lu\r\n" "Content-Type: text/plain\r\n" "\r\n" "%s", code, status, strlen(body), body); ssize_t n = send(request->conn_fd, buf, strlen(buf), 0); Assert(CurrentMemoryContext == PGWRequestContext); if (n != strlen(buf)) { int e = errno; elog(ERROR, "Failed to send response to client: %s.", strerror(e)); } }
我们可以在此方法末尾 pfree(buf)。我们还可以随时创建动态和短暂的 MemoryContext。例如:
static void pgweb_send_response(PGWRequest *request, int code, char *status, char *body) { char *buf; ssize_t n; MemoryContext ctx = AllocSetContextCreate(CurrentMemoryContext, "SendResponseContext", ALLOCSET_DEFAULT_SIZES); Assert(CurrentMemoryContext == PGWRequestContext); MemoryContextSwitchTo(ctx); buf = psprintf("HTTP/1.1 %d %s\r\n" "Content-Length: %lu\r\n" "Content-Type: text/plain\r\n" "\r\n" "%s", code, status, strlen(body), body); MemoryContextReset(ctx); MemoryContextSwitchTo(PGWRequestContext); n = send(request->conn_fd, buf, strlen(buf), 0); if (n != strlen(buf)) { int e = errno; elog(ERROR, "Failed to send response to client: %s.", strerror(e)); } }
如果我们一次进行大量分配,这可能很有用,这些分配可以很快被释放,并且我们不接近 CurrentMemoryContext 将被重置的点。然而,发送响应发生在 PGWRequestContext 生命周期的末尾,因此积极且显式地释放我们分配用于发送响应的内容没有太多价值。
解析 HTTP 请求
接下来让我们处理解析请求。在 request-line[10] 中,我们基本上只关注 HTTP 方法和请求的 URL。我们将完全忽略可能存在或不存在的任何标头和请求正文。
static void pgweb_parse_request(PGWRequest *request, char *buf, int buflen, char **errmsg) { int bufp = 0; Assert(CurrentMemoryContext == PGWRequestContext); request->buf = buf; request->method = pgweb_parse_request_method(request, buflen, &bufp, errmsg); if (request->method == -1) { /* pgweb_parse_request_method should handle setting the errmsg in this case. */ Assert(errmsg != NULL); return; } Assert(request->buf[bufp] == ' '); bufp++; pgweb_parse_request_url(request, buflen, &bufp, errmsg); }
如果 HTTP 方法不是 GET 或 PUT,我们将报错。
static PGWRequestMethod pgweb_parse_request_method(PGWRequest *r, int buflen, int *bufp, char **errmsg) { int bufp_original = *bufp; int len; Assert(CurrentMemoryContext == PGWRequestContext); while (*bufp < buflen && r->buf[*bufp] != ' ') (*bufp)++; if (*bufp == buflen) { *errmsg = psprintf("Incomplete request: '%s'", pnstrdup(r->buf, buflen)); return -1; } len = *bufp - bufp_original; if (len == 3 && strncmp(r->buf + bufp_original, "GET", len) == 0) return PGW_REQUEST_METHOD_GET; if (len == 4 && strncmp(r->buf + bufp_original, "POST", len) == 0) return PGW_REQUEST_METHOD_POST; *errmsg = psprintf("Unsupported method: '%s'", pnstrdup(r->buf + bufp_original, len)); return -1; }
最后我们将解析请求参数。
static void
pgweb_parse_request_url(PGWRequest *r, int buflen, int *bufp, char **errmsg)
{
int bufp_original = *bufp;
int bufp_tmp = *bufp;
int len = 0;
char *key = NULL;
char *value = NULL;
PGWRequestParam *param = NULL;
bool path_found = false;
Assert(CurrentMemoryContext == PGWRequestContext);
r->params = NIL;
while (*bufp < buflen && r->buf[*bufp] != ' ')
{
len = *bufp - bufp_tmp;
if (r->buf[*bufp] == '?')
{
r->path = pnstrdup(r->buf + bufp_tmp, len);
path_found = true;
(*bufp)++;
bufp_tmp = *bufp;
continue;
}
if (r->buf[*bufp] == '=')
{
key = pnstrdup(r->buf + bufp_tmp, len);
(*bufp)++;
bufp_tmp = *bufp;
continue;
}
if (r->buf[*bufp] == '&')
{
value = pnstrdup(r->buf + bufp_tmp, len);
(*bufp)++;
bufp_tmp = *bufp;
param = palloc0(sizeof(PGWRequestParam));
param->key = key;
param->value = value;
r->params = lappend(r->params, param);
continue;
}
(*bufp)++;
}
len = *bufp - bufp_original;
if (!path_found)
r->path = pnstrdup(r->buf + bufp_original, len);
else if (key != NULL && strlen(key) > 0)
{
param = palloc0(sizeof(PGWRequestParam));
param->key = key;
param->value = pnstrdup(r->buf + bufp_tmp, *bufp - bufp_tmp);
*bufp += len;
r->params = lappend(r->params, param);
}
r->url = pnstrdup(r->buf + bufp_original, len);
}
重要的是,我们确保在 PGWRequestContext 中分配请求参数,以便在请求结束时释放。
最后,我们需要实现路由处理!
调用路由处理程序
为了增添趣味,并展示对 MemoryContext 分配的控制,我们不仅仅是查找已注册路由的处理程序并调用它。我们还将缓存处理程序返回的响应。如果我们再次看到相同的 URL,如果它在我们的缓存中,我们将立即返回缓存的结果,并且永远不再调用处理程序。缓存将在 PGWServerContext 中分配,因为它必须在整个进程期间存在,而不仅仅是单个请求的持续时间。
typedef struct PGWResponseCache { char *url; char *response; } PGWResponseCache; static List /* PGWResponseCache * */ *response_cache; static void pgweb_handle_request(PGWRequest *request, PGWHandler *handler, char **errmsg) { ListCell *lc; char *msg = NULL; PGWResponseCache *cached = NULL; /* If there's a cached response, use it. */ foreach (lc, response_cache) { cached = lfirst(lc); if (strcmp(cached->url, request->url) == 0) { msg = cached->response; elog(INFO, "Cached request."); break; } } /* No cached response, run the route handler! */ if (!msg) { List *func_name_list = stringToQualifiedNameList(handler->funcname, NULL); Oid argtypes[] = {JSONOID}; Oid func_oid = LookupFuncName(func_name_list, sizeof(argtypes) / sizeof(Oid), argtypes, false); FmgrInfo func_info; Datum params = pgweb_request_params_to_json(request); Datum result; fmgr_info(func_oid, &func_info); result = FunctionCall1(&func_info, params); msg = TextDatumGetCString(result); /* Cache this response for the future. */ Assert(CurrentMemoryContext == PGWRequestContext); MemoryContextSwitchTo(PGWServerContext); cached = palloc0(sizeof(*cached)); cached->url = pstrdup(request->url); cached->response = pstrdup(msg); response_cache = lappend(response_cache, cached); MemoryContextSwitchTo(PGWRequestContext); } pgweb_send_response(request, 200, "OK", msg); }
现在我们只需要实现最后一个辅助函数 pgweb_request_params_to_json,它将把我们的请求参数关联列表转换为 Postgres JSON Datum,以便 SQL 处理程序可以方便地访问参数映射。
我们将通过构建请求参数映射的 JSON 字符串,然后将该字符串转换为 Postgres 的 JSON 类型来完成此操作。
static Datum pgweb_request_params_to_json(PGWRequest *request) { ListCell *lc; StringInfoData json_string; Assert(CurrentMemoryContext == PGWRequestContext); initStringInfo(&json_string); appendStringInfoString(&json_string, "{"); foreach (lc, request->params) { PGWRequestParam *param = lfirst(lc); if (json_string.len > 1) appendStringInfoString(&json_string, ", "); /* We're just going to assume there's no quotes in key or value. */ appendStringInfo(&json_string, "\"%s\": \"%s\"", param->key, param->value); } appendStringInfoString(&json_string, "}"); return DirectFunctionCall1(json_in, CStringGetDatum(json_string.data)); }
这是另一个中间分配的地方,其中一些分配可以在函数结束时释放。例如,我们可以在返回之前 pfree(json_string.data)。或者我们可以再次创建一个新的临时 MemoryContext 并在返回之前重置它。类似于我们在 pgweb_send_response 中讨论的可能性。
然而,在当前的实现中,这些分配将保留到请求结束。
就是这样!让我们试试看。
整合一切
我们已经安装了 Postgres 和 pg_config,所有其他 Postgres 二进制文件都应该在我们的 $PATH 中。因此在这个扩展目录中运行:
$ make $ make install
现在让我们启动一个 Postgres 数据库来尝试这个扩展:
$ initdb testdata The files belonging to this database system will be owned by user "phil". This user must also own the server process. The database cluster will be initialized with locale "en_US.UTF-8". The default database encoding has accordingly been set to "UTF8". The default text search configuration will be set to "english". Data page checksums are disabled. Transparent data encryption is disabled. creating directory testdata ... ok creating subdirectories ... ok selecting dynamic shared memory implementation ... posix selecting default max_connections ... 100 selecting default shared_buffers ... 128MB selecting default time zone ... America/New_York creating configuration files ... ok running bootstrap script ... ok performing post-bootstrap initialization ... ok syncing data to disk ... ok initdb: warning: enabling "trust" authentication for local connections initdb: hint: You can change this by editing pg_hba.conf or using the option -A, or --auth-local and --auth-host, the next time you run initdb. Success. You can now start the database server using: pg_ctl -D testdata -l logfile start $ pg_ctl -D testdata -l logfile start waiting for server to start.... done server started
现在记住我们的测试 SQL 脚本:
$ cat test.sql DROP EXTENSION IF EXISTS pgweb; CREATE EXTENSION pgweb; DROP FUNCTION IF EXISTS handle_hello_world; CREATE FUNCTION handle_hello_world(params JSON) RETURNS TEXT AS $$ BEGIN RETURN 'Hello, ' || (params->>'name') || E'!\n'; END; $$ LANGUAGE plpgsql; -- For debugging memory usage. SELECT pg_backend_pid(); SELECT pgweb.register_get('/hello', 'handle_hello_world'); SELECT pgweb.serve('127.0.0.1', 9003);
让我们运行它:
$ psql postgres -f test.sql psql:test.sql:1: NOTICE: extension "pgweb" does not exist, skipping DROP EXTENSION CREATE EXTENSION psql:test.sql:4: NOTICE: function handle_hello_world() does not exist, skipping DROP FUNCTION CREATE FUNCTION pg_backend_pid ---------------- 52001 (1 row) register_get -------------- (1 row) psql:test.sql:16: INFO: Listening on 127.0.0.1:9003.
它现在正在阻塞等待请求。所以在一个新的终端中 curl 服务器!
$ curl '127.0.0.1:9003/hello?name=Tim' Hello, Tim!
我们如何处理未注册的路径?
$ curl 127.0.0.1:9003 Not found
很好。要关闭吗?
$ curl '127.0.0.1:9003/_exit' curl: (52) Empty reply from server
太棒了!上面的 psql postgres -f test.sql 命令现在应该已经正常退出。我们可以随时再次运行该 SQL 脚本来启动服务器。
查找内存泄漏
起初我说过我会假设你使用 Debian 以使安装说明更简单。但现在我将假设你使用的是 Linux(即不是 macOS),以便我们可以运行来自 bcc 工具仓库[11] 的 memleak[10] 程序。
让我们关闭每次重置内存上下文:
$ git diff diff --git a/pgweb.c b/pgweb.c index b7eaa98..2c760f3 100644 --- a/pgweb.c +++ b/pgweb.c @@ -373,7 +373,7 @@ done: request.url); Assert(CurrentMemoryContext == PGWRequestContext); - MemoryContextReset(PGWRequestContext); + //MemoryContextReset(PGWRequestContext); MemoryContextSwitchTo(oldctx); return stayalive; @@ -440,6 +440,6 @@ pgweb_serve(PG_FUNCTION_ARGS) } close(server_fd); - MemoryContextReset(PGWServerContext); + //MemoryContextReset(PGWServerContext); PG_RETURN_VOID(); }
现在重新构建并重新安装扩展。
$ make $ make install
销毁并重建 Postgres 数据库:
$ pg_ctl -D testdata -l logfile stop $ rm -rf logfile initdb $ initdb testdata The files belonging to this database system will be owned by user "server". This user must also own the server process. The database cluster will be initialized with locale "en_US.UTF-8". The default database encoding has accordingly been set to "UTF8". The default text search configuration will be set to "english". Data page checksums are disabled. creating directory testdata ... ok creating subdirectories ... ok selecting dynamic shared memory implementation ... posix selecting default max_connections ... 100 selecting default shared_buffers ... 128MB selecting default time zone ... US/Eastern creating configuration files ... ok running bootstrap script ... ok performing post-bootstrap initialization ... ok syncing data to disk ... ok initdb: warning: enabling "trust" authentication for local connections initdb: hint: You can change this by editing pg_hba.conf or using the option -A, or --auth-local and --auth-host, the next time you run initdb. Success. You can now start the database server using: pg_ctl -D testdata -l logfile start $ pg_ctl -D testdata -l logfile start waiting for server to start.... done server started
并运行 SQL 测试脚本:
$ psql postgres -f test.sql psql:test.sql:1: NOTICE: extension "pgweb" does not exist, skipping DROP EXTENSION CREATE EXTENSION psql:test.sql:4: NOTICE: function handle_hello_world() does not exist, skipping DROP FUNCTION CREATE FUNCTION pg_backend_pid ---------------- 66893 (1 row) register_get -------------- (1 row) psql:test.sql:16: INFO: Listening on 127.0.0.1:9003.
注意 pg_backend_pid 是 66893。由于服务器运行在执行 pgweb.serve() 函数的同一进程中,因此在此函数中发生的内存泄漏将显示在此进程中。
在一个新的终端中,针对此进程启动 memleak 程序。
$ sudo memleak-bpfcc -p 66893 Attaching to pid 66893, Ctrl+C to quit.
在另一个终端中,让我们对服务器发出大量 curl 请求(10,000 个)以测试路径和分配。
$ cat test.sh #!/bin/bash for ((i=0;i<10000;i++)); do curl "127.0.0.1:9003/hello?name=Test$i" done $ bash test.sh Hello, Test0! … omitted … Hello, Test9999!
并像这样检查 memleak 报告的泄漏:
66895872 bytes in 12 allocations from stack 0x0000ffffbbf0c4cc [unknown] [libc.so.6] 0x0000ffffbbf0d254 [unknown] [libc.so.6] 0x0000ffffbbf0e50c [unknown] [libc.so.6] 0x0000ffffbbf0f0cc malloc+0x21c [libc.so.6] 0x0000fffffffff000 [unknown] [[uprobes]] 0x0000aaaac416a708 palloc+0x68 [postgres] 0x0000ffffbbcf1598 pgweb_serve+0x148 [pgweb.so] 0x0000aaaac3e250d4 ExecInterpExpr+0x1620 [postgres] 0x0000aaaac3e5e204 ExecResult+0x114 [postgres] 0x0000aaaac3e28f8c standard_ExecutorRun+0x14c [postgres] 0x0000aaaac3ff5b34 PortalRunSelect+0x194 [postgres] 0x0000aaaac3ff7448 PortalRun+0x268 [postgres] 0x0000aaaac3ff1d3c exec_simple_query+0x37c [postgres] 0x0000aaaac3ff2928 PostgresMain+0x848 [postgres] 0x0000aaaac3f526d0 ServerLoop.isra.0+0x1c2c [postgres] 0x0000aaaac3f53824 PostmasterMain+0xec4 [postgres] 0x0000aaaac3c25f44 main+0x214 [postgres] 0x0000ffffbbea7740 [unknown] [libc.so.6] 0x0000ffffbbea7818 __libc_start_main+0x98 [libc.so.6] 0x0000aaaac3c26270 _start+0x30 [postgres]
关于这个工作负载泄露了 65328Kib 的数据。这仅仅是它发现的一个泄露。
(我不确定为什么它无法告诉我 palloc 的行号。但至少我们看到它来自 pgweb_serve 调用堆栈。)
现在通过运行以下命令优雅地退出服务器:
$ curl localhost:9003/_exit curl: (52) Empty reply from server
让我们撤销上面注释掉重置内存上下文的那个差异。即撤销这个:
$ git diff pgweb.c diff --git a/pgweb.c b/pgweb.c index 210656d..94b0888 100644 --- a/pgweb.c +++ b/pgweb.c @@ -375,7 +375,7 @@ done: request.url); Assert(CurrentMemoryContext == PGWRequestContext); - MemoryContextReset(PGWRequestContext); + //MemoryContextReset(PGWRequestContext); MemoryContextSwitchTo(oldctx); return stayalive; @@ -442,6 +442,6 @@ pgweb_serve(PG_FUNCTION_ARGS) } close(server_fd); - MemoryContextReset(PGWServerContext); + //MemoryContextReset(PGWServerContext); PG_RETURN_VOID(); }
撤销该差异后,我们将始终在完成时重置内存上下文。
重建并重新安装扩展。
$ make $ make install
重新运行测试 SQL 脚本以启动服务器:
$ psql postgres -f test.sql DROP EXTENSION CREATE EXTENSION DROP FUNCTION CREATE FUNCTION pg_backend_pid ---------------- 87001 (1 row) register_get -------------- (1 row) psql:test.sql:16: INFO: Listening on 127.0.0.1:9003.
在新终端中对新的后端 pid 运行 memleak:
$ sudo memleak-bpfcc -p 87001 Attaching to pid 87001, Ctrl+C to quit.
在新终端中运行 10,000 个请求的工作负载:
$ bash test.sh Hello, Test0! … omitted … Hello, Test9999!
并记录 memleak 中最大的未完成分配。
1052672 bytes in 1 allocations from stack 0x0000ffffbbf0c4cc [unknown] [libc.so.6] 0x0000ffffbbf0d254 [unknown] [libc.so.6] 0x0000ffffbbf0e50c [unknown] [libc.so.6] 0x0000ffffbbf0f0cc malloc+0x21c [libc.so.6] 0x0000fffffffff000 [unknown] [[uprobes]] 0x0000aaaac4169fbc MemoryContextAlloc+0x5c [postgres] 0x0000aaaac416b338 pstrdup+0x28 [postgres] 0x0000ffffbbcf1c10 pgweb_serve+0x770 [pgweb.so] 0x0000aaaac3e250d4 ExecInterpExpr+0x1620 [postgres] 0x0000aaaac3e5e204 ExecResult+0x114 [postgres] 0x0000aaaac3e28f8c standard_ExecutorRun+0x14c [postgres] 0x0000aaaac3ff5b34 PortalRunSelect+0x194 [postgres] 0x0000aaaac3ff7448 PortalRun+0x268 [postgres] 0x0000aaaac3ff1d3c exec_simple_query+0x37c [postgres] 0x0000aaaac3ff2928 PostgresMain+0x848 [postgres] 0x0000aaaac3f526d0 ServerLoop.isra.0+0x1c2c [postgres] 0x0000aaaac3f53824 PostmasterMain+0xec4 [postgres] 0x0000aaaac3c25f44 main+0x214 [postgres] 0x0000ffffbbea7740 [unknown] [libc.so.6] 0x0000ffffbbea7818 __libc_start_main+0x98 [libc.so.6] 0x0000aaaac3c26270 _start+0x30 [postgres]
仅 1028Kib 未完成!这几乎肯定来自于进程生命周期内存在的缓存。
如果现在 curl 退出路由(重置 PGWServerContext 并因此释放缓存):
$ curl localhost:9003/_exit curl: (52) Empty reply from server
并观察 memleak 的输出:
16384 bytes in 1 allocations from stack 0x0000aaaac4161f18 AllocSetAlloc+0x378 [postgres] 0x0000aaaac416a480 MemoryContextAllocExtended+0x70 [postgres] 0x0000aaaac4147570 element_alloc+0x50 [postgres] 0x0000aaaac41482d0 hash_search_with_hash_value+0x1a0 [postgres] 0x0000aaaac3d71b1c oper+0x1c8 [postgres] 0x0000aaaac3d71f50 make_op+0x70 [postgres] 0x0000aaaac3d6b964 transformAExprOp+0x74 [postgres] 0x0000aaaac3d6b954 transformAExprOp+0x64 [postgres] 0x0000aaaac3d6c204 transformExpr+0x20 [postgres] 0x0000aaaac3d78f5c transformTargetEntry+0xbc [postgres] 0x0000aaaac3d7afd4 transformTargetList+0x80 [postgres] 0x0000aaaac3d3c618 transformStmt+0xcb4 [postgres] 0x0000aaaac3d3de38 parse_analyze_withcb+0x54 [postgres] 0x0000aaaac3ff16c8 pg_analyze_and_rewrite_withcb+0x34 [postgres] 0x0000aaaac3e6a0c8 _SPI_prepare_plan+0x98 [postgres] 0x0000aaaac3e6aa68 SPI_prepare_extended+0x98 [postgres] 0x0000ffffb2b2cd00 exec_prepare_plan+0x40 [plpgsql.so] 0x0000ffffb2b2d56c exec_eval_expr+0x27c [plpgsql.so] 0x0000ffffb2b33118 exec_stmts+0x1a14 [plpgsql.so] 0x0000ffffb2b34a94 exec_stmt_block+0x544 [plpgsql.so] 0x0000ffffb2b34bcc exec_toplevel_block.constprop.0+0x58 [plpgsql.so] 0x0000ffffb2b354d0 plpgsql_exec_function+0x200 [plpgsql.so] 0x0000ffffb2b405e8 plpgsql_call_handler+0xf8 [plpgsql.so] 0x0000aaaac4142cdc FunctionCall1Coll+0x3c [postgres] 0x0000ffffbbcf1bd8 pgweb_serve+0x738 [pgweb.so] 0x0000aaaac3e250d4 ExecInterpExpr+0x1620 [postgres] 0x0000aaaac3e5e204 ExecResult+0x114 [postgres] 0x0000aaaac3e28f8c standard_ExecutorRun+0x14c [postgres] 0x0000aaaac3ff5b34 PortalRunSelect+0x194 [postgres] 0x0000aaaac3ff7448 PortalRun+0x268 [postgres] 0x0000aaaac3ff1d3c exec_simple_query+0x37c [postgres] 0x0000aaaac3ff2928 PostgresMain+0x848 [postgres] 0x0000aaaac3f526d0 ServerLoop.isra.0+0x1c2c [postgres] 0x0000aaaac3f53824 PostmasterMain+0xec4 [postgres] 0x0000aaaac3c25f44 main+0x214 [postgres] 0x0000ffffbbea7740 [unknown] [libc.so.6] 0x0000ffffbbea7818 __libc_start_main+0x98 [libc.so.6] 0x0000aaaac3c26270 _start+0x30 [postgres]
我们看到更小的未完成分配,这些 可能发生[12] 在我们无法控制的内存上下文中,尽管在我们的调用堆栈内。因为,再次强调,任何人都可以随时切换到任何 MemoryContext。
最后,让我们尝试验证我们的理论:当我们正确重置内存上下文时,主要的未完成分配是因为缓存。我们可以通过对同一请求重复 10,000 次来测试这一点,而不是对不同的请求重复 10,000 次。
$ git diff diff --git a/test.sh b/test.sh index 7db8f8b..95b6521 100644 --- a/test.sh +++ b/test.sh @@ -1,5 +1,5 @@ #!/bin/bash for ((i=0;i<10000;i++)); do - curl "127.0.0.1:9003/hello?name=Test$i" + curl "127.0.0.1:9003/hello?name=Test1" done
重新运行测试 SQL 脚本:
$ psql postgres -f test.sql DROP EXTENSION CREATE EXTENSION DROP FUNCTION CREATE FUNCTION pg_backend_pid ---------------- 97085 (1 row) register_get -------------- (1 row) psql:test.sql:16: INFO: Listening on 127.0.0.1:9003.
将 memleak 附加到这个新的后端进程:
$ sudo memleak-bpfcc -p 97085 Attaching to pid 97085, Ctrl+C to quit.
并运行这个新的工作负载:
$ bash test.sh Hello, Test1! … 9,998 similar log lines omitted … Hello, Test1!
并观察即使不退出服务器进程,最大的分配仍然保持不变:不是我们的分配:
16384 bytes in 1 allocations from stack 0x0000aaaac4161f18 AllocSetAlloc+0x378 [postgres] 0x0000aaaac416a480 MemoryContextAllocExtended+0x70 [postgres] 0x0000aaaac4147570 element_alloc+0x50 [postgres] 0x0000aaaac41482d0 hash_search_with_hash_value+0x1a0 [postgres] 0x0000aaaac3d71b1c oper+0x1c8 [postgres] 0x0000aaaac3d71f50 make_op+0x70 [postgres] 0x0000aaaac3d6b964 transformAExprOp+0x74 [postgres] 0x0000aaaac3d6b954 transformAExprOp+0x64 [postgres] 0x0000aaaac3d6c204 transformExpr+0x20 [postgres] 0x0000aaaac3d78f5c transformTargetEntry+0xbc [postgres] 0x0000aaaac3d7afd4 transformTargetList+0x80 [postgres] 0x0000aaaac3d3c618 transformStmt+0xcb4 [postgres] 0x0000aaaac3d3de38 parse_analyze_withcb+0x54 [postgres] 0x0000aaaac3ff16c8 pg_analyze_and_rewrite_withcb+0x34 [postgres] 0x0000aaaac3e6a0c8 _SPI_prepare_plan+0x98 [postgres] 0x0000aaaac3e6aa68 SPI_prepare_extended+0x98 [postgres] 0x0000ffffb2b2cd00 exec_prepare_plan+0x40 [plpgsql.so] 0x0000ffffb2b2d56c exec_eval_expr+0x27c [plpgsql.so] 0x0000ffffb2b33118 exec_stmts+0x1a14 [plpgsql.so] 0x0000ffffb2b34a94 exec_stmt_block+0x544 [plpgsql.so] 0x0000ffffb2b34bcc exec_toplevel_block.constprop.0+0x58 [plpgsql.so] 0x0000ffffb2b354d0 plpgsql_exec_function+0x200 [plpgsql.so] 0x0000ffffb2b405e8 plpgsql_call_handler+0xf8 [plpgsql.so] 0x0000aaaac4142cdc FunctionCall1Coll+0x3c [postgres] 0x0000ffffbbcf1bd8 pgweb_serve+0x738 [pgweb.so] 0x0000aaaac3e250d4 ExecInterpExpr+0x1620 [postgres] 0x0000aaaac3e5e204 ExecResult+0x114 [postgres] 0x0000aaaac3e28f8c standard_ExecutorRun+0x14c [postgres] 0x0000aaaac3ff5b34 PortalRunSelect+0x194 [postgres] 0x0000aaaac3ff7448 PortalRun+0x268 [postgres] 0x0000aaaac3ff1d3c exec_simple_query+0x37c [postgres] 0x0000aaaac3ff2928 PostgresMain+0x848 [postgres] 0x0000aaaac3f526d0 ServerLoop.isra.0+0x1c2c [postgres] 0x0000aaaac3f53824 PostmasterMain+0xec4 [postgres] 0x0000aaaac3c25f44 main+0x214 [postgres] 0x0000ffffbbea7740 [unknown] [libc.so.6] 0x0000ffffbbea7818 __libc_start_main+0x98 [libc.so.6] 0x0000aaaac3c26270 _start+0x30 [postgres]
所以我们确实能够解释所有分配并观察何时丢失/未释放它们。
希望这个小项目能帮助你建立在 Postgres 中使用 MemoryContext 编程的直觉!
参考链接
- GitHub: https://github.com/eatonphil/pgweb
- palloc: https://github.com/postgres/postgres/blob/REL_16_STABLE/src/backend/utils/mmgr/mcxt.c#L1226
- pstrdup: https://github.com/postgres/postgres/blob/REL_16_STABLE/src/backend/utils/mmgr/mcxt.c#L1643
- psprintf: https://github.com/postgres/postgres/blob/REL_16_STABLE/src/common/psprintf.c#L46
- MemoryContextSwitchTo: https://github.com/postgres/postgres/blob/REL_16_STABLE/src/include/utils/palloc.h#L138
- MemoryContextReset: https://github.com/postgres/postgres/blob/REL_16_STABLE/src/backend/utils/mmgr/mcxt.c#L325
- 相关章节: https://beej.us/guide/bgnet/html/#listen
- RFC9112: https://datatracker.ietf.org/doc/html/rfc9112
- 状态行: https://datatracker.ietf.org/doc/html/rfc9112#name-status-line
- request-line: https://datatracker.ietf.org/doc/html/rfc9112#name-request-line
- bcc 工具仓库: https://github.com/iovisor/bcc/blob/master/tools/memleak.py
- 可能发生: https://github.com/postgres/postgres/blob/REL_16_STABLE/src/backend/utils/hash/dynahash.c#L376