T O P

[资源分享]     mysql-DuplicateUpdate和java的threadpool的"死锁"

  • By - 楼主

  • 2022-12-02 18:02:08
  • 大家千万不要被文章的标题给迷惑了,他两在本篇文章是没有关系的, 今天给大家讲讲最近2个有意思的issue,分享一下我学到的

    • mysql DuplicateUpdate的用法要注意的点
    • java的threadpool使用不当会造成“死锁”问题

    mysql DuplicateUpdate的用法要注意的点

    有个issue说遇到了一个这样的问题, image

    这个朋友使用我开源的job调度框架 https://github.com/yuzd/Hangfire.HttpJob

    存储用的是mysql,采用的实现是 https://github.com/arnoldasgudas/Hangfire.MySqlStorage

    set表的id是自增主键,正常理解 都是慢慢自增上去的,但是发现是大幅度跳跃式的自增, 真相是什么?

    首先针对这个问题,首先我们搞清楚在hangfire中和storage相关的部分如下

    image
    image
    • hangfire server调度依赖storage
    • storage抽象出来一层api(解耦)
    • 第三方扩展(不关心具体的storage实现)
    • 不同的storage具体实现(比如mysql,sqlserver等)

    Hangfire.Httpjob其实只是依赖了storage api那一层,也没有能力去直接写sql去执行, 只能用api去操作hangfire的那几张表(比如set表)

    那么问题肯定不是在扩展层,而是得去看看mysqlstorage的实现源码,针对set表的处理逻辑

    https://github.com/arnoldasgudas/Hangfire.MySqlStorage/blob/0bd1016f715c8c6617ce22fb7b2ce5b6c328d2fb/Hangfire.MySql/MySqlWriteOnlyTransaction.cs#L155

    
      public override void AddToSet(string key, string value, double score)
        {
            Logger.TraceFormat("AddToSet key={0} value={1}", key, value);
    
            AcquireSetLock();
            QueueCommand(x => x.Execute(
                $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                "VALUES (@Key, @Value, @Score) " +
                "ON DUPLICATE KEY UPDATE `Score` = @Score",
                new { key, value, score }));
        }

    这里是用了ON DUPLICATE KEY UPDATE 的语句

    这个语法是在mysql 4.1(2005)引入的,意思是 insert的时候遇到主键已存在 就执行后面 的update

    但是就是这个功能 会造成自增主键成跳跃式增长,增长跨度和SQL的执行次数成正比

    根据朋友提供的截图

    image
    image

    虽说是会跳跃,但是这个增长也太夸张了

    打上断点调试发现

    是hangfire server 不断的在调用,目的是把下一次执行时间(秒级别的时间戳)写到set表中

    image
    image
    image
    image
    image
    image

    打上日志可以看到有非常多相同值的调用,这仅仅是一个job,这个自增速度得再乘以job的个数,难怪了

    既然找到原因了,就提个PR 修改下

    
     public override void AddToSet(string key, string value, double score)
            {
                Logger.TraceFormat("AddToSet key={0} value={1}", key, value);
            
                AcquireSetLock();
                QueueCommand(x =>
                {
                    var sql = "";
                    if (key == "recurring-jobs") // 只发现这个key存在这个问题
                    {
                         // key+value是uniq 改成先update 如果没有成功 再insert
                        sql = $"UPDATE `{_storageOptions.TablesPrefix}Set` set `Score` = @score where `Key` = @key and `Value` = @value";
                        var updateRt = x.Execute(sql, new { score = score, key = key, value = value });
                        if (updateRt < 1)
                        {
                            sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                                  "VALUES (@Key, @Value, @Score) ";
                            x.Execute(
                                sql,
                                new { key, value, score });
                        }
                    }
                    else
                    {
                        sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
                              "VALUES (@Key, @Value, @Score) " +
                              "ON DUPLICATE KEY UPDATE `Score` = @Score";
                       x.Execute(
                           sql,
                           new { key, value, score });
                    }
            
                    //Console.WriteLine(sql + " ==> " + key + "@" + value + "@" + score);
                });
            }

    改完之后测试,id自增一切正常:

    image
    image

    java的threadpool使用不当会造成“死锁”问题

    image
    image

    这个原因先说出来: threadpool的线程被占用完后,再来的task会往queue里面丢,如果这个时候在这个pool的线程里面 future.get()的话会导致task runner(执行器)被堵住,没人从队列里面取任务了~

    (简单来说就是 线程在wait future返回,而这个future在queue里面苦苦等待新释放的线程去执行,就像死锁一样,我在等你的结果,而结果在等待着被执行)

    好家伙,这个场景有点熟悉,因为我在项目中也用过Future.get()// 虽说有设置timeout

    但是这个问题的重要一点是,这种花式“死锁” jvm是检测不出来的,下面有测试

    模拟一下这个场景:

    我搞了2个线程池,分别是nio线程池和业务线程池,模拟并发20个请求, 注意看process2方法里的注释,如果去掉那里的代码的话 就不会有这个死锁问题

    
    /**
     * @author yuzd
     */
    public class PoolTest {
    
        // 模拟nio线程池
        static ThreadPoolExecutor nioExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
                new CustomerNamedThreadFactory("nio", false),
                new ThreadPoolExecutor.AbortPolicy());
    
        // 业务线程池
        static ThreadPoolExecutor buExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
                new CustomerNamedThreadFactory("bu", true),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) {
    
            // 模拟是http请求并发20个
            IntStream.rangeClosed(1, 20).parallel().forEach((index) -> {
                // 交给nio线程池处理            
                nioExecutor.execute(() -> {
                    try {
                        httpHandler(index);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                });
            });
        }
    
    
       
        static void httpHandler(int index) throws ExecutionException, InterruptedException {
            System.out.println(Thread.currentThread().getName() + " request index :" + index + " staring");
            // 交给业务线程池处理     
           
            Future<String> parentFuture = buExecutor.submit(() -> process1(index));
            String p1Rt = parentFuture.get();  // nio线程在wait
            System.out.println(Thread.currentThread().getName() + " request index :" + p1Rt + " ending");
        }
    
        // future1
        static String process1(int index) throws ExecutionException, InterruptedException {
            System.out.println( Thread.currentThread().getName() + " process1 index :" + index + " staring");
            Future<String> childFuture = buExecutor.submit(() -> process2(index));
            String p2Rt = childFuture.get();  // 这里是bu线程在wait   这里会发生死锁
            
            System.out.println(Thread.currentThread().getName() + " process1 index :" + index + " ending");
            return p2Rt;
        }
    
        // future2
        static String process2(int index) throws InterruptedException, ExecutionException {
            System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " staring");
            // 加上就会死锁 
            // 只要不一下子产生足够数量的task(把core全部占掉)就不会死锁 加了这里就会把core全部占据 导致task进入到queue,core线程在wait future.get 无法被释放, 而queue的任务在等待它释放产生新的线程
            Future<String> submit = buExecutor.submit(() -> {
                try {
                    Thread.sleep(1000);
                    return String.valueOf(index);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            });
            submit.get(); 
            System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " ending");
            return String.valueOf(index);
        }
    }

    用visualvm分析线程dump,很难直接发现有异常,异步的很难检测,排查起来比较复杂,只看到是在wait

    image
    image

    用jstack没有发现deadlock

    image
    image

    在实际项目中我也看到过一个项目中共用一个线程池,线程池被封装成一个util方法,要执行异步的都用它,这个场景尤其要注意这个场景,也建议大家用带有超时的方式 Future.get(xxxx)

     

     

     

    Image

    本帖子中包含资源

    您需要 登录 才可以下载,没有帐号?立即注册