Java并发工具类ThreadLocal、ConcurrentHashMap和CopyOnWrite的正确用法

后端 潘老师 5个月前 (12-01) 119 ℃ (0) 扫码查看

本文主要讲解关于Java并发工具类的正确用法相关内容,涉及ThreadLocal、ConcurrentHashMap和CopyOnWriteArrayList的正确用法,让我们来一起学习下吧!

我们来看看在使用并发工具时,最常遇到哪些坑,以及如何解决、避免这些坑吧。

线程重用导致用户信息错乱

之前在生产上遇到一个诡异的bug,埋点数据的用户信息错乱,有时候获取到的信息是其他人的。看了代码之后发现是使用了ThreadLocal来缓存的用户个人信息。

我们知道,ThreadLocal适用于变量在线程间隔离,而在方法或类间共享的场景。如果用户信息的获取比较昂贵(比如从数据库查询用户信息),那么在ThreadLocal中缓存数据是比较合适的做法。但,这么做为什么会出现用户信息错乱的Bug呢?

废话不多说,我们来模拟具体场景

private static final ThreadLocal<Integer> CURRENT_USER = ThreadLocal.withInitial(() -> null);

    @GetMapping("currentUser")
    public Map<String, String> currentUser(@RequestParam("userId") Integer userId) {
        // 在设置用户信息之前先查询用户信息
        String before = Thread.currentThread().getName() + "-->" + CURRENT_USER.get();

        // 设置用户信息
        CURRENT_USER.set(userId);

        // 再次获取用户信息
        String after = Thread.currentThread().getName() + "-->" + CURRENT_USER.get();

        // 对比两次结果
        Map<String, String> result = new HashMap<>();
        result.put("before", before);
        result.put("after", after);
        return result;

    }

请求一下GET http://localhost:8080/currentUser?userId=1

我们来看一下结果,很正常哈

{
  "before": "http-nio-8080-exec-1-->null",
  "after": "http-nio-8080-exec-1-->1"
}

按理说,在设置用户信息之前第一次获取的值始终应该是null,但,程序运行在Tomcat中,执行程序的线程是Tomcat的工作线程,而Tomcat的工作线程是基于线程池的。

顾名思义,线程池会重用固定的几个线程,一旦线程重用,那么很可能首次从ThreadLocal获取的值是之前其他用户的请求遗留的值。这时,ThreadLocal中的用户信息就是其他用户的信息。

为了更快地重现这个问题,建议在配置文件中设置一下Tomcat的参数,把工作线程池最大线程数设置为1,这样始终是同一个线程在处理请求:

server.tomcat.max-threads=1

然后,用户2来了GET http://localhost:8080/currentUser?userId=2

{
  "before": "http-nio-8080-exec-1-->1",
  "after": "http-nio-8080-exec-1-->2"
}

完了,完犊子了,不一样了,很明显的拿到了第一个用户的信息

我们来反思一下哈

这个例子告诉我们,在写业务代码时,首先要理解代码会跑在什么线程上:

  • 我们可能会抱怨学多线程没用,因为代码里没有开启使用多线程。但其实,可能只是我们没有意识到,在Tomcat这种Web服务器下跑的业务代码,本来就运行在一个多线程环境(否则接口也不可能支持这么高的并发), 并不能认为没有显式开启多线程就不会有线程安全问题
  • 线程的创建比较昂贵,所以Web服务器一般会使用线程池来处理请求,这就意味着线程会被重用。这时, 使用类似ThreadLocal工具来存放一些数据时,需要特别注意在代码运行完后,显式地去清空设置的数据。如果在代码中使用了自定义的线程池,也同样会遇到这个问题。

所以我们一定要记住,在代码的finally代码块中,显式清除ThreadLocal中的数据。这样一来,新的请求过来即使使用了之前的线程也不会获取到错误的用户信息了。

