天增的博客
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
首页
博客
  • 分布式解决方案
  • Java并发工具包
  • redis
  • LeetCode
  • 系统设计
  • JVM体系
Github (opens new window)
Rss (opens new window)
  • zh-CN
  • en-US
  • Java并发工具包
  • 并发基础
    • 线程基础
      • Thread的状态
      • 进程与线程
      • 正确停止线程的方式
      • Thread的实现方式
      • waitnotifynotifyAll
      • 生产者消费者模型
    • 线程安全
      • 线程不安全
      • 线程安全
      • 需要注意线程安全问题的情况
  • 并发工具
    • 线程协作
      • Semaphore信号量
      • CountDownLatch详解
      • 使用CompletableFuture解决旅游平台问题
        • 旅游平台问题介绍
        • 使用线程池实现
        • CountDownLatch
        • CompletableFuture
      • 使用CyclicBarrier解决团建问题
    • Future
      • Future主要功能
      • FutureTask源码分析
    • ThreadLocal
      • ThreadLocal内存泄漏
      • ThreadLocal使用场景
    • 原子类
      • 原子类的作用概览
      • 原子类的性能分析
    • 阻塞队列
      • 常见的阻塞队列
      • 阻塞队列的常用方法
      • 什么是阻塞队列
    • 并发容器
      • HashMap
      • CopyOnWriteArrayList
      • ConcurrentHashMap详解
    • 线程池
      • 为什么多线程会带来性能问题
      • 线程池的优势
      • 创建线程池的参数
        • 如何设置线程数
      • 线程池线程复用原理
      • ForkJoin框架
    • 各种锁
      • 锁的种类和特点
        • 公平锁非公平锁
        • 自旋锁非自旋锁
        • 共享锁独占锁
        • 乐观锁和悲观锁
      • JVM锁优化
      • synchronized和Lock的对比
      • lock的常用方法
  • 底层原理
    • CAS原理
    • AQS框架
    • 伪共享
    • java内存模型
      • Java内存模型介绍
      • happens-before规则
  • topic
  • Java并发工具包
  • 并发工具
  • 线程协作
  • 使用CompletableFuture解决旅游平台问题
2022-04-21
目录

使用CompletableFuture解决旅游平台问题

# 使用CompletableFuture解决旅游平台问题

# 旅游平台问题介绍

如果要搭建一个旅游平台,经常会有这样的需求,就是用户想同时获取多个航空公司的航班信息。

比如: 北京到上海的票价。由于有很多公司都有这样的航班信息,所以应该获取到所有航空公司的信息,然后聚合输出。

CompletableFuture

# 串行获取

最传统的解决方案

串行获取示意图

我们获取价格的时候,先去访问国航等国航有响应之后,再去访问下一个航空公司,如果航空公司较多,每个响应都需要1s的话,十几个航空公司就是几十秒,用户肯定等不及。

# 并行获取

并行获取

如果我们换成并行的去请求各个网站信息,效果就能好很多。

只需要规定一个超时的总时长,比如3s,3s之后返回的响应一概不管,只把前3s获取到的结果进行返回,即使数据有所丢失,但是也比一直等待的强。

# 使用线程池实现

public class PriceDemo {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    public static void main(String[] args) throws InterruptedException {
        PriceDemo priceDemo = new PriceDemo();
        System.out.println(priceDemo.getPrices());
    }
    private Set<String> getPrices() throws InterruptedException {
        Set<String> prices = Collections.synchronizedSet(new HashSet<>());
        threadPool.submit(new Task("国行", prices));
        threadPool.submit(new Task("海航", prices));
        threadPool.submit(new Task("东航", prices));
        Thread.sleep(3000);
        return prices;
    }
    /**
     * 获取价格
     */
    private class Task implements Runnable {
        private String name;
        private Set<String> prices;

