信号量 Semaphore

信号量 Semaphore

信号量(Semaphore)是一种同步机制,用于控制多个线程或进程对共享资源的访问,避免出现竞争条件或死锁问题。它主要在多线程或多进程编程中使用,尤其在并发编程中非常重要。

进程间通信采用共享内存的时候,通常需要信号量 semaphore 或者互斥体 mutex 进行进程间的并发协调。
看到共享内存的时候,我感觉可以直接操作内存真的好爽啊。可以搞出很多具有想象力的玩儿法。共享内存就是,很有想象力。

信号量的工作原理

  1. 信号量的定义
    信号量是一个整型计数器,表示可以被访问的资源数量。
    • 当信号量的值大于零时,线程或进程可以访问资源,并使信号量的值减一。
    • 当信号量的值等于零时,线程或进程会阻塞,直到信号量的值大于零。
    • 当线程或进程释放资源时,信号量的值加一。
  2. 信号量的两种类型
    • 计数信号量(Counting Semaphore):信号量的值可以大于 1,表示可以有多个线程同时访问资源。例如,限制某个资源的并发访问量为 N。
    • 二进制信号量(Binary Semaphore):信号量的值只能是 0 或 1,类似于互斥锁(Mutex),用于实现互斥访问。
  3. 基本操作
    信号量的主要操作包括:
    • Wait(P 操作):尝试获取信号量。如果信号量的值大于零,则减一并继续执行;否则,阻塞直到信号量的值大于零。
    • Signal(V 操作):释放信号量,将信号量的值加一。如果有线程阻塞在该信号量上,则唤醒其中一个线程。

    这里的 PV 是荷兰语操作名的缩写,分别代表 "Proberen"(测试)和 "Verhogen"(增加)。

  4. 多线程环境中的工作流程
    • 初始化信号量:设置初始值,通常等于可用资源的数量。
    • 线程访问资源:线程在访问共享资源前执行 Wait 操作,减少信号量值。
    • 线程释放资源:线程完成资源访问后执行 Signal 操作,增加信号量值。

信号量的实际应用

  1. 线程同步
    信号量可以用于控制线程的执行顺序。例如,线程 A 必须在线程 B 之后执行,可以通过信号量阻塞线程 A,直到线程 B 释放信号量。
  2. 资源限制
    信号量可以用于限制资源的访问量。例如,一个线程池中,信号量可以用来限制线程数。
  3. 生产者 - 消费者问题
    信号量常用于解决经典的生产者 - 消费者问题:
    • 一个信号量表示缓冲区中的可用空间数量。
    • 另一个信号量表示缓冲区中可用的产品数量。

信号量的优缺点

优点:

简单实践一下

当我们想要实现自己的线程锁的时候,可以参 semaphore 的设计
本来想自己写代码实践一下,结果发现,停止线程和启动线程要么得用 Object 的 wait 和 notify 方法,要么得用 Lock 的 Condition 方法,
Synchronized 关键字写法

class ProcessExample {
    private static final Object lock = new Object();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                try {
                    System.out.println("Thread t1: Waiting...");
                    lock.wait(); // 阻塞当前线程
                    System.out.println("Thread t1: Resumed.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        Thread t2 = new Thread(() -> {
            synchronized (lock) {
                System.out.println("Thread t2: Notifying...");
                lock.notify(); // 唤醒线程 t1
            }
        });

        t1.start();

        try {
            Thread.sleep(1000); // 确保 t1 先进入 wait 状态
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t2.start();
    }
}

Condition 写法

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class ProcessExampleWithCondition {
    private static final Lock lock = new ReentrantLock();
    private static final Condition condition = lock.newCondition();

    public static void main(String[] args) {
        Thread t1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread t1: Waiting...");
                condition.await(); // 阻塞当前线程
                System.out.println("Thread t1: Resumed.");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        });

        Thread t2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println("Thread t2: Signaling...");
                condition.signal(); // 唤醒线程 t1
            } finally {
                lock.unlock();
            }
        });

        t1.start();

        try {
            Thread.sleep(1000); // 确保 t1 先进入 await 状态
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        t2.start();
    }
}

