0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

并发情况如何实现加锁来保证数据一致性

OSC开源社区 来源:OSCHINA 社区 2023-12-21 10:22 次阅读

作者:京东云开发者-京东科技 焦泽斌

单体架构下锁的实现方案

1. ReentrantLock 全局锁

ReentrantLock(可重入锁),指的是一个线程再次对已持有的锁保护的临界资源时,重入请求将会成功。 简单的与我们常用的 Synchronized 进行比较:

ReentrantLock Synchronized
锁实现机制 依赖 AQS 监视器模式
灵活性 支持响应超时、中断、尝试获取锁 不灵活
释放形式 必须显示调用 unlock () 释放锁 自动释放监视器
锁类型 公平锁 & 非公平锁 非公平锁
条件队列 可关联多个条件队列 关联一个条件队列
可重入性 可重入 可重入

AQS 机制:如果被请求的共享资源空闲,那么就当前请求资源的线程设置为有效的工作线程,将共享资源通过 CAScompareAndSetState设置为锁定状态;如果共享资源被占用,就采用一定的阻塞等待唤醒机制(CLH 变体的 FIFO 双端队列)来保证锁分配。 可重入性:无论是公平锁还是非公平锁的情况,加锁过程会利用一个 state 值

private volatile int state

state 值初始化的时候为 0,表示没有任何线程持有锁

当有线程来请求该锁时,state 值会自增 1,同一个线程多次获取锁,就会多次 + 1,这就是可重入的概念

解锁也是对 state 值自减 1,一直到 0,此线程对锁释放。

public class LockExample {

    static int count = 0;
    static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {

        Runnable runnable = new Runnable() {
            @Override
            public void run() {

                try {
                    // 加锁
                    lock.lock();
                    for (int i = 0; i < 10000; i++) {
                        count++;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    // 解锁,放在finally子句中,保证锁的释放
                    lock.unlock();
                }
            }
        };

        Thread thread1 = new Thread(runnable);
        Thread thread2 = new Thread(runnable);
        thread1.start();
        thread2.start();
        thread1.join();
        thread2.join();
        System.out.println("count: " + count);
    }
}

/**
 * 输出
 * count: 20000
 */

2. Mysql 行锁、乐观锁 乐观锁即是无锁思想,一般都是基于 CAS 思想实现的,而在 MySQL 中通过 version 版本号 + CAS 无锁形式实现乐观锁;例如 T1,T2 两个事务一起并发执行时,当 T2 事务执行成功提交后,会对 version+1,所以 T1 事务执行的 version 条件就无法成立了。 对 sql 语句进行加锁以及状态机的操作,也可以避免不同线程同时对 count 值访问导致的数据不一致问题。
// 乐观锁 + 状态机
update
    table_name
set
    version = version + 1,
    count = count + 1
where
    id = id AND version = version AND count = [修改前的count值];

// 行锁 + 状态机
 update
    table_name
set
    count = count + 1
where
    id = id AND count = [修改前的count值]
for update;

3. 细粒度的 ReetrantLock 锁 如果我们直接采用 ReentrantLock 全局加锁,那么这种情况是一条线程获取到锁,整个程序全部的线程来到这里都会阻塞;但是我们在项目里面想要针对每个用户在操作的时候实现互斥逻辑,所以我们需要更加细粒度的锁。
public class LockExample {
    private static Map lockMap = new ConcurrentHashMap<>();
    
    public static void lock(String userId) {
        // Map中添加细粒度的锁资源
        lockMap.putIfAbsent(userId, new ReentrantLock());
        // 从容器中拿锁并实现加锁
        lockMap.get(userId).lock();
    }
    public static void unlock(String userId) {
        // 先从容器中拿锁,确保锁的存在
        Lock locak = lockMap.get(userId);
        // 释放锁
        lock.unlock();
    }
}

弊端:如果每一个用户请求共享资源,就会加锁一次,后续该用户就没有在登录过平台,但是锁对象会一直存在于内存中,这等价于发生了内存泄漏,所以锁的超时和淘汰机制机制需要实现。

4. 细粒度的 Synchronized 全局锁

上面的加锁机制使用到了锁容器ConcurrentHashMap,该容易为了线程安全的情况,多以底层还是会用到Synchronized机制,所以有些情况,使用 lockMap 需要加上两层锁。 那么我们是不是可以直接使用Synchronized来实现细粒度的锁机制

public class LockExample {
    public static void syncFunc1(Long accountId) {
        String lock = new String(accountId + "").intern();

        synchronized (lock) {

            System.out.println(Thread.currentThread().getName() + "拿到锁了");
            // 模拟业务耗时
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(Thread.currentThread().getName() + "释放锁了");
        }
    }

