Java线程间的共享和协作,详细过程如下文,一眼懂

发布于:2022-12-19 ⋅ 阅读:(514) ⋅ 点赞:(0)

线程的共享

synchronized内置锁

Java 支持多个线程同时访问一个对象或者对象的成员变量,关键字synchronized 可以修饰方法或者以同步块的形式来进行使用,它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中,它保证了线程对变量访问的可见性和排他性,又称为内置锁机制。 对象锁和类锁: 对象锁是用于对象实例方法,或者一个对象实例上的,类锁是用于类的静态方法或者一个类的 class 对象上的。我们知道,类的对象实例可以有很多个,但是每个类只有一个 class 对象,所以不同对象实例的对象锁是互不干扰的,但是每个类只有一个类锁。 但是有一点必须注意的是,其实类锁只是一个概念上的东西,并不是真实存在的,类锁其实锁的是每个类的对应的 class 对象。类锁和对象锁之间也是互不干扰的。

代码示例:

 *类说明:synchronized关键字的使用方法
 */
public class SynTest {

	private long count =0;
	private Object obj = new Object();//作为一个锁

	public long getCount() {
		return count;
	}

	public void setCount(long count) {
		this.count = count;
	}

	/*用在同步块上*/
	public void incCount(){
		synchronized (obj){
			count++;
		}
	}

	/*用在方法上*/
	public synchronized void incCount2(){
			count++;
	}

	/*用在同步块上,但是锁的是当前类的对象实例*/
	public void incCount3(){
		synchronized (this){
			count++;
		}
	}

	//线程
	private static class Count extends Thread{

		private SynTest simplOper;

		public Count(SynTest simplOper) {
			this.simplOper = simplOper;
		}

		@Override
		public void run() {
			for(int i=0;i<10000;i++){
				simplOper.incCount();//count = count+10000
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		SynTest simplOper = new SynTest();
		//启动两个线程
		Count count1 = new Count(simplOper);
		Count count2 = new Count(simplOper);
		count1.start();
		count2.start();
		Thread.sleep(50);
		System.out.println(simplOper.count);//20000
	}
}
/**
 *类说明:锁的实例不一样,也是可以并行的
 */
public class DiffInstance {
	
    private static class InstanceSyn implements Runnable{
        private DiffInstance diffInstance;

        public InstanceSyn(DiffInstance diffInstance) {
            this.diffInstance = diffInstance;
        }

        @Override
        public void run() {
            System.out.println("TestInstance is running..."+ diffInstance);
            diffInstance.instance();
        }
    }

    private static class Instance2Syn implements Runnable{
        private DiffInstance diffInstance;

        public Instance2Syn(DiffInstance diffInstance) {
            this.diffInstance = diffInstance;
        }
        @Override
        public void run() {
            System.out.println("TestInstance2 is running..."+ diffInstance);
            diffInstance.instance2();
        }
    }

    private synchronized void instance(){
        SleepTools.second(3);
        System.out.println("synInstance is going..."+this.toString());
        SleepTools.second(3);
        System.out.println("synInstance ended "+this.toString());
    }

    private synchronized void instance2(){
        SleepTools.second(3);
        System.out.println("synInstance2 is going..."+this.toString());
        SleepTools.second(3);
        System.out.println("synInstance2 ended "+this.toString());
    }

    public static void main(String[] args) {
        DiffInstance instance1 = new DiffInstance();
        Thread t3 = new Thread(new Instance2Syn(instance1));
        DiffInstance instance2 = new DiffInstance();
        Thread t4 = new Thread(new InstanceSyn(instance1));
        //先执行完一个才会执行另外一个
        t3.start();
        t4.start();
        SleepTools.second(1);
    }
}

/**
 *类说明:演示实例锁和类锁是不同的,两者可以并行
 */
public class InstanceAndClass {
	
    private static class SynClass extends Thread{
        @Override
        public void run() {
            System.out.println("TestClass is running...");
            synClass();
        }
    }

    private static class InstanceSyn implements Runnable{
        private InstanceAndClass SynClassAndInstance;

        public InstanceSyn(InstanceAndClass SynClassAndInstance) {
            this.SynClassAndInstance = SynClassAndInstance;
        }

        @Override
        public void run() {
            System.out.println("TestInstance is running..."+SynClassAndInstance);
            SynClassAndInstance.instance();
        }
    }