        public Task(String name, Set<String> prices) {
            this.name = name;
            this.prices = prices;
        }
        @Override
        public void run() {
            try {
                int price = (int) (Math.random() * 4000);
                Thread.sleep(price);
                prices.add(name + ": " + price);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

在代码中,新建了一个线程安全的set,用于存储各个航空公司的价格信息。

然后通过向线程池提交获取价格的任务,最后线程睡3s,模拟用户等待的时间,最后在3s之内获取到的结果进行返回。

这就是使用线程池去实现的最基础的方案。

# CountDownLatch

我们可以采用CountDownLatch去对上面的代码进行一个优化。

上面的代码,最大的问题是无论如何都需要等待3秒,假如网络特别好,处理速度特别快,可能几百毫秒就返回了,所以就会白白等待一段时间。

所以需要改进一下:

public class PriceDemo {
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    public static void main(String[] args) throws InterruptedException {
        PriceDemo priceDemo = new PriceDemo();
        System.out.println(priceDemo.getPrices());
    }
    private Set<String> getPrices() throws InterruptedException {
        Set<String> prices = Collections.synchronizedSet(new HashSet<>());
        CountDownLatch countDownLatch = new CountDownLatch(3);
        threadPool.submit(new Task("国行", prices, countDownLatch));
        threadPool.submit(new Task("海航", prices, countDownLatch));
        threadPool.submit(new Task("东航", prices, countDownLatch));
        countDownLatch.await(3, TimeUnit.SECONDS);
        return prices;
    }

    /**
     * 获取价格
     */
    private class Task implements Runnable {
        private String name;
        private Set<String> prices;
        private CountDownLatch countDownLatch;
        public Task(String name, Set<String> prices, CountDownLatch countDownLatch) {
            this.name = name;
            this.prices = prices;
            this.countDownLatch = countDownLatch;
        }
        @Override
        public void run() {
            try {
                int price = (int) (Math.random() * 4000);
                Thread.sleep(price);
                prices.add(name + ": " + price);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown();
            }
        }
    }
}

使用CoutDownLatch,来进行任务执行的统计,每完成一个任务,CoutDownLatch的数量则减一。

然后使用await进行等待,await方法会阻塞当前线程,只有任务执行完,或者在规定时间内没有响应才会往下走。

这就可以保证,总用时会永远小于等于3s.

# CompletableFuture

CompletableFuture 提供了简单快速的方法让我们去实现上面使用CountDownLatch实现的代码逻辑。

public static void main(String[] args) throws InterruptedException {
        PriceDemo priceDemo = new PriceDemo();
        System.out.println(priceDemo.getPrices());
    }

    private Set<String> getPrices() throws InterruptedException {
        Set<String> prices = Collections.synchronizedSet(new HashSet<>());
        CompletableFuture<Void> task1 = CompletableFuture.runAsync(new Task("国行", prices));
        CompletableFuture<Void> task2 = CompletableFuture.runAsync(new Task("海航", prices));
        CompletableFuture<Void> task3 = CompletableFuture.runAsync(new Task("东航", prices));
        CompletableFuture<Void> allTask = CompletableFuture.allOf(task1, task2, task3);
        try {
            allTask.get(3,TimeUnit.SECONDS);
        } catch (ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }
        return prices;
    }
    /**
     * 获取价格
     */
    private class Task implements Runnable {
        private String name;
        private Set<String> prices;
        public Task(String name, Set<String> prices) {
            this.name = name;
            this.prices = prices;
        }
        @Override
        public void run() {
            try {
                int price = (int) (Math.random() * 4000);
                Thread.sleep(price);
                prices.add(name + ": " + price);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

我们可以从代码中看到,CompletableFuture对Task进行了包装,然后通过allOf将所有任务聚合起来,最后通过allTask.get()阻塞线程,如果任务超时会进入异常中,我们可以根据异常再去做对应的异常处理,相对于自己使用CountDownLatch实现的,功能性会多一些。

最近更新
01
以 root 身份启动 transmission-daemon
12-13
02
Debian系统安装qbittorrent-nox
12-09
03
LXC Debain12安装zerotier并实现局域网自动nat转发
07-29
更多文章>
Theme by Vdoing | Copyright © 2015-2024 天增 | 苏ICP备16037388号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式