    public static void syncFunc2(Long accountId) {
        String lock = new String(accountId + "").intern();

        synchronized (lock) {

            System.out.println(Thread.currentThread().getName() + "拿到锁了");
            // 模拟业务耗时
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            System.out.println(Thread.currentThread().getName() + "释放锁了");
        }
    }

    // 使用 Synchronized 来实现更加细粒度的锁
    public static void main(String[] args) {
        new Thread(()-> syncFunc1(123456L), "Thread-1").start();
        new Thread(()-> syncFunc2(123456L), "Thread-2").start();
    }
}

/**
 * 打印
 * Thread-1拿到锁了
 * Thread-1释放锁了
 * Thread-2拿到锁了
 * Thread-2释放锁了
 */

从代码中我们发现实现加锁的对象其实就是一个与用户 ID 相关的一个字符串对象,这里可能会有疑问,我每一个新的线程进来,new 的都是一个新的字符串对象,只不过字符串内容一样,怎么能够保证可以安全的锁住共享资源呢;

这其实需要归功于后面的intern()函数的功能;

intern()函数用于在运行时将字符串添加到堆空间中的字符串常量池中,如果字符串已经存在,返回字符串常量池中的引用。

分布式架构下锁的实现方案

核心问题:我们需要找到一个多个进程之间所有线程可见的区域来定义这个互斥量。 一个优秀的分布式锁的实现方案应该满足如下几个特性:

分布式环境下,可以保证不同进程之间的线程互斥

同一时刻,同时只允许一条线程成功获取到锁资源

保证互斥量的地方需要保证高可用性

要保证可以高性能的获取锁和释放锁

可以支持同一线程的锁重入性

具备合理的阻塞机制,竞争锁失败的线程要有相应的处理方案

支持非阻塞式的获取锁。获取锁失败的线程可以直接返回

具备合理的锁失效机制,如超时失效等,可以确保避免死锁情况出现

Redis 实现分布式锁

redis 属于中间件,可独立部署;

对于不同的 Java 进程来说都是可见的,同时性能也非常可观

依赖与 redis 本身提供的指令setnx key value来实现分布式锁;区别于普通set指令的是只有当 key 不存在时才会设置成功,key 存在时会返回设置失败

代码实例:

// 扣库存接口
@RequestMapping("/minusInventory")
public String minusInventory(Inventory inventory) {
    // 获取锁
    String lockKey = "lock-" + inventory.getInventoryId();
    int timeOut = 100;
    Boolean flag = stringRedisTemplate.opsForValue()
            .setIfAbsent(lockKey, "竹子-熊猫",timeOut,TimeUnit.SECONDS);
    // 加上过期时间,可以保证死锁也会在一定时间内释放锁
    stringRedisTemplate.expire(lockKey,timeOut,TimeUnit.SECONDS);
    
    if(!flag){
        // 非阻塞式实现
        return "服务器繁忙...请稍后重试!!!";
    }
    
    // ----只有获取锁成功才能执行下述的减库存业务----        
    try{
        // 查询库存信息
        Inventory inventoryResult =
            inventoryService.selectByPrimaryKey(inventory.getInventoryId());
        
        if (inventoryResult.getShopCount() <= 0) {
            return "库存不足,请联系卖家....";
        }
        
        // 扣减库存
        inventoryResult.setShopCount(inventoryResult.getShopCount() - 1);
        int n = inventoryService.updateByPrimaryKeySelective(inventoryResult);
    } catch (Exception e) { // 确保业务出现异常也可以释放锁,避免死锁
        // 释放锁
        stringRedisTemplate.delete(lockKey);
    }
    
    if (n > 0)
        return "端口-" + port + ",库存扣减成功!!!";
    return "端口-" + port + ",库存扣减失败!!!";
}

作者:竹子爱熊猫
链接:https://juejin.cn/post/7038473714970656775

过期时间的合理性分析:

因为对于不同的业务,我们设置的过期时间的长短都会不一样,太长了不合适,太短了也不合适;

所以我们想到的解决方案是设置一条子线程,给当前锁资源续命。具体实现是,子线程间隔 2-3s 去查询一次 key 是否过期,如果还没有过期则代表业务线程还在执行业务,那么则为该 key 的过期时间加上 5s。

但是为了避免主线程意外死亡后,子线程会一直为其续命,造成 “长生锁” 的现象,所以将子线程变为主(业务)线程的守护线程,这样子线程就会跟着主线程一起死亡。

// 续命子线程
public class GuardThread extends Thread { 
    private static boolean flag = true;

