PostgreSQL数据库复制——walsender后端启动

网友投稿 254 2022-09-19

PostgreSQL数据库复制——walsender后端启动

walsender类似于常规后端,连接和walsender process是一对一的关系,但它主要流程是处理一组特殊的复制模式命令,而不是处理SQL查询(代码在src/backend/postmaster/postmaster.c/ProcessStartupPacket(Port*, bool)函数:当strcmp(valptr, “database”)==0时,设置am_walsender为true,在src/backend/tcop/postgres.c/PostgresMain函数处理case Q时进入exec_replication_command函数)。

启动流程

postmaster守护进程执行ServerLoop来为新的客户端连接fork后端子进程(如下图所示,其最重要的函数就是创建Port结构体之后调用的BackendStartup)。与设置后端子进程为walsender相关的函数调用栈如下所示:ServerLoop --> BackendStartup --> BackendInitialize --> ProcessStartupPacket。postmaster守护进程在执行BackendStartup函数中会fork子进程,子进程会调用BackendInitialize函数进行额外的初始化并收集启动数据包。

在BackendInitialize函数对于标识后端子进程为walsender相关主要有两个地方:一是处理启动数据包ProcessStartupPacket,二是对进程ps进行标识。BackendInitialize函数调用处理启动数据包的代码如下所示:​​status = ProcessStartupPacket(port, false, false);​​。ProcessStartupPacket函数会读取客户端的启动包,并根据它做一些事情。 返回 STATUS_OK 或 STATUS_ERROR,或者可能调用 ereport(FATAL) 并且根本不返回(请注意,ereport(FATAL) 内容已发送到客户端,因此仅在您想要的情况下使用它。如果您不想向客户端发送任何内容,则返回 STATUS_ERROR,如果我们检测到通信故障,这通常是合适的 .)。当加密层(当前为 TLS 或 GSSAPI)的协商完成时设置 ssl_done 和/或 gss_done。 任一加密层的成功协商都会设置两个标志,但被拒绝的协商只会设置该层的标志,因为客户端可能希望尝试另一个。 我们不应该在这里假设客户可能提出请求的顺序。与am_walsender和am_db_walsender相关的代码如下所示:如果replication参数可以是布尔值或者是字符串database。如果是database,则设置am_walsender和am_db_walsender都为true。如果是布尔值,会设置am_walsender为true,am_db_walsender依旧为false。

普通的 walsender 后端(例如对于流式复制)不连接到特定数据库。 但是用于逻辑复制的 walsender 需要连接到特定的数据库。 即使连接到数据库,我们也允许发出流式复制命令,因为首先进行基本备份然后从那里开始流式更改是有意义的。

对于walsender进程ps进行标识的格式如下:​​postgres: walsender ​​。其执行代码如下所示:

/* To achieve that, we pass "walsender" as username and username as dbname * to init_ps_display(). XXX: should add a new variant of * init_ps_display() to avoid abusing the parameters like this. */ if (am_walsender) init_ps_display(pgstat_get_backend_desc(B_WAL_SENDER), port->user_name, remote_ps_data, update_process_title ? "authentication" : ""); else init_ps_display(port->user_name, port->database_name, remote_ps_data, update_process_title ? "authentication" : "");

walsender后端初始化

walsender后端初始化和常规后端postgres相似,不同之处会有am_walsender进行分支判断(通过am_walsender可以看出初始化不同)。不同之处如下:信号函数初始化WalSndSignals、InitPostgres函数、walsender特殊初始化InitWalSender、错误处理函数WalSndErrorCleanup。接下来我们将从这些不同之处开始讲起。

信号函数初始化WalSndSignals函数位于src/backend/replication/walsender.c文件中,用于给SIGHUP、SIGINT、SIGTERM、SIGQUIT、SIGPIPE、SIGUSR1、SIGUSR2、SIGCHLD和SIGALRM信号设置信号处理函数。