private static final ThreadLocal<Integer> CURRENT_USER = ThreadLocal.withInitial(() -> null);

    @GetMapping("currentUser")
    public Map<String, String> currentUser(@RequestParam("userId") Integer userId) {
        try {
            // 在设置用户信息之前先查询用户信息
            String before = Thread.currentThread().getName() + "-->" + CURRENT_USER.get();

            // 设置用户信息
            CURRENT_USER.set(userId);

            // 再次获取用户信息
            String after = Thread.currentThread().getName() + "-->" + CURRENT_USER.get();

            // 对比两次结果
            Map<String, String> result = new HashMap<>();
            result.put("before", before);
            result.put("after", after);
            return result;
        } finally {
            CURRENT_USER.remove();
        }
    }

使用线程安全的并发工具真的就万无一失了吗?

众所周知,ConcurrentHashMap,是一个高性能的线程安全的哈希表容器。“线程安全”这四个字特别容易让人误解,因为 ConcurrentHashMap只能保证提供的原子性读写操作是线程安全的。

   // 线程个数
    private static int THREAD_COUNT = 10;
    // 总元素数量
    private static int SIZE_COUNT = 1000;

    //获得一个指定元素数量模拟数据的ConcurrentHashMap
    private ConcurrentHashMap<String, Long> mock(int count) {
        return LongStream.rangeClosed(1, count)
                .boxed()
                .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
                        (o1, o2) -> o1, ConcurrentHashMap::new));
    }

    @GetMapping("error")
    public String error() throws InterruptedException {
        ConcurrentHashMap<String, Long> concurrentHashMap = mock(SIZE_COUNT - 100);
        //初始900个元素
        log.info("init size:{}", concurrentHashMap.size());

        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        //使用线程池并发处理逻辑
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
            //查询还差多少个元素
            int gap = SIZE_COUNT - concurrentHashMap.size();
            log.info("gap size:{}", gap);
            //填充元素
            concurrentHashMap.putAll(mock(gap));
        }));
        //等待所有任务完成
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        //最后元素个数会是多少呢?
        log.info("finish size:{}", concurrentHashMap.size());
        return "Error";
    }

看看结果,大跌眼镜喽

2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - init size:900
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:20:00 INFO  c.j.e.c.ConcurrentHashMapError - finish size:1900

明显可以看出已经过度填充了。

回到ConcurrentHashMap,我们需要注意 ConcurrentHashMap对外提供的方法或能力的限制

  • 使用了ConcurrentHashMap,不代表对它的多个操作之间的状态是一致的,是没有其他线程在操作它的,如果需要确保需要手动加锁。
  • 诸如size、isEmpty和containsValue等聚合方法,在并发情况下可能会反映ConcurrentHashMap的中间状态。因此在并发情况下,这些方法的返回值只能用作参考,而不能用于流程控制。显然,利用size方法计算差异值,是一个流程控制。
  • 诸如putAll这样的聚合方法也不能确保原子性,在putAll的过程中去获取数据可能会获取到部分数据。

代码的修改方案很简单,整段逻辑加锁即可:

@GetMapping("ok")
    public String ok() throws InterruptedException {
        ConcurrentHashMap<String, Long> concurrentHashMap = mock(SIZE_COUNT - 100);
        //初始900个元素
        log.info("init size:{}", concurrentHashMap.size());

        ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
        //使用线程池并发处理逻辑
        forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
            synchronized (concurrentHashMap){
                //查询还需要补充多少个元素
                int gap = SIZE_COUNT - concurrentHashMap.size();
                log.info("gap size:{}", gap);
                //补充元素
                concurrentHashMap.putAll(mock(gap));
            }

        }));
        //等待所有任务完成
        forkJoinPool.shutdown();
        forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
        //最后元素个数会是1000吗?
        log.info("finish size:{}", concurrentHashMap.size());
        return "OK";
    }

可以看到结果是正确的

2023-11-30 15:33:48 [http-nio-8080-exec-1] INFO  c.j.e.c.ConcurrentHashMapError - init size:900
2023-11-30 15:33:48 [ForkJoinPool-1-worker-13] INFO  c.j.e.c.ConcurrentHashMapError - gap size:100
2023-11-30 15:33:48 [ForkJoinPool-1-worker-6] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-11] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-10] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-1] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-2] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-4] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-8] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-15] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [ForkJoinPool-1-worker-9] INFO  c.j.e.c.ConcurrentHashMapError - gap size:0
2023-11-30 15:33:48 [http-nio-8080-exec-1] INFO  c.j.e.c.ConcurrentHashMapError - finish size:1000