    private synchronized void instance(){
        SleepTools.second(1);
        System.out.println("synInstance is going..."+this.toString());
        SleepTools.second(1);
        System.out.println("synInstance ended "+this.toString());
    }

    private static synchronized void synClass(){
        SleepTools.second(1);
        System.out.println("synClass going...");
        SleepTools.second(1);
        System.out.println("synClass end");
    }

    public static void main(String[] args) {
        InstanceAndClass synClassAndInstance = new InstanceAndClass();
        Thread t1 = new SynClass();
        Thread t2 = new Thread(new InstanceSyn(synClassAndInstance));
        t2.start();
        SleepTools.second(1);
        t1.start();
    }
}

/**
 *类说明:类锁和锁static变量也是不同的 可以并行 
 */
public class StaticAndClass {
	
    private static class SynClass extends Thread{
        @Override
        public void run() {
            System.out.println(currentThread().getName()
                    +":SynClass is running...");
            synClass();
        }
    }

    private static class SynStatic extends Thread{
        @Override
        public void run() {
            System.out.println(currentThread().getName()
                    +"SynStatic is running...");
            synStatic();
        }
    }

    private static synchronized void synClass(){
        System.out.println(Thread.currentThread().getName()
                +"synClass going...");
        SleepTools.second(1);
        System.out.println(Thread.currentThread().getName()
                +"synClass end");
    }

    private static Object obj = new Object();
    private static void synStatic(){
        synchronized (obj){
            System.out.println(Thread.currentThread().getName()
                    +"synStatic going...");
            SleepTools.second(1);
            System.out.println(Thread.currentThread().getName()
                    +"synStatic end");
        }
    }

    public static void main(String[] args) {
        StaticAndClass synClassAndInstance = new StaticAndClass();
        Thread t1 = new SynClass();
        //Thread t2 = new SynStatic();
        Thread t2 = new SynClass();
        t2.start();
        SleepTools.second(1);
        t1.start();
    }
}

错误的加锁和原因分析

原因:虽然我们对 i 进行了加锁,但是

但是当我们反编译这个类的 class 文件后,可以看到 i++实际是, 本质上是返回了一个新的 Integer 对象。也就是每个线程实际加锁的是不同的 Integer 对象。

volatile,最轻量的同步机制

volatile 保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

不加 volatile 时,子线程无法感知主线程修改了 ready 的值,从而不会退出循环,而加了 volatile 后,子线程可以感知主线程修改了 ready 的值,迅速退出循环。

/**
 * 类说明:演示Volatile的提供的可见性
 */
public class VolatileCase {
    private volatile static boolean ready;
    private static int number;

    //
    private static class PrintThread extends Thread{
        @Override
        public void run() {
            System.out.println("PrintThread is running.......");
            while(!ready);//无限循环
            System.out.println("number = "+number);
        }
    }

    public static void main(String[] args) {
        new PrintThread().start();
        SleepTools.second(1);
        number = 51;//如果没有加volatile关键字则主线程都结束了也没有打印number的值,加了关键值后打印出来的值就是主线程修改的值
        ready = true;
        SleepTools.second(5);
        System.out.println("main is ended!");
    }
}

但是 volatile 不能保证数据在多个线程下同时写时的线程安全。

/**
 * 类说明:
 */
public class NotSafe {
    private volatile long count =0;

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    //count进行累加
    public void incCount(){
        count++;
    }

    //线程
    private static class Count extends Thread{

        private NotSafe simplOper;

        public Count(NotSafe simplOper) {
            this.simplOper = simplOper;
        }

