`

java多线程编程

    博客分类:
  • j2se
 
阅读更多

线程和进程:

1) 每个进程都有自己独立的代码和数据空间,进程之间切换有较大的开销

2)线程可以看成是轻量的进程,同一类的线程共享程序代码和数据空间,每个线程都有独立的运行栈和程序计数器(PC),线程之间切换的开销比较小

3)进程和线程的根本区别在于:进程是资源分配的基本单位,线程是程序调度和执行的基本单位

4)进程:操作系统中能够同时执行多个任务(进程)

5) 线程:在一个应用程序中有的多条顺序执行的路线

6)没有线程的进程可以称之为单一的线程,如果有多个线程的进程,程序的执行不是一条线(线程),而是有多条线(线程)共同完成的

7)系统在执行进程的时候,为不同进程分配不同的内存区域;而不会为不同的线程分配内存资源(共享所属进程的内存资源),也就是说,除了CPU 资源之外,计算机中的软硬件和线程无关

传统多线程:

1) 传统创建线程的方式有两种:继承Thread类和实现Runnable接口

 

/**
 * 传统创建线程的2种方式
 * @author 
 *
 */
public class TraditionThreadTest {

	public static void main(String[] args){
		/**
		 * Thread 的子类
		 */
		Thread t1 = new Thread(){
			public void run(){
				while(true){
					try {
						Thread.sleep(500);
						System.out.println("1:"+Thread.currentThread().getName());
						System.out.println("1:"+this.getName()+":***");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		};
		t1.start();
		
		/**
		 *实现Runnable接口
		 */
		Thread t2 = new Thread(new Runnable(){
			@Override
			public void run(){
				while(true){
					try {
						Thread.sleep(500);
						System.out.println("2:"+Thread.currentThread().getName());
						//System.out.println("1:"+this.getName()+":***");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		});
		t2.start();
		
		/**
		 * 运行的是Thread子类中的run()方法
		 * 而不是Runnable中的run()方法
		 */
		new Thread(new Runnable(){
			@Override
			public void run() {
				while(true){
					try {
						Thread.sleep(500);
						System.out.println("Runnable:"+Thread.currentThread().getName());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}}){
			public void run(){
				while(true){
					try {
						Thread.sleep(500);
						System.out.println("Thread:"+Thread.currentThread().getName());
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			};
		}.start();
	}
}

2)线程同步synchronized

 

/**
 * 关于传统线程互斥  使用synchronized 关键字
 * 要进行互斥效果的时候,必须使用同一个锁对象
 * @author 
 *
 */
public class SynchronizedTraditional {
//	private static int a = 0; 
	public static void main(String[] args){
		new SynchronizedTraditional().init();
	}
	
	public void init(){
		 final Outputer outputer = new Outputer(); 
		//outputer.output1("");
		new Thread(new Runnable(){
			@Override
			public void run(){
				while(true){
					try {
						Thread.sleep(10);
						outputer.output1("Jacking");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				
			}
		}).start();
		new Thread(new Runnable(){
			@Override
			public void run(){
				while(true){
					try {
						Thread.sleep(10);
						//outputer.output2("liujing");
						outputer.output3("liujing");
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}
		}).start();
	}
	/**
	 * 非内部类 可以调用外部类 的所有 非静态成员
	 * @author 
	 *
	 */
	 static class Outputer {
		/**
		 * 方式一
		 * 代码段上加入synchronized 
		 * 如果不加synchronized 就会出现“
		 * 混乱 的结果
		 * @param name
		 */
		private void output1(String name){
			//System.out.println(a);
			int length = name.length();
			synchronized (/*this*/Outputer.class) {
				for(int i=0;i<length;i++){
					System.out.print(name.charAt(i));
				}
				System.out.println();
			}
		}
		
		/**
		 * 方法二
		 * 方法前加入synchronized ,和方法一可以互斥,
		 * 因为是使用的都是 同一把锁(this 对象)
		 * @param name
		 */
		synchronized private void output2(String name){
			int length = name.length();
				for(int i=0;i<length;i++){
					System.out.print(name.charAt(i));
					System.out.println();
			}
		}
		
		/**
		 * 方法三
		 * 静态方法前加入synchronized ,和方法一可以互斥,
		 * 因为是使用的都是 同一把锁(Outputer.class  字节码对象)
		 * @param name
		 */
		 static synchronized private void output3(String name){
			int length = name.length();
				for(int i=0;i<length;i++){
					System.out.print(name.charAt(i));
				}
				System.out.println();
		}
	}
}

3)线程通信notify()

 

/**
 * 实现主线程运行50次 执行50次,
 * 子线程运行20次 执行20次,之后在交替执行
 * 一共交替执行
 * @author 
 *
 */
public class TraditionanCommunication {

	public static void main(String[] args)
	{
		final Bussiness bussiness = new Bussiness();
		/*
		 * 子线程
		 */
		new Thread(new Runnable(){
			@Override
			public void run(){
				for(int i=1;i<20;i++){
					bussiness.sub(i);
				}
			}
		}).start();
		
		/*
		 * 主线程
		 */
		for(int i=1;i<=50;i++){
			bussiness.main(i);
		}
	}
}

/**
 * 业务类
 * @author 
 *
 */
class Bussiness{
	private boolean beShouldSub = true;
	synchronized public void sub(int i){
		//使用while,替代if 可以避免被假唤醒的情况
		while(!beShouldSub){
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		for(int j=1;j<=20;j++){
			System.out.println("sub "+i+"的sequence" +j);
		}
		beShouldSub = false;
		this.notify();
	}
	synchronized public void main(int i){
		//使用while,替代if 可以避免被假唤醒的情况
		while(beShouldSub){
			try {
				this.wait();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		for(int j=1;j<=100;j++){
			System.out.println("main "+i+"的sequence" +j);
		}
		beShouldSub = true;
		this.notify();
	}
}

4)共享变量

 

/**
 * 我的线程共享变量测试
 * 关于一道面试题 
 * 设计两个线程,一个变量 j ,一个线程变量对j  加  1 ,
 * 一个线程对变量  j 减 1
 * @author
 *
 */
public class MyThreadSVTest {
	
	private static MyShareData data = new MyShareData();
	private int j=10;//线程共享数据
	public static void main(String[] args){
//		new Thread(new Runnable(){
//			public void run(){
//				data.dec();
//			}
//		}).start();
//		new Thread(new Runnable(){
//			public void run(){
//				data.inc();
//			}
//		}).start();
		MyThreadSVTest mt =  new MyThreadSVTest();
		new Thread(mt.new DecThread()).start();
		new Thread(mt.new IncThread()).start();
	}
	
	public synchronized void inc(){
		for(int i=0;i<10;i++){
			j++;
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()+" "+j);
		}
	}
	
	public synchronized void dec(){
		for(int i=0;i<10;i++){
			j--;
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()+" "+j);
		}
	}
	
	/**
	 * 实现加 1 操作的线程
	 * @author 
	 *
	 */
	class IncThread implements Runnable{

		@Override
		public void run() {
			inc();
		}
		
	}
	
	/**
	 * 实现减1 操作的线程
	 * @author 
	 */
	class DecThread implements Runnable{

		@Override
		public void run() {
			dec();
		}
		
	}

}

/**
 * 作为线程共享的外部类
 * @author 
 *
 */
class MyShareData{
	int j =9;;
	
	/**
	 * 对j 加 1 的操作
	 */
	public synchronized void inc(){
		while(true){
			j--;
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()+" "+ j);
		}
	}
	
	/**
	 *对j 减 1 的操作
	 */
	public void dec(){
		while(true){
			j++;
			try {
				Thread.sleep(2000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()+" "+ j);
		}
	}
}

5)定时器的使用

 

/**
 * 传统的定时器
 * @author 
 *
 */
public class TimmerTraditionTest {
	private static int count =0;
	public static void main(String[] args){
		/**
		 * 实现 1秒之后炸,之后,每隔2秒炸一次
		 */
//		new Timer("第一个定时器").schedule(new TimerTask() {
//			@Override
//			public void run() {
//				System.out.println("bomb……");
//			}
//		}, 1000,2000);	
		/**
		 * 实现 2秒之后炸,之后,4秒炸一次
		 * 之后在循环以上的操作
		 */
		class MyTimerTask extends TimerTask{
			@Override
			public void run() {
				count =(count+1)%2;
				System.out.println("bomb……");
				new Timer().schedule(new MyTimerTask(), 2000+2000*count);
			}			
		}
		new Timer("第二个定时器").schedule(new MyTimerTask(),2000);
		while(true){
			System.out.println(new Date().getSeconds());
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

 补充:使用spring中的quartz实现复杂的任务调度,比如:实现实现,周一到周五收发邮件。而周六、周日休息这样复杂的任务调度逻辑

java.util.concurrent.* 包多线程高级编程:

1) 锁(Lock),读写锁(ReentrantReadWriteLock)

Lock 比传统的synchronized 更加面向对象,和我们生活中的锁的概念一致,是一个对象;所以,两个线程要实现互斥的效果,必须是由同一个锁对象去控制;对于读写锁,多个读锁不互斥,读锁和写锁互斥,写锁和写锁互斥,是由JVM 控制的。我们只需要应用好相应的锁就可以。下面利用读写锁,写一个缓存系统。

 

/**
 * 利用读写锁,设计一套缓存系统
 * @author 
 *
 */
public class CacheTest {
	private Map<String,Object>  map = new HashMap<String,Object>();
	private ReadWriteLock  rwl = new ReentrantReadWriteLock(); //读写锁
	public static void main(String[] args){
		
	}
	
	public Object getData(String key ){
		rwl.readLock().lock();
		try{
			Object value = null;
			if(value == null){
				rwl.readLock().unlock();
				rwl.writeLock().lock();
				try{
						if(value == null){
							value ="data"; //写人数据
							rwl.writeLock().unlock();
						}

				}finally{
					rwl.writeLock().unlock();
					rwl.readLock().lock();
				}
			}
		}finally{
			rwl.readLock().unlock();
		}
		return null;
	}
}

 

 在锁中,存在一个Condition ,去控制线程的状态,比如:在传统线程中通过 this.wait(),this.notify(),在Condition中,通过condition.await(),condtion.signal(),下面是利用Condition和读写锁,实现一套缓存系统:

 

/**
 * 缓存区中的共享变量
 * @author 
 *
 */
public class BoundBuffer {
	private static final Lock lock = new ReentrantLock();
	private static final Condition notFull = lock.newCondition();
	private static final Condition notEmpty = lock.newCondition();
	
	private static final Object[] items =  new Object[100];
	int putptr,takeptr,count;
	/**
	 * 向缓存区中 放入数据
	 * @param obj
	 * @throws InterruptedException
	 */
	public void put(Object obj) throws InterruptedException{
		lock.lock();
		try{
			while(count == items.length){
				notFull.await();
			}
			items[putptr]=obj;
			if(++putptr==items.length){
				putptr=0;
			}
			++count;
			notEmpty.signal();
		}finally{
			lock.unlock();
		}
	}
	/**
	 * 从共享区中取得数据
	 * @return
	 * @throws InterruptedException
	 */
	public Object get() throws InterruptedException{
		lock.lock();
		try{
			if(count == 0){
				notEmpty.await();
			}
			Object obj = items[takeptr];
			if(++takeptr == items.length){
				takeptr = 0;
			}
			--count;
			notFull.signal();
			return obj;
		}finally{
			lock.unlock();
		}
	}
}

2)线程池ExecutorService,Callable ,Future,CompleteService

线程池可以控制线程的数量

Callable:采用ExecutorService中的submit 提交该对象 ,Callable对象的call 方法放回一Future对象

Future:Future中取得的结果,必须和Callable放回的结果一致,通过泛型实现

CompleteService: 用于提交一组Callable对象,其take方法,返回完成任务Callable对应的Future对象,CompleteServicre的应用场景如:可以取得麦田中首先成熟的麦子

下面是对以上几个对象应用的程序代码:

 

/**
 * 线程池、Callable 、Future、CompleteService 的使用
 * @author 
 *
 */
public class CallableAndFutureTest {
	
	public static void main(String[] args){
		ExecutorService theadPool = Executors.newFixedThreadPool(10) ;
		Future<Object> future = theadPool.submit(new Callable<Object>(){
			@Override
			public Object call(){
				try {
					Thread.sleep(new Random().nextInt(5000));
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				return "hello";
			}
		});
		System.out.println("准备拿结果……");
		try {
			System.out.println(future.get());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		
		CompletionService<Object> completionServcie = new ExecutorCompletionService<Object>(theadPool);
		for(int i=1;i<=10;i++){
			final int j = i;
			completionServcie.submit(new Callable<Object>(){
				@Override
				public Object call(){
					try {
						Thread.sleep(new Random().nextInt(1000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					return "任务返回的结果是:"+j;
				}
			});
		}
		for(int i=1;i<=10;i++){
			;
			try {
				System.out.println(completionServcie.take().get());
			} catch (InterruptedException e) {
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		theadPool.shutdown();
	}
}

 

3)Semaphore

    使用Semaphore可以控制同时访问资源的线程的个数,并实现线程之间的同步。例如:我们可以实现对文件操作的线程数的控制

 

public class SemaphoreTest {
	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final  Semaphore sp = new Semaphore(3);
		for(int i=0;i<10;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						sp.acquire();
					} catch (InterruptedException e1) {
						e1.printStackTrace();
					}
					System.out.println("线程" + Thread.currentThread().getName() + 
							"进入,当前已有" + (3-sp.availablePermits()) + "个并发");
					try {
						Thread.sleep((long)(Math.random()*10000));
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println("线程" + Thread.currentThread().getName() + 
							"即将离开");					
					sp.release();
					//下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
					System.out.println("线程" + Thread.currentThread().getName() + 
							"已离开,当前已有" + (3-sp.availablePermits()) + "个并发");					
				}
			};
			service.execute(runnable);			
		}
	}
}

 4)其他工具类CyclicBarrier、CountDownLatch、Exchanger

CyclicBarrier:

表示大家到达集合点之后,先等待,直到全部到达了集合点之后在进行下一步的操作,分散的活动又在指定的地点集合再继续执行。描述场景:大家去郊游,等大家都到了,在上车出发去目的地,然后大家分散游玩,之后,在集合吃饭,在坐车回去

 

public class CyclicBarrierterTest {

	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final  CyclicBarrier cb = new CyclicBarrier(3);
		for(int i=0;i<3;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));						
						cb.await();
						
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));
						cb.await();	
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"即将到达集合地点3,当前已有" + (cb.getNumberWaiting() + 1) + "个已经到达," + (cb.getNumberWaiting()==2?"都到齐了,继续走啊":"正在等候"));						
						cb.await();						
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			};
			service.execute(runnable);
		}
		service.shutdown();
	}
}
 

 

CountDownLatch:

 

犹如倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当计数到达0时,

则所有等待者或单个等待者开始执行。这直接通过代码来说明CountDownLatch的作用,理解效果更直接。

可以实现一个人(也可以是多个人)等待其他所有人都来通知他,

可以实现一个人通知多个人的效果,类似裁判一声口令,运动员同时开始奔跑,

或者所有运动员都跑到终点后裁判才可以公布结果,用这个功能做百米赛跑的游戏程序不错哦!

还可以实现一个计划需要多个领导都签字后才能继续向下实施的情况。

 

public class CountDownLatchTest {

	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final CountDownLatch cdOrder = new CountDownLatch(1);
		final CountDownLatch cdAnswer = new CountDownLatch(3);		
		for(int i=0;i<3;i++){
			Runnable runnable = new Runnable(){
					public void run(){
					try {
						System.out.println("线程" + Thread.currentThread().getName() + 
								"正准备接受命令");						
						cdOrder.await();
						System.out.println("线程" + Thread.currentThread().getName() + 
						"已接受命令");								
						Thread.sleep((long)(Math.random()*10000));	
						System.out.println("线程" + Thread.currentThread().getName() + 
								"回应命令处理结果");						
						cdAnswer.countDown();						
					} catch (Exception e) {
						e.printStackTrace();
					}				
				}
			};
			service.execute(runnable);
		}		
		try {
			Thread.sleep((long)(Math.random()*10000));
		
			System.out.println("线程" + Thread.currentThread().getName() + 
					"即将发布命令");						
			cdOrder.countDown();
			System.out.println("线程" + Thread.currentThread().getName() + 
			"已发送命令,正在等待结果");	
			cdAnswer.await();
			System.out.println("线程" + Thread.currentThread().getName() + 
			"已收到所有响应结果");	
		} catch (Exception e) {
			e.printStackTrace();
		}				
		service.shutdown();
	}
}

 

 

Exchanger:

实现两线程之间数据交换

 

public class ExchangerTest {

	public static void main(String[] args) {
		ExecutorService service = Executors.newCachedThreadPool();
		final Exchanger exchanger = new Exchanger();
		service.execute(new Runnable(){
			public void run() {
				try {				

					String data1 = "zxx";
					System.out.println("线程" + Thread.currentThread().getName() + 
					"正在把数据" + data1 +"换出去");
					Thread.sleep((long)(Math.random()*10000));
					String data2 = (String)exchanger.exchange(data1);
					System.out.println("线程" + Thread.currentThread().getName() + 
					"换回的数据为" + data2);
				}catch(Exception e){
					
				}
			}	
		});
		service.execute(new Runnable(){
			public void run() {
				try {				

					String data1 = "lhm";
					System.out.println("线程" + Thread.currentThread().getName() + 
					"正在把数据" + data1 +"换出去");
					Thread.sleep((long)(Math.random()*10000));					
					String data2 = (String)exchanger.exchange(data1);
					System.out.println("线程" + Thread.currentThread().getName() + 
					"换回的数据为" + data2);
				}catch(Exception e){
					
				}				
			}	
		});		
	}
}

 

 5)可阻塞队列和同步集合

 

1. 什么是可阻塞队列,阻塞队列的作用与实际应用,阻塞队列的实现原理。

阻塞队列与Semaphore有些相似,但也不同,阻塞队列是一方存放数据,

另一方释放数据,Semaphore通常则是由同一方设置和释放信号量。

ArrayBlockingQueue

只有put方法和take方法才具有阻塞功能

用3个空间的队列来演示阻塞队列的功能和效果。

用两个具有1个空间的队列来实现同步通知的功能。

2.同步集合

传统方式下的Collection在迭代集合时,不允许对集合进行修改。

Java5中提供了如下一些同步集合类: 通过看java.util.concurrent包下的介绍可以知道有哪些并发集合 ConcurrentHashMap CopyOnWriteArrayList CopyOnWriteArraySet

 

 

分享到:
评论
1 楼 zuo_huai 2015-02-14  
赞一个!!!

相关推荐

Global site tag (gtag.js) - Google Analytics