IT星球论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

新浪微博账号登陆

只需一步,快速开始

搜索
查看: 95|回复: 0

Java回顾之多线程同步

[复制链接]

1997

主题

1

好友

1万

积分

管理员

Rank: 9Rank: 9Rank: 9

优秀会员 助人为乐 辛勤工作 技术精英 多才多艺 优秀班竹 灌水天才 星球管理 宣传大使 灌水之王 财富勋章 版主勋章 动漫勋章 勤奋会员 论坛精英 PS高手 心 8 闪游皮肤 双鱼座 8★8➹ 志愿者 乖

发表于 2017-4-11 18:35:24 |显示全部楼层
Java回顾之多线程同步

  在这篇文章里,我们关注线程同步的话题。这是比多线程更复杂,稍不留意,我们就会“掉到坑里”,而且和单线程程序不同,多线程的错误是否每次都出现,也是不固定的,这给调试也带来了很大的挑战。
  在这篇文章里,我们首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的“线程池”,JDK为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索。
  为什么要线程同步?
  说到线程同步,大部分情况下, 我们是在针对“单对象多线程”的情况进行讨论,一般会将其分成两部分,一部分是关于“共享变量”,一部分关于“执行步骤”。
  共享变量
  当我们在线程对象(Runnable)中定义了全局变量,run方法会修改该变量时,如果有多个线程同时使用该线程对象,那么就会造成全局变量的值被同时修改,造成错误。我们来看下面的代码:
[url=][/url]
1 class MyRunner implements Runnable 2 { 3     public int sum = 0; 4      5     public void run() 6     { 7         System.out.println(Thread.currentThread().getName() + " Start."); 8         for (int i = 1; i <= 100; i++) 9         {10             sum += i;11         }12         try {13             Thread.sleep(500);14         } catch (InterruptedException e) {15             e.printStackTrace();16         }17         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);18         System.out.println(Thread.currentThread().getName() + " End.");19     }20 }21 22 23 private static void sharedVaribleTest() throws InterruptedException24 {25     MyRunner runner = new MyRunner();26     Thread thread1 = new Thread(runner);27     Thread thread2 = new Thread(runner);28     thread1.setDaemon(true);29     thread2.setDaemon(true);30     thread1.start();31     thread2.start();32     thread1.join();33     thread2.join();34 }[url=][/url]


  这个示例中,线程用来计算1到100的和是多少,我们知道正确结果是5050(好像是高斯小时候玩过这个?),但是上述程序返回的结果是10100,原因是两个线程同时对sum进行操作。
  执行步骤
  我们在多个线程运行时,可能需要某些操作合在一起作为“原子操作”,即在这些操作可以看做是“单线程”的,例如我们可能希望输出结果的样子是这样的:
[url=][/url]
1 线程1:步骤12 线程1:步骤23 线程1:步骤34 线程2:步骤15 线程2:步骤26 线程2:步骤3[url=][/url]

  如果同步控制不好,出来的样子可能是这样的:
[url=][/url]
线程1:步骤1线程2:步骤1线程1:步骤2线程2:步骤2线程1:步骤3线程2:步骤3[url=][/url]

  这里我们也给出一个示例代码:
[url=][/url]
1 class MyNonSyncRunner implements Runnable 2 { 3     public void run() { 4         System.out.println(Thread.currentThread().getName() + " Start."); 5         for(int i = 1; i <= 5; i++) 6         { 7             System.out.println(Thread.currentThread().getName() + " Running step " + i); 8             try 9             {10                 Thread.sleep(50);11             }12             catch(InterruptedException ex)13             {14                 ex.printStackTrace();15             }16         }17         System.out.println(Thread.currentThread().getName() + " End.");18     }19 }20 21 22 private static void syncTest() throws InterruptedException23 {24     MyNonSyncRunner runner = new MyNonSyncRunner();25     Thread thread1 = new Thread(runner);26     Thread thread2 = new Thread(runner);27     thread1.setDaemon(true);28     thread2.setDaemon(true);29     thread1.start();30     thread2.start();31     thread1.join();32     thread2.join();33 }[url=][/url]


  如何控制线程同步
  既然线程同步有上述问题,那么我们应该如何去解决呢?针对不同原因造成的同步问题,我们可以采取不同的策略。
  控制共享变量
  我们可以采取3种方式来控制共享变量。
  将“单对象多线程”修改成“多对象多线程”  
  上文提及,同步问题一般发生在“单对象多线程”的场景中,那么最简单的处理方式就是将运行模型修改成“多对象多线程”的样子,针对上面示例中的同步问题,修改后的代码如下:
