物理锁、逻辑锁,物理锁管理器、逻辑锁管理器区别与联系,用spring batch批处理中锁的使用

发布于:2025-03-26 ⋅ 阅读:(23) ⋅ 点赞:(0)

以下是物理锁、逻辑锁及其管理器的区别与联系,结合数据库系统和并发控制场景进行说明:


1. 物理锁(Physical Lock)

定义
直接作用于具体资源(如数据行、表、文件等)的锁,用于控制并发访问,确保数据一致性。
特点

  • 直接关联资源:每个物理锁对应一个具体的资源实例(如数据库表的行锁)。
  • 原子操作:通过硬件指令(如CAS)或系统调用实现原子性。
  • 低层实现:通常由操作系统或数据库内核直接管理。

示例

  • MySQL的行锁(Row Lock)、表锁(Table Lock)。
  • 文件系统中的文件锁(如flock)。

2. 逻辑锁(Logical Lock)

定义
基于协议或规则的抽象锁机制,不直接绑定物理资源,而是通过规则控制访问顺序或条件。
特点

  • 协议驱动:依赖锁协议(如两阶段锁协议、乐观锁协议)。
  • 无资源绑定:不直接管理资源,而是管理访问逻辑。
  • 高层策略:用于协调并发流程或事务顺序。

示例

  • 乐观锁(CAS操作、版本号控制)。
  • 分布式锁中的逻辑规则(如ZooKeeper的顺序节点)。

3. 物理锁管理器(Physical Lock Manager)

职责

  • 锁的分配与释放:为资源分配物理锁,记录锁状态(如持有者、类型)。
  • 冲突检测:检查锁请求是否与现有锁冲突(如写锁请求与现有读锁冲突)。
  • 死锁检测与解决:通过超时或资源优先级策略处理死锁。
  • 锁表维护:维护全局锁表(Lock Table),记录所有锁的分配状态。

实现

  • 数据库中的锁管理模块(如Oracle的锁管理器)。
  • 操作系统中的互斥锁(Mutex)管理器。

4. 逻辑锁管理器(Logical Lock Manager)

职责

  • 策略制定:定义锁的使用规则(如两阶段锁协议)。
  • 事务协调:确保事务遵循锁协议(如事务必须先获取读锁再升级为写锁)。
  • 异常处理:处理违反逻辑规则的场景(如事务回滚)。
  • 资源调度:通过逻辑规则优化资源分配顺序,减少冲突。

实现

  • 分布式系统中的协调器(如Raft协议的领导者选举逻辑)。
  • 事务管理器中的并发控制模块。

区别与联系

对比维度 物理锁/管理器 逻辑锁/管理器
核心目标 直接控制资源访问 通过规则协调访问顺序
实现层级 底层(硬件/操作系统/数据库内核) 高层(协议/策略/应用逻辑)
资源绑定 直接绑定具体资源(如文件、数据行) 不绑定资源,仅绑定规则
冲突解决 检测物理资源冲突并阻塞/释放锁 通过协议确保逻辑顺序,避免冲突
典型场景 数据库事务的行级锁、文件系统锁 乐观锁、分布式锁协议、事务两阶段提交

协同工作示例

以数据库事务并发控制为例:

  1. 逻辑锁管理器

    • 根据两阶段锁协议,规定事务必须先获取所有锁再释放。
    • 决定使用读锁(S锁)或写锁(X锁)。
  2. 物理锁管理器

    • 为事务分配具体资源的锁(如表A的X锁)。
    • 检测锁冲突(如另一个事务请求表A的S锁时,若当前存在X锁则阻塞)。
  3. 协作流程

    • 逻辑锁管理器制定策略 → 物理锁管理器执行锁操作 → 两者共同避免数据不一致和死锁。

总结

  • 物理锁是“工具”,直接控制资源访问;逻辑锁是“规则”,确保访问顺序正确。
  • 物理锁管理器负责底层资源控制;逻辑锁管理器负责高层策略制定。
  • 二者共同作用,实现系统并发控制,平衡性能与数据一致性。

以下是物理锁、逻辑锁及其管理器的区别与联系,以及在Spring Batch中的Java代码示例:


