Java - 多线程基础

260

多任务:

真正的多任务是在多核 CPU 之后, 在此之前所有多任务都是伪多任务

高并发: 多个任务抢占少量资源

高并发三大要素:

  • 可见性
  • 原子性
  • 排序性
    • Java 代码, 底层最后都会编译成汇编指令, 汇编指令做优化时代码执行顺序可能改变, 这样可能导致双重检查锁失效

时间片: 每个任务运行的时间, 时间片耗尽就必须退出进入等待队列

优先级别调度:

操作系统底层的多任务使用的就是时间片轮换机制, 配合优先级别调度完成

  • 多进程编程
  • 多线程编程
  • 协程编程

进程: process

  • 操作系统管理的基本单位, 一个进程可以看成一个软件, 一个端口, 直接申请独立的内存
  • 一个软件可以由多个进程组成
  • 进程包含线程
  • 进程之间相互独立, 一个进程挂掉不会影响其他进程

线程: thread

  • 线程是最小量级的进程, 依赖于进程
  • 线程是独立的, 每个线程就是一个栈
  • 线程占有的是每一个进程的内存空间
  • 一个线程挂掉可能影响整个进程, 不稳定

协程: coroutine

Java 提供了四种创建多线程的方案:

  1. 基础 Thread
  2. 实现 Runnable 接口
  3. 实现 Callable 接口
  4. 线程池实现多线程

如何创建线程:

继承 Thread 类, 重写 run 方法

写在 run 方法里的就是线程方法, 用 start() 开启线程

使用继承 Thread 类的方式创建多线程时, 如果需要访问当前线程,则无需使用 Thread.currentThread() 方法,直接使用 this 即可获得当前线程

main 函数是主线程

class Test extends Thread {
    public static void main(String[] args) {
        Test test = new Test();
        test.start();
        for (int i = 0; i < 100; i++) {
            System.out.println("主线程也在运行----" + i);
        }
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println("一个单独的线程运行了------" + i);
        }
    }
}

实现 Runnable 接口:

使用 Runnable 接口实现的多线程类没有 start() 方法, 启动线程必须借助 Thread 类的构造函数

Thread.currentThread().getName() 获取进程名

class Test implements Runnable {

    public static void main(String[] args) {
        Test a1 = new Test();
        Test a2 = new Test();
        new Thread(a1).start();
        new Thread(a2).start();
    }

    @Override
    public void run() {
        for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + "一个单独的线程运行了------" + i);
        }
    }
}

实现 Callable 接口:

可以有返回值

开启线程需要借助 FutureTask, 再借助 Thread

class Test implements Callable<Integer> {
    public static void main(String[] args) {
        Test test = new Test();
        FutureTask<? extends Integer> futureTask = new FutureTask<>(test);
        new Thread(futureTask).start();
        try {
            Integer a = futureTask.get(); // 使用Callable接口实现的多线程对象, 最后能够得到线程方法返回的结果
            System.out.println(a);
        } catch (Exception e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 1000; i++) {
            System.out.println(Thread.currentThread().getName() + "主线程开始运行");
        }
    }

    @Override
    public Integer call() {
        int sum = 0;
        for (int i = 0; i < 1000; i++) {
            sum += 1;
            System.out.println("使用Callable接口创建的子线程" + i);
        }
        return sum;
    }
}

线程池:

池化模式: 将大量需要的对象提前创建好, 放在一个池中

对象提前创建完成, 也不需要销毁对象, 所以说使用效率比较好

优点: 使用效率比较好, 避免对象的重复创建和销毁

缺点: 内存占用高, 池的数量难以把控

池的数量把控问题是最关键的

Java 线程池是 1.5 提供的, 底层是 CallableFuture 接口

根据场景创建不同的线程池

Executors:

使用这个类根据需要创建线程池

ExecutorService 继承了 Executors, 一般用ExecutorService

单例线程池(SingleThreadExecutor):

所有任务都保存队列 LinkedBlockingQueue 中, 核心线程数为 1,线程空闲时间为 0 等待唯一的单线程来执行任务, 并保证所有任务按照指定顺序(FIFO 或优先级)执行

class Test {
    void test() {
        // 创建一个单例的线程池
        // 单例线程池: 只有一个线程, 固定不会变化的一个线程
        // 使用场景: 只有一个线程时, 建议使用这种线程池
        ExecutorService executor = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
// 如果想要将外部的变量传递到匿名内部类中, 可以做成常量
            final int finalI = i;
            executor.execute(() -> System.out.println(Thread.currentThread().getName() + "--->" + finalI));
        }
    }
}
固定大小线程池(FixedThreadPool):