[url=][/url]
1 private static void sharedVaribleTest2() throws InterruptedException 2 { 3     Thread thread1 = new Thread(new MyRunner()); 4     Thread thread2 = new Thread(new MyRunner()); 5     thread1.setDaemon(true); 6     thread2.setDaemon(true); 7     thread1.start(); 8     thread2.start(); 9     thread1.join();10     thread2.join();11 }[url=][/url]


  我们可以看到,上述代码中两个线程使用了两个不同的Runnable实例,它们在运行过程中,就不会去访问同一个全局变量。
  将“全局变量”降级为“局部变量”
  既然是共享变量造成的问题,那么我们可以将共享变量改为“不共享”,即将其修改为局部变量。这样也可以解决问题,同样针对上面的示例,这种解决方式的代码如下:
[url=][/url]
1 class MyRunner2 implements Runnable 2 { 3     public void run() 4     { 5         System.out.println(Thread.currentThread().getName() + " Start."); 6         int sum = 0; 7         for (int i = 1; i <= 100; i++) 8         { 9             sum += i;10         }11         try {12             Thread.sleep(500);13         } catch (InterruptedException e) {14             e.printStackTrace();15         }16         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + sum);17         System.out.println(Thread.currentThread().getName() + " End.");18     }19 }20 21 22 private static void sharedVaribleTest3() throws InterruptedException23 {24     MyRunner2 runner = new MyRunner2();25     Thread thread1 = new Thread(runner);26     Thread thread2 = new Thread(runner);27     thread1.setDaemon(true);28     thread2.setDaemon(true);29     thread1.start();30     thread2.start();31     thread1.join();32     thread2.join();33 }[url=][/url]


  我们可以看出,sum变量已经由全局变量变为run方法内部的局部变量了。
  使用ThreadLocal机制
  ThreadLocal是JDK引入的一种机制,它用于解决线程间共享变量,使用ThreadLocal声明的变量,即使在线程中属于全局变量,针对每个线程来讲,这个变量也是独立的。
  我们可以用这种方式来改造上面的代码,如下所示:
[url=][/url]
1 class MyRunner3 implements Runnable 2 { 3     public ThreadLocal<Integer> tl = new ThreadLocal<Integer>(); 4      5     public void run() 6     { 7         System.out.println(Thread.currentThread().getName() + " Start."); 8         for (int i = 0; i <= 100; i++) 9         {10             if (tl.get() == null)11             {12                 tl.set(new Integer(0));13             }14             int sum = ((Integer)tl.get()).intValue();15             sum+= i;16             tl.set(new Integer(sum));17             try {18                 Thread.sleep(10);19             } catch (InterruptedException e) {20                 e.printStackTrace();21             }22         }23         24         System.out.println(Thread.currentThread().getName() + " --- The value of sum is " + ((Integer)tl.get()).intValue());25         System.out.println(Thread.currentThread().getName() + " End.");26     }27 }28 29 30 private static void sharedVaribleTest4() throws InterruptedException31 {32     MyRunner3 runner = new MyRunner3();33     Thread thread1 = new Thread(runner);34     Thread thread2 = new Thread(runner);35     thread1.setDaemon(true);36     thread2.setDaemon(true);37     thread1.start();38     thread2.start();39     thread1.join();40     thread2.join();41 }[url=][/url]


  综上三种方案,第一种方案会降低多线程执行的效率,因此,我们推荐使用第二种或者第三种方案。
  控制执行步骤
  说到执行步骤,我们可以使用synchronized关键字来解决它。