Spring Batch中锁的Java代码示例

1. 物理锁示例:使用数据库表实现作业互斥
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;

@Component
public class LockExampleJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    // 物理锁:通过数据库表实现作业互斥
    public Job lockJob() {
        return jobBuilderFactory.get("lockJob")
                .start(lockStep())
                .build();
    }

    private Step lockStep() {
        return stepBuilderFactory.get("lockStep")
                .tasklet((contribution, chunkContext) -> {
                    try {
                        // 尝试获取物理锁(数据库表锁)
                        try (Connection conn = dataSource.getConnection()) {
                            conn.createStatement().execute("SELECT GET_LOCK('BATCH_LOCK', 5000)");
                            System.out.println("成功获取物理锁,开始执行任务...");
                            // 执行核心业务逻辑
                            processBusinessLogic();
                            // 释放锁
                            conn.createStatement().execute("SELECT RELEASE_LOCK('BATCH_LOCK')");
                        }
                    } catch (SQLException e) {
                        throw new RuntimeException("获取物理锁失败", e);
                    }
                    return null;
                })
                .build();
    }

    private void processBusinessLogic() {
        // 模拟业务处理
        System.out.println("正在处理业务逻辑...");
    }
}

注释说明

  1. 物理锁实现
    • 使用MySQL的GET_LOCKRELEASE_LOCK函数实现数据库级别的锁。
    • 通过try-with-resources确保锁的释放。
    • 如果锁未在5秒内获取,抛出异常终止作业。

2. 逻辑锁示例:使用乐观锁(版本号校验)
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

@Component
public class LogicalLockExampleJob {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private DataSource dataSource;

    // 逻辑锁:通过版本号实现乐观锁
    public Job logicalLockJob() {
        return jobBuilderFactory.get("logicalLockJob")
                .start(logicalLockStep())
                .build();
    }

    private Step logicalLockStep() {
        return stepBuilderFactory.get("logicalLockStep")
                .<DataItem, DataItem>chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .build();
    }

    // 读取数据(包含版本号字段)
    private JdbcPagingItemReader<DataItem> reader() {
        return new JdbcPagingItemReaderBuilder<DataItem>()
                .dataSource(dataSource)
                .queryProvider(() -> {
                    String sql = "SELECT id, data, version FROM items";
                    return new MySqlPagingQueryProvider(sql);
                })
                .rowMapper(new RowMapper<DataItem>() {
                    @Override
                    public DataItem mapRow(ResultSet rs, int rowNum) throws SQLException {
                        return new DataItem(
                                rs.getLong("id"),
                                rs.getString("data"),
                                rs.getInt("version")
                        );
                    }
                })
                .name("dataItemReader")
                .build();
    }

    // 处理数据时检查版本号(逻辑锁)
    private ItemProcessor<DataItem, DataItem> processor() {
        return item -> {
            // 假设版本号在读取后被其他事务修改,此处抛出异常
            if (item.getVersion() != expectedVersion) {
                throw new OptimisticLockingFailureException("版本号不匹配,数据已修改");
            }
            return item;
        };
    }

    // 写入数据时更新版本号
    private ItemWriter<DataItem> writer() {
        return items -> {
            for (DataItem item : items) {
                String sql = "UPDATE items SET data = ?, version = version + 1 WHERE id = ? AND version = ?";
                dataSource.getConnection().prepareStatement(sql)
                        .setString(1, item.getData())
                        .setLong(2, item.getId())
                        .setInt(3, item.getVersion())
                        .executeUpdate();
            }
        };
    }
}

// 数据实体类(包含版本号字段)
class DataItem {
    private Long id;
    private String data;
    private int version;

    // 构造函数、getter/setter
}

注释说明

  1. 逻辑锁实现
    • 使用版本号(version字段)实现乐观锁:
      • 读取数据时记录当前版本号。
      • 更新时检查版本号是否匹配,若不匹配则拒绝更新。
    • 通过SQL的WHERE version = ?条件确保并发安全。
    • 如果版本号不匹配,抛出OptimisticLockingFailureException终止处理。