线程的共享
synchronized内置锁
Java 支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized 可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。 对象锁和类锁: 对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的 class 对象上的。我们知道,类的对象实例可以有很多个,但是每个类只有一个 class 对象,所以不同对象实例的对象锁是互不干扰的,但是每个类只有一个类锁。 但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的 class 对象。类锁和对象锁之间也是互不干扰的。
代码示例:
*类说明:synchronized关键字的使用方法
*/
public class SynTest {
private long count =0;
private Object obj = new Object();//作为一个锁
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
/*用在同步块上*/
public void incCount(){
synchronized (obj){
count++;
}
}
/*用在方法上*/
public synchronized void incCount2(){
count++;
}
/*用在同步块上,但是锁的是当前类的对象实例*/
public void incCount3(){
synchronized (this){
count++;
}
}
//线程
private static class Count extends Thread{
private SynTest simplOper;
public Count(SynTest simplOper) {
this.simplOper = simplOper;
}
@Override
public void run() {
for(int i=0;i<10000;i++){
simplOper.incCount();//count = count+10000
}
}
}
public static void main(String[] args) throws InterruptedException {
SynTest simplOper = new SynTest();
//启动两个线程
Count count1 = new Count(simplOper);
Count count2 = new Count(simplOper);
count1.start();
count2.start();
Thread.sleep(50);
System.out.println(simplOper.count);//20000
}
}
/**
*类说明:锁的实例不一样,也是可以并行的
*/
public class DiffInstance {
private static class InstanceSyn implements Runnable{
private DiffInstance diffInstance;
public InstanceSyn(DiffInstance diffInstance) {
this.diffInstance = diffInstance;
}
@Override
public void run() {
System.out.println("TestInstance is running..."+ diffInstance);
diffInstance.instance();
}
}
private static class Instance2Syn implements Runnable{
private DiffInstance diffInstance;
public Instance2Syn(DiffInstance diffInstance) {
this.diffInstance = diffInstance;
}
@Override
public void run() {
System.out.println("TestInstance2 is running..."+ diffInstance);
diffInstance.instance2();
}
}
private synchronized void instance(){
SleepTools.second(3);
System.out.println("synInstance is going..."+this.toString());
SleepTools.second(3);
System.out.println("synInstance ended "+this.toString());
}
private synchronized void instance2(){
SleepTools.second(3);
System.out.println("synInstance2 is going..."+this.toString());
SleepTools.second(3);
System.out.println("synInstance2 ended "+this.toString());
}
public static void main(String[] args) {
DiffInstance instance1 = new DiffInstance();
Thread t3 = new Thread(new Instance2Syn(instance1));
DiffInstance instance2 = new DiffInstance();
Thread t4 = new Thread(new InstanceSyn(instance1));
//先执行完一个才会执行另外一个
t3.start();
t4.start();
SleepTools.second(1);
}
}
/**
*类说明:演示实例锁和类锁是不同的,两者可以并行
*/
public class InstanceAndClass {
private static class SynClass extends Thread{
@Override
public void run() {
System.out.println("TestClass is running...");
synClass();
}
}
private static class InstanceSyn implements Runnable{
private InstanceAndClass SynClassAndInstance;
public InstanceSyn(InstanceAndClass SynClassAndInstance) {
this.SynClassAndInstance = SynClassAndInstance;
}
@Override
public void run() {
System.out.println("TestInstance is running..."+SynClassAndInstance);
SynClassAndInstance.instance();
}
}
private synchronized void instance(){
SleepTools.second(1);
System.out.println("synInstance is going..."+this.toString());
SleepTools.second(1);
System.out.println("synInstance ended "+this.toString());
}
private static synchronized void synClass(){
SleepTools.second(1);
System.out.println("synClass going...");
SleepTools.second(1);
System.out.println("synClass end");
}
public static void main(String[] args) {
InstanceAndClass synClassAndInstance = new InstanceAndClass();
Thread t1 = new SynClass();
Thread t2 = new Thread(new InstanceSyn(synClassAndInstance));
t2.start();
SleepTools.second(1);
t1.start();
}
}
/**
*类说明:类锁和锁static变量也是不同的 可以并行
*/
public class StaticAndClass {
private static class SynClass extends Thread{
@Override
public void run() {
System.out.println(currentThread().getName()
+":SynClass is running...");
synClass();
}
}
private static class SynStatic extends Thread{
@Override
public void run() {
System.out.println(currentThread().getName()
+"SynStatic is running...");
synStatic();
}
}
private static synchronized void synClass(){
System.out.println(Thread.currentThread().getName()
+"synClass going...");
SleepTools.second(1);
System.out.println(Thread.currentThread().getName()
+"synClass end");
}
private static Object obj = new Object();
private static void synStatic(){
synchronized (obj){
System.out.println(Thread.currentThread().getName()
+"synStatic going...");
SleepTools.second(1);
System.out.println(Thread.currentThread().getName()
+"synStatic end");
}
}
public static void main(String[] args) {
StaticAndClass synClassAndInstance = new StaticAndClass();
Thread t1 = new SynClass();
//Thread t2 = new SynStatic();
Thread t2 = new SynClass();
t2.start();
SleepTools.second(1);
t1.start();
}
}
错误的加锁和原因分析
原因:虽然我们对 i 进行了加锁,但是
但是当我们反编译这个类的 class 文件后,可以看到 i++实际是, 本质上是返回了一个新的 Integer 对象。也就是每个线程实际加锁的是不同的 Integer 对象。
volatile,最轻量的同步机制
volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。
不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环,而加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。
/**
* 类说明:演示Volatile的提供的可见性
*/
public class VolatileCase {
private volatile static boolean ready;
private static int number;
//
private static class PrintThread extends Thread{
@Override
public void run() {
System.out.println("PrintThread is running.......");
while(!ready);//无限循环
System.out.println("number = "+number);
}
}
public static void main(String[] args) {
new PrintThread().start();
SleepTools.second(1);
number = 51;//如果没有加volatile关键字则主线程都结束了也没有打印number的值,加了关键值后打印出来的值就是主线程修改的值
ready = true;
SleepTools.second(5);
System.out.println("main is ended!");
}
}
但是 volatile 不能保证数据在多个线程下同时写时的线程安全。
/**
* 类说明:
*/
public class NotSafe {
private volatile long count =0;
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
//count进行累加
public void incCount(){
count++;
}
//线程
private static class Count extends Thread{
private NotSafe simplOper;
public Count(NotSafe simplOper) {
this.simplOper = simplOper;
}
@Override
public void run() {
for(int i=0;i<10000;i++){
simplOper.incCount();
}
}
}
public static void main(String[] args) throws InterruptedException {
NotSafe simplOper = new NotSafe();
//启动两个线程
Count count1 = new Count(simplOper);
Count count2 = new Count(simplOper);
count1.start();
count2.start();
Thread.sleep(50);
System.out.println(simplOper.count);//20000?
}
}
volatile 最适用的场景:一个线程写,多个线程读。
线程间的协作
线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题: 1) 难以确保及时性。 2)难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
等待/通知机制
是指一个线程 A 调用了对象 O 的 wait()方法进入等待状态,而另一个线程 B调用了对象 O 的 notify()或者 notifyAll()方法,线程 A 收到通知后从对象 O 的 wait()方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
notify(): 通知一个在对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入 WAITING 状态。
notifyAll(): 通知所有等待在该对象上的线程
wait() 调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用 wait()方法后,会释放对象的锁
wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达 n 毫秒,如果没有通知就超时返回
wait (long,int) 对于超时时间更细粒度的控制,可以达到纳秒
等待和通知的标准范式
等待方遵循如下原则。
1)获取对象的锁。
2)如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件。
3)条件满足则执行对应的逻辑。
通知方遵循如下原则。
1)获得对象的锁。
2)改变条件。
3)通知所有等待在对象上的线程。
在调用 wait()、notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法、notify()系列方法,进入 wait()方法后,当前线程释放锁,在从 wait()返回前,线程与其他线程竞争重新获得锁,执行 notify()系列方法的线程退出调用了 notifyAll 的 synchronized代码块的时候后,他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出 synchronized 代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。
notify 和 notifyAll 应该用谁
尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程
代码示例:
/**
*类说明:快递实体类
*/
public class Express {
public final static String CITY = "ShangHai";
private int km;/*快递运输里程数*/
private String site;/*快递到达地点*/
public Express() {
}
public Express(int km, String site) {
this.km = km;
this.site = site;
}
/* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/
public synchronized void changeKm(){
this.km = 101;
notify();
}
/* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/
public synchronized void changeSite(){
this.site = "BeiJing";
notifyAll();
}
/*线程等待公里的变化*/
public synchronized void waitKm(){
while(this.km<100){
try {
wait();
System.out.println("Check Site thread["
+Thread.currentThread().getId()
+"] is be notified");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("the Km is "+this.km+",I will change db");
}
/*线程等待目的地的变化*/
public synchronized void waitSite(){
while(this.site.equals(CITY)){//快递到达目的地
try {
wait();
System.out.println("Check Site thread["+Thread.currentThread().getId()
+"] is be notified");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("the site is "+this.site+",I will call user");
}
}
/**
*类说明:测试wait/notify/notifyAll
*/
public class TestWN {
private static Express express = new Express(0,Express.CITY);
/*检查里程数变化的线程,不满足条件,线程一直等待*/
private static class CheckKm extends Thread{
@Override
public void run() {
express.waitKm();
}
}
/*检查地点变化的线程,不满足条件,线程一直等待*/
private static class CheckSite extends Thread{
@Override
public void run() {
express.waitSite();
}
}
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<3;i++){
new CheckSite().start();
}
for(int i=0;i<3;i++){
new CheckKm().start();
}
Thread.sleep(1000);
express.changeKm();//快递地点变化
}
}
等待超时模式实现一个连接池
调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果 假设等待时间段是 T,那么可以推断出在当前时间 now+T 之后就会超时 等待持续时间:REMAINING=T。 超时时间:FUTURE=now+T。
/**
*类说明:连接池的实现
*/
public class DBPool {
/*容器,存放连接*/
private static LinkedList<Connection> pool = new LinkedList<Connection>();
/*限制了池的大小=20*/
public DBPool(int initialSize) {
if (initialSize > 0) {
for (int i = 0; i < initialSize; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
}
/*释放连接,通知其他的等待连接的线程*/
public void releaseConnection(Connection connection) {
if (connection != null) {
synchronized (pool){
pool.addLast(connection);
//通知其他等待连接的线程
pool.notifyAll();
}
}
}
/*获取*/
// 在mills内无法获取到连接,将会返回null 1S
public Connection fetchConnection(long mills)
throws InterruptedException {
synchronized (pool){
//永不超时
if(mills<=0){
while(pool.isEmpty()){
pool.wait();
}
return pool.removeFirst();
}else{
/*超时时刻*/
long future = System.currentTimeMillis()+mills;
/*等待时长*/
long remaining = mills;
while(pool.isEmpty()&&remaining>0){
pool.wait(remaining);
/*唤醒一次,重新计算等待时长*/
remaining = future-System.currentTimeMillis();
}
Connection connection = null;
if(!pool.isEmpty()){
connection = pool.removeFirst();
}
return connection;
}
}
}
}
/**
*类说明:
*/
public class DBPoolTest {
static DBPool pool = new DBPool(10);
// 控制器:控制main线程将会等待所有Woker结束后才能继续执行
static CountDownLatch end;
public static void main(String[] args) throws Exception {
// 线程数量
int threadCount = 50;
end = new CountDownLatch(threadCount);
int count = 20;//每个线程的操作次数
AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
for (int i = 0; i < threadCount; i++) {
Thread thread = new Thread(new Worker(count, got, notGot),
"worker_"+i);
thread.start();
}
end.await();// main线程在此处等待
System.out.println("总共尝试了: " + (threadCount * count));
System.out.println("拿到连接的次数: " + got);
System.out.println("没能连接的次数: " + notGot);
}
static class Worker implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public Worker(int count, AtomicInteger got,
AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public void run() {
while (count > 0) {
try {
// 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
// 分别统计连接获取的数量got和未获取到的数量notGot
Connection connection = pool.fetchConnection(1000);
if (connection != null) {
try {
connection.createStatement();
// PreparedStatement preparedStatement
// = connection.prepareStatement("");
// preparedStatement.execute();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
notGot.incrementAndGet();
System.out.println(Thread.currentThread().getName()
+"等待超时!");
}
} catch (Exception ex) {
} finally {
count--;
}
}
end.countDown();
}
}
}
/**
*类说明:
*/
public class SqlConnectImpl implements Connection{
/*拿一个数据库连接*/
public static final Connection fetchConnection(){
return new SqlConnectImpl();
}
.........
客户端获取连接的过程被设定为等待超时的模式,也就是在 1000 毫秒内如果无法获取到可用连接,将会返回给客户端一个 null。设定连接池的大小为 10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。 它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用 fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用 releaseConnection(Connection)方法将连接放回线程池