信号量 Semaphore
信号量 Semaphore
信号量(Semaphore)是一种同步机制,用于控制多个线程或进程对共享资源的访问,避免出现竞争条件或死锁问题。它主要在多线程或多进程编程中使用,尤其在并发编程中非常重要。
进程间通信采用共享内存的时候,通常需要信号量 semaphore 或者互斥体 mutex 进行进程间的并发协调。
看到共享内存的时候,我感觉可以直接操作内存真的好爽啊。可以搞出很多具有想象力的玩儿法。共享内存就是,很有想象力。
信号量的工作原理
- 信号量的定义
信号量是一个整型计数器,表示可以被访问的资源数量。- 当信号量的值大于零时,线程或进程可以访问资源,并使信号量的值减一。
- 当信号量的值等于零时,线程或进程会阻塞,直到信号量的值大于零。
- 当线程或进程释放资源时,信号量的值加一。
- 信号量的两种类型
- 计数信号量(Counting Semaphore):信号量的值可以大于 1,表示可以有多个线程同时访问资源。例如,限制某个资源的并发访问量为 N。
- 二进制信号量(Binary Semaphore):信号量的值只能是 0 或 1,类似于互斥锁(Mutex),用于实现互斥访问。
- 基本操作
信号量的主要操作包括:- Wait(P 操作):尝试获取信号量。如果信号量的值大于零,则减一并继续执行;否则,阻塞直到信号量的值大于零。
- Signal(V 操作):释放信号量,将信号量的值加一。如果有线程阻塞在该信号量上,则唤醒其中一个线程。
这里的 P 和 V 是荷兰语操作名的缩写,分别代表 "Proberen"(测试)和 "Verhogen"(增加)。
- 多线程环境中的工作流程
- 初始化信号量:设置初始值,通常等于可用资源的数量。
- 线程访问资源:线程在访问共享资源前执行 Wait 操作,减少信号量值。
- 线程释放资源:线程完成资源访问后执行 Signal 操作,增加信号量值。
信号量的实际应用
- 线程同步
信号量可以用于控制线程的执行顺序。例如,线程 A 必须在线程 B 之后执行,可以通过信号量阻塞线程 A,直到线程 B 释放信号量。 - 资源限制
信号量可以用于限制资源的访问量。例如,一个线程池中,信号量可以用来限制线程数。 - 生产者 - 消费者问题
信号量常用于解决经典的生产者 - 消费者问题:- 一个信号量表示缓冲区中的可用空间数量。
- 另一个信号量表示缓冲区中可用的产品数量。
信号量的优缺点
优点:
- 简单高效,适用于解决资源共享问题。
- 通过计数信号量,可以轻松实现多资源的管理。
缺点: - 编程复杂度较高,容易出错。
- 如果管理不当,可能导致死锁、资源泄漏等问题。
简单实践一下
当我们想要实现自己的线程锁的时候,可以参 semaphore 的设计
本来想自己写代码实践一下,结果发现,停止线程和启动线程要么得用 Object 的 wait 和 notify 方法,要么得用 Lock 的 Condition 方法,
Synchronized 关键字写法
class ProcessExample {
private static final Object lock = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
try {
System.out.println("Thread t1: Waiting...");
lock.wait(); // 阻塞当前线程
System.out.println("Thread t1: Resumed.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread t2 = new Thread(() -> {
synchronized (lock) {
System.out.println("Thread t2: Notifying...");
lock.notify(); // 唤醒线程 t1
}
});
t1.start();
try {
Thread.sleep(1000); // 确保 t1 先进入 wait 状态
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
Condition 写法
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class ProcessExampleWithCondition {
private static final Lock lock = new ReentrantLock();
private static final Condition condition = lock.newCondition();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread t1: Waiting...");
condition.await(); // 阻塞当前线程
System.out.println("Thread t1: Resumed.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
});
Thread t2 = new Thread(() -> {
lock.lock();
try {
System.out.println("Thread t2: Signaling...");
condition.signal(); // 唤醒线程 t1
} finally {
lock.unlock();
}
});
t1.start();
try {
Thread.sleep(1000); // 确保 t1 先进入 await 状态
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
}
}
但是无论是哪种写法,都必须先获取锁,然后才能使用,而获取排他锁,本来就是 semaphore 应该干的事情,因此实际上实现不了,从这个角度看,AQS 的实现就很牛逼,不依赖 synchronized 关键字,从 0 实现了锁机制。
JDK 自带的 semaphore 实现
在 Java 中,可以使用标准库提供的 java.util.concurrent.Semaphore
类来实现信号量(Semaphore)。它是用于控制同时访问某个资源的线程数量的经典工具,底层基于 AQS(AbstractQueuedSynchronizer)实现。
示例:简单信号量用法(模拟限流)
import java.util.concurrent.Semaphore;
public class SemaphoreExample {
// 创建一个最多允许3个线程同时访问的信号量
private static final Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
// 创建10个线程模拟并发访问
for (int i = 0; i < 10; i++) {
final int threadNum = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadNum + " 尝试获取许可...");
semaphore.acquire(); // 获取许可(阻塞)
System.out.println("线程 " + threadNum + " 获取到许可,开始执行...");
Thread.sleep(2000); // 模拟执行任务
System.out.println("线程 " + threadNum + " 执行完毕,释放许可。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(); // 释放许可
}
}).start();
}
}
}
关键方法说明:
方法 | 作用 |
---|---|
acquire() |
获取一个许可,如果没有就阻塞等待 |
release() |
释放一个许可 |
tryAcquire() |
尝试获取一个许可,获取不到就立即返回 false |
availablePermits() |
返回当前可用许可数 |
应用场景举例:
- 限制访问某个资源(如数据库连接池)
- 控制并发线程数(如限流、并发爬虫)
- 线程间同步(更低级别的控制)
进阶用法:构造公平信号量
Semaphore semaphore = new Semaphore(3, true); // 第二个参数 true 表示公平信号量
公平信号量会按照线程请求许可的顺序依次授予(FIFO),非公平信号量可能后来的线程先获得许可(性能更好,但可能不公平)。
semaphore 的一些坑
semaphore 的 release 方法会增加 permit,而调用 release 之前不需要调用 acquire 方法
举个例子
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(2);
semaphore.release();
semaphore.release();
System.out.println(semaphore.availablePermits());
semaphore.acquire();
semaphore.acquire();
System.out.println(semaphore.availablePermits());
semaphore.acquire();
semaphore.acquire();
System.out.println(semaphore.availablePermits());
}
输出
4
2
0
本来我们创建了一个 permit 为 2 的信号量,希望两次 acquire 之后,第三次 acquire 就阻塞。
但是
semaphore.release();
会增加 permit 的值,甚至超过初始值- 调用
semaphore.release();
之前不需要持有 permit,也就是说不需要先调用semaphore.acquire();
在这两个效果的叠加下,虽然我初始化的是一个 permit 为 2 的 semaphore,但是我可以通过调用两次semaphore.release();
将 permit 增加到 4,然后调用 4 次semaphore.acquire();
,这个 semaphore 实际上就失效了。
所以,我们需要用一个线程变量记录一下当前线程是否已经调用了semaphore.acquire();
,然后在调用semaphore.release();
之前检查,如果没有,则不执行semaphore.release();
。
代码如下:
import java.util.concurrent.Semaphore;
public class TrackedSemaphore {
private final Semaphore semaphore;
// 每个线程的持有许可数量
private final ThreadLocal<Integer> heldPermits = ThreadLocal.withInitial(() -> 0);
public TrackedSemaphore(int permits) {
this.semaphore = new Semaphore(permits);
}
public void acquire() throws InterruptedException {
semaphore.acquire();
heldPermits.set(heldPermits.get() + 1);
}
public boolean tryAcquire() {
boolean acquired = semaphore.tryAcquire();
if (acquired) {
heldPermits.set(heldPermits.get() + 1);
}
return acquired;
}
public void release() {
int held = heldPermits.get();
if (held <= 0) {
throw new IllegalStateException("当前线程未持有许可,不能释放!");
}
heldPermits.set(held - 1);
semaphore.release();
}
public int availablePermits() {
return semaphore.availablePermits();
}
// 可选:当前线程持有许可数
public int getHeldPermits() {
return heldPermits.get();
}
}
为什么 JDK 中的 Semaphore
不做这种检查?
- 性能考虑:加锁检查每个线程是否 acquire 过,成本高。
- 灵活性:有些场景
acquire
和release
发生在线程 A 和线程 B 中,这种跨线程行为是合法的(例如生产者消费者模型)。