04 线程间通信.txt

UP 返回
★本节对应视频29-44

1 wait和notify
  1.1 代码:
	public class Demo {
		private volatile int signal;
		public void set(int value) {
			this.signal = value;
		}
		public int get() {
			return signal;
		}
	
		public static void main(String[] args) {
			Demo demo = new Demo();
			new Thread(new Runnable() {
				@Override
				public void run() {
					synchronized (demo) {				// notify方法需要放到同步代码块中才可以
						System.out.println("修改状态的线程执行...");
						try {
							Thread.sleep(5000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
						demo.set(1);
						System.out.println("状态修改成功...");
						demo.notify();// 如果不加调用的对象,直接使用notify则默认调用的实例是new Thread的
					}
				}
			}).start();
	
			new Thread(new Runnable() {
				@Override
				public void run() {
					synchronized (demo) {				// wait方法需要放到同步代码块中才可以
						while (demo.get() != 1) {
							try {
								demo.wait();
							} catch (InterruptedException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
							}
						}
						System.out.println("模拟代码执行...");
					}
				}
			}).start();
		}
	}

  1.2 现在将设置和获取signal值的线程单独摘出来:
	■Demo2类
	public class Demo2 {
		private volatile int signal;
	
		public synchronized void set() {
			signal = 1;
			notifyAll();// ▶唤醒所有的处于wait的线程争夺时间片。
			// ▶notify会随机唤醒一个处于wait的线程
	
			//▶ 唤醒方法会等当前方法完全执行完毕释放锁以后,其他等待的线程才能拿到锁去执行
			System.out.println("开始唤醒休眠的线程...");
			try {
				Thread.sleep(3000);
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	
		public synchronized int get() {
			System.out.println(Thread.currentThread().getName() + " 执行了... ");
			if (signal != 1) {
				try {
					wait();// ▶调用wait方法会释放锁
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			System.out.println(Thread.currentThread().getName() + " 方法执行完毕... ");
			return signal;
		}
	
		public static void main(String[] args) {
			Demo2 demo2 = new Demo2();
			Target1 target1 = new Target1(demo2);
			Target2 target2 = new Target2(demo2);
	
			new Thread(target2).start();
			new Thread(target2).start();
			new Thread(target2).start();
			new Thread(target2).start();// ▶这四个线程都会进入get方法。因为他们都卡在wait处而释放锁,但是并不会继续往下执行
	
			try {
				TimeUnit.SECONDS.sleep(1);// ▶▶▶同样可以实现线程休眠1s
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			new Thread(target1).start();
		}
	}

	■设置值线程
	public class Target1 implements Runnable {
		private Demo2 demo;
	
		public Target1(Demo2 demo) {
			this.demo = demo;
		}
	
		public void run() {
			demo.set();
		}
	}

	■获取值线程
	public class Target2 implements Runnable {
		private Demo2 demo;
	
		public Target2(Demo2 demo) {
			this.demo = demo;
		}
	
		public void run() {
			demo.get();
		}
	}

2 Condition  
  2.1 使用代码:
	public class Demo4 {
	
		private int signal;
	
		Lock lock = new ReentrantLock();
		Condition a = lock.newCondition();  //Lock的内部类
		Condition b = lock.newCondition();
		Condition c = lock.newCondition();
	
		public static void main(String[] args) {
	
			Demo4 demo = new Demo4();
	
			A a = new A(demo);
			B b = new B(demo);
			C c = new C(demo);
	
			new Thread(a).start();
			new Thread(b).start();
			new Thread(c).start();
	
		}
	
		public void a() {
			lock.lock();
			while (signal != 0) {
				try {
					a.await();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			System.out.println("a");
			signal++;
			b.signal();
			lock.unlock();
		}
	
		public void b() {
			lock.lock();
			while (signal != 1) {
				try {
					b.await();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			System.out.println("b");
			signal++;
			c.signal();
			lock.unlock();
		}
	
		public void c() {
			lock.lock();
			while (signal != 2) {
				try {
					c.await();
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			System.out.println("c");
			signal = 0;
			a.signal();
			lock.unlock();
		}
	
	}
	
	class A implements Runnable {
	
		private Demo4 demo;
	
		public A(Demo4 demo) {
			this.demo = demo;
		}
	
		@Override
		public void run() {
			while (true) {
				demo.a();
				try {
					Thread.sleep(1000);
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	class B implements Runnable {
	
		private Demo4 demo;
	
		public B(Demo4 demo) {
			this.demo = demo;
		}
	
		@Override
		public void run() {
			while (true) {
				demo.b();
				try {
					Thread.sleep(1000);
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	
	class C implements Runnable {
	
		private Demo4 demo;
	
		public C(Demo4 demo) {
			this.demo = demo;
		}
	
		@Override
		public void run() {
			while (true) {
				demo.c();
				try {
					Thread.sleep(1000);
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}

	wait和notify相当于一只在维护一个等待队列,而每一个condition都会有自己的等待队列,所以这点要比wait

2-1 自定义数据库连接池
	public class MyDataSource {
	
		private LinkedList<Connection> pool = new LinkedList<Connection>();
	
		private static final int MAX_CONNECTION = 10;
	
		private static final String DRIVER_CLASS = "";
	
		private static final String USER = "";
	
		private static final String PASSWORD = "";
	
		private static final String URL = "";
	
		private Lock lock = new ReentrantLock();
		private Condition c1 = lock.newCondition();
	
		static {
			try {
				Class.forName("com.mysql.jdbc.Driver");
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	
		public MyDataSource() {
			try {
				Connection connection = DriverManager.getConnection(URL, USER, PASSWORD);
				pool.addLast(connection);
			} catch (SQLException e) {
				e.printStackTrace();
			}
	
		}
	
		public Connection getConnect() {
			Connection conn = null;
			lock.lock();
			try {
				// synchronized (pool) {
				while (pool.size() <= 0) {
					try {
						// wait();
						c1.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				if (pool.isEmpty()) {
					conn = pool.removeFirst();
				}
				// }
				return conn;
			} finally {
				lock.unlock();
			}
		}
	
		public void release(Connection connection) {
			if (connection != null) {
				lock.lock();
				// synchronized (pool) {
				try {
					pool.addLast(connection);
					// notifyAll();
					c1.signal();
				} finally {
					lock.unlock();
				}
				// }
			}
		}
	}

3 线程的加塞
  3.1 代码     目的是让加塞线程先执行完再执行调用线程
	public class Demo {
	
		public void a(Thread joinThread) {
			System.out.println("方法a执行...");
			joinThread.start();
			try {
				joinThread.join();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("方法a执行完毕");
		}
	
		public void b() {
			System.out.println("加塞线程执行...");
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println("加塞线程执行完毕");
		}
	
		public static void main(String[] args) {
			Demo demo = new Demo();
			Thread joinThread = new Thread(new Runnable() {
				@Override
				public void run() {
					demo.b();
				}
			});
			new Thread(new Runnable() {
				@Override
				public void run() {
					demo.a(joinThread);
				}
			}).start();
		}
	}

  3.2 实现原理
	join方法里面有这样一段代码:
	if (millis == 0) {					//join()直接调用的join(0),则此处无限等待
            while (isAlive()) {			//等待的条件就是  只要加塞线程还活着(加塞线程开始后只要没死亡都是活着的)
                wait(0);					//wait方法锁的的是synchronized的对象,故哪个线程调用了该方法锁的就是这个线程,于是就使调用线程一直等待
            }
        } 
	加塞线程执行完毕以后会调用自身的notifyAll方法来唤醒调用线程

4 ThreadLocal		使每一个线程都保有一个自身维护的变量
	public class TestThreadLocal {
	
		private ThreadLocal<Integer> count = new ThreadLocal<Integer>() {		//每一个线程都会有一个count属性。后面那个是用来初始化的方法
			@Override
			protected Integer initialValue() {
				return new Integer(0);
			}
		};
	
		public int getNext() {
			Integer value = count.get();
			value++;
			count.set(value);
			return value;
		}
	
		public static void main(String[] args) {							//两个线程中的value是互不影响的
			TestThreadLocal local = new TestThreadLocal();
			new Thread(new Runnable() {
				@Override
				public void run() {
					while (true) {
						System.out.println(Thread.currentThread().getName() + " " + local.getNext());
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}).start();
	
			new Thread(new Runnable() {
				@Override
				public void run() {
					while (true) {
						System.out.println(Thread.currentThread().getName() + " " + local.getNext());
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							e.printStackTrace();
						}
					}
				}
			}).start();
		}
	}
	
5 并发工具类
  5.1  CountDownLatch		共享锁
	以下代码是模拟读取文件,每一行分别用一个线程计算数字的和并放入数组,等所有行的线程计算结束以后再把最终的总和算出来
	public class TestCountDownLatch {
	
		private int[] nums;
	
		public TestCountDownLatch(int line) {
			nums = new int[line];
		}
	
		public void calc(String line, int index, CountDownLatch latch) {
			String[] nus = line.split(",");
			int total = 0;
			for (String num : nus) {
				total += Integer.parseInt(num);
			}
			nums[index] = total;
			System.out.println(Thread.currentThread().getName() + " 执行计算任务 " + line + " 结果为 " + total);
			latch.countDown();				//■ 每执行一次计数器减1
		}
	
		public void sum() {
			System.out.println("汇总线程执行..." + Thread.currentThread().getName());
			int total = 0;
			for (int i = 0; i < nums.length; i++) {
				total += nums[i];
			}
			System.out.println("最终结果为 : " + total);
		}
	
		public static void main(String[] args) {
			List<String> contents = readFile();
			int lineCount = contents.size();
	
			CountDownLatch latch = new CountDownLatch(lineCount);			// ■初始化计数器,要给定自减的数
	
			TestCountDownLatch test = new TestCountDownLatch(lineCount);
			for (int i = 0; i < lineCount; i++) {
				final int j = i;
				new Thread(new Runnable() {
					@Override
					public void run() {
						test.calc(contents.get(j), j, latch);
					}
				}).start();
			}
	
			try {
				latch.await();						//■ 计数器直到为0时才会执行后面的语句
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
	
			// 如果把CountDownLatch去掉,也可以使用下面这种方法来达到目的。但是如果执行过程非常复杂,这里的自旋就很耗性能
	//		while (Thread.activeCount() > 1) {
	//			// 如果线程个数超过1就一直自旋等待
	//		}
			test.sum();
		}
	
		private static List<String> readFile() {// 模拟读文件
			List<String> contents = new ArrayList<>();
			contents.add("20,30,68,9,51");
			contents.add("17,2,-9");
			contents.add("29,77,56,998,223,0,2");
			contents.add("1,2,3,4,5,6,7,8,9");
			return contents;
		}
	}

  5.2 CyclicBarrier			
 	以下代码模拟的是10个人开会,先来的人就await等待,直到10个人来齐了则会议开始
	public class TestCyclicBarrier {
	
		Random random = new Random();
	
		public void meeting(CyclicBarrier barrier) {
			try {
				Thread.sleep(random.nextInt(4000));
			} catch (InterruptedException e1) {
				e1.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName() + " 到达会议室,等待开会");
			// ★如果在await()方法之前抛出异常,到不了这一句的话,那CyclicBarrier中指定的线程也不会到达执行
			//如果其中一个线程在这里调用了interrupt,那么会传递给所有的线程(待理解,参视频41)
			//barrier.reset()方法需要慎用,防止有些线程可能永远等待中而无法被唤醒(即reset以后又有线程进入了await方法,参视频41)
			try {
				barrier.await();			//■每当一个线程await,计数就加1
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	
		public static void main(String[] args) {
			TestCyclicBarrier test = new TestCyclicBarrier();
			CyclicBarrier barrier = new CyclicBarrier(12, new Runnable() {
				@Override
				public void run() {
					//■ 当await的线程数量达到指定的数字10时,就会执行CyclicBarrier指定的线程
					System.out.println("准备就绪,开始开会...");
				}
			});
			for (int i = 0; i < 10; i++) {
				new Thread(new Runnable() {
					@Override
					public void run() {
						test.meeting(barrier);
					}
				}).start();
			}
		}
	}

  5.3  Semaphore			共享锁
	以下代码模拟的是不断创建线程,但是同时最多只允许10个线程执行,以减缓cpu分配给他的资源
	public class TestSemaphore {
		public void method(Semaphore semaphore) {
			try {
				semaphore.acquire();			//■获取。每获取一次计数加1
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName() + " is run...");
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			semaphore.release();				//■释放。每释放一次计数减1,即相当于一个线程执行完毕
		}
	
		public static void main(String[] args) {
			TestSemaphore test = new TestSemaphore();
			Semaphore semaphore = new Semaphore(10);		//■初始化一个Semaphore,表示同时只可以有10个线程执行
			while (true) {						//不断创建线程
				new Thread(new Runnable() {
					@Override
					public void run() {
						test.method(semaphore);
						try {
							Thread.sleep(1000);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}).start();
			}
		}
	}

  5.4 Exchanger
	以下代码模拟的是,a方法在抓取数据,a抓取成功以后经过处理将结果res进行交换;b方法也在抓取数据,同时会一直等到exch.exchange方法拿到了其他线程传递的结果,再将结果同自己处理的数据作比较。这样就达到了两个线程都完成了某一件事才继续进行其他操作的目的
	public class TestExchanger {
	
		public void a(Exchanger<String> exch) {
			System.out.println("a方法执行...");
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			String res = "12345";
			try {
				System.out.println("等待对比结果...");
				exch.exchange(res);				//■Exchanger可以将参数res返回给别的线程以供使用
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	
		public void b(Exchanger<String> exch) {
			System.out.println("b方法执行...");
			try {
				Thread.sleep(4000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			String res = "12345";
			try {
				String value = exch.exchange(res);	//■通过Exchanger获取其他线程传递的数据
				System.out.println("开始进行比对...");
				System.out.println("比对结果为 :" + value.equals(res));
				exch.exchange(res);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	
		public static void main(String[] args) {
			TestExchanger test = new TestExchanger();
			Exchanger<String> exch = new Exchanger<>();
			new Thread(new Runnable() {
				@Override
				public void run() {
					test.a(exch);
				}
			}).start();
			new Thread(new Runnable() {
				@Override
				public void run() {
					test.b(exch);
				}
			}).start();
		}
	}

6 Future
	public class Demo {
		public static void main(String[] args) throws Exception {
			Callable<Integer> call = new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					System.out.println("正在计算结果...");
					Thread.sleep(1000);
					return 1;
				}
			};
	
			FutureTask<Integer> task = new FutureTask<Integer>(call);
	
			Thread thread = new Thread(task);
			thread.start();
	
			System.out.println("do something");
			Integer result = task.get();
			System.out.println("结果为" + result);
		}
	}
	
	Callable和Runnable的区别:Runnable的run方法是被线程调用的,在run方法是异步执行的;Callable的call方法不是异步执行的,是由Future的run方法调用的


















DOWN 返回