Postgres-xl GTM(全局事务管理器 Globale Transaction Manager)快照管理

网友投稿 236 2022-12-02

Postgres-xl GTM(全局事务管理器 Globale Transaction Manager)快照管理

src/gtm/main/gtm_snap.c

快照请求通信处理函数

ProcessGetSnapshotCommand --> MSG_SNAPSHOT_GET

ProcessGetSnapshotCommand和ProcessGetSnapshotCommandMulti是处理客户端或GTM Proxy请求快照通信的函数。用于解析网络消息,然后调用GTM_GetTransactionSnapshot构建快照。

void ProcessGetSnapshotCommand(Port *myport, StringInfo message, bool get_gxid) { GTM_Snapshot snapshot; MemoryContext oldContext; int status; int txn_count; int sn_xcnt;

先从message消息体中获取事务数量,确保为1个事务,再从消息中获取该事务的gxid。通过GTM_GXIDToHandle(GXID -> handle) 函数将gxid转换为事务句柄。可参考上一篇博客。

const char *data = NULL; GTM_TransactionHandle txn; GlobalTransactionId gxid; txn_count = pq_getmsgint(message, sizeof (int)); Assert(txn_count == 1); data = pq_getmsgbytes(message, sizeof (gxid)); if (data == NULL) ereport(ERROR,(EPROTO,errmsg("Message does not contain valid GXID"))); memcpy(&gxid, data, sizeof(gxid)); elog(INFO, "Received transaction ID %d for snapshot obtention", gxid); txn = GTM_GXIDToHandle(gxid); pq_getmsgend(message);

如果ProcessGetSnapshotCommand指定了需要获取gxid,则不再需要消息中传输过来的gxid,通过事务句柄,GTM_GetGlobalTransactionId函数返回目前事务管理器该事务使用的gxid。

if (get_gxid) { gxid = GTM_GetGlobalTransactionId(txn); if (gxid == InvalidGlobalTransactionId) ereport(ERROR, (EINVAL, errmsg("Failed to get a new transaction id"))); }

切换到TopMostMemoryContext内存上下文,通过调用GTM_GetTransactionSnapshot函数获取最新快照。

= MemoryContextSwitchTo(TopMostMemoryContext); if ((snapshot = GTM_GetTransactionSnapshot(&txn, 1, &status)) == NULL) ereport(ERROR,(EINVAL,errmsg("Failed to get a snapshot"))); MemoryContextSwitchTo(oldContext);

向客户端回送消息,包含的消息内容如下所示。

; pq_beginmessage(&buf, 'S'); pq_sendint(&buf, get_gxid ? SNAPSHOT_GXID_GET_RESULT : SNAPSHOT_GET_RESULT, 4); if (myport->remote_type == GTM_NODE_GTM_PROXY){ GTM_ProxyMsgHeader proxyhdr; proxyhdr.ph_conid = myport->conn_id; pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); } pq_sendbytes(&buf, (char *)&gxid, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); pq_sendbytes(&buf, (char *)&status, sizeof(int) * txn_count); pq_sendbytes(&buf, (char *)&snapshot->sn_snapid, sizeof (uint64)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId)); /* Read once */ sn_xcnt = snapshot->sn_xcnt; pq_sendint(&buf, sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)snapshot->sn_xip, sizeof(GlobalTransactionId) * sn_xcnt); pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); return;}

ProcessGetSnapshotCommandMulti --> MSG_SNAPSHOT_GET_MULTI

和上一函数相比,事务句柄和全局事务xid局部变量都换成了数组,对请求的处理也是基于请求数据包中的事务数来进行循环处理。