void WalSndSignals(void) { /* Set up signal handlers */ pqsignal(SIGHUP, PostgresSigHupHandler); /* set flag to read config file */ pqsignal(SIGINT, StatementCancelHandler); /* query cancel */ pqsignal(SIGTERM, die); /* request shutdown */ pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and shutdown */ pqsignal(SIGCHLD, SIG_DFL); /* Reset some signals that are accepted by postmaster but not here */}void InitializeTimeouts(void) { int i; disable_alarm(); /* Initialize, or re-initialize, all local state */ num_active_timeouts = 0; for (i = 0; i < MAX_TIMEOUTS; i++) { all_timeouts[i].index = i; all_timeouts[i].indicator = false; all_timeouts[i].timeout_handler = NULL; all_timeouts[i].start_time = 0; all_timeouts[i].fin_time = 0; } all_timeouts_initialized = true; pqsignal(SIGALRM, handle_sig_alarm); /* Now establish the signal handler */}

InitPostgres函数有am_walsender分支如下: InitPostgres --> PerformAuthentication --> am_walsender分支 InitPostgres --> am_walsender分支

PerformAuthentication函数用于认证远端客户端,其am_walsender分支代码如下所示,用于组装log消息。

if (Log_connections) { StringInfoData logmsg; initStringInfo(&logmsg); if (am_walsender) appendStringInfo(&logmsg, _("replication connection authorized: user=%s"), port->user_name); else appendStringInfo(&logmsg, _("connection authorized: user=%s"), port->user_name); if (!am_walsender) appendStringInfo(&logmsg, _(" database=%s"), port->database_name); if (port->application_name != NULL) appendStringInfo(&logmsg, _(" application_name=%s"), port->application_name);

在InitPostgres函数后期,有4个地方处理了am_walsender分支。第一地方说明在数据库尝试关闭过程中不允许新的replication连接;第二个地方说明最后几个连接槽是为超级用户保留的(replication连接从使用 max_wal_senders个保留的插槽,不受 max_connections 或 superuser_reserved_connections 的限制) ;第三个地方检查replication连接的权限。

// 检查replication连接的权限 if (!superuser() && !has_rolreplication(GetUserId())) ereport(FATAL,(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser or replication role to start walsender")));

第四个地方说明如果是一个仅支持物理复制的普通 walsender,我们不想连接到任何特定的数据库。 只需通过处理启动数据包中的任何选项来完成后端启动,我们就完成了。

if (am_walsender && !am_db_walsender) { /* process any options passed in the startup packet */ if (MyProcPort != NULL) process_startup_options(MyProcPort, am_superuser); /* Apply PostAuthDelay as soon as we've read all options */ if (PostAuthDelay > 0) pg_usleep(PostAuthDelay * 1000000L); InitializeClientEncoding(); /* initialize client encoding */ pgstat_bestart(); /* report this backend in the PgBackendStatus array */ CommitTransactionCommand(); /* close the transaction we started above */ return; }

void InitWalSender(void) { am_cascading_walsender = RecoveryInProgress(); InitWalSenderSlot(); /* Create a per-walsender data structure in shared memory */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));}void MarkPostmasterChildWalSender(void) { int slot = MyPMChildSlot; slot--; PMSignalState->PMChildFlags[slot] = PM_CHILD_WALSENDER;}

InitWalSenderSlot函数为每个walsender进程初始化数据结构。首先找到一个空闲walsender槽并占有它。 由于事先在 InitProcess() 中为空闲的 WAL sender进行了检查,所以不能找不要空闲walsender槽。

