首页 » 未分类 » 正文

生产者-消费者问题

管道流

package study.java;
import java.io.*;

public class Main {
    public static void main(String[] args) {        // 创建管道输出流
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();
        try{
            pos.connect(pis);
        }catch (IOException e){
            e.printStackTrace();
        }
        Producer producer = new Producer(pos);
        Consumer consumer = new Consumer(pis);
        producer.start();
        consumer.start();
    }
}

class Producer extends Thread {
    int i;
    PipedOutputStream pos=null;
    Producer(PipedOutputStream pos){
        this.pos=pos;
        i=1;
    }

     synchronized public void run(){
        try{
            while(true){
                Thread.sleep(100);
                DataOutputStream out = new DataOutputStream(pos);
                out.writeInt(i);
                i++;
            }
        }catch (IOException|InterruptedException e){
            e.printStackTrace();
        }
    }
}

class Consumer extends Thread{
    PipedInputStream pis=null;
    Consumer(PipedInputStream pis){
        this.pis=pis;
    }
    public void run(){
        try{
            while(true){
                DataInputStream in = new DataInputStream(pis);
                System.out.println(in.readInt());
            }
        }catch (IOException e){
            e.printStackTrace();
        }
    }
}

BlockingQueue的做法:

package thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
 * Created by Zen9 on 2016/3/16.
 */
public class BlockingQueueTest {
    public static void main(String[] args) {        // 创建容量为1的BlockingQueue
        BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(1);        // 启动3个生产者线程
        new Producer(blockingQueue).start();
        new Producer(blockingQueue).start();
        new Producer(blockingQueue).start();        // 启动1个消费者线程
        new Consumer(blockingQueue).start();
    }
}
class Producer extends Thread{
    int i;
    private BlockingQueue<String> blockingQueue;
    public Producer(BlockingQueue<String> blockingQueue){
        this.blockingQueue = blockingQueue;
        i=1;
    }
    @Override
    public void run() {
        while(true){
            try{
                blockingQueue.put("生产"+i);
                i++;
            }catch (InterruptedException e){

            }
        }
    }
}
class Consumer extends Thread{
    private BlockingQueue<String> blockingQueue;
    public Consumer(BlockingQueue<String> blockingQueue){
        this.blockingQueue = blockingQueue;
    }
    @Override
    public void run() {
        while(true){
            try{
                Thread.sleep(1100);
                System.out.println(blockingQueue.take());
            }catch (InterruptedException e){

            }
        }
    }
}

wait和notify

import static java.lang.System.*;
public class CalendarDate8 {
    public static void main(String args[])throws Exception{
        CubbyHole cubbyHole=new CubbyHole();
        Producer producer=new Producer(cubbyHole,1);
        Producer producer1=new Producer(cubbyHole,2);
        Consumer consumer=new Consumer(cubbyHole,1);
        Consumer consumer1=new Consumer(cubbyHole, 2);
        producer.start();
        consumer.start();
    }
}
class Producer extends Thread{
    private CubbyHole cubbyHoles;
    private int number;
    public Producer(CubbyHole c, int number){
        cubbyHoles=c;
        this.number=number;
    }
    public void run(){
        for(int i=0; i<10; i++){
            cubbyHoles.put(i);
            out.println("Producer # "+this.number + " put"+i);
            try{
                sleep((int)(Math.random()*100));
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}
class Consumer extends Thread{
    private CubbyHole cubbyHole;
    private int num;
    public Consumer(CubbyHole c, int number){
        cubbyHole=c;
        this.num=number;
    }
    public void run(){
        int value=0;
        for(int i=0; i<10; i++){
            value=cubbyHole.get();
            out.println("Comsumer # "+this.num + " got:"+value);
        }
    }
}
class CubbyHole{
    private int seq;
    private boolean available=false;
    public synchronized int get(){
        while(available == false){
            try{
                wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        available=false;
        notify();
        return seq;
    }
    public synchronized void put(int value){
        while(available==true){
            try{
                wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
        seq=value;
        available=true;
        notify();
    }
}

发表评论