void ProcessGetSnapshotCommandMulti(Port *myport, StringInfo message) { StringInfoData buf; GTM_TransactionHandle txn[GTM_MAX_GLOBAL_TRANSACTIONS]; GlobalTransactionId gxid[GTM_MAX_GLOBAL_TRANSACTIONS]; GTM_Snapshot snapshot; MemoryContext oldContext; int txn_count; int ii; int status[GTM_MAX_GLOBAL_TRANSACTIONS]; int sn_xcnt; txn_count = pq_getmsgint(message, sizeof (int)); for (ii = 0; ii < txn_count; ii++){ const char *data = pq_getmsgbytes(message, sizeof (gxid[ii])); if (data == NULL) ereport(ERROR,(EPROTO,errmsg("Message does not contain valid GXID"))); memcpy(&gxid[ii], data, sizeof (gxid[ii])); txn[ii] = GTM_GXIDToHandle(gxid[ii]); } pq_getmsgend(message);

切换到TopMostMemoryContext内存上下文,通过调用GTM_GetTransactionSnapshot函数获取最新快照。

= MemoryContextSwitchTo(TopMostMemoryContext); if ((snapshot = GTM_GetTransactionSnapshot(txn, txn_count, status)) == NULL) ereport(ERROR,(EINVAL,errmsg("Failed to get a snapshot"))); MemoryContextSwitchTo(oldContext);

回送消息填充

pq_beginmessage(&buf, 'S'); pq_sendint(&buf, SNAPSHOT_GET_MULTI_RESULT, 4); if (myport->remote_type == GTM_NODE_GTM_PROXY){ GTM_ProxyMsgHeader proxyhdr; proxyhdr.ph_conid = myport->conn_id; pq_sendbytes(&buf, (char *)&proxyhdr, sizeof (GTM_ProxyMsgHeader)); } pq_sendbytes(&buf, (char *)&txn_count, sizeof(txn_count)); pq_sendbytes(&buf, (char *)status, sizeof(int) * txn_count); pq_sendbytes(&buf, (char *)&snapshot->sn_snapid, sizeof (uint64)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmin, sizeof (GlobalTransactionId)); pq_sendbytes(&buf, (char *)&snapshot->sn_xmax, sizeof (GlobalTransactionId)); /* Read once */ sn_xcnt = snapshot->sn_xcnt; pq_sendint(&buf, sn_xcnt, sizeof (int)); pq_sendbytes(&buf, (char *)snapshot->sn_xip,sizeof(GlobalTransactionId) * sn_xcnt); pq_endmessage(myport, &buf); if (myport->remote_type != GTM_NODE_GTM_PROXY) pq_flush(myport); return;}

GTM_GetTransactionSnapshot函数构建快照并将其存储在GTMTransactions array中以跟踪所有正在处理的事务。如果GTM Proxy请求多个快照,快照管理器只计算一次,将该快照用于请求中的所有事务。

