阻塞队列BlockingQueue

阻塞队列BlockingQueue

池技术,如线程池

  • ArrayBlockingQueue
  • LinkedBlockingQueue
  • PriorityBlockingQueue

  • 抛出异常 add/remove
  • 不会抛出异常 offer/poll
  • 阻塞等待 put/take
  • 超时等待 offer/poll
package com.wdg.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * @description: BlockingQueue 阻塞队列实例
 * @author: WuDG/1490727316@qq.com
 * @date: 2021/3/17 16:11
 */
public class BlockingQueueDemo {
    public static void main(String[] args) {
//        test1();
//        test2();
//        test3();
        test4();
    }

    /**
     * add/remove 抛出异常
     */
    public static void test1(){
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

        for (int i = 0; i < 5; i++) {
            final String str = String.valueOf(i);
            new Thread(() -> {
                boolean add = queue.add(str);
                System.out.println(add);
            }).start();
        }
        System.out.println("队首元素:"+queue.element());
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                String remove = queue.remove();
                System.out.println("remove:"+remove);
            }).start();
        }
    }

    /**
     * offer/poll 不抛出异常
     */
    public static void test2(){
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

        for (int i = 0; i < 9; i++) {
            final String str = String.valueOf(i);
            new Thread(() -> {
                boolean offer = queue.offer(str);
                System.out.println("offer:"+offer);
            }).start();
        }
        
        for (int i = 0; i < 9; i++) {
            System.out.println("队首元素:"+queue.peek());
            new Thread(() -> {
                String poll = queue.poll();
                System.out.println("poll:"+poll);
            }).start();
        }
    }

    /**
     * put/take 阻塞等待
     */
    public static void test3(){
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

        for (int i = 0; i < 9; i++) {
            final String str = String.valueOf(i);
            new Thread(() -> {
                try {
                    queue.put(str);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        for (int i = 0; i < 9; i++) {
            new Thread(() -> {
                String poll = null;
                try {
                    poll = queue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("poll:"+poll);
            }).start();
        }
    }

    /**
     * offer/poll 超时等待
     */
    public static void test4(){
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

        for (int i = 0; i < 9; i++) {
            final String str = String.valueOf(i);
            new Thread(() -> {
                try {
                    boolean offer = queue.offer(str, 1, TimeUnit.SECONDS);
                    System.out.println("offer:"+offer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }

        for (int i = 0; i < 9; i++) {
            new Thread(() -> {
                String poll = null;
                try {
//                    TimeUnit.SECONDS.sleep(2);
                    poll = queue.poll(1, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("poll:"+poll);
            }).start();
        }
    }
}
# java  并发 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×