人妖在线一区,国产日韩欧美一区二区综合在线,国产啪精品视频网站免费,欧美内射深插日本少妇

新聞動(dòng)態(tài)

PostgreSQL事務(wù)回卷實(shí)戰(zhàn)案例詳析

發(fā)布日期:2022-07-15 19:10 | 文章來源:站長(zhǎng)之家

背景

前陣子某個(gè)客戶反饋他的RDS PostgreSQL無法寫入,報(bào)錯(cuò)信息如下:

postgres=# select * from test;
id
----
(0 rows)

postgres=# insert into test select 1;
ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT: Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.

隨后RDS工程師介入處理以后,該問題立馬得到了解決。

XID基礎(chǔ)原理

XID 定義

XID(Transaction ID)是 PostgreSQL 內(nèi)部的事務(wù)編號(hào),每個(gè)事務(wù)都會(huì)分配一個(gè)XID,依次遞增。PostgreSQL 數(shù)據(jù)中每個(gè)元組頭部都會(huì)保存著 插入 或者 刪除 這條元組的XID(Transaction ID),然后內(nèi)核通過這個(gè) XID 構(gòu)造數(shù)據(jù)庫的一致性讀。在事務(wù)隔離級(jí)別是 可重復(fù)讀 的情況下,假設(shè)如有兩個(gè)事務(wù),xid1=200,xid2=201,那么 xid1 中只能看到 t_xmin <= 200 的元組,看不到 t_xmin > 200 的元組。

typedef uint32 TransactionId;  /* 事務(wù)號(hào)定義,32位無符號(hào)整數(shù) */
typedef struct HeapTupleFields
{
  TransactionId t_xmin;    /* 插入該元組的事務(wù)號(hào) */
  TransactionId t_xmax;    /* 刪除或鎖定該元組的事務(wù)號(hào) */
    /*** 其它屬性省略 ***/
} HeapTupleFields;
struct HeapTupleHeaderData
{
  union
  {
    HeapTupleFields t_heap;
    DatumTupleFields t_datum;
  }      t_choice;
    /*** 其它屬性省略 ***/
};

XID 發(fā)行機(jī)制

從上面結(jié)構(gòu)中我們可以看到,XID 是一個(gè)32位無符號(hào)整數(shù),也就是 XID 的范圍是 0到2^32-1;那么超過了 2^32-1的事務(wù)怎么辦呢?其實(shí) XID 是一個(gè)環(huán),超過了 2^32-1 之后又會(huì)從頭開始分配。通過源代碼也證明了上述結(jié)論:

// 無效事務(wù)號(hào)
#define InvalidTransactionId    ((TransactionId) 0)
// 引導(dǎo)事務(wù)號(hào),在數(shù)據(jù)庫初始化過程(BKI執(zhí)行)中使用
#define BootstrapTransactionId    ((TransactionId) 1)
// 凍結(jié)事務(wù)號(hào)用于表示非常陳舊的元組,它們比所有正常事務(wù)號(hào)都要早(也就是可見)
#define FrozenTransactionId      ((TransactionId) 2)
// 第一個(gè)正常事務(wù)號(hào)
#define FirstNormalTransactionId  ((TransactionId) 3)
// 把 FullTransactionId 的低32位作為無符號(hào)整數(shù)生成 xid
#define XidFromFullTransactionId(x)    ((uint32) (x).value)
static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
  dest->value++;
  while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
    dest->value++;
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
    /*** 省略 ***/
  FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
    /*** 省略 ***
  return full_xid;
}
static void
AssignTransactionId(TransactionState s)
{
    /*** 省略 ***/
  s->fullTransactionId = GetNewTransactionId(isSubXact);
  if (!isSubXact)
    XactTopFullTransactionId = s->fullTransactionId;
    /*** 省略 ***/
}
TransactionId
GetTopTransactionId(void)
{
  if (!FullTransactionIdIsValid(XactTopFullTransactionId))
    AssignTransactionId(&TopTransactionStateData);
  return XidFromFullTransactionId(XactTopFullTransactionId);
}

可以看到,新事務(wù)號(hào)保存在共享變量緩存中:ShmemVariableCache->nextFullXid,每發(fā)行一個(gè)事務(wù)號(hào)后,向上調(diào)整它的值,并跳過上述三個(gè)特殊值。三個(gè)特殊仠分別為0、1和2,作用可以看上面代碼注釋。

