PostgreSQL事務(wù)回卷實(shí)戰(zhàn)案例詳析
背景
前陣子某個(gè)客戶反饋他的RDS PostgreSQL無法寫入,報(bào)錯(cuò)信息如下:
postgres=# select * from test;
id
----
(0 rows)postgres=# insert into test select 1;
ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT: Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.
隨后RDS工程師介入處理以后,該問題立馬得到了解決。
XID基礎(chǔ)原理
XID 定義
XID(Transaction ID)是 PostgreSQL 內(nèi)部的事務(wù)編號(hào),每個(gè)事務(wù)都會(huì)分配一個(gè)XID,依次遞增。PostgreSQL 數(shù)據(jù)中每個(gè)元組頭部都會(huì)保存著 插入 或者 刪除 這條元組的XID(Transaction ID),然后內(nèi)核通過這個(gè) XID 構(gòu)造數(shù)據(jù)庫的一致性讀。在事務(wù)隔離級(jí)別是 可重復(fù)讀 的情況下,假設(shè)如有兩個(gè)事務(wù),xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元組,看不到 t_xmin > 200 的元組。
typedef uint32 TransactionId; /* 事務(wù)號(hào)定義,32位無符號(hào)整數(shù) */ typedef struct HeapTupleFields { TransactionId t_xmin; /* 插入該元組的事務(wù)號(hào) */ TransactionId t_xmax; /* 刪除或鎖定該元組的事務(wù)號(hào) */ /*** 其它屬性省略 ***/ } HeapTupleFields; struct HeapTupleHeaderData { union { HeapTupleFields t_heap; DatumTupleFields t_datum; } t_choice; /*** 其它屬性省略 ***/ };
XID 發(fā)行機(jī)制
從上面結(jié)構(gòu)中我們可以看到,XID 是一個(gè)32位無符號(hào)整數(shù),也就是 XID 的范圍是 0到2^32-1;那么超過了 2^32-1的事務(wù)怎么辦呢?其實(shí) XID 是一個(gè)環(huán),超過了 2^32-1 之后又會(huì)從頭開始分配。通過源代碼也證明了上述結(jié)論:
// 無效事務(wù)號(hào) #define InvalidTransactionId ((TransactionId) 0) // 引導(dǎo)事務(wù)號(hào),在數(shù)據(jù)庫初始化過程(BKI執(zhí)行)中使用 #define BootstrapTransactionId ((TransactionId) 1) // 凍結(jié)事務(wù)號(hào)用于表示非常陳舊的元組,它們比所有正常事務(wù)號(hào)都要早(也就是可見) #define FrozenTransactionId ((TransactionId) 2) // 第一個(gè)正常事務(wù)號(hào) #define FirstNormalTransactionId ((TransactionId) 3) // 把 FullTransactionId 的低32位作為無符號(hào)整數(shù)生成 xid #define XidFromFullTransactionId(x) ((uint32) (x).value) static inline void FullTransactionIdAdvance(FullTransactionId *dest) { dest->value++; while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId) dest->value++; } FullTransactionId GetNewTransactionId(bool isSubXact) { /*** 省略 ***/ full_xid = ShmemVariableCache->nextFullXid; xid = XidFromFullTransactionId(full_xid); /*** 省略 ***/ FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid); /*** 省略 *** return full_xid; } static void AssignTransactionId(TransactionState s) { /*** 省略 ***/ s->fullTransactionId = GetNewTransactionId(isSubXact); if (!isSubXact) XactTopFullTransactionId = s->fullTransactionId; /*** 省略 ***/ } TransactionId GetTopTransactionId(void) { if (!FullTransactionIdIsValid(XactTopFullTransactionId)) AssignTransactionId(&TopTransactionStateData); return XidFromFullTransactionId(XactTopFullTransactionId); }
可以看到,新事務(wù)號(hào)保存在共享變量緩存中:ShmemVariableCache->nextFullXid,每發(fā)行一個(gè)事務(wù)號(hào)后,向上調(diào)整它的值,并跳過上述三個(gè)特殊值。三個(gè)特殊仠分別為0、1和2,作用可以看上面代碼注釋。
XID 回卷機(jī)制
前面說到,XID 是一個(gè)環(huán),分配到 2^32-1 之后又從 3 開始,那么內(nèi)核是怎么比較兩個(gè)事務(wù)的大小的呢?比如 xid 經(jīng)歷了這樣一個(gè)過程 3-> 2^32-1 -> 5,那么內(nèi)核怎么樣知道 5 這個(gè)事務(wù)在 2^32-1 后面呢?我們?cè)倏匆幌麓a:
/* * TransactionIdPrecedes --- is id1 logically < id2? */ bool TransactionIdPrecedes(TransactionId id1, TransactionId id2) { /* * If either ID is a permanent XID then we can just do unsigned * comparison. If both are normal, do a modulo-2^32 comparison. */ int32 diff; if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2)) return (id1 < id2); diff = (int32) (id1 - id2); return (diff < 0); }
可以看到,內(nèi)核使用了一個(gè)比較取巧的方法:(int32) (id1 - id2) < 0,32位有符號(hào)整數(shù)的取值范圍是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以轉(zhuǎn)換成 int32 會(huì)變成負(fù)數(shù)。但是這里面有一個(gè)問題,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,一旦大于就會(huì)出現(xiàn)回卷,導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見。
XID 回卷預(yù)防
前面講到,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,否則會(huì)發(fā)生回卷導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見,那內(nèi)核是怎么避免這個(gè)問題的呢??jī)?nèi)核是這樣處理的:通過定期把老事務(wù)產(chǎn)生的元組的 XID 更新為 FrozenTransactionId,即更新為2,來回收 XID,而 XID 為2 的元組對(duì)所有的事務(wù)可見,這個(gè)過程稱為 XID 凍結(jié),通過這個(gè)方式可以回收 XID 來保證 |最新事務(wù)號(hào)-最老事務(wù)號(hào)| < 2^31。
除了內(nèi)核自動(dòng)凍結(jié)回收XID,我們也可以通過命令或者 sql 的方式手動(dòng)進(jìn)行 xid 凍結(jié)回收
- 查詢數(shù)據(jù)庫或表的年齡,數(shù)據(jù)庫年齡指的是:「最新事務(wù)號(hào)-數(shù)據(jù)庫中最老事務(wù)號(hào)」,表年齡指的是:「最新事務(wù)號(hào)-表中最老事務(wù)號(hào)」
# 查看每個(gè)庫的年齡 SELECT datname, age(datfrozenxid) FROM pg_database; # 1個(gè)庫每個(gè)表的年齡排序 SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; # 查看1個(gè)表的年齡 select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;
手動(dòng)凍結(jié)回收一張表的元組的 xid 的sql:
vacuum freeze 表名;
手動(dòng)凍結(jié)回收一個(gè)庫里面的所有表 xid 的命令:
vacuumdb -d 庫名 --freeze --jobs=30 -h 連接串 -p 端口號(hào) -U 庫Owner
凍結(jié)回收過程是一個(gè)重 IO 的操作,這個(gè)過程內(nèi)核會(huì)描述表的所有頁面,然后把符合要求的元組的 t_xmin 字段更新為 2,所以這個(gè)過程需要在業(yè)務(wù)低峰進(jìn)行,避免影響業(yè)務(wù)。
與凍結(jié)回收相關(guān)的內(nèi)核參數(shù)有三個(gè):vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于筆者對(duì)于這三個(gè)參數(shù)理解不深,就不在這里班門弄斧了,感興趣的同學(xué)可以自行找資料了解一下。
解決方案
問題分析
基于上面的原理分析,我們知道,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 = 2^31-1000000,即當(dāng)前可用的 xid 僅剩下一百萬的時(shí)候,內(nèi)核就會(huì)禁止實(shí)例寫入并報(bào)錯(cuò):database is not accepting commands to avoid wraparound data loss in database, 這個(gè)時(shí)候必須連到提示中的 "xxxx" 對(duì)表進(jìn)行 freeze 回收更多的 XID。
void SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid) { TransactionId xidVacLimit; TransactionId xidWarnLimit; TransactionId xidStopLimit; TransactionId xidWrapLimit; TransactionId curXid; Assert(TransactionIdIsNormal(oldest_datfrozenxid)); /* * xidWrapLimit = 最老的事務(wù)號(hào) + 0x7FFFFFFF,當(dāng)前事務(wù)號(hào)一旦到達(dá)xidWrapLimit將發(fā)生回卷 */ xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1); if (xidWrapLimit < FirstNormalTransactionId) xidWrapLimit += FirstNormalTransactionId; /* * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidStopLimit,實(shí)例將不可寫入,保留 1000000 的xid用于vacuum * 每 vacuum 一張表需要占用一個(gè)xid */ xidStopLimit = xidWrapLimit - 1000000; if (xidStopLimit < FirstNormalTransactionId) xidStopLimit -= FirstNormalTransactionId; /* * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidWarnLimit,將不停地收到 * WARNING: database "xxxx" must be vacuumed within 2740112 transactions */ xidWarnLimit = xidStopLimit - 10000000; if (xidWarnLimit < FirstNormalTransactionId) xidWarnLimit -= FirstNormalTransactionId; /* * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidVacLimit將觸發(fā)force autovacuums */ xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age; if (xidVacLimit < FirstNormalTransactionId) xidVacLimit += FirstNormalTransactionId; /* Grab lock for just long enough to set the new limit values */ LWLockAcquire(XidGenLock, LW_EXCLUSIVE); ShmemVariableCache->oldestXid = oldest_datfrozenxid; ShmemVariableCache->xidVacLimit = xidVacLimit; ShmemVariableCache->xidWarnLimit = xidWarnLimit; ShmemVariableCache->xidStopLimit = xidStopLimit; ShmemVariableCache->xidWrapLimit = xidWrapLimit; ShmemVariableCache->oldestXidDB = oldest_datoid; curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid); LWLockRelease(XidGenLock); /* Log the info */ ereport(DEBUG1, (errmsg("transaction ID wrap limit is %u, limited by database with OID %u", xidWrapLimit, oldest_datoid))); /* * 如果 當(dāng)前事務(wù)號(hào)>=最老事務(wù)號(hào)+autovacuum_freeze_max_age * 觸發(fā) autovacuum 對(duì)年齡最老的數(shù)據(jù)庫進(jìn)行清理,如果有多個(gè)數(shù)據(jù)庫達(dá)到要求,按年齡最老的順序依次清理 * 通過設(shè)置標(biāo)志位標(biāo)記當(dāng)前 autovacuum 結(jié)束之后再來一次 autovacuum */ if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) && IsUnderPostmaster && !InRecovery) SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER); /* Give an immediate warning if past the wrap warn point */ if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery) { char *oldest_datname; if (IsTransactionState()) oldest_datname = get_database_name(oldest_datoid); else oldest_datname = NULL; if (oldest_datname) ereport(WARNING, (errmsg("database \"%s\" must be vacuumed within %u transactions", oldest_datname, xidWrapLimit - curXid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); else ereport(WARNING, (errmsg("database with OID %u must be vacuumed within %u transactions", oldest_datoid, xidWrapLimit - curXid), errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); } } bool TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2) { int32 diff; if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2)) return (id1 >= id2); diff = (int32) (id1 - id2); return (diff >= 0); } FullTransactionId GetNewTransactionId(bool isSubXact) { /*** 省略 ***/ full_xid = ShmemVariableCache->nextFullXid; xid = XidFromFullTransactionId(full_xid); if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit)) { TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit; TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit; TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit; Oid oldest_datoid = ShmemVariableCache->oldestXidDB; /*** 省略 ***/ if (IsUnderPostmaster && TransactionIdFollowsOrEquals(xid, xidStopLimit)) { char *oldest_datname = get_database_name(oldest_datoid); /* complain even if that DB has disappeared */ if (oldest_datname) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"", oldest_datname), errhint("Stop the postmaster and vacuum that database in single-user mode.\n" "You might also need to commit or roll back old prepared transactions, or drop stale replication slots."))); /*** 省略 ***/ } /*** 省略 ***/ } /*** 省略 ***/ }
問題定位
# 查看每個(gè)庫的年齡 SELECT datname, age(datfrozenxid) FROM pg_database; # 1個(gè)庫每個(gè)表的年齡排序 SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; # 查看1個(gè)表的年齡 select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;
問題解決
- 通過上面的第一個(gè) sql,查找年齡最大的數(shù)據(jù)庫,數(shù)據(jù)庫年齡指的是:|最新事務(wù)號(hào)-數(shù)據(jù)庫中最老事務(wù)號(hào)|
- 通過上面第二個(gè) sql,查找年齡最大的表,然后對(duì)表依次執(zhí)行:vacuum freeze 表名,把表中的老事務(wù)號(hào)凍結(jié)回收,表年齡指的是:|最新事務(wù)號(hào)-表中最老事務(wù)號(hào)|
- 運(yùn)維腳本
單進(jìn)程 Shell 腳本
# 對(duì)指定數(shù)據(jù)庫中年齡最大的前 50 張表進(jìn)行 vacuum freeze for cmd in `psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd | grep -v row | grep vacuum`; do psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫名 -c "$cmd" done
多進(jìn)程 Python 腳本
from multiprocessing import Pool import psycopg2 args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='數(shù)據(jù)庫名', user='用戶名', password='密碼') def vacuum_handler(sql): sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; " try: conn = psycopg2.connect(**args) cur = conn.cursor() cur.execute(sql) conn.commit() cur = conn.cursor() cur.execute(sql_str) print cur.fetchall() conn.close() except Exception as e: print str(e) # 對(duì)指定數(shù)據(jù)庫中年齡最大的前 1000 張表進(jìn)行 vacuum freeze,32 個(gè)進(jìn)程并發(fā)執(zhí)行 def multi_vacuum(): pool = Pool(processes=32) sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;"; try: conn = psycopg2.connect(**args) cur = conn.cursor() cur.execute(sql_str) rows = cur.fetchall() for row in rows: cmd = row['vacuum_cmd'] pool.apply_async(vacuum_handler, (cmd, )) conn.close() pool.close() pool.join() except Exception as e: print str(e) multi_vacuum()
友情提示
vacuum freeze 會(huì)掃描表的所有頁面并更新,是一個(gè)重 IO 的操作,操作過程中一定要控制好并發(fā)數(shù),否則非常容易把實(shí)例打掛。
作者信息
謝桂起(花名:淵渱) 2020年畢業(yè)后加入阿里云,一直從事RDS PostgreSQL相關(guān)工作,善于解決線上各類RDS PostgreSQL運(yùn)維管控相關(guān)問題。
總結(jié)
到此這篇關(guān)于PostgreSQL事務(wù)回卷的文章就介紹到這了,更多相關(guān)PostgreSQL事務(wù)回卷內(nèi)容請(qǐng)搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。