springboot第78集:队列,锁,并发,通信,框架,流计算

科技   2024-10-15 06:32   广东  

个人简介:我是哪吒

全栈架构师Java | 微服务集群方向

一个人可能走得更快,但一群人一定会走得更远

队列

PriorityBlockingQueue不会阻塞数据⽣产者(因为队列是⽆界的),⽽只会在没

有可消费的数据时,阻塞数据的消费者。因此使⽤的时候要特别注意,⽣产者⽣产

数据的速度绝对不能快于消费者消费数据的速度,否则时间⼀⻓,会最终耗尽所有

的可⽤堆内存空间。对于使⽤默认⼤⼩的LinkedBlockingQueue也是⼀样的

LinkedBlockingQueue

由链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的

⼤⼩是 Integer.MAX_VALUE ,也可以指定⼤⼩。此队列按照先进先出的原则对元素

进⾏排序。

SynchronousQueue

这个队列⽐较特殊,没有任何内部容量,甚⾄连⼀个队列的容量都没有。并且每个

put 必须等待⼀个 take,反之亦然。

需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。

以下⽅法的返回值,可以帮助理解这个队列:

iterator() 永远返回空,因为⾥⾯没有东⻄

peek() 永远返回null

put() 往queue放进去⼀个element以后就⼀直wait直到有其他thread进来把这个

element取⾛。

offer() 往queue⾥放⼀个element后⽴即返回,如果碰巧这个element被另⼀个

thread取⾛了,offer⽅法返回true,认为offer成功;否则返回false。

take() 取出并且remove掉queue⾥的element,取不到东⻄他会⼀直等。

poll() 取出并且remove掉queue⾥的element,只有到碰巧另外⼀个线程正在往

queue⾥offer数据或者put数据的时候,该⽅法才会取到东⻄。否则⽴即返回

null。

isEmpty() 永远返回true

remove()&removeAll() 永远返回false

注意

PriorityBlockingQueue不会阻塞数据⽣产者(因为队列是⽆界的),⽽只会在没

有可消费的数据时,阻塞数据的消费者。因此使⽤的时候要特别注意,⽣产者⽣产

数据的速度绝对不能快于消费者消费数据的速度,否则时间⼀⻓,会最终耗尽所有

的可⽤堆内存空间。对于使⽤默认⼤⼩的LinkedBlockingQueue也是⼀样的。

PriorityBlockingQueue

基于优先级的⽆界阻塞队列(优先级的判断通过构造函数传⼊的Compator对象来决定),内部控制线程同步

DelayQueue

该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注⼊其中的元素必须实现

DelayQueue是⼀个没有⼤⼩限制的队列,因此往队列中插⼊数据的操作(⽣产者)永远不会被阻塞,⽽只

ArrayBlockingQueue

由数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。

可以初始化队列⼤⼩, 且⼀旦初始化不能改变。构造⽅法中的fair表示控制对象的

内部锁是否采⽤公平锁,默认是⾮公平锁

⼀⽅⾯消除了⽣产者类与消费者类之间的代码依赖性,

另⼀⽅⾯将⽣产数据的过程与使⽤数据的过程解耦简化负载。

BlockingQueue是Java util.concurrent包下重要的数据结构,区别于普通的队

列,BlockingQueue提供了线程安全的队列访问⽅式,并发包下很多⾼级同

步类的实现都是基于BlockingQueue实现的。

BlockingQueue⼀般⽤于⽣产者-消费者模式,⽣产者是往队列⾥添加元素的线程,

消费者是从队列⾥拿元素的线程。BlockingQueue就是存放元素的容器。

注意之处

不能往阻塞队列中插⼊null,会抛出空指针异常。

可以访问阻塞队列中的任意元素,调⽤remove(o)可以将队列之中的特定对象

移除,但并不⾼效,尽量避免使⽤。

BlockingQueue的操作⽅法

抛出异常:如果试图的操作⽆法⽴即执⾏,抛异常。当阻塞队列满时候,再往

队列⾥插⼊元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空

时,从队列⾥获取元素时会抛出NoSuchElementException异常 。

返回特殊值:如果试图的操作⽆法⽴即执⾏,返回⼀个特殊值,通常是true /

false。

⼀直阻塞:如果试图的操作⽆法⽴即执⾏,则⼀直阻塞或者响应中断。