XID 回卷機(jī)制

前面說到,XID 是一個(gè)環(huán),分配到 2^32-1 之后又從 3 開始,那么內(nèi)核是怎么比較兩個(gè)事務(wù)的大小的呢?比如 xid 經(jīng)歷了這樣一個(gè)過程 3-> 2^32-1 -> 5,那么內(nèi)核怎么樣知道 5 這個(gè)事務(wù)在 2^32-1 后面呢?我們?cè)倏匆幌麓a:

/*
 * TransactionIdPrecedes --- is id1 logically < id2?
 */
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
  /*
   * If either ID is a permanent XID then we can just do unsigned
   * comparison.  If both are normal, do a modulo-2^32 comparison.
   */
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 < id2);
  diff = (int32) (id1 - id2);
  return (diff < 0);
}

可以看到,內(nèi)核使用了一個(gè)比較取巧的方法:(int32) (id1 - id2) < 0,32位有符號(hào)整數(shù)的取值范圍是 -2^31 到 231-1,5-(232-1) 得到的值比 2^31-1 大,所以轉(zhuǎn)換成 int32 會(huì)變成負(fù)數(shù)。但是這里面有一個(gè)問題,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,一旦大于就會(huì)出現(xiàn)回卷,導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見。

XID 回卷預(yù)防

前面講到,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 必須小于 2^31,否則會(huì)發(fā)生回卷導(dǎo)致老事務(wù)產(chǎn)生的數(shù)據(jù)對(duì)新事務(wù)不可見,那內(nèi)核是怎么避免這個(gè)問題的呢??jī)?nèi)核是這樣處理的:通過定期把老事務(wù)產(chǎn)生的元組的 XID 更新為 FrozenTransactionId,即更新為2,來回收 XID,而 XID 為2 的元組對(duì)所有的事務(wù)可見,這個(gè)過程稱為 XID 凍結(jié),通過這個(gè)方式可以回收 XID 來保證 |最新事務(wù)號(hào)-最老事務(wù)號(hào)| < 2^31。
除了內(nèi)核自動(dòng)凍結(jié)回收XID,我們也可以通過命令或者 sql 的方式手動(dòng)進(jìn)行 xid 凍結(jié)回收

  • 查詢數(shù)據(jù)庫或表的年齡,數(shù)據(jù)庫年齡指的是:「最新事務(wù)號(hào)-數(shù)據(jù)庫中最老事務(wù)號(hào)」,表年齡指的是:「最新事務(wù)號(hào)-表中最老事務(wù)號(hào)」
# 查看每個(gè)庫的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1個(gè)庫每個(gè)表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 
# 查看1個(gè)表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;

手動(dòng)凍結(jié)回收一張表的元組的 xid 的sql:

vacuum freeze 表名;

手動(dòng)凍結(jié)回收一個(gè)庫里面的所有表 xid 的命令:

vacuumdb -d 庫名 --freeze --jobs=30 -h 連接串 -p 端口號(hào) -U 庫Owner

凍結(jié)回收過程是一個(gè)重 IO 的操作,這個(gè)過程內(nèi)核會(huì)描述表的所有頁面,然后把符合要求的元組的 t_xmin 字段更新為 2,所以這個(gè)過程需要在業(yè)務(wù)低峰進(jìn)行,避免影響業(yè)務(wù)。

與凍結(jié)回收相關(guān)的內(nèi)核參數(shù)有三個(gè):vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于筆者對(duì)于這三個(gè)參數(shù)理解不深,就不在這里班門弄斧了,感興趣的同學(xué)可以自行找資料了解一下。

解決方案

問題分析

基于上面的原理分析,我們知道,「最新事務(wù)號(hào)-最老事務(wù)號(hào)」 = 2^31-1000000,即當(dāng)前可用的 xid 僅剩下一百萬的時(shí)候,內(nèi)核就會(huì)禁止實(shí)例寫入并報(bào)錯(cuò):database is not accepting commands to avoid wraparound data loss in database, 這個(gè)時(shí)候必須連到提示中的 "xxxx" 對(duì)表進(jìn)行 freeze 回收更多的 XID。

