PostgreSQL事务回卷实战案例详析
背景
前阵子某个客户反馈他的RDS PostgreSQL没法写入,报错信息以下:
postgres=# select * from test;
id
—-
(0 rows)postgres=# insert into test select 1;
ERROR: database is not accepting commands to avoid wraparound data loss in database "xxxx"
HINT: Stop the postmaster and vacuum that database in single-user mode.
You might also need to commit or roll back old prepared transactions.
随后RDS工程师参与处理以后,该问题立马得到了解决。
XID基础原理
XID 定义
XID(Transaction ID)是 PostgreSQL 内部的事务编号,每一个事务都会分配一个XID,顺次递增。PostgreSQL 数据中每一个元组头部都会保存着 插入 或 删除 这条元组的XID(Transaction ID),然后内核通过这个 XID 构造数据库的一致性读。在事务隔离级别是 可重复读 的情况下,假定如有两个事务,xid1=200,xid2=201,那末 xid1 中只能看到 t_xmin <= 200 的元组,看不到 t_xmin > 200 的元组。
typedef struct HeapTupleFields
{
TransactionId t_xmin; /* 插入该元组的事务号 */
TransactionId t_xmax; /* 删除或锁定该元组的事务号 */
/*** 其它属性省略 ***/
} HeapTupleFields;
struct HeapTupleHeaderData
{
union
{
HeapTupleFields t_heap;
DatumTupleFields t_datum;
} t_choice;
/*** 其它属性省略 ***/
};
XID 发行机制
从上面结构中我们可以看到,XID 是一个32位无符号整数,也就是 XID 的范围是 0到2^32⑴;那末超过了 2^32⑴的事务怎样办呢?其实 XID 是一个环,超过了 2^32⑴ 以后又会从头开始分配。通过源代码也证明了上述结论:
#define InvalidTransactionId ((TransactionId) 0)
// 引导事务号,在数据库初始化进程(BKI履行)中使用
#define BootstrapTransactionId ((TransactionId) 1)
// 冻结事务号用于表示非常陈腐的元组,它们比所有正常事务号都要早(也就是可见)
#define FrozenTransactionId ((TransactionId) 2)
// 第一个正常事务号
#define FirstNormalTransactionId ((TransactionId) 3)
// 把 FullTransactionId 的低32位作为无符号整数生成 xid
#define XidFromFullTransactionId(x) ((uint32) (x).value)
static inline void
FullTransactionIdAdvance(FullTransactionId *dest)
{
dest->value++;
while (XidFromFullTransactionId(*dest) < FirstNormalTransactionId)
dest->value++;
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
/*** 省略 ***/
full_xid = ShmemVariableCache->nextFullXid;
xid = XidFromFullTransactionId(full_xid);
/*** 省略 ***/
FullTransactionIdAdvance(&ShmemVariableCache->nextFullXid);
/*** 省略 ***
return full_xid;
}
static void
AssignTransactionId(TransactionState s)
{
/*** 省略 ***/
s->fullTransactionId = GetNewTransactionId(isSubXact);
if (!isSubXact)
XactTopFullTransactionId = s->fullTransactionId;
/*** 省略 ***/
}
TransactionId
GetTopTransactionId(void)
{
if (!FullTransactionIdIsValid(XactTopFullTransactionId))
AssignTransactionId(&TopTransactionStateData);
return XidFromFullTransactionId(XactTopFullTransactionId);
}
可以看到,新事务号保存在同享变量缓存中:ShmemVariableCache->nextFullXid,每发行一个事务号后,向上调剂它的值,并跳过上述三个特殊值。三个特殊仠分别为0、1和2,作用可以看上面代码注释。
XID 回卷机制
前面说到,XID 是一个环,分配到 2^32⑴ 以后又从 3 开始,那末内核是怎样比较两个事务的大小的呢?比如 xid 经历了这样一个进程 3-> 2^32⑴ -> 5,那末内核怎样样知道 5 这个事务在 2^32⑴ 后面呢?我们再看一下代码:
* TransactionIdPrecedes — is id1 logically < id2?
*/
bool
TransactionIdPrecedes(TransactionId id1, TransactionId id2)
{
/*
* If either ID is a permanent XID then we can just do unsigned
* comparison. If both are normal, do a modulo⑵^32 comparison.
*/
int32 diff;
if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
return (id1 < id2);
diff = (int32) (id1 – id2);
return (diff < 0);
}
可以看到,内核使用了一个比较取巧的方法:(int32) (id1 – id2) < 0,32位有符号整数的取值范围是 ⑵^31 到 231⑴,5-(232⑴) 得到的值比 2^31⑴ 大,所以转换成 int32 会变成负数。但是这里面有一个问题,「最新事务号-最老事务号」 一定要小于 2^31,一旦大于就会出现回卷,致使老事务产生的数据对新事务不可见。
XID 回卷预防
前面讲到,「最新事务号-最老事务号」 一定要小于 2^31,否则会产生回卷致使老事务产生的数据对新事务不可见,那内核是怎样避免这个问题的呢?内核是这样处理的:通过定期把老事务产生的元组的 XID 更新为 FrozenTransactionId,即更新为2,来回收 XID,而 XID 为2 的元组对所有的事务可见,这个进程称为 XID 冻结,通过这个方式可以回收 XID 来保证 |最新事务号-最老事务号| < 2^31。
除内核自动冻结回收XID,我们也能够通过命令或 sql 的方式手动进行 xid 冻结回收
- 查询数据库或表的年龄,数据库年龄指的是:「最新事务号-数据库中最老事务号」,表年龄指的是:「最新事务号-表中最老事务号」
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1个库每一个表的年龄排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN (‘r’, ‘m’) order by age desc;
# 查看1个表的年龄
select oid::regclass,age(relfrozenxid) from pg_class where oid=’schema名称.表名称’::regclass::oid;
手动冻结回收一张表的元组的 xid 的sql:
手动冻结回收一个库里面的所有表 xid 的命令:
冻结回收进程是一个重 IO 的操作,这个进程内核会描写表的所有页面,然后把符合要求的元组的 t_xmin 字段更新为 2,所以这个进程需要在业务低峰进行,避免影响业务。
与冻结回收相关的内核参数有三个:vacuum_freeze_min_age、vacuum_freeze_table_age和autovacuum_freeze_max_age,由于笔者对这三个参数理解不深,就不在这里班门弄斧了,感兴趣的同学可以自行找资料了解一下。
解决方案
问题分析
基于上面的原理分析,我们知道,「最新事务号-最老事务号」 = 2^31⑴000000,即当前可用的 xid 仅剩下一百万的时候,内核就会制止实例写入并报错:database is not accepting commands to avoid wraparound data loss in database, 这个时候一定要连到提示中的 "xxxx" 对表进行 freeze 回收更多的 XID。
SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
{
TransactionId xidVacLimit;
TransactionId xidWarnLimit;
TransactionId xidStopLimit;
TransactionId xidWrapLimit;
TransactionId curXid;
Assert(TransactionIdIsNormal(oldest_datfrozenxid));
/*
* xidWrapLimit = 最老的事务号 + 0x7FFFFFFF,当前事务号一旦到达xidWrapLimit将产生回卷
*/
xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
if (xidWrapLimit < FirstNormalTransactionId)
xidWrapLimit += FirstNormalTransactionId;
/*
* 一旦当前事务号到达xidStopLimit,实例将不可写入,保存 1000000 的xid用于vacuum
* 每 vacuum 一张表需要占用一个xid
*/
xidStopLimit = xidWrapLimit – 1000000;
if (xidStopLimit < FirstNormalTransactionId)
xidStopLimit -= FirstNormalTransactionId;
/*
* 一旦当前事务号到达xidWarnLimit,将不停地收到
* WARNING: database “xxxx” must be vacuumed within 2740112 transactions
*/
xidWarnLimit = xidStopLimit – 10000000;
if (xidWarnLimit < FirstNormalTransactionId)
xidWarnLimit -= FirstNormalTransactionId;
/*
* 一旦当前事务号到达xidVacLimit将触发force autovacuums
*/
xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
if (xidVacLimit < FirstNormalTransactionId)
xidVacLimit += FirstNormalTransactionId;
/* Grab lock for just long enough to set the new limit values */
LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
ShmemVariableCache->oldestXid = oldest_datfrozenxid;
ShmemVariableCache->xidVacLimit = xidVacLimit;
ShmemVariableCache->xidWarnLimit = xidWarnLimit;
ShmemVariableCache->xidStopLimit = xidStopLimit;
ShmemVariableCache->xidWrapLimit = xidWrapLimit;
ShmemVariableCache->oldestXidDB = oldest_datoid;
curXid = XidFromFullTransactionId(ShmemVariableCache->nextFullXid);
LWLockRelease(XidGenLock);
/* Log the info */
ereport(DEBUG1,
(errmsg(“transaction ID wrap limit is %u, limited by database with OID %u”,
xidWrapLimit, oldest_datoid)));
/*
* 如果 当前事务号>=最老事务号+autovacuum_freeze_max_age
* 触发 autovacuum 对年龄最老的数据库进行清算,如果有多个数据库到达要求,按年龄最老的顺序顺次清算
* 通过设置标志位标记当前 autovacuum 结束以后再来一次 autovacuum
*/
if (TransactionIdFollowsOrEquals(curXid, xidVacLimit) &&
IsUnderPostmaster && !InRecovery)
SendPostmasterSignal(PMSIGNAL_START_AUTOVAC_LAUNCHER);
/* Give an immediate warning if past the wrap warn point */
if (TransactionIdFollowsOrEquals(curXid, xidWarnLimit) && !InRecovery)
{
char *oldest_datname;
if (IsTransactionState())
oldest_datname = get_database_name(oldest_datoid);
else
oldest_datname = NULL;
if (oldest_datname)
ereport(WARNING,
(errmsg(“database \”%s\” must be vacuumed within %u transactions”,
oldest_datname,
xidWrapLimit – curXid),
errhint(“To avoid a database shutdown, execute a database-wide VACUUM in that database.\n”
“You might also need to commit or roll back old prepared transactions, or drop stale replication slots.”)));
else
ereport(WARNING,
(errmsg(“database with OID %u must be vacuumed within %u transactions”,
oldest_datoid,
xidWrapLimit – curXid),
errhint(“To avoid a database shutdown, execute a database-wide VACUUM in that database.\n”
“You might also need to commit or roll back old prepared transactions, or drop stale replication slots.”)));
}
}
bool
TransactionIdFollowsOrEquals(TransactionId id1, TransactionId id2)
{
int32 diff;
if (!TransactionIdIsNormal(id1) || !TransactionIdIsNormal(id2))
return (id1 >= id2);
diff = (int32) (id1 – id2);
return (diff >= 0);
}
FullTransactionId
GetNewTransactionId(bool isSubXact)
{
/*** 省略 ***/
full_xid = ShmemVariableCache->nextFullXid;
xid = XidFromFullTransactionId(full_xid);
if (TransactionIdFollowsOrEquals(xid, ShmemVariableCache->xidVacLimit))
{
TransactionId xidWarnLimit = ShmemVariableCache->xidWarnLimit;
TransactionId xidStopLimit = ShmemVariableCache->xidStopLimit;
TransactionId xidWrapLimit = ShmemVariableCache->xidWrapLimit;
Oid oldest_datoid = ShmemVariableCache->oldestXidDB;
/*** 省略 ***/
if (IsUnderPostmaster &&
TransactionIdFollowsOrEquals(xid, xidStopLimit))
{
char *oldest_datname = get_database_name(oldest_datoid);
/* complain even if that DB has disappeared */
if (oldest_datname)
ereport(ERROR,
(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
errmsg(“database is not accepting commands to avoid wraparound data loss in database \”%s\””,
oldest_datname),
errhint(“Stop the postmaster and vacuum that database in single-user mode.\n”
“You might also need to commit or roll back old prepared transactions, or drop stale replication slots.”)));
/*** 省略 ***/
}
/*** 省略 ***/
}
/*** 省略 ***/
}
问题定位
SELECT datname, age(datfrozenxid) FROM pg_database;
# 1个库每一个表的年龄排序
SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN (‘r’, ‘m’) order by age desc;
# 查看1个表的年龄
select oid::regclass,age(relfrozenxid) from pg_class where oid=’schema名称.表名称’::regclass::oid;
问题解决
- 通过上面的第一个 sql,查找年龄最大的数据库,数据库年龄指的是:|最新事务号-数据库中最老事务号|
- 通过上面第二个 sql,查找年龄最大的表,然后对表顺次履行:vacuum freeze 表名,把表中的老事务号冻结回收,表年龄指的是:|最新事务号-表中最老事务号|
- 运维脚本
单进程 Shell 脚本
for cmd in `psql -U用户名 -p端口号 -h连接串 -d数据库名 -c “SELECT ‘vacuum freeze ‘||c.oid::regclass||’;’ as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN (‘r’, ‘m’) order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc offset 50 limit 50;” | grep -v vacuum_cmd | grep -v row | grep vacuum`; do
psql -U用户名 -p端口号 -h连接串 -d数据库名 -c “$cmd”
done
多进程 Python 脚本
import psycopg2
args = dict(host=’pgm-bp10xxxx.pg.rds.aliyuncs.com’, port=5432, dbname=’数据库名’,
user=’用户名’, password=’密码’)
def vacuum_handler(sql):
sql_str = “SELECT c.oid::regclass as table_name, greatest(age(c.relfrozenxid),age(t.relfrozenxid)) as age FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN (‘r’, ‘m’) order by age desc limit 10; ”
try:
conn = psycopg2.connect(**args)
cur = conn.cursor()
cur.execute(sql)
conn.commit()
cur = conn.cursor()
cur.execute(sql_str)
print cur.fetchall()
conn.close()
except Exception as e:
print str(e)
# 对指定数据库中年龄最大的前 1000 张表进行 vacuum freeze,32 个进程并发履行
def multi_vacuum():
pool = Pool(processes=32)
sql_str = “SELECT ‘vacuum freeze ‘||c.oid::regclass||’;’ as vacuum_cmd FROM pg_class c LEFT JOIN pg_class t ON c.reltoastrelid = t.oid WHERE c.relkind IN (‘r’, ‘m’) order by greatest(age(c.relfrozenxid),age(t.relfrozenxid)) desc limit 1000;”;
try:
conn = psycopg2.connect(**args)
cur = conn.cursor()
cur.execute(sql_str)
rows = cur.fetchall()
for row in rows:
cmd = row[‘vacuum_cmd’]
pool.apply_async(vacuum_handler, (cmd, ))
conn.close()
pool.close()
pool.join()
except Exception as e:
print str(e)
multi_vacuum()
友谊提示
vacuum freeze 会扫描表的所有页面并更新,是一个重 IO 的操作,操作进程中一定要控制好并发数,否则非常容易把实例打挂。
作者信息
谢桂起(花名:渊渱) 2020年毕业后加入阿里云,一直从事RDS PostgreSQL相关工作,善于解决线上各类RDS PostgreSQL运维管控相关问题。
总结
到此这篇关于PostgreSQL事务回卷的文章就介绍到这了,更多相关PostgreSQL事务回卷内容请搜索之前的文章或继续浏览下面的相关文章希望大家以后多多支持!
文章来源:丸子建站
文章标题:PostgreSQL事务回卷实战案例详析
https://www.wanzijz.com/view/63807.html