但是无论是哪种写法,都必须先获取锁,然后才能使用,而获取排他锁,本来就是 semaphore 应该干的事情,因此实际上实现不了,从这个角度看,AQS 的实现就很牛逼,不依赖 synchronized 关键字,从 0 实现了锁机制。

JDK 自带的 semaphore 实现

在 Java 中,可以使用标准库提供的 java.util.concurrent.Semaphore 类来实现信号量(Semaphore)。它是用于控制同时访问某个资源的线程数量的经典工具,底层基于 AQS(AbstractQueuedSynchronizer)实现。

示例:简单信号量用法(模拟限流)

import java.util.concurrent.Semaphore;

public class SemaphoreExample {
    // 创建一个最多允许3个线程同时访问的信号量
    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        // 创建10个线程模拟并发访问
        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            new Thread(() -> {
                try {
                    System.out.println("线程 " + threadNum + " 尝试获取许可...");
                    semaphore.acquire(); // 获取许可(阻塞)
                    System.out.println("线程 " + threadNum + " 获取到许可,开始执行...");
                    Thread.sleep(2000); // 模拟执行任务
                    System.out.println("线程 " + threadNum + " 执行完毕,释放许可。");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }).start();
        }
    }
}

关键方法说明:

方法 作用
acquire() 获取一个许可,如果没有就阻塞等待
release() 释放一个许可
tryAcquire() 尝试获取一个许可,获取不到就立即返回 false
availablePermits() 返回当前可用许可数

应用场景举例:

  1. 限制访问某个资源(如数据库连接池)
  2. 控制并发线程数(如限流、并发爬虫)
  3. 线程间同步(更低级别的控制)

进阶用法:构造公平信号量

Semaphore semaphore = new Semaphore(3, true); // 第二个参数 true 表示公平信号量

公平信号量会按照线程请求许可的顺序依次授予(FIFO),非公平信号量可能后来的线程先获得许可(性能更好,但可能不公平)。

semaphore 的一些坑

semaphore 的 release 方法会增加 permit,而调用 release 之前不需要调用 acquire 方法

举个例子

public static void main(String[] args) throws InterruptedException {  
    Semaphore semaphore = new Semaphore(2);  
    semaphore.release();  
    semaphore.release();  
    System.out.println(semaphore.availablePermits());  
    semaphore.acquire();  
    semaphore.acquire();  
    System.out.println(semaphore.availablePermits());  
    semaphore.acquire();  
    semaphore.acquire();  
    System.out.println(semaphore.availablePermits());  
}

输出

4
2
0

本来我们创建了一个 permit 为 2 的信号量,希望两次 acquire 之后,第三次 acquire 就阻塞。
但是

import java.util.concurrent.Semaphore;

public class TrackedSemaphore {
    private final Semaphore semaphore;

    // 每个线程的持有许可数量
    private final ThreadLocal<Integer> heldPermits = ThreadLocal.withInitial(() -> 0);

    public TrackedSemaphore(int permits) {
        this.semaphore = new Semaphore(permits);
    }

    public void acquire() throws InterruptedException {
        semaphore.acquire();
        heldPermits.set(heldPermits.get() + 1);
    }

    public boolean tryAcquire() {
        boolean acquired = semaphore.tryAcquire();
        if (acquired) {
            heldPermits.set(heldPermits.get() + 1);
        }
        return acquired;
    }

    public void release() {
        int held = heldPermits.get();
        if (held <= 0) {
            throw new IllegalStateException("当前线程未持有许可,不能释放!");
        }
        heldPermits.set(held - 1);
        semaphore.release();
    }

    public int availablePermits() {
        return semaphore.availablePermits();
    }

    // 可选:当前线程持有许可数
    public int getHeldPermits() {
        return heldPermits.get();
    }
}

为什么 JDK 中的 Semaphore 不做这种检查?