PostgreSQL 的 pg_advisory_lock 函数
pg_advisory_lock 是 PostgreSQL 提供的一种应用级锁机制,它不锁定具体的数据库对象(如表或行),而是通过数字键值来协调应用间的并发控制。
锁的基本概念
PostgreSQL 提供两种咨询锁(advisory lock):
- 会话级咨询锁:锁在会话结束时自动释放
- 事务级咨询锁:锁在事务结束时自动释放
主要函数列表
函数 | 描述 |
---|---|
pg_advisory_lock(key) | 获取会话级咨询锁(阻塞) |
pg_try_advisory_lock(key) | 尝试获取会话级咨询锁(非阻塞) |
pg_advisory_xact_lock(key) | 获取事务级咨询锁(阻塞) |
pg_try_advisory_xact_lock(key) | 尝试获取事务级咨询锁(非阻塞) |
pg_advisory_unlock(key) | 释放会话级咨询锁 |
pg_advisory_unlock_all() | 释放当前会话持有的所有咨询锁 |
函数详解
1 pg_advisory_lock(key bigint)
功能:获取一个会话级别的咨询锁(如果锁已被其他会话持有,则阻塞等待)
参数:
key
:64位整数锁标识
示例:
SELECT pg_advisory_lock(123456);
-- 执行需要同步的操作
SELECT pg_advisory_unlock(123456);
2 pg_try_advisory_lock(key bigint)
功能:尝试获取会话级咨询锁(非阻塞,立即返回成功与否)
返回值:boolean(true表示获取成功)
示例:
DO $$
BEGIN
IF pg_try_advisory_lock(123456) THEN
RAISE NOTICE 'Lock acquired, performing work...';
-- 执行受保护的操作
PERFORM pg_advisory_unlock(123456);
ELSE
RAISE NOTICE 'Could not acquire lock, skipping...';
END IF;
END $$;
3 pg_advisory_xact_lock(key bigint)
功能:获取事务级咨询锁(锁在事务结束时自动释放)
示例:
BEGIN;
SELECT pg_advisory_xact_lock(123456);
-- 执行需要同步的操作
COMMIT; -- 锁自动释放
4 pg_try_advisory_xact_lock(key bigint)
功能:尝试获取事务级咨询锁(非阻塞)
示例:
BEGIN;
SELECT pg_try_advisory_xact_lock(123456);
-- 无论是否获取成功都继续执行
COMMIT;
锁的键值设计
咨询锁使用64位整数作为键值,有两种使用方式:
单键模式:使用一个64位整数
SELECT pg_advisory_lock(123456789);
双键模式:使用两个32位整数组合
SELECT pg_advisory_lock(123, 456);
实际应用场景
场景1:防止定时任务重复执行
-- 在定时任务开始时检查锁
DO $$
BEGIN
IF NOT pg_try_advisory_xact_lock(987654) THEN
RAISE NOTICE 'Task is already running in another process';
RETURN;
END IF;
RAISE NOTICE 'Starting scheduled task...';
-- 执行定时任务逻辑
-- ...
COMMIT; -- 锁自动释放
END $$;
场景2:应用级分布式锁
-- 应用1获取锁
SELECT pg_advisory_lock(555555) FROM my_table WHERE id = 1;
-- 应用2尝试获取同样的锁
SELECT pg_try_advisory_lock(555555); -- 返回false
-- 应用1释放锁
SELECT pg_advisory_unlock(555555);
场景3:确保单实例初始化
-- 系统初始化时确保只执行一次
DO $$
BEGIN
-- 尝试获取锁,等待最多5秒
FOR i IN 1..5 LOOP
IF pg_try_advisory_lock(1357924680) THEN
-- 检查是否已经初始化
IF NOT EXISTS (SELECT 1 FROM system_status WHERE initialized = true) THEN
-- 执行初始化
INSERT INTO system_status(initialized) VALUES (true);
RAISE NOTICE 'System initialized successfully';
ELSE
RAISE NOTICE 'System already initialized';
END IF;
-- 显式释放锁(虽然会话结束会自动释放)
PERFORM pg_advisory_unlock(1357924680);
RETURN;
END IF;
PERFORM pg_sleep(1); -- 等待1秒
END LOOP;
RAISE EXCEPTION 'Could not acquire initialization lock after 5 seconds';
END $$;
监控咨询锁
查看当前持有的咨询锁
SELECT locktype, classid, objid, objsubid, mode, granted
FROM pg_locks
WHERE locktype = 'advisory';
查看所有咨询锁(包括已授予和等待的)
SELECT pid, locktype, mode, granted, fastpath, virtualtransaction
FROM pg_locks
WHERE locktype = 'advisory';
注意事项
锁释放:
- 会话级锁必须显式释放或会话结束自动释放
- 事务级锁在事务结束时自动释放
死锁风险:
- 按固定顺序获取多个咨询锁以避免死锁
- 使用 pg_try_advisory_lock 可以降低死锁风险
性能影响:
- 咨询锁比表锁/行锁更轻量级
- 大量使用仍可能影响性能
集群环境:
- 咨询锁只在单个PostgreSQL实例内有效
- 不适用于跨多个数据库实例的协调
锁标识管理:
- 建议在应用中集中管理锁标识
- 使用有意义的常量而非魔法数字
高级用法
超时获取锁
DO $$
DECLARE
lock_acquired BOOLEAN := false;
timeout INTERVAL := '5 seconds';
start_time TIMESTAMP := clock_timestamp();
BEGIN
WHILE (clock_timestamp() - start_time) < timeout LOOP
IF pg_try_advisory_lock(424242) THEN
lock_acquired := true;
EXIT;
END IF;
PERFORM pg_sleep(0.1); -- 等待100ms
END LOOP;
IF lock_acquired THEN
RAISE NOTICE 'Lock acquired after %', clock_timestamp() - start_time;
-- 执行受保护的操作
PERFORM pg_advisory_unlock(424242);
ELSE
RAISE EXCEPTION 'Could not acquire lock within timeout';
END IF;
END $$;
使用咨询锁实现队列
-- 生产者
SELECT pg_advisory_lock(987); -- 全局写锁
-- 插入队列项
INSERT INTO job_queue(job_data) VALUES ('some data');
SELECT pg_advisory_unlock(987);
-- 消费者
SELECT pg_advisory_lock(988); -- 全局读锁
-- 获取并锁定一个作业
UPDATE job_queue
SET status = 'processing',
worker_id = pg_backend_pid(),
claimed_at = NOW()
WHERE id = (
SELECT id
FROM job_queue
WHERE status = 'pending'
ORDER BY created_at
LIMIT 1
)
RETURNING *;
SELECT pg_advisory_unlock(988);
pg_advisory_lock 是 PostgreSQL 强大的应用级同步机制,合理使用可以解决复杂的并发控制问题,但需要谨慎设计以避免死锁和性能问题。