超时退出:如果试图的操作⽆法⽴即执⾏,该⽅法调⽤将会发⽣阻塞,直到能

够执⾏,但等待时间不会超过给定值。返回⼀个特定值以告知该操作是否成

功,通常是 true / false。

阻塞队列提供了四组不同的⽅法⽤于插⼊、移除、检查元素:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FixedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个固定大小为3的线程池
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

        // 提交多个任务到线程池
        for (int i = 1; i <= 5; i++) {
            final int taskID = i;
            fixedThreadPool.execute(() -> {
                System.out.println("Task " + taskID + " is running by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行的时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task " + taskID + " completed by " + Thread.currentThread().getName());
            });
        }

        // 关闭线程池
        fixedThreadPool.shutdown();
    }
}

可缓存的线程池 (CachedThreadPool)

CachedThreadPool 会根据需要创建新线程来处理任务,如果有空闲的线程则会重用。适合执行大量短期异步任务。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CachedThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个可缓存的线程池
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

        // 提交多个任务到线程池
        for (int i = 1; i <= 5; i++) {
            final int taskID = i;
            cachedThreadPool.execute(() -> {
                System.out.println("Task " + taskID + " is running by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(500); // 模拟任务执行的时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task " + taskID + " completed by " + Thread.currentThread().getName());
            });
        }

        // 关闭线程池
        cachedThreadPool.shutdown();
    }
}

定时调度的线程池 (ScheduledThreadPool)

ScheduledThreadPool 用于调度任务,可以延迟执行或周期性执行任务。它适用于定时任务或重复执行任务的场景。

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class ScheduledThreadPoolExample {
    public static void main(String[] args) {
        // 创建一个具有2个线程的定时调度线程池
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);

        // 延迟3秒后执行任务
        scheduledThreadPool.schedule(() -> {
            System.out.println("Task 1 executed after 3 seconds delay.");
        }, 3, TimeUnit.SECONDS);

        // 延迟1秒后开始执行任务,并且每隔2秒重复执行一次
        scheduledThreadPool.scheduleAtFixedRate(() -> {
            System.out.println("Task 2 is executing periodically every 2 seconds.");
        }, 1, 2, TimeUnit.SECONDS);

        // 延迟1秒后开始执行任务,并且在上一个任务结束后延迟3秒再执行
        scheduledThreadPool.scheduleWithFixedDelay(() -> {
            System.out.println("Task 3 is executing with a fixed delay of 3 seconds.");
        }, 1, 3, TimeUnit.SECONDS);
    }
}

单一线程的线程池 (SingleThreadExecutor)

