侧边栏壁纸
博主头像
再见理想博主等级

只争朝夕,不负韶华

  • 累计撰写 112 篇文章
  • 累计创建 64 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

多线程怎么做事务管理

再见理想
2023-07-17 / 0 评论 / 0 点赞 / 1,807 阅读 / 1,487 字

一,场景

假设我们有一个大数据系统,每天指定时间,需要从大数据系统中拉取 50w 条数据,对数据进行一个清洗操作,然后把数据保存到我们业务系统的数据库中。对于业务系统而言,这 50w 条数据,必须全部落库,要么就是一条都不插入。

二,方案分析

如果使用for循环去处理,需要几十万次IO,显然是效率太低。对于大批量数据库操作,如果能有办法实现批量更新那是最好,减少IO的次数,能满足需求情况下建议使用此方案。
如果要追求更短的处理时间,只能搬出多线程,开5个线程,每个线程处理10w数据。随即而来的问题是,Spring里面,事务的隔离性是通过ThreadLocal实现,每个线程都有自己的ThreadLocal,线程间互不干扰。每个线程对应一个事务,如果其中一个线程出现异常,怎么让所有线程都回滚?
能想到编程式事务,这问题就解决一半,通过编程式事务,我们就能完全掌控事务的开启和提交或者回滚操作。首先我们有一个全局变量为 Boolean 类型,默认为可以提交。在子线程里面,我们可以先通过编程式事务开启事务,然后插入 10w 条数据后,但是不提交,同时告诉主线程是否出现异常状态。主线程收集所有子线程的状态,判断是提交或者回滚。

三,代码实现

我们可以创建两个CountDownLatch锁,一个CountDownLatch锁(代码中mainMonitor)用于阻塞子线程提交或回滚操作,另一个CountDownLatch锁(代码中childMonitor)用于子线程的计数。当childMonitor计数为0时,主线程修改IS_OK是否可以提交状态,这时mainMonitor计数器归0,子线程根据IS_OK状态判断是提交或者回滚各自的事务。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class ThreadTransactionDemo {
    //是否可以提交
    public static volatile boolean IS_OK = true;

    public static void main(String[] args) {
        //子线程等待主线程通知
        CountDownLatch mainMonitor = new CountDownLatch(1);
        int threadCount = 5;
        CountDownLatch childMonitor = new CountDownLatch(threadCount);
        //子线程运行结果
        List<Boolean> childResponse = new ArrayList<Boolean>();
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < threadCount; i++) {
            int finalI = i;
            executor.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + ":开始执行");
                    /*if (finalI == 4) {
                    	// 模拟异常
                        throw new Exception("出现异常");
                    }*/
                    TimeUnit.MILLISECONDS.sleep(ThreadLocalRandom.current().nextInt(1000));
                    childResponse.add(Boolean.TRUE);
                    childMonitor.countDown();
                    System.out.println(Thread.currentThread().getName() + ":准备就绪,等待其他线程结果,判断是否事务提交");
                    mainMonitor.await();
                    if (IS_OK) {
                        System.out.println(Thread.currentThread().getName() + ":事务提交");
                    } else {
                        System.out.println(Thread.currentThread().getName() + ":事务回滚");
                    }
                } catch (Exception e) {
                    childResponse.add(Boolean.FALSE);
                    childMonitor.countDown();
                    System.out.println(Thread.currentThread().getName() + ":出现异常,开始事务回滚");
                }
            });
        }
        //主线程等待所有子线程执行response
        try {
            childMonitor.await();
            for (Boolean resp : childResponse) {
                if (!resp) {
                    //如果有一个子线程执行失败了,则改变mainResult,让所有子线程回滚
                    System.out.println(Thread.currentThread().getName()+":有线程执行失败,标志位设置为false");
                    IS_OK = false;
                    break;
                }
            }
            //主线程获取结果成功,让子线程开始根据主线程的结果执行(提交或回滚)
            mainMonitor.countDown();
            //为了让主线程阻塞,让子线程执行。
            Thread.currentThread().join();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

所有子线程均正常情况下,打印如下:

如果有一个子线程出现异常,打印如下:

需要注意的是,这里启动的线程数不是越大越好,同时需要保证子线程数少于核心线程数,否则就会有任务进入队列,核心线程会一直阻塞住,等待主线程唤醒,而主线程在等待所有线程的运行结果,所以它会一直等待下去。

四,二阶段提交?

按照上面实现,一不留神实现了一个类似于两阶段提交(2PC)的一致性协议,这个实现方式实际上就是编程式事务配合二阶段提交(2PC)使用。前面说到,能想到编程式事务,这问题就解决一半,另一半就是二阶段提交。
也就是可能出现问题的地方,二阶段提交可能出现的问题再这里也可能会出现,比如协调者的单点问题,可以参考文章:《分布式事务-两阶段提交》。
其实当我们把一个个子线程理解为微服务中的一个个子系统的时候,这就是一个分布式事务的场景了。而我们拿出来的解决方案,并不是一个完美的解决方案。虽然,从某种角度上,我们绕开了事务的隔离性,但是有一定概率出现数据一致性问题。但由于都在同一个方法里面,没有网络的问题,要么都提交,要么都回滚,出现数据不一致的情况概率较低。
多线程事务换个角度想,可以理解为分布式事务,可以借助这个案例去了解分布式事务。但是解决分布式事务的最好的方法就是:不要有分布式事务!而解决分布式事务的绝大部分落地方案都是:最终一致性。

0

评论区