多线程开发可以更好的发挥多核cpu性能,常用的多线程设计模式有:Future、Master-Worker、Guard
Susperionsion(保护性暂挂模式)、不变模式、生产者-消费者
模式;jdk除了定义了若干并发的数据结构,也内置了多线程框架和各种线程池;
锁(分为内部锁、重入锁、读写锁)、ThreadLocal、信号量等在并发控制中发挥着巨大的作用。
一、Future模型
1.什么是Future模型
该模型是将异步请求和代理模式联合的模型产物。见下图:
客户端发送一个长时间的请求,服务端不需等待该数据处理完成便立即返回一个伪造的代理数据(相当于商品订单,不是商品本身),用户也无需等待,先去执行其他的若干操作后,再去调用服务器已经完成组装的真实数据。该模型充分利用了等待的时间片段。
2.Future模式的核心结构:
Main:启动系统,调用Client发出请求;
Client:返回Data对象,理解返回FutureData(伪造的数据或未来数据),并开启ClientThread线程装配RealData(真实数据);
Data:返回数据的接口;
FutureData:Future数据,构造很快,但是是一个虚拟的数据,需要装配RealData;
RealData:真实数据,构造比较慢。
3.Future模式的代码实现:
(1)Main函数:
public class Main {
public static void main(String[] args){
Client client = new Client();
//理解返回一个FutureData
Data data = client.request("name");
System.out.println("请求完毕!");
try{
//处理其他业务
//这个过程中,真是数据RealData组装完成,重复利用等待时间
Thread.sleep(2000);
}catch (Exception e){
}
//真实数据
System.out.println("数据 = "+ data.getResult());
}
}
(2)Client的实现:
public class Client {
public Data request(final String queryStr){
final FutureData future = new FutureData();
//开启一个新的线程来构造真实数据
new Thread(){
public void run(){
RealData realData = new RealData(queryStr);
future.setRealData(realData); }
}.start();
return future;
}
}
(3)Data的实现:
public interface Data {
public String getResult();
}
(4)FutureData:
/**
* 是对RealData的一个包装
* @author limin
*
*/
public class FutureData implements Data {
protected RealData realData =null;
protected boolean isReady = false;
public synchronized void setRealData(RealData realData){
if(isReady){
return;
}
this.realData=realData;
isReady=true;
notifyAll();
}
@Override
public synchronized String getResult() {
while(!isReady){
try{
wait();
}catch (Exception e){
}
}
return realData.result;
}
}
(5)RealData实现:
public class RealData implements Data {
protected String result;
public RealData(String para){
//构造比较慢
StringBuffer sb= new StringBuffer();
for(int i=0;i<10;i++){
sb.append(para);
try{
Thread.sleep(1000);
}catch(Exception e){
}
result= sb.toString();
}
}
@Override
public String getResult() {
return result;
}
}
4.注意:
FutureData是对RealData的包装,是对真实数据的一个代理,封装了获取真实数据的等待过程。它们都实现了共同的接口,所以,针对客户端程序组是没有区别的;
客户端在调用的方法中,单独启用一个线程来完成真实数据的组织,这对调用客户端的main函数式封闭的;
因为咋FutureData中的notifyAll和wait函数,主程序会等待组装完成后再会继续主进程,也就是如果没有组装完成,main函数会一直等待。
二、Master-Worker模式
Master-Worker模式是常用的并行模式之一,它的核心思想是,系统有两个进程协作工作:Master进程,负责接收和分配任务;Worker进程,负责处理子任务。当Worker进程将子任务处理完成后,结果返回给Master进程,由Master进程做归纳汇总,最后得到最终的结果。
2.1什么是Master-Worker模式:
该模式的结构图:
结构图:
Worker:用于实际处理一个任务;
Master:任务的分配和最终结果的合成;
Main:启动程序,调度开启Master。
2.2代码实现:
下面的是一个简易的Master-Worker框架实现。
1
(1)Master部分:
[java] view plain copy
package MasterWorker;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
//任务队列
protected Queue<Object> workQueue= new ConcurrentLinkedQueue<Object>();
//Worker进程队列
protected Map<String ,Thread> threadMap= new HashMap<String ,Thread>();
//子任务处理结果集
protected Map<String ,Object> resultMap= new ConcurrentHashMap<String, Object>();
//是否所有的子任务都结束了
public boolean isComplete(){
for(Map.Entry<String , Thread> entry:threadMap.entrySet()){
if(entry.getValue().getState()!=Thread.State.TERMINATED){
return false;
}
}
return true ;
}
//Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量
public Master(Worker worker,int countWorker){
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
for(int i=0;i<countWorker;i++){
threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i)));
}
}
//提交一个任务
public void submit(Object job){
workQueue.add(job);
}
//返回子任务结果集
public Map<String ,Object> getResultMap(){
return resultMap;
}
//开始运行所有的Worker进程,进行处理
public void execute(){
for(Map.Entry<String , Thread> entry:threadMap.entrySet()){
entry.getValue().start();
}
}
}
(2)Worker进程实现:
package MasterWorker;
import java.util.Map;
import java.util.Queue;
public class Worker implements Runnable{
//任务队列,用于取得子任务
protected Queue<Object> workQueue;
//子任务处理结果集
protected Map<String ,Object> resultMap;
public void setWorkQueue(Queue<Object> workQueue){
this.workQueue= workQueue;
}
public void setResultMap(Map<String ,Object> resultMap){
this.resultMap=resultMap;
}
//子任务处理的逻辑,在子类中实现具体逻辑
public Object handle(Object input){
return input;
}
@Override
public void run() {
while(true){
//获取子任务
Object input= workQueue.poll();
if(input==null){
break;
}
//处理子任务
Object re = handle(input);
resultMap.put(Integer.toString(input.hashCode()), re);
}
}
}
(3)运用这个小框架计算1——100的立方和,PlusWorker的实现:
package MasterWorker;
public class PlusWorker extends Worker {
@Override
public Object handle(Object input) {
Integer i =(Integer)input;
return i*i*i;
}
}
(4)进行计算的Main函数:
package MasterWorker;
import java.util.Map;
import java.util.Set;
public class Main {
/**
* @param args
*/
public static void main(String[] args) {
//固定使用5个Worker,并指定Worker
Master m = new Master(new PlusWorker(), 5);
//提交100个子任务
for(int i=0;i<100;i++){
m.submit(i);
}
//开始计算
m.execute();
int re= 0;
//保存最终结算结果
Map<String ,Object> resultMap =m.getResultMap();
//不需要等待所有Worker都执行完成,即可开始计算最终结果
while(resultMap.size()>0 || !m.isComplete()){
Set<String> keys = resultMap.keySet();
String key =null;
for(String k:keys){
key=k;
break;
}
Integer i =null;
if(key!=null){
i=(Integer)resultMap.get(key);
}
if(i!=null){
//最终结果
re+=i;
}
if(key!=null){
//移除已经被计算过的项
resultMap.remove(key);
}
}
}
}
2.3总结:
Master-Worker模式是一种将串行任务并行化的方案,被分解的子任务在系统中可以被并行处理,同时,如果有需要,Master进程不需要等待所有子任务都完成计算,就可以根据已有的部分结果集计算最终结果集。
三、生产者-消费模式
生产者-消费模式,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。两者之间通过共享内存缓冲去进行通信。
3.1.架构模式图:
类图:
生产者:提交用户请求,提取用户任务,并装入内存缓冲区;
消费者:在内存缓冲区中提取并处理任务;
内存缓冲区:缓存生产者提交的任务或数据,供消费者使用;
任务:生产者向内存缓冲区提交的数据结构;
Main:使用生产者和消费者的客户端。
3.2代码实现一个基于生产者-消费者模式的求整数平方的并行计算:
(1)Producer生产者线程:
package ProducerConsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Producer implements Runnable{
//Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。
//而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。
//这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
private volatile boolean isRunning= true;
//内存缓冲区
private BlockingQueue<PCData> queue;
//总数,原子操作
private static AtomicInteger count = new AtomicInteger();
private static final int SLEEPTIME=1000;
public Producer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
PCData data=null;
Random r = new Random();
System.out.println("start producer id = "+ Thread .currentThread().getId());
try{
while(isRunning){
Thread.sleep(r.nextInt(SLEEPTIME));
//构造任务数据
data= new PCData(count.incrementAndGet());
System.out.println("data is put into queue ");
//提交数据到缓冲区
if(!queue.offer(data,2,TimeUnit.SECONDS)){
System.out.println("faile to put data: "+ data);
}
}
}catch (InterruptedException e){
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
public void stop(){
isRunning=false;
}
}
(2)Consumer消费者线程:
package ProducerConsumer;
import java.text.MessageFormat;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable {
//缓冲区
private BlockingQueue<PCData> queue;
private static final int SLEEPTIME=1000;
public Consumer(BlockingQueue<PCData> queue) {
this.queue = queue;
}
@Override
public void run() {
System.out.println("start Consumer id= "+ Thread .currentThread().getId());
Random r = new Random();
try {
//提取任务
while(true){
PCData data= queue.take();
if(null!= data){
//计算平方
int re= data.getData()*data.getData();
System.out.println(MessageFormat.format("{0}*{1}={2}",
data.getData(),data.getData(),re
));
Thread.sleep(r.nextInt(SLEEPTIME));
}
}
} catch (InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}
(3)PCData共享数据模型:
package ProducerConsumer;
public final class PCData {
private final int intData;
public PCData(int d) {
intData=d;
}
public PCData(String d) {
intData=Integer.valueOf(d);
}
public int getData(){
return intData;
}
@Override
public String toString(){
return "data:"+ intData ;
}
}
(4)Main函数:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
public class Main {
/**
* @param args
*/
public static void main(String[] args) throws InterruptedException{
//建立缓冲区
BlockingQueue<PCData> queue = new LinkedBlockingDeque<PCData>(10);
//建立生产者
Producer producer1 = new Producer(queue);
Producer producer2 = new Producer(queue);
Producer producer3 = new Producer(queue);
//建立消费者
Consumer consumer1 = new Consumer(queue);
Consumer consumer2 = new Consumer(queue);
Consumer consumer3 = new Consumer(queue);
//建立线程池
ExecutorService service = Executors.newCachedThreadPool();
//运行生产者
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
//运行消费者
service.execute(consumer1);
service.execute(consumer2);
service.execute(consumer3);
Thread.sleep(10*1000);
//停止生产者
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(3000);
service.shutdown();
}
}
3.3注意:
volatile关键字:Volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。
生产-消费模式的核心组件是共享内存缓冲区,是两者的通信桥梁,起到解耦作用,优化系统整体结构。
由于缓冲区的存在,生产者和消费者,无论谁在某一局部时间内速度相对较高,都可以使用缓冲区得到缓解,保证系统正常运行,这在一定程度上缓解了性能瓶颈对系统系能的影响。
四、不变模式
一个类的内部状态创建后,在整个生命期间都不会发生变化时,就是不变类
4.1 不变模式不需要同步
public final class Product {
//确保无子类
private final String no;
// 私有属性,不会被其他对象获取
private final String name;
//final保证属性不会被2次赋值
private final double price;
public Product(String no, String name, double price) {
//在创建对象时,必须指定数据
// super();
// 因为创建之后,无法进行修改
this.no = no;
this.name = name;
this.price = price;
}
public String getNo() {
return no;
}
public String getName() {
return name;
}
public double getPrice() {
return price;
}
}
4.2下面是JDK提供几种不变的模式
java.lang.String
java.lang.Boolean
java.lang.Byte
java.lang.Character
java.lang.Double
java.lang.Float
java.lang.Integer
java.lang.Long
java.lang.Short