static GTM_Snapshot GTM_GetTransactionSnapshot(GTM_TransactionHandle handle[], int txn_count, int *status) { GlobalTransactionId xmin; GlobalTransactionId xmax; GlobalTransactionId globalxmin; int count = 0; gtm_ListCell *elem = NULL; int ii; /* Instead of allocating memory for a snapshot, we use the snapshot of the * first transaction in the given array. The same snapshot will later be * copied to other transaction info structures. */ GTM_TransactionInfo *mygtm_txninfo = NULL; GTM_Snapshot snapshot = NULL; memset(status, 0, sizeof (int) * txn_count); for (ii = 0; ii < txn_count; ii++){ /* Even if the request does not contain a valid GXID, we still send down a snapshot, but mark the status field acoordingly */ if (handle[ii] != InvalidTransactionHandle) mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]); else status[ii] = STATUS_NOT_FOUND; /* If the transaction does not exist, just mark the status field with * a STATUS_ERROR code * FIXME This comment seems to be misplaced/stale - we're not checking * if a transaction exists (we've already done that above and set the * status to STATUS_NOT_FOUND). */ if ((mygtm_txninfo != NULL) && (snapshot == NULL)) snapshot = &mygtm_txninfo->gti_current_snapshot; } /* If no valid transaction exists in the array, we record the snapshot in a * thread-specific structure. This allows us to avoid repeated * allocation/freeing of the structure. * Note that we must use a thread-specific variable and not a global * variable because a concurrent thread might compute a new snapshot and * overwrite the snapshot information while we are still sending this copy * to the client. Using a thread-specific storage avoids that problem. */ if (snapshot == NULL) snapshot = &GetMyThreadInfo->thr_snapshot; Assert(snapshot != NULL); /* This can only happen when using a snapshot from GTMTransactions, as the * thread-specific sn_xip array is allocated statically as part of GTM_ThreadInfo. */ if (snapshot->sn_xip == NULL){ /* First call for this snapshot */ snapshot->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId)); if (snapshot->sn_xip == NULL) ereport(ERROR,(ENOMEM,errmsg("out of memory"))); } /* It is sufficient to get shared lock on ProcArrayLock, even if we are going to set MyProc->xmin. */ GTM_RWLockAcquire(>MTransactions.gt_TransArrayLock, GTM_LOCKMODE_READ); /* xmax is always latestCompletedXid + 1 */ xmax = GTMTransactions.gt_latestCompletedXid; Assert(GlobalTransactionIdIsNormal(xmax)); GlobalTransactionIdAdvance(xmax); /* Get the snapshot id */ snapshot->sn_snapid = GTMTransactions.gt_snapid; /* initialize xmin calculation with xmax */ globalxmin = xmin = xmax; /* Spin over transaction list checking xid, xmin, and subxids. The goal is to * gather all active xids and find the lowest xmin */ gtm_foreach(elem, GTMTransactions.gt_open_transactions){ volatile GTM_TransactionInfo *gtm_txninfo = (GTM_TransactionInfo *)gtm_lfirst(elem); GlobalTransactionId xid; /* Don't take into account LAZY VACUUMs */ if (gtm_txninfo->gti_vacuum) continue; /* Update globalxmin to be the smallest valid xmin */ xid = gtm_txninfo->gti_xmin; /* fetch just once */ if (GlobalTransactionIdIsNormal(xid) && GlobalTransactionIdPrecedes(xid, globalxmin)) globalxmin = xid; /* Fetch xid just once - see GetNewTransactionId */ xid = gtm_txninfo->gti_gxid; /* * If the transaction has been assigned an xid < xmax we add it to the * snapshot, and update xmin if necessary. There's no need to store * XIDs >= xmax, since we'll treat them as running anyway. We don't * bother to examine their subxids either. * * We don't include our own XID (if any) in the snapshot, but we must * include it into xmin. */ if (GlobalTransactionIdIsNormal(xid)){ /* * Unlike Postgres, we include the GXID of the current transaction * as well in the snapshot. This is necessary because the same * snapshot is shared by multiple backends through GTM proxy and * the GXID will vary for each backend. * * XXX We should confirm that this does not have any adverse effect * on the MVCC visibility and check if any changes are related to * the MVCC checks because of the change */ if (GlobalTransactionIdFollowsOrEquals(xid, xmax)) continue; if (GlobalTransactionIdPrecedes(xid, xmin)) xmin = xid; snapshot->sn_xip[count++] = xid; } } /* Update globalxmin to include actual process xids. This is a slightly * different way of computing it than GetOldestXmin uses, but should give * the same result. */ if (GlobalTransactionIdPrecedes(xmin, globalxmin)) globalxmin = xmin; snapshot->sn_xmin = xmin; snapshot->sn_xmax = xmax; snapshot->sn_xcnt = count; /* Now, before the proc array lock is released, set the xmin in the txninfo * structures of all the transactions. */ for (ii = 0; ii < txn_count; ii++){ GTM_Snapshot mysnap = NULL; /* We have already gone through all the transaction handles above and * marked the invalid handles with STATUS_ERROR */ if ((status[ii] == STATUS_ERROR) || (status[ii] == STATUS_NOT_FOUND)) continue; mygtm_txninfo = GTM_HandleToTransactionInfo(handle[ii]); mysnap = &mygtm_txninfo->gti_current_snapshot; if (GTM_IsTransSerializable(mygtm_txninfo)){ if ((mygtm_txninfo->gti_snapshot_set) && (txn_count > 1)){ GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); elog(ERROR, "Grouped snapshot can only include first snapshot in Serializable transaction"); } if (!mygtm_txninfo->gti_snapshot_set) { /* For the first transaction in the array, the snapshot is already set. */ if (snapshot != mysnap){ if (mysnap->sn_xip == NULL){ /* First call for this snapshot */ mysnap->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId)); if (mysnap->sn_xip == NULL) ereport(ERROR, (ENOMEM, errmsg("out of memory"))); } mysnap->sn_xmin = snapshot->sn_xmin; mysnap->sn_xmax = snapshot->sn_xmax; mysnap->sn_xcnt = snapshot->sn_xcnt; memcpy(mysnap->sn_xip, snapshot->sn_xip,sizeof (GlobalTransactionId) * snapshot->sn_xcnt); } mygtm_txninfo->gti_snapshot_set = true; } } else if (snapshot != mysnap){ if (mysnap->sn_xip == NULL){ /* First call for this snapshot */ mysnap->sn_xip = (GlobalTransactionId *)palloc(GTM_MAX_GLOBAL_TRANSACTIONS * sizeof(GlobalTransactionId)); if (mysnap->sn_xip == NULL) { GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); ereport(ERROR, (ENOMEM, errmsg("out of memory"))); } } mysnap->sn_xmin = snapshot->sn_xmin; mysnap->sn_xmax = snapshot->sn_xmax; mysnap->sn_xcnt = snapshot->sn_xcnt; memcpy(mysnap->sn_xip, snapshot->sn_xip, sizeof (GlobalTransactionId) * snapshot->sn_xcnt); } if ((mygtm_txninfo != NULL) && (!GlobalTransactionIdIsValid(mygtm_txninfo->gti_xmin))) mygtm_txninfo->gti_xmin = xmin; } GTM_RWLockRelease(>MTransactions.gt_TransArrayLock); elog(DEBUG1, "GTM_GetTransactionSnapshot: (%u:%u:%u)",snapshot->sn_xmin, snapshot->sn_xmax,snapshot->sn_xcnt); return snapshot;}

快照在TopMostMemoryContext中分配,不在进程内存上下文中分配的原因是,事务可能快客户端连接,比如prepared transactions。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java与Scala创建List与Map的实现方式
下一篇:PostgreSQL数据库事务系统——Push\Pop快照
相关文章

 发表评论

暂时没有评论,来抢沙发吧~