[url=][/url]
1 class MySyncRunner implements Runnable 2 { 3     public void run() { 4         synchronized(this) 5         { 6             System.out.println(Thread.currentThread().getName() + " Start."); 7             for(int i = 1; i <= 5; i++) 8             { 9                 System.out.println(Thread.currentThread().getName() + " Running step " + i);10                 try11                 {12                     Thread.sleep(50);13                 }14                 catch(InterruptedException ex)15                 {16                     ex.printStackTrace();17                 }18             }19             System.out.println(Thread.currentThread().getName() + " End.");20         }21     }22 }23 24 25 private static void syncTest2() throws InterruptedException26 {27     MySyncRunner runner = new MySyncRunner();28     Thread thread1 = new Thread(runner);29     Thread thread2 = new Thread(runner);30     thread1.setDaemon(true);31     thread2.setDaemon(true);32     thread1.start();33     thread2.start();34     thread1.join();35     thread2.join();36 }[url=][/url]


  在线程同步的话题上,synchronized是一个非常重要的关键字。它的原理和数据库中事务锁的原理类似。我们在使用过程中,应该尽量缩减synchronized覆盖的范围,原因有二:1)被它覆盖的范围是串行的,效率低;2)容易产生死锁。我们来看下面的示例:
[url=][/url]
1 private static void syncTest3() throws InterruptedException 2 { 3     final List<Integer> list = new ArrayList<Integer>(); 4      5     Thread thread1 = new Thread() 6     { 7         public void run() 8         { 9             System.out.println(Thread.currentThread().getName() + " Start.");10             Random r = new Random(100);11             synchronized(list)12             {13                 for (int i = 0; i < 5; i++)14                 {15                     list.add(new Integer(r.nextInt()));16                 }17                 System.out.println("The size of list is " + list.size());18             }19             try20             {21                 Thread.sleep(500);22             }23             catch(InterruptedException ex)24             {25                 ex.printStackTrace();26             }27             System.out.println(Thread.currentThread().getName() + " End.");28         }29     };30     31     Thread thread2 = new Thread()32     {33         public void run()34         {35             System.out.println(Thread.currentThread().getName() + " Start.");36             Random r = new Random(100);37             synchronized(list)38             {39                 for (int i = 0; i < 5; i++)40                 {41                     list.add(new Integer(r.nextInt()));42                 }43                 System.out.println("The size of list is " + list.size());44             }45             try46             {47                 Thread.sleep(500);48             }49             catch(InterruptedException ex)50             {51                 ex.printStackTrace();52             }53             System.out.println(Thread.currentThread().getName() + " End.");54         }55     };56     57     thread1.start();58     thread2.start();59     thread1.join();60     thread2.join();61 }[url=][/url]


  我们应该把需要同步的内容集中在一起,尽量不包含IT论坛不相关的、消耗大量资源的操作,示例中线程休眠的操作显然不应该包括在里面。
  构造线程池
  我们在Java回顾之网络通信中,已经构建了一个Socket连接池,这里我们在此基础上,构建一个线程池,完成基本的启动、休眠、唤醒、停止操作。
  基本思路还是以数组的形式保持一系列线程,通过Socket通信,客户端向服务器端发送命令,当服务器端接收到命令后,根据收到的命令对线程数组中的线程进行操作。
  Socket客户端的代码保持不变,依然采用构建Socket连接池时的代码,我们主要针对服务器端进行改造。
  首先,我们需要定义一个线程对象,它用来执行我们的业务操作,这里简化起见,只让线程进行休眠。