/* Initialize a per-walsender data structure for this walsender process */static void InitWalSenderSlot(void) { int i; /* Find a free walsender slot and reserve it. This must not fail due to the prior check for free WAL senders in InitProcess(). */ for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockAcquire(&walsnd->mutex); if (walsnd->pid != 0) { SpinLockRelease(&walsnd->mutex); continue; } else { /* Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; walsnd->state = WALSNDSTATE_STARTUP; walsnd->sentPtr = InvalidXLogRecPtr; walsnd->needreload = false; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; walsnd->sync_standby_priority = 0; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; break; } } /* Arrange to clean up at walsender exit */ on_shmem_exit(WalSndKill, 0);}

WalSndKill函数为walsender进程清理数据结构,和InitWalSenderSlot相反的功能。

/* Destroy the per-walsender data structure for this walsender process */static void WalSndKill(int code, Datum arg) { WalSnd *walsnd = MyWalSnd; MyWalSnd = NULL; SpinLockAcquire(&walsnd->mutex); walsnd->latch = NULL; /* clear latch while holding the spinlock, so it can safely be read */ walsnd->pid = 0; /* Mark WalSnd struct as no longer being in use. */ SpinLockRelease(&walsnd->mutex);}

错误处理函数WalSndErrorCleanup用于在错误处理时进行清理工作。WAL 发送者进程不像常规后端那样使用事务。 此函数在 WAL 发送者进程中执行错误后所需的任何清理,类似于事务中止在常规后端执行的操作。从下面代码可以看出walsender需要占用LWLock、ConditionVariable、pgstat、ReplicationSlot和sendFile资源。

void WalSndErrorCleanup(void) { LWLockReleaseAll(); ConditionVariableCancelSleep(); pgstat_report_wait_end(); if (sendFile >= 0) { close(sendFile); sendFile = -1; } if (MyReplicationSlot != NULL) ReplicationSlotRelease(); ReplicationSlotCleanup(); replication_active = false; if (got_STOPPING || got_SIGUSR2) proc_exit(0); // 直接退出 WalSndSetState(WALSNDSTATE_STARTUP); /* Revert back to startup state */ // 回退到wal sender启动状态}

共享内存中的数据结构

WalSndCtlData结构体在共享内存中申请,包含了同步复制队列(每种请求类型一个队列)、队列头的当前位置(所有waiters都应该有一个跟随此值的 waitLSN。 受 SyncRepLock 保护。)、是否定义了任何同步备用(等待后端无法安全地重新加载配置文件,因此检查点会根据需要更新此值。 受 SyncRepLock 保护)、WalSnd槽位 。

extern WalSnd *MyWalSnd;/* There is one WalSndCtl struct for the whole database cluster */typedef struct { /* Synchronous replication queue with one queue per request type. Protected by SyncRepLock. */ SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE]; /* Current location of the head of the queue. All waiters should have a waitLSN that follows this value. Protected by SyncRepLock. */ XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE]; /* Are any sync standbys defined? Waiting backends can't reload the config file safely, so checkpointer updates this value as needed. Protected by SyncRepLock. */ bool sync_standbys_defined; WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];} WalSndCtlData;extern WalSndCtlData *WalSndCtl;

每个 walsender 在共享内存中都有一个 WalSnd 结构。 该结构受其“互斥”自旋锁字段保护,除了某些成员仅由 walsender 进程本身写入。因此该进程可以自由读取这些成员而无需持有自旋锁。 pid 和needreload 始终要求为所有访问保留自旋锁。

typedef struct WalSnd { pid_t pid; /* this walsender's PID, or 0 if not active */ WalSndState state; /* this walsender's state */ XLogRecPtr sentPtr; /* WAL has been sent up to this point */ bool needreload; /* does currently-open file need to be reloaded? */ /* The xlog locations that have been written, flushed, and applied by standby-side. These may be invalid if the standby-side has not offered values yet. */ XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; /* Measured lag times, or -1 for unknown/none. */ TimeOffset writeLag; TimeOffset flushLag; TimeOffset applyLag; slock_t mutex; /* Protects shared variables shown above (and sync_standby_priority). */ int sync_standby_priority; /* The priority order of the standby managed by this WALSender, as listed in synchronous_standby_names, or 0 if not-listed. */ TimestampTz replyTime; /* Timestamp of the last message received from standby. */ /* Pointer to the walsender's latch. Used by backends to wake up this walsender when it has work to do. NULL if the walsender isn't active. */ Latch *latch;} WalSnd;

从上面成员可以看出walsender具备的功能由lag tracking和SyncRepQueue功能。

WalSndShmemInit函数用于为WalSndCtlData申请共享内存。WalSndShmemInit函数由postmaster的CreateSharedMemoryAndSemaphores函数调用(src/backend/storage/ipc/ipci.c)。首先调用ShmemInitStruct函数为WALSndCtlData申请内存,如果是第一次申请,则调用SHMQueueInit初始化SyncRepQueue成员槽,然后调用SpinLockInit函数初始化walsnd结构体的mutex成员。

void WalSndShmemInit(void) { bool found; int i; WalSndCtl = (WalSndCtlData *) ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); if (!found) { MemSet(WalSndCtl, 0, WalSndShmemSize()); /* First time through, so initialize */ for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); } }}Size WalSndShmemSize(void) { Size size = 0; size = offsetof(WalSndCtlData, walsnds); size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd))); return size;}