指定线程池的固定大小, 对于超出的线程会在 LinkedBlockingQueue 队列中等待

核心线程数可以指定, 线程空闲时间为 0

class Test {
    void test() {
        // 创建一个固定大小的线程池
        // 适用场景: 并发量不会发生变化(非常小)
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 10; i++) {
// 如果想要将外部的变量传递到匿名内部类中, 可以做成常量
            final int finalI = i;
            executor.submit(() -> System.out.println(Thread.currentThread().getName() + "--->" + finalI));
        }
    }
}
缓存线程池(CachedThreadPool):

缓存无界线程池测试

当线程池中的线程空闲时间超过 60s 则会自动回收该线程, 核心线程数为 0 当任务超过线程池的线程数则创建新线程, 线程池的大小上限为 Integer.MAX_VALUE

class Test {
    void test() {
        // 创建一个可变的线程池
        // 基于缓存, 创建一个缓存的线程池, 若线程数不够则新建线程
        // 并发量变化比较明显的建议使用这种线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i = 0; i < 20; i++) {
// 如果想要将外部的变量传递到匿名内部类中, 可以做成常量
            final int finalI = i;
            executor.submit(() -> System.out.println(Thread.currentThread().getName() + "--->" + finalI));
        }
    }
}
延迟任务线程池(ScheduledThreadPool):
  • schedule(Runnable command, long delay, TimeUnit unit)

    延迟一定时间后执行 Runnable 任务

  • schedule(Callable callable, long delay, TimeUnit unit)`

    延迟一定时间后执行 Callable 任务

  • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

    延迟一定时间后, 以间隔 period 时间的频率周期性地执行任务

  • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit)

    scheduleAtFixedRate() 方法很类似, 但是不同的是scheduleWithFixedDelay() 方法的周期时间间隔是以上一个任务执行结束到下一个任务开始执行的间隔, 而 scheduleAtFixedRate() 方法的周期时间间隔是以上一个任务开始执行到下一个任务开始执行的间隔, 也就是这一些任务系列的触发时间都是可预知的

ScheduledExecutorService 功能强大, 对于定时执行的任务, 建议多采用该方法

class Test {
    void test() throws InterruptedException {
        // 创建的线程具有延时执行的特点
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
        for (int i = 0; i < 20; i++) {
// 如果想要将外部的变量传递到匿名内部类中, 可以做成常量
            final int finalI = i;
            // 如果用execute()方法则没有延迟
            // 第一个参数: 需要执行的任务
            // 第二个参数: 延迟时长
            // 第三个参数: 时间单位
            executor.schedule(() -> System.out.println(Thread.currentThread().getName() + "--->" + finalI), 5, TimeUnit.SECONDS);
            TimeUnit.SECONDS.sleep(5);
        }
    }
}
class Test {
    void test() {
        // 创建指定核心线程数,但最大线程数是Integer.MAX_VALUE的可定时执行或周期执行任务的线程池
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);

        // 定时执行一次的任务,延迟1s后执行
        executorService.schedule(new Runnable() {
            @Override
            public void run() {
                print("scheduleThreadPool");
                System.out.println(Thread.currentThread().getName() + ", delay 1s");
            }
        }, 1, TimeUnit.SECONDS);

        // 周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName() + ", every 3s");
            }
        }, 2, 3, TimeUnit.SECONDS);


        executorService.scheduleWithFixedDelay(new Runnable() {
            @Override
            public void run() {
                long start = new Date().getTime();
                System.out.println("scheduleWithFixedDelay 开始执行时间:" +
                        DateFormat.getTimeInstance().format(new Date()));
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long end = new Date().getTime();
                System.out.println("scheduleWithFixedDelay执行花费时间=" + (end - start) / 1000 + "m");
                System.out.println("scheduleWithFixedDelay执行完成时间:"
                        + DateFormat.getTimeInstance().format(new Date()));
                System.out.println("======================================");
            }
        }, 1, 2, TimeUnit.SECONDS);

    }
}
抢占线程池(WorkStealingPool):

它会通过工作窃取的方式, 使得多核的 CPU 不会闲置, 总会有活着的线程让 CPU 去运行

基于工作窃取算法, 其中任务可以生成其他较小的任务, 这些任务将添加到并行处理线程的队列中

如果一个线程完成了工作并且无事可做, 则可以从另一线程的队列中“窃取”工作

class Test {
    void test() throws InterruptedException {
        System.out.println("获得JAVA虚拟机可用的最大CPU处理器数量:" + Runtime.getRuntime().availableProcessors());
        ExecutorService executorService = Executors.newWorkStealingPool();
        /**
         * call方法存在返回值futureTask的get方法可以获取这个返回值。
         * 使用此种方法实现线程的好处是当你创建的任务的结果不是立即就要时,
         * 你可以提交一个线程在后台执行,而你的程序仍可以正常运行下去,
         * 在需要执行结果时使用futureTask去获取即可。
         */
        List<Callable<String>> callableList = IntStream.range(0, 20).boxed().map(i -> (Callable<String>) () -> {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(String.format("当前【%s】线程正在执行>>>", Thread.currentThread().getName()));
            return "callable type thread task:" + i;
        }).collect(Collectors.toList());

        // 执行给定的任务,返回持有他们的状态和结果的所有完成的期待列表。
        executorService.invokeAll(callableList).stream().map(futureTask -> {
            try {
                return futureTask.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            return null;
        }).forEach(System.out::println);
    }
}
class Test {
    void test() {
        // 创建线程池
        ExecutorService threadPool = Executors.newWorkStealingPool();
        // 执行任务
        for (int i = 0; i < 10; i++) {
            final int index = i;
            threadPool.execute(() -> {
                System.out.println(index + " 被执行,线程名:" + Thread.currentThread().getName());
            });
        }
        // 确保任务执行完成
        while (!threadPool.isTerminated()) {
        }
    }
}
单例延迟线程池(SingleThreadScheduledExecutor):
class Test {
    void test() {
        // 创建线程池
        ScheduledExecutorService threadPool = Executors.newSingleThreadScheduledExecutor();
        // 添加定时执行任务(2s 后执行)
        System.out.println("添加任务,时间:" + new Date());
        threadPool.schedule(() -> {
            System.out.println("任务被执行,时间:" + new Date());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
            }
        }, 2, TimeUnit.SECONDS);
    }
}
原始线程池(ThreadPoolExecutor):

ThreadPoolExecutor 可以设置 7 个参数, 如下:

public ThreadPoolExecutor (int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)


// 前 5 个参数为必填参数

// 1: 核心线程数, 线程中始终存活的线程数 >= 0
// 2: 最大线程数, 线程池中允许的最大线程数 >= 1
// 3: 最大线程数可以存活的时间, 当线程中没有任务执行时, 最大线程就会销毁一部分最终保持核心线程数量的线程 >= 0
// 4: 时间单位
// 5: 一个阻塞队列, 用来存储线程池等待执行的任务, 均为线程安全
// 6: 线程工厂, 主要用来创建线程, 默认为正常优先级非守护线程
// 7: 线程饱和策略/拒绝策略, 拒绝处理任务时的策略

详解:

  • corePoolSize (线程池基本大小):

    当向线程池提交一个任务时, 若线程池已创建的线程数小于 corePoolSize, 即便此时存在空闲线程, 也会通过创建一个新线程来执行该任务, 直到已创建的线程数大于或等于 corePoolSize 时 才会根据是否存在空闲线程, 来决定是否需要创建新的线程

    除了利用提交新任务来创建和启动线程(按需构造, 也可以通过
    prestartCoreThread()prestartAllCoreThreads() 方法来提前启动线程池中的基本线程。

  • maximumPoolSize (线程池最大大小):

    线程池所允许的最大线程个数, 当队列满了, 且已创建的线程数小于 maximumPoolSize, 则线程池会创建新的线程来执行任务, 另外, 对于无界队列, 可忽略该参数

  • keepAliveTime (线程存活保持时间):

    默认情况下, 当线程池的线程个数多于 corePoolSize 时, 线程的空闲时间超过 keepAliveTime 则会终止, 但只要 keepAliveTime 大于 0,allowCoreThreadTimeOut(boolean) 方法也可将此超时策略应用于核心线程, 另外,也可以使用 setKeepAliveTime() 动态地更改参数。

  • unit (存活时间的单位):

    时间单位, 分为7类, 从细到粗顺序:NANOSECONDS (纳秒), MICROSECONDS (微秒), MILLISECONDS (毫秒), SECONDS (秒), MINUTES (分), HOURS (小时), DAYS (天)

  • workQueue(任务队列):

    用于传输和保存等待执行任务的阻塞队列, 可以使用此队列与线程池进行交互:

    • 如果运行的线程数少于 corePoolSize, 则 Executor 始终首选添加新的线程, 而不进行排队
    • 如果运行的线程数等于或多于 corePoolSize, 则 Executor 始终首选将请求加入队列, 而不添加新的线程
    • 如果无法将请求加入队列, 则创建新的线程, 除非创建此线程超出 maximumPoolSize, 在这种情况下, 任务将被拒绝
  • threadFactory (线程工厂):

    用于创建新线程, 由同一个 threadFactory 创建的线程, 属于同一个 ThreadGroup, 创建的线程优先级都为 Thread.NORM_PRIORITY, 以及是非守护进程状态
    threadFactory创建的线程也是采用 new Thread() 方式, threadFactory创建的线程名都具有统一的风格: pool-m-thread-n (m 为线程池的编号,n 为线程池内的线程编号);

  • handler (线程饱和策略):

    当线程池和队列都满了, 则表明该线程池已达饱和状态

    • ThreadPoolExecutor.AbortPolicy: 处理程序遭到拒绝, 则直接抛出运行时异常 RejectedExecutionException (默认策略)

    • ThreadPoolExecutor.CallerRunsPolicy: 调用者所在线程来运行该任务, 此策略提供简单的反馈控制机制, 能够减缓新任务的提交速度

    • ThreadPoolExecutor.DiscardPolicy: 无法执行的任务将被删除。

    • ThreadPoolExecutor.DiscardOldestPolicy: 如果执行程序尚未关闭, 则位于工作队列头部的任务将被删除, 然后重新尝试执行任务 (如果再次失败,则重复此过程)

队列排队详解

  • 直接提交

    工作队列的默认选项是 SynchronousQueue, 它将任务直接提交给线程而不保持它们

    在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程

    此策略可以避免在处理可能具有内部依赖性的请求集时出现锁, 直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务

    当命令以超过队列所能处理的平均数连续到达时, 此策略允许无界线程具有增长的可能性

  • 无界队列

    使用无界队列 (例如: 不具有预定义容量的 LinkedBlockingQueue) 将导致在所有 corePoolSize 线程都忙时新任务在队列中等待

    这样, 创建的线程就不会超过 corePoolSize (因此, maximumPoolSize 的值也就无效了)

    当每个任务完全独立于其他任务, 即任务执行互不影响时, 适合于使用无界队列

    例如: 在 Web 页服务器中, 这种排队可用于处理瞬态突发请求, 当命令以超过队列所能处理的平均数连续到达时, 此策略允许无界线程具有增长的可能性

  • 有界队列

    当使用有限的 maximumPoolSizes 时, 有界队列 (如 ArrayBlockingQueue) 有助于防止资源耗尽, 但是可能较难调整和控制

    队列大小和最大池大小可能需要相互折衷: 使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销, 但是可能导致人工降低吞吐量

    如果任务频繁阻塞 (例如, 如果它们是 I/O 边界), 则系统可能为超过您许可的更多线程安排时间, 使用小型队列通常要求较大的池大小, CPU 使用率较高, 但是可能遇到不可接受的调度开销, 这样也会降低吞吐量。

工作队列 BlockingQueue 详解

BlockingQueue的插入 / 移除 / 检查这些方法, 对于不能立即满足但可能在将来某一时刻可以满足的操作, 共有4种不同的处理方式: 第一种是抛出一个异常, 第二种是返回一个特殊值 (null
false, 具体取决于操作), 第三种是在操作可以成功前, 无限期地阻塞当前线程, 第四种是在放弃前只在给定的最大时间限制内阻塞

如下表格:

操作抛出异常特殊值阻塞超时
插入add(e)offer(e)put(e)offer(e, time, unit)
移除remove()poll()take()poll(time, unit)
检查element()peek()不可用不可用

实现 BlockingQueue 接口的常见类如下:

  • ArrayBlockingQueue: 基于数组的有界阻塞队列

    队列按 FIFO 原则对元素进行排序, 队列头部是在队列中存活时间最长的元素, 队尾则是存在时间最短的元素

    新元素插入到队列的尾部, 队列获取操作则是从队列头部开始获得元素

    这是一个典型的 "有界缓存区", 固定大小的数组在其中保持生产者插入的元素和使用者提取的元素

    一旦创建了这样的缓存区, 就不能再增加其容量, 试图向已满队列中放入元素会导致操作受阻塞

    ArrayBlockingQueue 构造方法可通过设置 fairness 参数来选择是否采用公平策略, 公平性通常会降低吞吐量, 但也减少了可变性和避免了 "不平衡性", 可根据情况来决策

  • LinkedBlockingQueue: 基于链表的无界阻塞队列

    ArrayBlockingQueue 一样采用 FIFO 原则对元素进行排序, 基于链表的队列吞吐量通常要高于基于数组的队列

  • SynchronousQueue: 同步的阻塞队列

    其中每个插入操作必须等待另一个线程的对应移除操作, 等待过程一直处于阻塞状态, 同理, 每一个移除操作必须等到另一个线程的对应插入操作.

    SynchronousQueue 没有任何容量, 不能在同步队列上进行 peek, 因为仅在试图要移除元素时, 该元素才存在, 除非另一个线程试图移除某个元素, 否则也不能 (使用任何方法) 插入元素; 也不能迭代队列, 因为其中没有元素可用于迭代 Executors.newCachedThreadPool 使用了该队列

  • PriorityBlockingQueue: 基于优先级的无界阻塞队列。

    优先级队列的元素按照其自然顺序进行排序,或者根据构造队列时提供的 Comparator 进行排序, 具体取决于所使用的构造方法

    优先级队列不允许使用 null 元素, 依靠自然顺序的优先级队列还不允许插入不可比较的对象 (这样做可能导致 ClassCastException). 虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败 (导致 OutOfMemoryError)

线程池关闭

调用线程池的 shutdown()shutdownNow() 方法来关闭线程池

  • shutdown原理:将线程池状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
  • shutdownNow原理:将线程池的状态设置成 STOP 状态,然后中断所有任务(包括正在执行的)的线程,并返回等待执行任务的列表。

中断采用 interrupt 方法, 所以无法响应中断的任务可能永远无法终止, 但调用上述的两个关闭之一, isShutdown() 方法返回值为 true, 当所有任务都已关闭, 表示线程池关闭完成, 则isTerminated() 方法返回值为 true. 当需要立刻中断所有的线程, 不一定需要执行完任务,可直接调用 shutdownNow()方法

线程池流程:

img

  1. 判断核心线程池是否已满, 即已创建线程数是否小于 corePoolSize? 没满则创建一个新的工作线程来执行任务. 已满则进入下个流程。
  2. 判断工作队列是否已满? 没满则将新提交的任务添加在工作队列, 等待执行。已满则进入下个流程
  3. 判断整个线程池是否已满,即已创建线程数是否小于 maximumPoolSize? 没满则创建一个新的工作线程来执行任务, 已满则交给饱和策略来处理这个任务

合理配置线程池

  • 需要针对具体情况而具体处理, 不同的任务类别应采用不同规模的线程池, 任务类别可划分为 CPU 密集型任务、IO 密集型任务和混合型任务。

  • 对于CPU密集型任务: 线程池中线程个数应尽量少, 不应大于 CPU 核心数

  • 对于IO密集型任务: 由于 IO 操作速度远低于 CPU 速度, 那么在运行这类任务时, CPU 绝大多数时间处于空闲状态, 那么线程池可以配置尽量多些的线程, 以提高 CPU 利用率

  • 对于混合型任务: 可以拆分为 CPU 密集型任务和 IO 密集型任务, 当这两类任务执行时间相差无几时, 通过拆分再执行的吞吐率高于串行执行的吞吐率, 但若这两类任务执行时间有数据级的差距, 那么没有拆分的意义

线程池监控:

利用线程池提供的参数进行监控, 参数如下:

  • taskCount: 线程池需要执行的任务数量

  • completedTaskCount: 线程池在运行过程中已完成的任务数量. 小于或等于taskCount

  • largestPoolSize: 线程池曾经创建过的最大线程数量. 通过这个数据可以知道线程池是否满过, 如等于线程池的最大大小, 则表示线程池曾经满了

  • getPoolSize: 线程池的线程数量. 如果线程池不销毁的话, 池里的线程不会自动销毁, 所以这个大小只增不减

  • getActiveCount: 获取活动的线程数

通过扩展线程池进行监控: 继承线程池并重写线程池的 beforeExecute(), afterExecute()terminated() 方法, 可以在任务执行前、后和线程池关闭前自定义行为

如监控任务的平均执行时间, 最大执行时间和最小执行时间等

ThreadLocal 对象:

线程对象提供了副本对象, 特点是, 每一个线程都独立拥有 ThreadLocal 对象

多线程情况下, 一般比较喜欢使用 ThreadLocal, 多线程情况下, 将值保存到 ThreadLocal 中, 多线程之间都是各自拥有各自的值, 不会发生冲突

添加的值最后一定要移除, 否则容易出现内存溢出

set() 在自身线程中添加值

get() 获取自身线程中存储在 ThreadLocal 中的值

remove() 移除值

Thread 类方法:

可以用 Thread(String name) 构造函数设置线程名

方法描述
getName()获取线程名
getId()获取线程ID
getPriority()或者Thread.currentThread().getPriority(Thread thread)获取线程权重
getState()获取线程状态
interrupt()抛出一个终端信号, 线程不会立即中断
isAlive()判断线程是否活着
stop()进程终止, 已过时
isDaemon()是否是守护线程
join()其他线程不会和此线程进行资源抢占直至结束
setDaemon(Boolean flag)flagtrue 则是设置守护进程
Thread.sleep()休眠线程, sleep(0) 可以用于重新分配时间片, 让线程进入阻塞状态
Thread.yield()暗示当前线程放弃一次抢占, 当前线程不一定会放弃抢占

继承Thread实现线程和实现Runnable接口实现线程的区别:

  • Thread 实现的不会共享内存
  • 如果使用不同的 Thread 对象启动同一个 Runnable 线程对象则会共享内存, 会引发线程安全问题

结束线程的正确方式:

Java 中合理结束一个进程的执行(常用)

public class ThreadTest10 {
    public static void main(String[] args) {
        MyRunnable4 r = new MyRunnable4();
        Thread t = new Thread(r);
        t.setName("t");
        t.start();
        // 模拟5秒
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 终止线程
        // 想要什么时候终止t的执行, 把标记修改为false, 就结束了.
        r.run = false;
    }
}

class MyRunnable4 implements Runnable {
    // 打一个布尔标记
    boolean run = true;

    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            if (run) {
                System.out.println(Thread.currentThread().getName() + "--->" + i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                return;
            }
        }
    }
}

解决线程安全问题:

由于代码并不是一次就执行完, 例如自增转换成汇编需要三步, 在这三步中如果没有加锁, 那么其他的线程可能会干扰从而使值不合我们的预期

通过加锁:

synchronized 关键字:

JDK1.0 提供

非公平锁, 同步锁

不会产生死锁现象, JVM 自动关闭

java 提供了同步锁, synchronized, 该锁有三种使用方式:

  • 同步块

    • 同步块加锁是用代码块将需要加锁的代码包含住, 需要传一个 Object 类型的 key(钥匙)
  • 方法加锁

    • 如果某个方法的所有代码都可能出现线程安全问题, 建议加方法锁, 该方法充当该锁的 key
  • 将静态方法加锁

    • 本质是类的字节码加锁, 类的字节码充当 key
class Test {
    final Object key = new Object();

    void test() {
        synchronized (key) {
            //
        }
    }
}

加锁时范围应该尽量小

JDK5 之前默认调用系统锁(重量级锁), 在之后 JUC 包提供了一种新锁 ---Lock锁

Lock 锁(接口):

默认是非公平锁, 可以充当公平锁

此锁用完必须手动解开释放锁, 否则会出现死锁导致程序卡死, 解锁代码一定放在 finally

实现类:

  • ReentrantLock(重入锁)
ReentrantLock:

ReentrantLock() 构造函数

ReentrantLock(boolean fair) 构造函数, fairtrue可以改成公平锁

public class TestLock extends Thread {

    private int count;
    private Lock lock = new ReentrantLock();

    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            try {
                // 加锁
                lock.lock();
                count++;

            } finally {
                // 一定要记得释放锁!!
                // 建议将释放锁的代码一定要放在finally中!!!
                lock.unlock();
            }
        }
        System.out.println(Thread.currentThread().getName() + ":count = " + count);
    }

    public static void main(String[] args) {
        TestLock task = new TestLock();
        new Thread(task).start();
        new Thread(task).start();

    }
}

synchronized 锁升级:

JDK7 之后, 三个阶段一个处理不了就到下一个阶段

偏向锁(无锁): 通过贴标签, 解决没有并发的场景 ---> 自旋锁(CSA): 通过循环等待, 无法解决 ABA 问题 ---> 重量级锁(系统锁)

乐观锁和悲观锁:

乐观锁: 默认不加锁, 添加一个 version 字段, 被修改之后版本号就往上加, 版本不匹配就不能修改, 以此保证线程安全, 效率高

悲观锁: 不管需不需要加锁都会直接加锁, 比如 synchronizedLock

单例模式:

饿汉式:

饱汉式:

饱汉式线程安全问题使用 DCL(Double Check Lock) 双重检查锁, 同时需要禁止指令重排序

public class Singleton {

    private volatile static Singleton singleton = null;

    // 构造函数私有化
    private Singleton() {
    }

    public static Sington newInstance() {
        if (sington == null) {
            synchronized (Sington.class) {
                if (sington == null) {
                    sington = new Singleton();
                }
            }
        }
        return singleton;
    }

}

volatile 关键字:

作用:

  • volatile 修饰的变量是线程可见的
  • 禁止指令重排序

线程的生命周期:

5891660535498_.pic_hd

5901660535499_.pic_hd

5911660535500_.pic_hd

  • 新建状态:

    使用 new 关键字和 Thread 类或其子类建立一个线程对象后, 该线程对象就处于新建状态. 它保持这个状态直到程序 start() 这个线程.

  • 就绪状态:

    当线程对象调用了 start() 方法之后, 该线程就进入就绪状态. 就绪状态的线程处于就绪队列中, 要等待 JVM 里线程调度器的调度.

  • 运行状态:

    如果就绪状态的线程获取 CPU 资源, 就可以执行 run(), 此时线程便处于运行状态. 处于运行状态的线程最为复杂, 它可以变为阻塞状态、就绪状态和死亡状态.

  • 阻塞状态:

    如果一个线程执行了 sleep(睡眠)、suspend(挂起) 等方法,失去所占用资源之后, 该线程就从运行状态进入阻塞状态. 在睡眠时间已到或获得设备资源后可以重新进入就绪状态.

    可以分为三种:

    • 等待阻塞: 运行状态中的线程执行 wait() 方法,使线程进入到等待阻塞状态.
    • 同步阻塞: 线程在获取 synchronized 同步锁失败(因为同步锁被其他线程占用).
    • 其他阻塞: 通过调用线程的 sleep()join() 发出了 I/O 请求时,线程就会进入到阻塞状态. 当 sleep() 状态超时, join() 等待线程终止或超时,或者 I/O 处理完毕,线程重新转入就绪状态.
  • 死亡状态:

    一个运行状态的线程完成任务或者其他终止条件发生时,该线程就切换到终止状态.

线程的优先级

每一个 Java 线程都有一个优先级,这样有助于操作系统确定线程的调度顺序.

Java 线程的优先级是一个整数,其取值范围是 1 (Thread.MIN_PRIORITY) - 10 (Thread.MAX_PRIORITY )

默认情况下, 每一个线程都会分配一个优先级 NORM_PRIORITY(5)

具有较高优先级的线程对程序更重要,并且应该在低优先级的线程之前分配处理器资源, 但是, 线程优先级不能保证线程执行的顺序, 而且非常依赖于平台

死锁(deadLock):

两个线程之间, 进行资源竞争时, 彼此都拿着对方需要的资源, 但是因为线程的竞争性(请求保持), 没有释放锁就会形成相互等待释放资源的现象

此现象应该尽量避免, 但只要存在锁就不能完全避免

死锁的必要条件:

  • 互斥
    • 都有相同请求
  • 请求保持
  • 环路等待
  • 不可剥夺性

打破条件其一就可以解除死锁

线程的同步的问题:

协步同调. 使用 Lock 实现或者同步锁配合唤醒机制

notify() 唤醒同步锁对象上已处于 wait 状态的对象

notifyAll() 唤醒同步锁对象上所有处于 wait 状态的对象

wait() 让此对象在同步锁对象上进入等待状态

同步锁配合唤醒机制实现同步:

  • 必须有一个公有对象
  • 该共有对象就是同步锁的钥匙

生产者和消费者模式:

供需的平衡问题:

1:

假设只有一个生产者生产食物, 此时只有一个消费者, 生产者生产食物后装进盘子, 消费者吃掉这个盘子的食物

import java.util.Random;

class Customer extends Thread {
    private final Disk disk;

    public Customer(Disk disk) {
        this.disk = disk;
    }

    @Override
    public void run() {
        synchronized (this.disk) {
            while (true) {
                if (this.disk.isFull()) {
                    System.out.println(Thread.currentThread().getName() + "吃了" + this.disk.getFood());
                    this.disk.setFood(null);
                    this.disk.setFull(false);
                    // 唤醒生产者
                    this.disk.notify();

                    // 让生产者进入等待
                    try {
                        this.disk.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

// 共有对象
class Disk {
    private boolean full;
    private String food;

    public boolean isFull() {
        return this.full;
    }

    public void setFull(boolean full) {
        this.full = full;
    }

    public String getFood() {
        return food;
    }

    public void setFood(String food) {
        this.food = food;
    }
}

class Producer extends Thread {
    private final String[] foods;
    private final Disk disk;
    private final Random random;

    public Producer(Disk disk) {
        this.foods = new String[]{"apple", "eeg", "beef", "rice", "vegetables"};
        this.disk = disk;
        this.random = new Random();
    }

    @Override
    public void run() {
        makeFood();
    }

    private void makeFood() {
        synchronized (this.disk) {
            while (true) {
                if (!this.disk.isFull()) {
                    String food = this.foods[this.random.nextInt(this.foods.length)];
                    System.out.println("创造了一个食物: " + food);
                    this.disk.setFood(food);
                    this.disk.setFull(true);

                    // 先唤醒消费者
                    this.disk.notify();

                    // 让生产者进入等待
                    try {
                        this.disk.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

class Test {
    public static void main(String[] args) {
        Disk disk = new Disk();
        Producer producer = new Producer(disk);
        Customer customer = new Customer(disk);
        producer.start();
        customer.start();
    }
}

2:

假设现在同样只有一个生产者, 但是有多个消费者且盘子只有一个, 让消费者自行抢夺食物

/**
 * @author Erzbir
 * @Date: 2023/6/24 17:02
 */

import java.util.Random;

class Customer extends Thread {
    private final Disk disk;

    public Customer(Disk disk) {
        this.disk = disk;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (this.disk) {
                while (!this.disk.isFull()) {
                    try {
                        this.disk.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println(Thread.currentThread().getName() + "吃了" + this.disk.getFood());
                this.disk.setFood(null);
                this.disk.setFull(false);
                this.disk.notifyAll();
            }
        }
    }
}

// 共有对象
class Disk {
    private boolean full;
    private String food;

    public boolean isFull() {
        return this.full;
    }

    public void setFull(boolean full) {
        this.full = full;
    }

    public String getFood() {
        return food;
    }

    public void setFood(String food) {
        this.food = food;
    }
}

class Producer extends Thread {
    private final String[] foods;
    private final Disk disk;
    private final Random random;

    public Producer(Disk disk) {
        this.foods = new String[]{"apple", "eeg", "beef", "rice", "vegetables", "A", "B", "C", "D", "E", "F", "G"};
        this.disk = disk;
        this.random = new Random();
    }

    @Override
    public void run() {
        while (true) {
            synchronized (this.disk) {
                while (this.disk.isFull()) {
                    try {
                        this.disk.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                String food = this.foods[this.random.nextInt(this.foods.length)];
                System.out.println("创造了一个食物: " + food);
                this.disk.setFood(food);
                this.disk.setFull(true);
                this.disk.notifyAll();
            }
        }
    }
}

public class Test {
    public static void main(String[] args) {
        Disk disk = new Disk();
        Producer producer = new Producer(disk);

        Customer customer1 = new Customer(disk);
        Customer customer2 = new Customer(disk);
        Customer customer3 = new Customer(disk);
        Customer customer4 = new Customer(disk);
        Customer customer5 = new Customer(disk);
        Customer customer6 = new Customer(disk);

        producer.start();

        customer1.setName("-------客户1-------");
        customer2.setName("-------客户2-------");
        customer3.setName("-------客户3-------");
        customer4.setName("-------客户4-------");
        customer5.setName("-------客户5-------");
        customer6.setName("-------客户6-------");

        customer1.start();
        customer2.start();
        customer3.start();
        customer4.start();
        customer5.start();
        customer6.start();
    }
}