    public GuardThread(String lockKey, 
        int timeOut, StringRedisTemplate stringRedisTemplate){
        ……
    }

    @Override
    public void run() {
        // 开启循环续命
        while (flag){
            try {
                // 先休眠一半的时间
                Thread.sleep(timeOut / 2 * 1000);
            }catch (Exception e){
                e.printStackTrace();
            }
            // 时间过了一半之后再去续命
            // 先查看key是否过期
            Long expire = stringRedisTemplate.getExpire(
                lockKey, TimeUnit.SECONDS);
            // 如果过期了,代表主线程释放了锁
            if (expire <= 0){
                // 停止循环
                flag = false;
            }
            // 如果还未过期
            // 再为则续命一半的时间
            stringRedisTemplate.expire(lockKey,expire
                + timeOut/2,TimeUnit.SECONDS);
        }
    }
}


// 创建子线程为锁续命
GuardThread guardThread = new GuardThread(lockKey,timeOut,stringRedisTemplate);
// 设置为当前 业务线程 的守护线程
guardThread.setDaemon(true);
guardThread.start();

作者:竹子爱熊猫 
链接:https://juejin.cn/post/7038473714970656775

Redis 主从架构下锁失效的问题 为了在开发过程保证 Redis 的高可用,会采用主从复制架构做读写分离,从而提升 Redis 的吞吐量以及可用性。但是如果一条线程在 redis 主节点上获取锁成功之后,主节点还没有来得及复制给从节点就宕机了,此时另一条线程访问 redis 就会在从节点上面访问,同时也获取锁成功,这时候临界资源的访问就会出现安全性问题了。 解决办法:

红锁算法(官方提出的解决方案):多台独立的 Redis 同时写入数据,在锁失效时间之内,一半以上的机器写成功则返回获取锁成功,失败的时候释放掉那些成功的机器上的锁。但这种做法缺点是成本高需要独立部署多台 Redis 节点。

额外记录锁状态:再额外通过其他独立部署的中间件(比如 DB)来记录锁状态,在新线程获取锁之前需要先查询 DB 中的锁持有记录,只要当锁状态为未持有时再尝试获取分布式锁。但是这种情况缺点显而易见,获取锁的过程实现难度复杂,性能开销也非常大;另外还需要配合定时器功能更新 DB 中的锁状态,保证锁的合理失效机制。

使用 Zookepper 实现

Zookeeper 实现分布式锁

Zookeeper 数据区别于 redis 的数据,数据是实时同步的,主节点写入后需要一半以上的节点都写入才会返回成功。所以如果像电商、教育等类型的项目追求高性能,可以放弃一定的稳定性,推荐使用 redis 实现;例如像金融、银行、政府等类型的项目,追求高稳定性,可以牺牲一部分性能,推荐使用 Zookeeper 实现。

分布式锁性能优化

上面加锁确实解决了并发情况下线程安全的问题,但是我们面对 100w 个用户同时去抢购 1000 个商品的场景该如何解决呢? db2a9dc2-9f28-11ee-8b88-92fbcf53809c.png

可与将共享资源做一下提前预热,分段分散存储一份。抢购时间为下午 15:00,提前再 14:30 左右将商品数量分成 10 份,并将每一块数据进行分别加锁,来防止并发异常。

另外也需要在 redis 中写入 10 个 key,每一个新的线程进来先随机的分配一把锁,然后进行后面的减库存逻辑,完成之后释放锁,以便之后的线程使用。

这种分布式锁的思想就是,将原先一把锁就可以实现的多线程同步访问共享资源的功能,为了提高瞬时情况下多线程的访问速度,还需要保证并发安全的情况下一种实现方式。

审核编辑:汤梓红
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 状态机
    +关注

    关注

    2

    文章

    492

    浏览量

    27522
  • MySQL
    +关注

    关注

    1

    文章

    804

    浏览量

    26524
  • 线程
    +关注

    关注

    0

    文章

    504

    浏览量

    19674
  • 数据一致性
    +关注

    关注

    0

    文章

    5

    浏览量

    1420

原文标题:并发情况如何实现加锁来保证数据一致性

文章出处:【微信号:OSC开源社区,微信公众号:OSC开源社区】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    串行数据一致性测试和验证测量基础知识

