企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
多线程开发可以更好的发挥多核cpu性能,常用的多线程设计模式有:Future、Master-Worker、Guard Susperionsion(保护性暂挂模式)、不变模式、生产者-消费者 模式;jdk除了定义了若干并发的数据结构,也内置了多线程框架和各种线程池; 锁(分为内部锁、重入锁、读写锁)、ThreadLocal、信号量等在并发控制中发挥着巨大的作用。 一、Future模型 1.什么是Future模型 该模型是将异步请求和代理模式联合的模型产物。见下图: 客户端发送一个长时间的请求,服务端不需等待该数据处理完成便立即返回一个伪造的代理数据(相当于商品订单,不是商品本身),用户也无需等待,先去执行其他的若干操作后,再去调用服务器已经完成组装的真实数据。该模型充分利用了等待的时间片段。 2.Future模式的核心结构: Main:启动系统,调用Client发出请求; Client:返回Data对象,理解返回FutureData(伪造的数据或未来数据),并开启ClientThread线程装配RealData(真实数据); Data:返回数据的接口; FutureData:Future数据,构造很快,但是是一个虚拟的数据,需要装配RealData; RealData:真实数据,构造比较慢。 3.Future模式的代码实现: (1)Main函数: public class Main { public static void main(String[] args){ Client client = new Client(); //理解返回一个FutureData Data data = client.request("name"); System.out.println("请求完毕!"); try{ //处理其他业务 //这个过程中,真是数据RealData组装完成,重复利用等待时间 Thread.sleep(2000); }catch (Exception e){ } //真实数据 System.out.println("数据 = "+ data.getResult()); } } (2)Client的实现: public class Client { public Data request(final String queryStr){ final FutureData future = new FutureData(); //开启一个新的线程来构造真实数据 new Thread(){ public void run(){ RealData realData = new RealData(queryStr); future.setRealData(realData); } }.start(); return future; } } (3)Data的实现: public interface Data { public String getResult(); } (4)FutureData: /** * 是对RealData的一个包装 * @author limin * */ public class FutureData implements Data { protected RealData realData =null; protected boolean isReady = false; public synchronized void setRealData(RealData realData){ if(isReady){ return; } this.realData=realData; isReady=true; notifyAll(); } @Override public synchronized String getResult() { while(!isReady){ try{ wait(); }catch (Exception e){ } } return realData.result; } } (5)RealData实现: public class RealData implements Data { protected String result; public RealData(String para){ //构造比较慢 StringBuffer sb= new StringBuffer(); for(int i=0;i<10;i++){ sb.append(para); try{ Thread.sleep(1000); }catch(Exception e){ } result= sb.toString(); } } @Override public String getResult() { return result; } } 4.注意: FutureData是对RealData的包装,是对真实数据的一个代理,封装了获取真实数据的等待过程。它们都实现了共同的接口,所以,针对客户端程序组是没有区别的; 客户端在调用的方法中,单独启用一个线程来完成真实数据的组织,这对调用客户端的main函数式封闭的; 因为咋FutureData中的notifyAll和wait函数,主程序会等待组装完成后再会继续主进程,也就是如果没有组装完成,main函数会一直等待。 二、Master-Worker模式 Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。 2.1什么是Master-Worker模式: 该模式的结构图: 结构图: Worker:用于实际处理一个任务; Master:任务的分配和最终结果的合成; Main:启动程序,调度开启Master。 2.2代码实现: 下面的是一个简易的Master-Worker框架实现。 1 (1)Master部分: [java] view plain copy package MasterWorker; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //任务队列 protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>(); //Worker进程队列 protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>(); //子任务处理结果集 protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>(); //是否所有的子任务都结束了 public boolean isComplete(){ for(Map.Entry<String , Thread> entry:threadMap.entrySet()){ if(entry.getValue().getState()!=Thread.State.TERMINATED){ return false; } } return true ; } //Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量 public Master(Worker worker,int countWorker){ worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for(int i=0;i<countWorker;i++){ threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } //提交一个任务 public void submit(Object job){ workQueue.add(job); } //返回子任务结果集 public Map<String ,Object> getResultMap(){ return resultMap; } //开始运行所有的Worker进程,进行处理 public void execute(){ for(Map.Entry<String , Thread> entry:threadMap.entrySet()){ entry.getValue().start(); } } } (2)Worker进程实现: package MasterWorker; import java.util.Map; import java.util.Queue; public class Worker implements Runnable{ //任务队列,用于取得子任务 protected Queue<Object> workQueue; //子任务处理结果集 protected Map<String ,Object> resultMap; public void setWorkQueue(Queue<Object> workQueue){ this.workQueue= workQueue; } public void setResultMap(Map<String ,Object> resultMap){ this.resultMap=resultMap; } //子任务处理的逻辑,在子类中实现具体逻辑 public Object handle(Object input){ return input; } @Override public void run() { while(true){ //获取子任务 Object input= workQueue.poll(); if(input==null){ break; } //处理子任务 Object re = handle(input); resultMap.put(Integer.toString(input.hashCode()), re); } } } (3)运用这个小框架计算1——100的立方和,PlusWorker的实现: package MasterWorker; public class PlusWorker extends Worker { @Override public Object handle(Object input) { Integer i =(Integer)input; return i*i*i; } } (4)进行计算的Main函数: package MasterWorker; import java.util.Map; import java.util.Set; public class Main { /** * @param args */ public static void main(String[] args) { //固定使用5个Worker,并指定Worker Master m = new Master(new PlusWorker(), 5); //提交100个子任务 for(int i=0;i<100;i++){ m.submit(i); } //开始计算 m.execute(); int re= 0; //保存最终结算结果 Map<String ,Object> resultMap =m.getResultMap(); //不需要等待所有Worker都执行完成,即可开始计算最终结果 while(resultMap.size()>0 || !m.isComplete()){ Set<String> keys = resultMap.keySet(); String key =null; for(String k:keys){ key=k; break; } Integer i =null; if(key!=null){ i=(Integer)resultMap.get(key); } if(i!=null){ //最终结果 re+=i; } if(key!=null){ //移除已经被计算过的项 resultMap.remove(key); } } } } 2.3总结: Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。 三、生产者-消费模式 生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。 3.1.架构模式图: 类图: 生产者:提交用户请求,提取用户任务,并装入内存缓冲区; 消费者:在内存缓冲区中提取并处理任务; 内存缓冲区:缓存生产者提交的任务或数据,供消费者使用; 任务:生产者向内存缓冲区提交的数据结构; Main:使用生产者和消费者的客户端。 3.2代码实现一个基于生产者-消费者模式的求整数平方的并行计算: (1)Producer生产者线程: package ProducerConsumer; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; public class Producer implements Runnable{ //Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。 //而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。 //这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。 private volatile boolean isRunning= true; //内存缓冲区 private BlockingQueue<PCData> queue; //总数,原子操作 private static AtomicInteger count = new AtomicInteger(); private static final int SLEEPTIME=1000; public Producer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { PCData data=null; Random r = new Random(); System.out.println("start producer id = "+ Thread .currentThread().getId()); try{ while(isRunning){ Thread.sleep(r.nextInt(SLEEPTIME)); //构造任务数据 data= new PCData(count.incrementAndGet()); System.out.println("data is put into queue "); //提交数据到缓冲区 if(!queue.offer(data,2,TimeUnit.SECONDS)){ System.out.println("faile to put data: "+ data); } } }catch (InterruptedException e){ e.printStackTrace(); Thread.currentThread().interrupt(); } } public void stop(){ isRunning=false; } } (2)Consumer消费者线程: package ProducerConsumer; import java.text.MessageFormat; import java.util.Random; import java.util.concurrent.BlockingQueue; public class Consumer implements Runnable { //缓冲区 private BlockingQueue<PCData> queue; private static final int SLEEPTIME=1000; public Consumer(BlockingQueue<PCData> queue) { this.queue = queue; } @Override public void run() { System.out.println("start Consumer id= "+ Thread .currentThread().getId()); Random r = new Random(); try { //提取任务 while(true){ PCData data= queue.take(); if(null!= data){ //计算平方 int re= data.getData()*data.getData(); System.out.println(MessageFormat.format("{0}*{1}={2}", data.getData(),data.getData(),re )); Thread.sleep(r.nextInt(SLEEPTIME)); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } } } (3)PCData共享数据模型: package ProducerConsumer; public final class PCData { private final int intData; public PCData(int d) { intData=d; } public PCData(String d) { intData=Integer.valueOf(d); } public int getData(){ return intData; } @Override public String toString(){ return "data:"+ intData ; } } (4)Main函数: import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; public class Main { /** * @param args */ public static void main(String[] args) throws InterruptedException{ //建立缓冲区 BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10); //建立生产者 Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); //建立消费者 Consumer consumer1 = new Consumer(queue); Consumer consumer2 = new Consumer(queue); Consumer consumer3 = new Consumer(queue); //建立线程池 ExecutorService service = Executors.newCachedThreadPool(); //运行生产者 service.execute(producer1); service.execute(producer2); service.execute(producer3); //运行消费者 service.execute(consumer1); service.execute(consumer2); service.execute(consumer3); Thread.sleep(10*1000); //停止生产者 producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(3000); service.shutdown(); } } 3.3注意: volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。 生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。 由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。 四、不变模式 一个类的内部状态创建后,在整个生命期间都不会发生变化时,就是不变类 4.1 不变模式不需要同步 public final class Product { //确保无子类 private final String no; // 私有属性,不会被其他对象获取 private final String name; //final保证属性不会被2次赋值 private final double price; public Product(String no, String name, double price) { //在创建对象时,必须指定数据 // super(); // 因为创建之后,无法进行修改 this.no = no; this.name = name; this.price = price; } public String getNo() { return no; } public String getName() { return name; } public double getPrice() { return price; } } 4.2下面是JDK提供几种不变的模式 java.lang.String java.lang.Boolean java.lang.Byte java.lang.Character java.lang.Double java.lang.Float java.lang.Integer java.lang.Long java.lang.Short