pg_stat_replication是定义在src/backend/catalog/system_views.sql中的视图,其中关联的两个函数是pg_stat_get_activity和pg_stat_get_wal_senders。其中pg_stat_get_wal_senders函数定义在src/backend/replication/walsender.c中,该函数后续分析,但是我们需要知道其返回 walsender 的活动情况,包括发送到备用服务器的 walsender的pid 和 xlog 位置。其信息也是通过遍历WalSndCtl指向的结构体数组获取的。

CREATE VIEW pg_stat_replication AS SELECT S.pid, S.usesysid, U.rolname AS usename, S.application_name, S.client_addr, S.client_hostname, S.client_port, S.backend_start, S.backend_xmin, W.state, W.sent_lsn, W.write_lsn, W.flush_lsn, W.replay_lsn, W.write_lag, W.flush_lag, W.replay_lag, W.sync_priority, W.sync_state, W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);

walsender状态

walsender进程保持如下可能状态定义在src/include/replication/walsender_private.h中,对于WALSNDSTATE_BACKUP暂时不会分析(在分析pg_basebackup工具时分析学习)。

typedef enum WalSndState { WALSNDSTATE_STARTUP = 0, // 从启动walsender到握手结束 WALSNDSTATE_BACKUP, // 为备份工具(如pg_basebackup_utility)发送整个数据库集群的文件时 WALSNDSTATE_CATCHUP, // 在追赶阶段 WALSNDSTATE_STREAMING, // 流媒体复制正在工作 WALSNDSTATE_STOPPING} WalSndState;

walsender运行流程

exec_replication_command函数执行传入的复制命令。如果 cmd_string 被识别为 WalSender 命令,则返回 true,否则返回 false,从下图可以看出其继续执行了exec_simple_query函数。

exec_replication_command函数的执行流程如下所示,主要关注SnapBuildClearExportedSnapshot函数和相关的事务处理函数(后续进行解析)。真正的协议处理逻辑根据cmd_node的type成员调用相应的函数进行相应的协议处理。

bool exec_replication_command(const char *cmd_string) { int parse_rc; Node *cmd_node; MemoryContext cmd_context, old_context; // 如果 WAL 发送者被告知关闭即将关闭,则相应地切换其状态以正确处理下一个复制命令。 if (got_STOPPING) WalSndSetState(WALSNDSTATE_STOPPING); // 如果处于停止模式,则抛出错误。 我们需要防止在写入关闭检查点时可能生成 WAL 的命令。 为了安全起见,我们只是禁止所有新命令。 if (MyWalSnd->state == WALSNDSTATE_STOPPING) ereport(ERROR, (errmsg("cannot execute new commands while WAL sender is in stopping mode"))); // CREATE_REPLICATION_SLOT ... LOGICAL 导出快照,直到下一个命令到达。 如果有什么东西,清理旧的东西。 SnapBuildClearExportedSnapshot(); CHECK_FOR_INTERRUPTS(); cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_SIZES); old_context = MemoryContextSwitchTo(cmd_context); // 解析命令 replication_scanner_init(cmd_string); parse_rc = replication_yyparse(); if (parse_rc != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), (errmsg_internal("replication command parser returned %d", parse_rc)))); cmd_node = replication_parse_result; // 如果启用了 log_replication_commands,则记录复制命令。 即使它被禁用,也要使用 DEBUG1 级别记录命令以实现向后兼容性。 请注意,此处不记录 SQL 命令,如果启用 log_statement,稍后将记录。 if (cmd_node->type != T_SQLCmd) ereport(log_replication_commands ? LOG : DEBUG1, (errmsg("received replication command: %s", cmd_string))); // CREATE_REPLICATION_SLOT ... LOGICAL 导出快照。 如果在事务之外调用它,则应在此处清除快照。 if (!IsTransactionBlock()) SnapBuildClearExportedSnapshot(); // 对于中止的事务,除了纯 SQL 之外,不允许任何操作,exec_simple_query() 将正确处理它。 if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) ereport(ERROR,(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, commands ignored until end of transaction block"))); CHECK_FOR_INTERRUPTS(); // 分配将用于每个传出和传入消息的缓冲区。 我们每个命令只执行一次以减少 palloc 开销。 initStringInfo(&output_message); initStringInfo(&reply_message); initStringInfo(&tmpbuf); // 向 pgstat 报告此进程正在运行 pgstat_report_activity(STATE_RUNNING, NULL); switch (cmd_node->type) // 真正的协议处理逻辑 MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); /* done */ EndCommand("SELECT", DestRemote); /* Send CommandComplete message */ pgstat_report_activity(STATE_IDLE, NULL); /* Report to pgstat that this process is now idle */ return true;}

walsender状态机

walsender物理复制设置walsender状态的函数调用流程如下所示: exec_replication_command --> WalSndSetState(WALSNDSTATE_STOPPING) StartReplication --> WalSndSetState(WALSNDSTATE_CATCHUP)/WalSndSetState(WALSNDSTATE_STARTUP) WalSndLoop --> WalSndSetState(WALSNDSTATE_STREAMING) XLogSendPhysical --> WalSndSetState(WALSNDSTATE_STOPPING) WalSndErrorCleanup --> WalSndSetState(WALSNDSTATE_STARTUP) local_sigjmp_buf处理异常时walsender进程调用该函数清理walsender状态

walsender逻辑复制设置walsender状态的函数调用流程如下所示: exec_replication_command --> WalSndSetState(WALSNDSTATE_STOPPING) StartLogicalReplication --> WalSndSetState(WALSNDSTATE_CATCHUP)/WalSndSetState(WALSNDSTATE_STARTUP) WalSndLoop --> WalSndSetState(WALSNDSTATE_STREAMING) XLogSendPhysical --> WalSndSetState(WALSNDSTATE_STOPPING) WalSndErrorCleanup --> WalSndSetState(WALSNDSTATE_STARTUP) local_sigjmp_buf处理异常时walsender进程调用该函数清理walsender状态

正常流程:walsender进程启动时状态是WALSNDSTATE_STARTUP,如果walsender进程收到START_REPLICATION命令,则调用StartReplication函数(以StartReplication为例)。StartReplication函数在没有要流式传输的内容时,甚至不要进入 COPY 模式,当我们第一次开始复制时,备用数据库将位于主数据库之后(对于某些应用程序,例如同步复制,对于这种初始追赶模式来说有一个清晰的状态是很重要的,这样我们就可以在以后改变流状态时触发动作。 我们可能会长时间处于这种状态,这正是我们希望能够监控我们是否还在这里的原因),设置状态为WALSNDSTATE_CATCHUP。StartReplication函数再调用WalSndLoop(XLogSendPhysical)函数,WalSndLoop函数是通过 Copy 消息流式传输 WAL 的 walsender 进程的主循环,如果当前没有需要发送的xlog,且处于WALSNDSTATE_CATCHUP,则设置walsender进程状态为WALSNDSTATE_STREAMING。最后再设置walsender进程状态为WALSNDSTATE_STARTUP。

协议

从 PostgreSQL 9.4 开始,支持以下命令: • ​​​IDENTIFY_SYSTEM​​​:这需要服务器识别自己。服务器回复四个字段(systemid、timeline、xlogpos、dbname)。 • ​​​TIMELINE_HISTORY tli​​​:这请求服务器发送给定时间线的时间线历史文件。响应由文件名和内容组成。 • ​​​CREATE_REPLICATION_SLOT slot_name {PHYSICAL | LOGICAL output_plugin}​​​:这将创建一个复制槽(物理或逻辑)。在逻辑复制槽的情况下,用于格式化复制槽返回的数据的输出插件是强制性的。 • ​​​START_REPLICATION [SLOT slot_name][PHYSICAL] xxx/xxx [TIMELINE tli]​​​:这告诉服务器在特定时间线的特定位置为给定复制槽启动 WAL 流。 • ​​​START_REPLICATION SLOT slot_name LOGICALXXX/XXX [ ( option_name [option_value] [, ... ] ) ]​​​:从某个位置开始逻辑流式传输。 • ​​​DROP_REPLICATION_SLOT slot_name​​​:这会删除一个复制槽。 • ​​​BASE_BACKUP [LABEL 'label'] [PROGRESS][FAST] [WAL] [NOWAIT] [MAX_RATE rate]​​:这将执行基本备份,给定某些可选参数。

START_REPLICATION 命令用于开始向客户端发送WAL。当流传输时,walsender保持从磁盘读取XLOG记录,并通过COPY 协议将他们发送到备用服务器,直到两端通过退出COPY模式结束复制或直接关闭连接。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PostgreSQL数据库FDW——WIP PostgreSQL Sharding
下一篇:爬虫实战(一)-新版知乎网页分析获取登录url
相关文章

 发表评论

暂时没有评论,来抢沙发吧~