        @Override
        public void run() {
            for(int i=0;i<10000;i++){
                simplOper.incCount();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        NotSafe simplOper = new NotSafe();
        //启动两个线程
        Count count1 = new Count(simplOper);
        Count count2 = new Count(simplOper);
        count1.start();
        count2.start();
        Thread.sleep(50);
        System.out.println(simplOper.count);//20000?
    }
}

volatile 最适用的场景:一个线程写,多个线程读。

 

线程间的协作

线程之间相互配合,完成某项工作,比如:一个线程修改了一个对象的值,而另一个线程感知到了变化,然后进行相应的操作,整个过程开始于一个线程,而最终执行又是另一个线程。前者是生产者,后者就是消费者,这种模式隔离了“做什么”(what)和“怎么做”(How),简单的办法是让消费者线程不断地循环检查变量是否符合预期在 while 循环中设置不满足的条件,如果条件满足则退出 while 循环,从而完成消费者的工作。却存在如下问题: 1) 难以确保及时性。 2)难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。

等待/通知机制

是指一个线程 A 调用了对象 O 的 wait()方法进入等待状态,而另一个线程 B调用了对象 O 的 notify()或者 notifyAll()方法,线程 A 收到通知后从对象 O 的 wait()方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait()和notify/notifyAll()的关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。

notify(): 通知一个在对象上等待的线程,使其从 wait 方法返回,而返回的前提是该线程获取到了对象的锁,没有获得锁的线程重新进入 WAITING 状态。

notifyAll(): 通知所有等待在该对象上的线程

wait() 调用该方法的线程进入 WAITING 状态,只有等待另外线程的通知或被中断才会返回.需要注意,调用 wait()方法后,会释放对象的锁

wait(long) 超时等待一段时间,这里的参数时间是毫秒,也就是等待长达 n 毫秒,如果没有通知就超时返回

wait (long,int) 对于超时时间更细粒度的控制,可以达到纳秒

等待和通知的标准范式

等待方遵循如下原则。

1)获取对象的锁。

2)如果条件不满足,那么调用对象的 wait()方法,被通知后仍要检查条件。

3)条件满足则执行对应的逻辑。

通知方遵循如下原则。

1)获得对象的锁。

2)改变条件。

3)通知所有等待在对象上的线程。

在调用 wait()、notify()系列方法之前,线程必须要获得该对象的对象级别锁,即只能在同步方法或同步块中调用 wait()方法、notify()系列方法,进入 wait()方法后,当前线程释放锁,在从 wait()返回前,线程与其他线程竞争重新获得锁,执行 notify()系列方法的线程退出调用了 notifyAll 的 synchronized代码块的时候后,他们就会去竞争。如果其中一个线程获得了该对象锁,它就会继续往下执行,在它退出 synchronized 代码块,释放锁后,其他的已经被唤醒的线程将会继续竞争获取该锁,一直进行下去,直到所有被唤醒的线程都执行完毕。

notify 和 notifyAll 应该用谁

尽可能用 notifyall(),谨慎使用 notify(),因为 notify()只会唤醒一个线程,我们无法确保被唤醒的这个线程一定就是我们需要唤醒的线程

代码示例:

/**
 *类说明:快递实体类
 */
public class Express {
    public final static String CITY = "ShangHai";
    private int km;/*快递运输里程数*/
    private String site;/*快递到达地点*/

    public Express() {
    }

    public Express(int km, String site) {
        this.km = km;
        this.site = site;
    }

    /* 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理*/
    public synchronized void changeKm(){
        this.km = 101;
        notify();
    }

    /* 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理*/
    public  synchronized  void changeSite(){
        this.site = "BeiJing";
        notifyAll();
    }