    小弟这次给大家带来了串行数据一致性测试和验证测量基础知识其中提到了些高速串行信号的测试测量方法和简单的原理性介绍,适合初学者使用。PS。这其中提到的些测量设备现在已经升级为最新的仪器设备,但是测试的原理和技术还是可以让大家闲
    发表于 04-16 16:17

    Redis缓存和MySQL数据一致原因和解决方案

    并发架构系列:Redis缓存和MySQL数据一致性方案详解
    发表于 03-27 15:55

    如何解决stm32 H7 DMA串口发送数据一致性问题?

    如何解决stm32 H7 DMA串口发送数据一致性问题?
    发表于 12-06 06:05

    顺序一致性和TSO一致性分别是什么?SC和TSO到底哪个好?

    保证某个内核的fence指令前的所有memory访问能够在fence指令后的memory访问之前完成。实际上fence指令(或者说是memory barrier)在后续我们即将介绍的弱序内存一致性
    发表于 07-19 14:54

    VxWorks中主备数据一致性功能组件的设计与实现

    数据一致性是主备用系统必须解决的问题。目前主备系统的一致性都采用手工编程实现。导致代码结构繁杂,且效率不高。利用VxWorks的异常处理机制,结合RISC CPU的特性.设
    发表于 12-16 14:21 5次下载

    一致性规划研究

    针对一致性规划的高度求解复杂度,分析主流一致性规划器的求解策略,给出影响一致性规划器性能的主要因素:启发信息的有效,信念状态表示方法的紧凑
    发表于 04-06 08:43 12次下载

    VxWorks中主备数据一致性功能组件的设计与实现

    数据一致性是主备用系统必须解决的问题。目前主备系统的一致性都采用手工编程实现,导致代码结构繁杂,且效率不高。利用VxWorks 的异常处理机制,结合RISC CPU 的特性,设计实
    发表于 09-22 11:32 8次下载

    VxWorks中主备数据一致性功能组件的设计与实现

    数据一致性是主备用系统必须解决的问题。目前主备系统的一致性都采用手工编程实现。导致代码结构繁杂,且效率不高。利用VxWorks的异常处理机制,结合RISC CPU的特性.设计
    发表于 11-28 16:47 11次下载

    P2P平台上的数据一致性研究

    P2P网络是个自组织的动态网络,对等点可以随意的加入或者离开网络,因此如何控制数据一致性成了P2P网络平台应用扩展应用的关键点,本文引入数据一致性算法到P2P网络平台中
    发表于 02-25 16:06 15次下载

    串行数据一致性及验证基础指南

    本基础指南旨在帮助您了解串行数据传输的般方面,并介绍适用于这些新兴串行技术的模拟和数字测量要求。 串行数据一致性测试和验证测量基础知识本手册将帮助您理解串行
    发表于 08-05 15:14 32次下载

    电能质量监测数据一致性定义及检测方法_邱丽羚

    电能质量监测数据一致性定义及检测方法_邱丽羚
    发表于 01-08 11:07 0次下载

    分布式系统的CAP和数据一致性模型

    CAP理论的核心思想是任何基于网络的数据共享系统最多只能满足数据一致性(Consistency)、可用(Availability)和网络分区容忍(Partition Tolerance)三个特性中的两个。
    的头像 发表于 05-05 23:20 2265次阅读

    如何保障MySQL和Redis的数据一致性

    我直接先抛下结论:在满足实时的条件下,不存在两者完全保存一致的方案,只有最终一致性方案。根据网上的众多解决方案,总结出 6 种,直接看目录。
    的头像 发表于 03-14 16:48 823次阅读

    如何保证缓存一致性

    “ 本文的参考文章是2022年HOT 34上Intel Rob Blakenship关于CXL缓存一致性篇介绍。”
    的头像 发表于 10-19 17:42 1086次阅读
    如何<b class='flag-5'>保证</b>缓存<b class='flag-5'>一致性</b>

    深入理解数据备份的关键原则:应用一致性与崩溃一致性的区别

    深入理解数据备份的关键原则:应用一致性与崩溃一致性的区别 在数字化时代,数据备份成为了企业信息安全的核心环节。但在备份过程中,两个关键概念——应用
    的头像 发表于 03-11 11:29 895次阅读
    深入理解<b class='flag-5'>数据</b>备份的关键原则:应用<b class='flag-5'>一致性</b>与崩溃<b class='flag-5'>一致性</b>的区别