了解并正确的使用并发工具,不会导致性能问题

CopyOnWrite是一个时髦的技术,不管是Linux还是Redis都会用到。 在Java中,CopyOnWriteArrayList虽然是一个线程安全的ArrayList,但因为其实现方式是,每次修改数据时都会复制一份数据出来,所以有明显的适用场景,即读多写少或者说希望无锁读的场景。

如果我们要使用CopyOnWriteArrayList,那一定是因为场景需要而不是因为足够酷炫。如果读写比例均衡或者有大量写操作的话,使用CopyOnWriteArrayList的性能会非常糟糕。

我们写一段测试代码,来比较下使用CopyOnWriteArrayList和普通加锁方式ArrayList的读写性能吧。在这段代码中我们针对并发读和并发写分别写了一个测试方法,测试两者一定次数的写或读操作的耗时。

//测试并发写的性能
    @GetMapping("write")
    public Map testWrite() {
        CopyOnWriteArrayList<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
        List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());

        StopWatch stopWatch = new StopWatch();
        int loopCount = 100000;
        stopWatch.start("copyOnWriteArrayList");
        // 循环并发向copyOnWriteArrayList写入元素
        IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
        stopWatch.stop();

        stopWatch.start("synchronizedList");
        // 循环并发向synchronizedList写入元素
        IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());

        Map<String, Integer> sizeMap = new HashMap();
        sizeMap.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
        sizeMap.put("synchronizedList", synchronizedList.size());
        return sizeMap;
    }

来看一下结果,在十万数据场景下,CopyOnWriteArray几乎比普通加锁方式ArrayList的写性能差了一百倍!!!

---------------------------------------------
ns         %     Task name
---------------------------------------------
3083770209  099%  copyOnWriteArrayList
039863541  001%  synchronizedList

//填充List
    private void addAll(List<Integer> list) {
        list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
    }

    //测试并发读的性能
    @GetMapping("read")
    public Map testRead() {
        //创建CopyOnWriteArrayList和synchronizedList
        List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
        List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
        //填充数据
        addAll(copyOnWriteArrayList);
        addAll(synchronizedList);
        StopWatch stopWatch = new StopWatch();
        int loopCount = 1000000;
        int count = copyOnWriteArrayList.size();
        stopWatch.start("Read:copyOnWriteArrayList");
        //循环1000000次并发从CopyOnWriteArrayList随机查询元素
        IntStream.rangeClosed(1, loopCount).parallel().forEach(i -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
        stopWatch.stop();
        stopWatch.start("Read:synchronizedList");
        //循环1000000次并发从加锁的ArrayList随机查询元素
        IntStream.range(0, loopCount).parallel().forEach(i -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
        Map<String, Integer> sizeMap = new HashMap<>();
        sizeMap.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
        sizeMap.put("synchronizedList", synchronizedList.size());
        return sizeMap;
    }

而在大量读的场景下(100万次get操作),CopyOnWriteArray又比同步的ArrayList快八倍以上:

---------------------------------------------
ns         %     Task name
---------------------------------------------
022652125  011%  Read:copyOnWriteArrayList
186287000  089%  Read:synchronizedList

你可能会问,为何在大量写的场景下,CopyOnWriteArrayList会这么慢呢?

答案就在源码中。以add方法为例,每次add时,都会用Arrays.copyOf创建一个新数组,频繁add时内存的申请释放消耗会很大:

        public boolean add(E e) {
        synchronized (lock) {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        }
    }

以上就是关于Java并发工具类ThreadLocal、ConcurrentHashMap和CopyOnWrite的正确用法相关的全部内容,希望对你有帮助。欢迎持续关注潘子夜个人博客(www.panziye.com),学习愉快哦!


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/back/12051.html
喜欢 (0)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】