    /*线程等待公里的变化*/
    public synchronized void waitKm(){
        while(this.km<100){
            try {
                wait();
                System.out.println("Check Site thread["
                                +Thread.currentThread().getId()
                        +"] is be notified");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("the Km is "+this.km+",I will change db");
    }

    /*线程等待目的地的变化*/
    public synchronized void waitSite(){
        while(this.site.equals(CITY)){//快递到达目的地
            try {
                wait();
                System.out.println("Check Site thread["+Thread.currentThread().getId()
                		+"] is be notified");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("the site is "+this.site+",I will call user");
    }
}
/**
 *类说明:测试wait/notify/notifyAll
 */
public class TestWN {
    private static Express express = new Express(0,Express.CITY);

    /*检查里程数变化的线程,不满足条件,线程一直等待*/
    private static class CheckKm extends Thread{
        @Override
        public void run() {
        	express.waitKm();
        }
    }

    /*检查地点变化的线程,不满足条件,线程一直等待*/
    private static class CheckSite extends Thread{
        @Override
        public void run() {
        	express.waitSite();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for(int i=0;i<3;i++){
            new CheckSite().start();
        }
        for(int i=0;i<3;i++){
            new CheckKm().start();
        }

        Thread.sleep(1000);
        express.changeKm();//快递地点变化
    }
}

等待超时模式实现一个连接池

调用场景:调用一个方法时等待一段时间(一般来说是给定一个时间段),如果该方法能够在给定的时间段之内得到结果,那么将结果立刻返回,反之,超时返回默认结果 假设等待时间段是 T,那么可以推断出在当前时间 now+T 之后就会超时 等待持续时间:REMAINING=T。 超时时间:FUTURE=now+T。

/**
 *类说明:连接池的实现
 */
public class DBPool {

    /*容器,存放连接*/
    private static LinkedList<Connection> pool = new LinkedList<Connection>();

    /*限制了池的大小=20*/
    public DBPool(int initialSize) {
        if (initialSize > 0) {
            for (int i = 0; i < initialSize; i++) {
                pool.addLast(SqlConnectImpl.fetchConnection());
            }
        }
    }

    /*释放连接,通知其他的等待连接的线程*/
    public void releaseConnection(Connection connection) {
        if (connection != null) {
            synchronized (pool){
                pool.addLast(connection);
                //通知其他等待连接的线程
                pool.notifyAll();
            }
        }
    }

    /*获取*/
    // 在mills内无法获取到连接,将会返回null 1S
    public Connection fetchConnection(long mills)
            throws InterruptedException {
        synchronized (pool){
            //永不超时
            if(mills<=0){
                while(pool.isEmpty()){
                    pool.wait();
                }
                return pool.removeFirst();
            }else{
                /*超时时刻*/
                long future = System.currentTimeMillis()+mills;
                /*等待时长*/
                long remaining = mills;
                while(pool.isEmpty()&&remaining>0){
                    pool.wait(remaining);
                    /*唤醒一次,重新计算等待时长*/
                    remaining = future-System.currentTimeMillis();
                }
                Connection connection = null;
                if(!pool.isEmpty()){
                    connection = pool.removeFirst();
                }
                return connection;
            }
        }

    }
}
/**
 *类说明:
 */
public class DBPoolTest {
    static DBPool pool  = new DBPool(10);
    // 控制器:控制main线程将会等待所有Woker结束后才能继续执行
    static CountDownLatch end;

    public static void main(String[] args) throws Exception {
    	// 线程数量
        int threadCount = 50;
        end = new CountDownLatch(threadCount);
        int count = 20;//每个线程的操作次数
        AtomicInteger got = new AtomicInteger();//计数器:统计可以拿到连接的线程
        AtomicInteger notGot = new AtomicInteger();//计数器:统计没有拿到连接的线程
        for (int i = 0; i < threadCount; i++) {
            Thread thread = new Thread(new Worker(count, got, notGot), 
            		"worker_"+i);
            thread.start();
        }
        end.await();// main线程在此处等待
        System.out.println("总共尝试了: " + (threadCount * count));
        System.out.println("拿到连接的次数:  " + got);
        System.out.println("没能连接的次数: " + notGot);
    }

    static class Worker implements Runnable {
        int           count;
        AtomicInteger got;
        AtomicInteger notGot;

        public Worker(int count, AtomicInteger got,
                               AtomicInteger notGot) {
            this.count = count;
            this.got = got;
            this.notGot = notGot;
        }

        public void run() {
            while (count > 0) {
                try {
                    // 从线程池中获取连接,如果1000ms内无法获取到,将会返回null
                    // 分别统计连接获取的数量got和未获取到的数量notGot
                    Connection connection = pool.fetchConnection(1000);
                    if (connection != null) {
                        try {
                            connection.createStatement();
//                            PreparedStatement preparedStatement
//                                    = connection.prepareStatement("");
//                            preparedStatement.execute();
                            connection.commit();
                        } finally {
                            pool.releaseConnection(connection);
                            got.incrementAndGet();
                        }
                    } else {
                        notGot.incrementAndGet();
                        System.out.println(Thread.currentThread().getName()
                        		+"等待超时!");
                    }
                } catch (Exception ex) {
                } finally {
                    count--;
                }
            }
            end.countDown();
        }
    }
}
/**
 *类说明:
 */
public class SqlConnectImpl implements Connection{
	
	/*拿一个数据库连接*/
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }
    .........

客户端获取连接的过程被设定为等待超时的模式,也就是在 1000 毫秒内如果无法获取到可用连接,将会返回给客户端一个 null。设定连接池的大小为 10个,然后通过调节客户端的线程数来模拟无法获取连接的场景。 它通过构造函数初始化连接的最大上限,通过一个双向队列来维护连接,调用方需要先调用 fetchConnection(long)方法来指定在多少毫秒内超时获取连接,当连接使用完成后,需要调用 releaseConnection(Connection)方法将连接放回线程池


网站公告

今日签到

点亮在社区的每一天
去签到