[url=][/url]
1 enum ThreadStatus 2 { 3     Initial, 4     Running, 5     Sleeping, 6     Stopped 7 } 8 9 enum ThreadTask10 {11     Start,12     Stop,13     Sleep,14     Wakeup15 }16 17 18 class MyThread extends Thread19 {20     public ThreadStatus status = ThreadStatus.Initial;21     public ThreadTask task;22     public void run()23     {24         status = ThreadStatus.Running;25         while(true)26         {27             try {28                 Thread.sleep(3000);29                 if (status == ThreadStatus.Sleeping)30                 {31                     System.out.println(Thread.currentThread().getName() + " 进入休眠状态。");32                     this.wait();33                 }34             } catch (InterruptedException e) {35                 System.out.println(Thread.currentThread().getName() + " 运行过程中出现错误。");36                 status = ThreadStatus.Stopped;37             }38         }39     }40 }[url=][/url]


  然后,我们需要定义一个线程管理器,它用来对线程池中的线程进行管理,代码如下:
[url=][/url]
1 class MyThreadManager 2 { 3     public static void manageThread(MyThread[] threads, ThreadTask task) 4     { 5         for (int i = 0; i < threads.length; i++) 6         { 7             synchronized(threads) 8             { 9                 manageThread(threads, task);10             }11         }12         System.out.println(getThreadStatus(threads));13     }14     15     public static void manageThread(MyThread thread, ThreadTask task)16     {17         if (task == ThreadTask.Start)18         {19             if (thread.status == ThreadStatus.Running)20             {21                 return;22             }23             if (thread.status == ThreadStatus.Stopped)24             {25                 thread = new MyThread();26             }27             thread.status = ThreadStatus.Running;28             thread.start();29             30         }31         else if (task == ThreadTask.Stop)32         {33             if (thread.status != ThreadStatus.Stopped)34             {35                 thread.interrupt();36                 thread.status = ThreadStatus.Stopped;37             }38         }39         else if (task == ThreadTask.Sleep)40         {41             thread.status = ThreadStatus.Sleeping;42         }43         else if (task == ThreadTask.Wakeup)44         {45             thread.notify();46             thread.status = ThreadStatus.Running;47         }48     }49     50     public static String getThreadStatus(MyThread[] threads)51     {52         StringBuffer sb = new StringBuffer();53         for (int i = 0; i < threads.length; i++)54         {55             sb.append(threads.getName() + "的状态:" + threads.status).append("\r\n");56         }57         return sb.tostring();58     }59 }[url=][/url]


  最后,是我们的服务器端,它不断接受客户端的请求,每收到一个连接请求,服务器端会新开一个线程,来处理后续客户端发来的各种操作指令。
[url=][/url]
1 public class MyThreadPool { 2 3     public static void main(String[] args) throws IOException 4     { 5         MyThreadPool pool = new MyThreadPool(5); 6     } 7      8     private int threadCount; 9     private MyThread[] threads = null;10     11     12     public MyThreadPool(int count) throws IOException13     {14         this.threadCount = count;15         threads = new MyThread[count];16         for (int i = 0; i < threads.length; i++)17         {18             threads = new MyThread();19             threads.start();20         }21         Init();22     }23     24     private void Init() throws IOException25     {26         ServerSocket serverSocket = new ServerSocket(5678);27         while(true)28         {29             final Socket socket = serverSocket.accept();30             Thread thread = new Thread()31             {32                 public void run()33                 {34                     try35                     {36                         System.out.println("检测到一个新的Socket连接。");37                         BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()));38                         PrintStream ps = new PrintStream(socket.getOutputStream());39                         String line = null;40                         while((line = br.readLine()) != null)41                         {42                             System.out.println(line);43                             if (line.equals("Count"))44                             {45                                 System.out.println("线程池中有5个线程");46                             }47                             else if (line.equals("Status"))48                             {49                                 String status = MyThreadManager.getThreadStatus(threads);50                                 System.out.println(status);51                             }52                             else if (line.equals("StartAll"))53                             {54                                 MyThreadManager.manageThread(threads, ThreadTask.Start);55                             }56                             else if (line.equals("StopAll"))57                             {58                                 MyThreadManager.manageThread(threads, ThreadTask.Stop);59                             }60                             else if (line.equals("SleepAll"))61                             {62                                 MyThreadManager.manageThread(threads, ThreadTask.Sleep);63                             }64                             else if (line.equals("WakeupAll"))65                             {66                                 MyThreadManager.manageThread(threads, ThreadTask.Wakeup);67                             }68                             else if (line.equals("End"))69                             {70                                 break;71                             }72                             else73                             {74                                 System.out.println("Command:" + line);75                             }76                             ps.println("OK");77                             ps.flush();78                         }79                     }80                     catch(Exception ex)81                     {82                         ex.printStackTrace();83                     }84                 }85             };86             thread.start();87         }88     }89 }[url=][/url]


  探索JDK中的concurrent工具包
  为了简化开发人员在进行多线程开发时的工作量,并减少程序中的bug,JDK提供了一套concurrent工具包,我们可以用它来方便的开发多线程程序。
  线程池  
  我们在上面实现了一个非常“简陋”的线程池,concurrent工具包中也提供了线程池,而且使用非常方便。
  concurrent工具包中的线程池分为3类:ScheduledThreadPool、FixedThreadPool和CachedThreadPool。
  首先我们来定义一个Runnable的对象