void
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
  TransactionId xidVacLimit;
  TransactionId xidWarnLimit;
  TransactionId xidStopLimit;
  TransactionId xidWrapLimit;
  TransactionId curXid;
  Assert(TransactionIdIsNormal(oldest_datfrozenxid));
  /*
     * xidWrapLimit = 最老的事務(wù)號(hào) + 0x7FFFFFFF,當(dāng)前事務(wù)號(hào)一旦到達(dá)xidWrapLimit將發(fā)生回卷
   */
  xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
  if (xidWrapLimit < FirstNormalTransactionId)
    xidWrapLimit += FirstNormalTransactionId;
  /*
     * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidStopLimit,實(shí)例將不可寫入,保留 1000000 的xid用于vacuum
     * 每 vacuum 一張表需要占用一個(gè)xid
   */
  xidStopLimit = xidWrapLimit - 1000000;
  if (xidStopLimit < FirstNormalTransactionId)
    xidStopLimit -= FirstNormalTransactionId;
  /*
     * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidWarnLimit,將不停地收到
     * WARNING:  database "xxxx" must be vacuumed within 2740112 transactions
   */
  xidWarnLimit = xidStopLimit - 10000000;
  if (xidWarnLimit < FirstNormalTransactionId)
    xidWarnLimit -= FirstNormalTransactionId;
  /*
     * 一旦當(dāng)前事務(wù)號(hào)到達(dá)xidVacLimit將觸發(fā)force autovacuums
   */
  xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
  if (xidVacLimit < FirstNormalTransactionId)
    xidVacLimit += FirstNormalTransactionId;
  /* Grab lock for just long enough to set the new limit values */
  LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
  ShmemVariableCache->oldestXid = oldest_datfrozenxid;
  ShmemVariableCache->xidVacLimit = xidVacLimit;
  ShmemVariableCache->xidWarnLimit = xidWarnLimit;
  ShmemVariableCache->xidStopLimit = xidStopLimit;
  ShmemVariableCache->xidWrapLimit = xidWrapLimit;
  ShmemVariableCache->oldestXidDB = oldest_datoid;
  curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
  LWLockRelease(XidGenLock);
  /* Log the info */
  ereport(DEBUG1,
      (errmsg("transaction ID wrap limit is %u, limited by database with OID %u",
          xidWrapLimit, oldest_datoid)));
  /*
     * 如果 當(dāng)前事務(wù)號(hào)>=最老事務(wù)號(hào)+autovacuum_freeze_max_age
     * 觸發(fā) autovacuum 對(duì)年齡最老的數(shù)據(jù)庫進(jìn)行清理,如果有多個(gè)數(shù)據(jù)庫達(dá)到要求,按年齡最老的順序依次清理
   * 通過設(shè)置標(biāo)志位標(biāo)記當(dāng)前 autovacuum 結(jié)束之后再來一次 autovacuum
     */
  if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
    IsUnderPostmaster && !InRecovery)
    SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
  /* Give an immediate warning if past the wrap warn point */
  if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
  {
    char     *oldest_datname;
    if (IsTransactionState())
      oldest_datname = get_database_name(oldest_datoid);
    else
      oldest_datname = NULL;
    if (oldest_datname)
      ereport(WARNING,
          (errmsg("database \"%s\" must be vacuumed within %u transactions",
              oldest_datname,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
    else
      ereport(WARNING,
          (errmsg("database with OID %u must be vacuumed within %u transactions",
              oldest_datoid,
              xidWrapLimit - curXid),
           errhint("To avoid a database shutdown, execute a database-wide VACUUM in that database.\n"
               "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
  }
}
bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
  int32    diff;
  if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
    return (id1 >= id2);
  diff = (int32) (id1 - id2);
  return (diff >= 0);
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
    /*** 省略 ***/
  full_xid = ShmemVariableCache->nextFullXid;
  xid = XidFromFullTransactionId(full_xid);
  if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
  {
    TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
    TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
    TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
    Oid      oldest_datoid = ShmemVariableCache->oldestXidDB;
        /*** 省略 ***/
    if (IsUnderPostmaster &&
      TransactionIdFollowsOrEquals(xid, xidStopLimit))
    {
      char     *oldest_datname = get_database_name(oldest_datoid);
      /* complain even if that DB has disappeared */
      if (oldest_datname)
        ereport(ERROR,
            (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
             errmsg("database is not accepting commands to avoid wraparound data loss in database \"%s\"",
                oldest_datname),
             errhint("Stop the postmaster and vacuum that database in single-user mode.\n"
                 "You might also need to commit or roll back old prepared transactions, or drop stale replication slots.")));
            /*** 省略 ***/
    }
        /*** 省略 ***/
  }
    /*** 省略 ***/
}

問題定位

# 查看每個(gè)庫的年齡
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1個(gè)庫每個(gè)表的年齡排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc; 
# 查看1個(gè)表的年齡
select oid::regclass,age(relfrozenxid) from pg_class where oid='schema名稱.表名稱'::regclass::oid;

問題解決

  1. 通過上面的第一個(gè) sql,查找年齡最大的數(shù)據(jù)庫,數(shù)據(jù)庫年齡指的是:|最新事務(wù)號(hào)-數(shù)據(jù)庫中最老事務(wù)號(hào)|
  2. 通過上面第二個(gè) sql,查找年齡最大的表,然后對(duì)表依次執(zhí)行:vacuum freeze 表名,把表中的老事務(wù)號(hào)凍結(jié)回收,表年齡指的是:|最新事務(wù)號(hào)-表中最老事務(wù)號(hào)|
  3. 運(yùn)維腳本

單進(jìn)程 Shell 腳本

# 對(duì)指定數(shù)據(jù)庫中年齡最大的前 50 張表進(jìn)行 vacuum freeze
for cmd in `psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫名 -c "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;" | grep -v vacuum_cmd  | grep -v row | grep vacuum`; do
    psql -U用戶名 -p端口號(hào) -h連接串 -d數(shù)據(jù)庫名 -c "$cmd"
done

多進(jìn)程 Python 腳本

from multiprocessing import Pool
import psycopg2
args = dict(host='pgm-bp10xxxx.pg.rds.aliyuncs.com', port=5432, dbname='數(shù)據(jù)庫名',
            user='用戶名', password='密碼')
def vacuum_handler(sql):
    sql_str = "SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by age desc limit 10; "
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        cur = conn.cursor()
        cur.execute(sql_str)
        print cur.fetchall()
        conn.close()
    except Exception as e:
        print str(e)
# 對(duì)指定數(shù)據(jù)庫中年齡最大的前 1000 張表進(jìn)行 vacuum freeze,32 個(gè)進(jìn)程并發(fā)執(zhí)行
def multi_vacuum():
    pool = Pool(processes=32)
    sql_str = "SELECT 'vacuum freeze '||c.oid::regclass||';' as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN ('r', 'm') order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;";
    try:
        conn = psycopg2.connect(**args)
        cur = conn.cursor()
        cur.execute(sql_str)
        rows = cur.fetchall()
        for row in rows:
            cmd = row['vacuum_cmd']
            pool.apply_async(vacuum_handler, (cmd, ))
        conn.close()
        pool.close()
        pool.join()
    except Exception as e:
        print str(e)

multi_vacuum()

友情提示

vacuum freeze 會(huì)掃描表的所有頁面并更新,是一個(gè)重 IO 的操作,操作過程中一定要控制好并發(fā)數(shù),否則非常容易把實(shí)例打掛。

作者信息

謝桂起(花名:淵渱) 2020年畢業(yè)后加入阿里云,一直從事RDS PostgreSQL相關(guān)工作,善于解決線上各類RDS PostgreSQL運(yùn)維管控相關(guān)問題。

總結(jié)

到此這篇關(guān)于PostgreSQL事務(wù)回卷的文章就介紹到這了,更多相關(guān)PostgreSQL事務(wù)回卷內(nèi)容請(qǐng)搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!

美國(guó)服務(wù)器租用

版權(quán)聲明:本站文章來源標(biāo)注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請(qǐng)保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責(zé)任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學(xué)習(xí)參考,不代表本站立場(chǎng),如有內(nèi)容涉嫌侵權(quán),請(qǐng)聯(lián)系alex-e#qq.com處理。

相關(guān)文章

實(shí)時(shí)開通

自選配置、實(shí)時(shí)開通

免備案

全球線路精選!

全天候客戶服務(wù)

7x24全年不間斷在線

專屬顧問服務(wù)

1對(duì)1客戶咨詢顧問

在線
客服

在線客服:7*24小時(shí)在線

客服
熱線

400-630-3752
7*24小時(shí)客服服務(wù)熱線

關(guān)注
微信

關(guān)注官方微信
頂部