SingleThreadExecutor 只会创建一个线程来执行任务,任务按顺序执行,适用于需要顺序执行任务的场景。

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SingleThreadExecutorExample {
    public static void main(String[] args) {
        // 创建一个单线程的线程池
        ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

        // 提交多个任务到线程池
        for (int i = 1; i <= 5; i++) {
            final int taskID = i;
            singleThreadExecutor.execute(() -> {
                System.out.println("Task " + taskID + " is running by " + Thread.currentThread().getName());
                try {
                    Thread.sleep(1000); // 模拟任务执行的时间
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("Task " + taskID + " completed by " + Thread.currentThread().getName());
            });
        }

        // 关闭线程池
        singleThreadExecutor.shutdown();
    }
}

在高级 Java 编程中,线程池与阻塞队列的结合使用非常常见,尤其是在生产者-消费者模型中。阻塞队列能够有效地协调线程之间的资源共享,并且线程池能够高效管理线程的调度。我们可以通过 ThreadPoolExecutor 和阻塞队列(如 LinkedBlockingQueue)结合,实现生产者-消费者模式。

下面是一个使用线程池阻塞队列的高级 Java 逻辑代码示例,展示了如何通过 ThreadPoolExecutor 来执行任务,并且通过 LinkedBlockingQueue 实现任务的协调与控制:

import java.util.concurrent.*;
import java.util.Random;

// 生产者任务
class Producer implements Runnable {
    private final BlockingQueue<Integer> queue;
    private final Random random = new Random();

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                int producedElement = random.nextInt(100); // 生成随机数
                System.out.println(Thread.currentThread().getName() + " 生产了: " + producedElement);
                queue.put(producedElement); // 将元素放入阻塞队列
                Thread.sleep(500); // 模拟生产过程
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 消费者任务
class Consumer implements Runnable {
    private final BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                int consumedElement = queue.take(); // 从阻塞队列中获取元素
                System.out.println(Thread.currentThread().getName() + " 消费了: " + consumedElement);
                Thread.sleep(1000); // 模拟消费过程
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ThreadPoolBlockingQueueExample {
    public static void main(String[] args) {
        // 创建一个线程安全的阻塞队列,容量为10
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);

        // 创建自定义线程池,核心线程数为2,最大线程数为4,存活时间为60秒,使用阻塞队列
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2, 4, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()
        );

        // 提交生产者任务到线程池
        executor.submit(new Producer(queue));
        executor.submit(new Producer(queue));

        // 提交消费者任务到线程池
        executor.submit(new Consumer(queue));
        executor.submit(new Consumer(queue));

        // 运行一段时间后手动关闭线程池
        try {
            Thread.sleep(10000); // 模拟10秒后停止生产和消费
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 关闭线程池
        executor.shutdown();
        try {
            // 等待线程池中的任务执行完毕
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }
    }
}

代码详解

  1. 阻塞队列 (BlockingQueue)

  • 代码使用了 LinkedBlockingQueue<Integer> 来作为生产者和消费者之间共享的阻塞队列,容量设置为 10,表示最多允许 10 个元素同时存在于队列中。
  • 当队列满时,生产者会阻塞,直到队列中有空位;当队列为空时,消费者会阻塞,直到有新元素被放入队列。
  • 生产者 (Producer)

    • 生产者在无限循环中随机生成整数,并尝试将其放入队列。如果队列已满,则 put() 方法会阻塞,直到队列中有空位。
    • 模拟了每隔 500 毫秒生产一个元素的过程。
  • 消费者 (Consumer)

    • 消费者从队列中获取元素(使用 take(),该方法会阻塞直到有元素可用)。每次获取一个元素后,模拟处理该元素的过程,处理时间为 1000 毫秒。
  • 线程池 (ThreadPoolExecutor)

    • 使用了自定义线程池 ThreadPoolExecutor,它的核心线程数为 2,最大线程数为 4,空闲线程存活时间为 60 秒,且使用 LinkedBlockingQueue 作为工作队列来存放待执行的任务。
    • 两个生产者任务和两个消费者任务分别提交到线程池中,线程池会自动调度这些任务,保证生产者和消费者在并发环境中高效运行。
  • 线程池的关闭

    • 主线程等待了 10 秒后,调用 executor.shutdown() 以优雅地关闭线程池,等待当前所有已提交的任务完成。
    • 如果线程池在 60 秒内没有完成所有任务,调用 executor.shutdownNow() 强制关闭线程池。

    线程池与阻塞队列结合的优势

    • 线程池管理任务执行:通过线程池管理生产者和消费者的任务执行,不必手动管理线程的创建、调度与回收,提升了并发控制的效率。
    • 阻塞队列保证线程同步BlockingQueue 在生产者和消费者之间充当了线程安全的桥梁,确保多线程环境下的数据安全性与操作协调性,避免繁琐的锁机制。
    • 线程复用:线程池有效地重用了线程,减少了频繁创建和销毁线程的开销,提升了系统性能。

    总结

    此代码结合了线程池和阻塞队列,展示了如何在生产者-消费者模型中使用高级 Java 并发工具。ThreadPoolExecutor 提供了灵活的线程管理,而 LinkedBlockingQueue 则保证了线程之间安全的资源共享。

    自旋锁概述

    自旋锁是一种轻量级锁机制,在线程尝试获取锁时,如果锁被其他线程占用,当前线程不会进入阻塞状态,而是通过循环不断地检查锁的状态(自旋)直到获取到锁。这种方式避免了线程上下文切换的开销,因此适合锁持有时间较短的场景。自旋锁适用于并发冲突少、锁定时间短的情况下,否则会浪费大量 CPU 资源。

    自旋锁的工作原理

    1. 一个线程尝试获取锁时,检查锁是否被其他线程持有。
    2. 如果锁空闲,则获取锁并进入临界区。
    3. 如果锁已被占用,线程进入自旋状态,即不断重复检查锁是否释放。
    4. 一旦锁释放,线程会立即尝试再次获取锁。
    5. 当任务完成后,线程释放锁。

    自旋锁的优势与劣势

    • 优势

      • 避免了线程进入阻塞状态,从而减少了线程上下文切换的开销。
      • 适合短时间内锁竞争的场景,尤其是在多核 CPU 环境下,频繁的上下文切换代价较高的情况下。
    • 劣势

      • 如果锁占用时间较长,自旋锁会占用大量 CPU 时间,导致资源浪费。
      • 自旋锁并不会阻塞线程,因此在长时间持有锁时会导致 CPU 空转。

    自旋锁实现的高级 Java 代码

    下面的代码展示了一个使用 AtomicReference 实现自旋锁的高级 Java 代码。自旋锁通过 compareAndSet 方法来确保线程能够安全地获取锁并进入临界区。

    import java.util.concurrent.atomic.AtomicReference;

    // 自定义自旋锁实现
    class SpinLock {
        // 使用 AtomicReference 来保存当前持有锁的线程
        private final AtomicReference<Thread> owner = new AtomicReference<>();

        // 自旋锁的加锁方法
        public void lock() {
            Thread currentThread = Thread.currentThread();
            // 当锁已经被其他线程占用时,当前线程自旋等待
            while (!owner.compareAndSet(null, currentThread)) {
                // 自旋等待,什么都不做,只是不断检查锁的状态
            }
            System.out.println(currentThread.getName() + " 获取到了锁");
        }

        // 自旋锁的解锁方法
        public void unlock() {
            Thread currentThread = Thread.currentThread();
            // 只有持有锁的线程才能解锁
            if (owner.get() == currentThread) {
                owner.set(null); // 释放锁
                System.out.println(currentThread.getName() + " 释放了锁");
            }
        }
    }

    // 自旋锁测试类
    public class SpinLockExample {
        public static void main(String[] args) {
            final SpinLock spinLock = new SpinLock();

            // 创建两个线程模拟锁的竞争
            Runnable task = () -> {
                spinLock.lock();
                try {
                    // 模拟临界区中的任务执行
                    System.out.println(Thread.currentThread().getName() + " 正在执行临界区任务");
                    Thread.sleep(1000); // 模拟任务耗时
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    // 任务执行完毕后解锁
                    spinLock.unlock();
                }
            };

            // 启动两个线程,演示自旋锁的工作
            Thread thread1 = new Thread(task, "线程1");
            Thread thread2 = new Thread(task, "线程2");

            thread1.start();
            thread2.start();
        }
    }

    偏向锁概述

    偏向锁(Biased Locking)是 Java 虚拟机(JVM)中的一种优化锁机制,旨在减少无竞争情况下的锁开销。在偏向锁模式下,锁会偏向于第一个获取它的线程,之后这个线程可以再次进入同步块而无需任何同步操作。只有当其他线程尝试获取该锁时,偏向锁才会被撤销,从而转变为轻量级锁或重量级锁。

    偏向锁的设计假设是:大多数锁在生命周期内都是由同一个线程获得的,因此偏向锁可以显著减少单线程多次获取同一锁时的性能开销。

    偏向锁的工作原理

    1. 初次获取锁:当某个线程首次获取锁时,锁会偏向于该线程。此时,JVM 会将锁的偏向标记设为该线程,表示该线程已经获得锁。
    2. 后续获取锁:如果同一线程再次尝试获取锁,JVM 会直接让该线程进入同步块,而不需要执行任何锁相关的同步操作。
    3. 偏向锁撤销:当另一个线程尝试获取偏向锁时,JVM 会撤销偏向锁,将其升级为轻量级锁或重量级锁。此时可能会发生线程之间的竞争。

    偏向锁的优点

    • 偏向锁大幅度减少了在无竞争情况下的同步开销,因为同一个线程反复获取锁时不会涉及到复杂的同步机制。
    • 适用于锁竞争较少、单个线程频繁加锁和解锁的场景。

    偏向锁的缺点

    • 如果锁在多个线程之间频繁竞争,偏向锁的撤销会带来额外的性能损耗。

    JVM 默认开启偏向锁

    • 偏向锁在 Java 6 及以后的 JVM 中是默认启用的,但可以通过 JVM 参数来禁用它:-XX:-UseBiasedLocking
    • 如果要调整偏向锁启动的延迟,可以使用参数 -XX:BiasedLockingStartupDelay=0 来立即启用偏向锁。
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class CustomThreadPoolExample {
        public static void main(String[] args) {
            // 创建自定义的线程池
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,                        // 核心线程数
                4,                        // 最大线程数
                60,                       // 空闲线程存活时间
                TimeUnit.SECONDS,          // 存活时间的单位
                new LinkedBlockingQueue<Runnable>() // 任务队列
            );
            
            // 向线程池提交任务
            for (int i = 0; i < 10; i++) {
                executor.execute(() -> {
                    try {
                        System.out.println(Thread.currentThread().getName() + " 正在执行任务");
                        Thread.sleep(2000); // 模拟任务耗时
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            
            // 关闭线程池
            executor.shutdown();
        }
    }

    线程池参数说明

    1. 核心线程数 (Core Pool Size) :

    • 这是线程池中保持活跃的最小线程数。即使线程处于空闲状态,核心线程也不会被回收。
    • 设置合适的核心线程数可以确保在任务压力较小时,避免频繁地创建和销毁线程带来的性能开销。
  • 最大线程数 (Maximum Pool Size) :

    • 这是线程池能够容纳的最大线程数量。当任务数超过核心线程数,且任务队列已满时,线程池会创建新线程直到达到最大线程数。
    • 如果任务执行时间较长,应考虑使用较小的最大线程数,避免过多的线程导致系统资源不足。
  • 线程存活时间 (Keep Alive Time) :

    • 当线程池中的线程数量超过核心线程数时,空闲线程的最大存活时间。超过该时间后,多余的线程会被销毁,直到线程数降至核心线程数。
    • 适用于负载波动较大的场景,可以灵活地调整线程池大小。
  • 任务队列 (Task Queue) :

    • LinkedBlockingQueue(无界队列) :常用于当任务量大时,允许队列无限增长(直到内存耗尽),避免频繁创建线程。
    • SynchronousQueue(直接提交队列) :每次提交任务都必须有空闲线程执行,不存储任何等待任务,适用于需要立即处理任务的场景。

    线程池工作过程概述

    1. 核心线程数 (Core Pool Size)
      当任务提交时,线程池会优先创建核心线程来处理任务。如果当前活跃线程数小于核心线程数,即使任务队列为空,也会创建新的线程来执行任务。
    2. 任务队列 (Task Queue)
      当线程池的活跃线程数已经达到了核心线程数,新的任务会被放入任务队列,等待空闲线程处理。如果使用的是LinkedBlockingQueue(无界队列),队列可以无限增长;如果使用的是SynchronousQueue,则不存储任务,任务会直接提交给空闲线程执行。
    3. 最大线程数 (Maximum Pool Size)
      当任务队列已满,且仍有新的任务提交时,线程池会尝试创建非核心线程,直到线程总数达到Maximum Pool Size。这些额外的线程在任务执行完成后,若处于空闲状态,将会根据Keep Alive Time设定的时间被销毁。
    4. 线程存活时间 (Keep Alive Time)
      线程池中的非核心线程如果空闲时间超过Keep Alive Time,将会被销毁,直到线程数减小到核心线程数。通过设置这个时间,可以使线程池在任务少时自动减少线程数量,节约资源。
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class ThreadPoolLifecycle {
        public static void main(String[] args) {
            // 创建自定义的线程池
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                2,                        // 核心线程数
                4,                        // 最大线程数
                60,                       // 空闲线程存活时间
                TimeUnit.SECONDS,          // 存活时间的单位
                new LinkedBlockingQueue<>(5)  // 有界任务队列,容量为5
            );
            
            // 提交10个任务
            for (int i = 0; i < 10; i++) {
                final int taskNumber = i;
                executor.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 正在执行任务 " + taskNumber);
                    try {
                        // 模拟任务执行
                        Thread.sleep(2000); // 每个任务执行2秒
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }
            
            // 关闭线程池
            executor.shutdown();
        }
    }

    过程描述

    1. 前2个任务:直接由线程池创建的2个核心线程处理。
    2. 第3到第5个任务:被放入任务队列,等待核心线程空闲。
    3. 第6到第7个任务:任务队列满了,线程池创建非核心线程来处理。
    4. 第8到第10个任务:超过最大线程数,新的任务无法提交,可能抛出异常。

    加群联系作者vx:xiaoda0423

    仓库地址:https://github.com/webVueBlog/JavaGuideInterview

    算法猫叔
    程序员:进一寸有一寸的欢喜
     最新文章