[url=][/url]
1 class MyRunner implements Runnable 2 { 3     public void run() { 4         System.out.println(Thread.currentThread().getName() + "运行开始"); 5         for(int i = 0; i < 1; i++) 6         { 7             try 8             { 9                 System.out.println(Thread.currentThread().getName() + "正在运行");10                 Thread.sleep(200);11             }12             catch(Exception ex)13             {14                 ex.printStackTrace();15             }16         }17         System.out.println(Thread.currentThread().getName() + "运行结束");18     }19 }[url=][/url]


  可以看出,它的功能非常简单,只是输出了线程的执行过程。
  ScheduledThreadPool
  这和我们平时使用的ScheduledTask比较类似,或者说很像Timer,它可以使得一个线程在指定的一段时间内开始运行,并且在间隔另外一段时间后再次运行,直到线程池关闭。
  示例代码如下:
[url=][/url]
1 private static void scheduledThreadPoolTest() 2 { 3     final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); 4      5     MyRunner runner = new MyRunner(); 6      7     final ScheduledFuture<?> handler1 = scheduler.scheduleAtFixedRate(runner, 1, 10, TimeUnit.SECONDS); 8     final ScheduledFuture<?> handler2 = scheduler.scheduleWithFixedDelay(runner, 2, 10, TimeUnit.SECONDS); 9     10     scheduler.schedule(new Runnable()11     {12         public void run()13         {14             handler1.cancel(true);15             handler2.cancel(true);16             scheduler.shutdown();17         }18     }, 30, TimeUnit.SECONDS19     );20 }[url=][/url]


  FixedThreadPool
  这是一个指定容量的线程池,即我们可以指定在同一时间,线程池中最多有多个线程在运行,超出的线程,需要等线程池中有空闲线程时,才能有机会运行。
  来看下面的代码:
[url=][/url]
1 private static void fixedThreadPoolTest() 2 { 3     ExecutorService exec = Executors.neWFixedThreadPool(3); 4     for(int i = 0; i < 5; i++) 5     { 6         MyRunner runner = new MyRunner(); 7         exec.execute(runner); 8     } 9     exec.shutdown();10 }[url=][/url]


  注意它的输出结果:
[url=][/url]
pool-1-thread-1运行开始pool-1-thread-1正在运行pool-1-thread-2运行开始pool-1-thread-2正在运行pool-1-thread-3运行开始pool-1-thread-3正在运行pool-1-thread-1运行结束pool-1-thread-1运行开始pool-1-thread-1正在运行pool-1-thread-2运行结束pool-1-thread-2运行开始pool-1-thread-2正在运行pool-1-thread-3运行结束pool-1-thread-1运行结束pool-1-thread-2运行结束[url=][/url]

  可以看到从始至终,最多有3个线程在同时运行。
  CachedThreadPool
  这是另外一种线程池,它不需要指定容量,只要有需要,它就会创建新的线程。
  它的使用方式和FixedThreadPool非常像,来看下面的代码:
CachedThreadPool示例
  它的执行结果如下:
[url=][/url]
pool-1-thread-1运行开始pool-1-thread-1正在运行pool-1-thread-2运行开始pool-1-thread-2正在运行pool-1-thread-3运行开始pool-1-thread-3正在运行pool-1-thread-4运行开始pool-1-thread-4正在运行pool-1-thread-5运行开始pool-1-thread-5正在运行pool-1-thread-1运行结束pool-1-thread-2运行结束pool-1-thread-3运行结束pool-1-thread-4运行结束pool-1-thread-5运行结束[url=][/url]

  可以看到,它创建了5个线程。
  处理线程返回值
  在有些情况下,我们需要使用线程的返回值,在上述的所有代码中,线程这是执行了某些操作,没有任何返回值。
  如何做到这一点呢?我们可以使用JDK中的Callable<T>和CompletionService<T>,前者返回单个线程的结果,后者返回一组线程的结果。
  返回单个线程的结果
  还是直接看代码吧:
[url=][/url]
1 private static void callableTest() throws InterruptedException, ExecutionException 2 { 3     ExecutorService exec = Executors.newFixedThreadPool(1); 4     Callable<String> call = new Callable<String>() 5     { 6         public String call() 7         { 8             return "Hello World."; 9         }10     };11     Future<String> result = exec.submit(call);12     System.out.println("线程的返回值是" + result.get());13     exec.shutdown();14 }[url=][/url]


  执行结果如下:
线程的返回值是Hello World.
  返回线程池中每个线程的结果
  这里需要使用CompletionService<T>,代码如下:
[url=][/url]
1 private static void completionServiceTest() throws InterruptedException, ExecutionException 2 { 3     ExecutorService exec = Executors.newFixedThreadPool(10); 4     CompletionService<String> service = new ExecutorCompletionService<String>(exec); 5     for (int i = 0; i < 10; i++) 6     { 7         Callable<String> call = new Callable<String>() 8         { 9             public String call() throws InterruptedException10             {11                 return Thread.currentThread().getName();12             }13         };14         service.submit(call);15     }16     17     Thread.sleep(1000);18     for(int i = 0; i < 10; i++)19     {20         Future<String> result = service.take();21         System.out.println("线程的返回值是" + result.get());22     }23     exec.shutdown();24 }[url=][/url]


  执行结果如下:
[url=][/url]
线程的返回值是pool-2-thread-1线程的返回值是pool-2-thread-2线程的返回值是pool-2-thread-3线程的返回值是pool-2-thread-5线程的返回值是pool-2-thread-4线程的返回值是pool-2-thread-6线程的返回值是pool-2-thread-8线程的返回值是pool-2-thread-7线程的返回值是pool-2-thread-9线程的返回值是pool-2-thread-10[url=][/url]

  实现生产者-消费者模型
  对于生产者-消费者模型来说,我们应该都不会陌生,通常我们都会使用某种数据结构来实现它。在concurrent工具包中,我们可以使用BlockingQueue来实现生产者-消费者模型,如下:
[url=][/url]
1 public class BlockingQueueSample { 2 3     public static void main(String[] args) 4     { 5         blockingQueueTest(); 6     } 7      8     private static void blockingQueueTest() 9     {10         final BlockingQueue<Integer> queue = new LinkedBlockingQueue<Integer>();11         final int maxSleepTimeForSetter = 10;12         final int maxSleepTimerForGetter = 10;13         14         Runnable setter = new Runnable()15         {16             public void run()17             {18                 Random r = new Random();19                 while(true)20                 {21                     int value = r.nextInt(100);22                     try23                     {24                         queue.put(new Integer(value));25                         System.out.println(Thread.currentThread().getName() + "---向队列中插入值" + value);26                         Thread.sleep(r.nextInt(maxSleepTimeForSetter) * 1000);27                     }28                     catch(Exception ex)29                     {30                         ex.printStackTrace();31                     }32                 }33             }34         };35         36         Runnable getter = new Runnable()37         {38             public void run()39             {40                 Random r = new Random();41                 while(true)42                 {43                     try44                     {45                         if (queue.size() == 0)46                         {47                             System.out.println(Thread.currentThread().getName() + "---队列为空");48                         }49                         else50                         {51                             int value = queue.take().intValue();52                             System.out.println(Thread.currentThread().getName() + "---从队列中获取值" + value);53                         }54                         Thread.sleep(r.nextInt(maxSleepTimerForGetter) * 1000);55                     }56                     catch(Exception ex)57                     {58                         ex.printStackTrace();59                     }60                 }61             }62         };63         64         ExecutorService exec = Executors.newFixedThreadPool(2);65         exec.execute(setter);66         exec.execute(getter);67     }68 }[url=][/url]


  我们定义了两个线程,一个线程向Queue中添加数据,一个线程从Queue中取数据。我们可以通过控制maxSleepTimeForSetter和maxSleepTimerForGetter的值,来使得程序得出不同的结果。
  可能的执行结果如下:
[url=][/url]
pool-1-thread-1---向队列中插入值88pool-1-thread-2---从队列中获取值88pool-1-thread-1---向队列中插入值75pool-1-thread-2---从队列中获取值75pool-1-thread-2---队列为空pool-1-thread-2---队列为空pool-1-thread-2---队列为空pool-1-thread-1---向队列中插入值50pool-1-thread-2---从队列中获取值50pool-1-thread-2---队列为空pool-1-thread-2---队列为空pool-1-thread-2---队列为空pool-1-thread-2---队列为空pool-1-thread-2---队列为空pool-1-thread-1---向队列中插入值51pool-1-thread-1---向队列中插入值92pool-1-thread-2---从队列中获取值51pool-1-thread-2---从队列中获取值92[url=][/url]

  因为Queue中的值和Thread的休眠时间都是随机的,所以执行结果也不是固定的。
  使用信号量来控制线程
  JDK提供了Semaphore来实现“信号量”的功能,它提供了两个方法分别用于获取和释放信号量:acquire和release,示例代码如下:
[url=][/url]
1 private static void semaphoreTest() 2 { 3     ExecutorService exec = Executors.newFixedThreadPool(10); 4     final Semaphore semp = new Semaphore(2); 5      6     for (int i = 0; i < 10; i++) 7     { 8         Runnable runner = new Runnable() 9         {10             public void run()11             {12                 try13                 {14                     semp.acquire();15                     System.out.println(new Date() + " " + Thread.currentThread().getName() + "正在执行。");16                     Thread.sleep(5000);17                     semp.release();18                 }19                 catch(Exception ex)20                 {21                     ex.printStackTrace();22                 }23             }24         };25         exec.execute(runner);26     }27     28     exec.shutdown();29 }[url=][/url]


  执行结果如下:
[url=][/url]
Tue May 07 11:22:11 CST 2013 pool-1-thread-1正在执行。Tue May 07 11:22:11 CST 2013 pool-1-thread-2正在执行。Tue May 07 11:22:17 CST 2013 pool-1-thread-3正在执行。Tue May 07 11:22:17 CST 2013 pool-1-thread-4正在执行。Tue May 07 11:22:22 CST 2013 pool-1-thread-5正在执行。Tue May 07 11:22:22 CST 2013 pool-1-thread-6正在执行。Tue May 07 11:22:27 CST 2013 pool-1-thread-7正在执行。Tue May 07 11:22:27 CST 2013 pool-1-thread-8正在执行。Tue May 07 11:22:32 CST 2013 pool-1-thread-10正在执行。Tue May 07 11:22:32 CST 2013 pool-1-thread-9正在执行。[url=][/url]

  可以看出,尽管线程池中创建了10个线程,但是同时运行的,只有2个线程。
  控制线程池中所有线程的执行步骤
  在前面,我们已经提到,可以用synchronized关键字来控制单个线程中的执行步骤,那么如果我们想要对线程池中的所有线程的执行步骤进行控制的话,应该如何实现呢?
  我们有两种方式,一种是使用CyclicBarrier,一种是使用CountDownLatch。
  CyclicBarrier使用了类似于Object.wait的机制,它的构造函数中需要接收一个整型数字,用来说明它需要控制的线程数目,当在线程的run方法中调用它的await方法时,它会保证所有的线程都执行到这一步,才会继续执行后面的步骤。
  示例代码如下:
[url=][/url]
1 class MyRunner2 implements Runnable 2 { 3     private CyclicBarrier barrier = null; 4     public MyRunner2(CyclicBarrier barrier) 5     { 6         this.barrier = barrier; 7     } 8      9     public void run() {10         Random r = new Random();11         try12         {13             for (int i = 0; i < 3; i++)14             {15                 Thread.sleep(r.nextInt(10) * 1000);16                 System.out.println(new Date() + "--" + Thread.currentThread().getName() + "--第" + (i + 1) + "次等待。");17                 barrier.await();18             }19         }20         catch(Exception ex)21         {22             ex.printStackTrace();23         }24     }25     26 }27 28 private static void cyclicBarrierTest()29 {30     CyclicBarrier barrier = new CyclicBarrier(3);31     32     ExecutorService exec = Executors.newFixedThreadPool(3);33     for (int i = 0; i < 3; i++)34     {35         exec.execute(new MyRunner2(barrier));36     }37     exec.shutdown();38 }[url=][/url]


  执行结果如下:
[url=][/url]
Tue May 07 11:31:20 CST 2013--pool-1-thread-2--第1次等待。Tue May 07 11:31:21 CST 2013--pool-1-thread-3--第1次等待。Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第1次等待。Tue May 07 11:31:24 CST 2013--pool-1-thread-1--第2次等待。Tue May 07 11:31:26 CST 2013--pool-1-thread-3--第2次等待。Tue May 07 11:31:30 CST 2013--pool-1-thread-2--第2次等待。Tue May 07 11:31:32 CST 2013--pool-1-thread-1--第3次等待。Tue May 07 11:31:33 CST 2013--pool-1-thread-3--第3次等待。Tue May 07 11:31:33 CST 2013--pool-1-thread-2--第3次等待。[url=][/url]

  可以看出,thread-2到第1次等待点时,一直等到thread-1到达后才继续执行。
  CountDownLatch则是采取类似”倒计时计数器”的机制来控制线程池中的线程,它有CountDown和Await两个方法。示例代码如下:
[url=][/url]
1 private static void countdownLatchTest() throws InterruptedException 2 { 3     final CountDownLatch begin = new CountDownLatch(1); 4     final CountDownLatch end = new CountDownLatch(5); 5     ExecutorService exec = Executors.newFixedThreadPool(5); 6     for (int i = 0; i < 5; i++) 7     { 8         Runnable runner = new Runnable() 9         {10             public void run()11             {12                 Random r = new Random();13                 try14                 {15                     begin.await();16                     System.out.println(Thread.currentThread().getName() + "运行开始");17                     Thread.sleep(r.nextInt(10)*1000);18                     System.out.println(Thread.currentThread().getName() + "运行结束");19                 }20                 catch(Exception ex)21                 {22                     ex.printStackTrace();23                 }24                 finally25                 {26                     end.countDown();27                 }28             }29         };30         exec.execute(runner);31     }32     begin.countDown();33     end.await();34     System.out.println(Thread.currentThread().getName() + "运行结束");35     exec.shutdown();36 }[url=][/url]


  执行结果如下:
[url=][/url]
pool-1-thread-1运行开始pool-1-thread-5运行开始pool-1-thread-2运行开始pool-1-thread-3运行开始pool-1-thread-4运行开始pool-1-thread-2运行结束pool-1-thread-1运行结束pool-1-thread-3运行结束pool-1-thread-5运行结束pool-1-thread-4运行结束main运行结束[url=][/url]



    作者:李胜攀
    出处:http://wing011203.cnblogs.com/

Java回顾之多线程同步
该会员没有填写今日想说内容.
您需要登录后才可以回帖 登录 | 立即注册 新浪微博账号登陆

回顶部