小工具      在线工具  汉语词典  css  js  c++  java

简单的延时消费队列

Java 额外说明

收录于:97天前

1.一个简单的延迟消费队列单元测试,添加2个任务到队列然后消费。代码如下:

package com.robinboot.facade;

import com.robinboot.service.task.DelayRunnable;
import com.robinboot.service.task.DelayRunnable;
import com.robinboot.service.task.TaskManager;
import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @auther: TF12778
 * @date: 2020/11/26 11:22
 * @description:简单的延时消费队列单元测试,将2个任务添加到队列中,然后消费
 */
public class TaskTest extends BaseTest{

    public static final Logger logger=Logger.getLogger(TaskTest.class);

    ExecutorService singlePooling = Executors.newSingleThreadExecutor();
    DelayRunnable delayTask = new DelayRunnable();

    @Before
    public void tastttt1() {
        singlePooling.submit(delayTask);
    }

    @Test
    public void tastttt() {

        delayTask.put(new Runnable() {
            @Override
            public void run() {
                System.out.println("delayTask------1");
            }
        }, 1);

        delayTask.put(new Runnable() {
            @Override
            public void run() {
                System.out.println("delayTask------2");
            }
        }, 5);
    }
}

结果:

2、DelayRunnable主要功能:

声明一个延迟任务队列,将任务添加到任务队列中,每隔100ms取出一个任务执行,并将任务提交到线程池

package com.robinboot.service.task;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;

/**
 * @auther: TF12778
 * @date: 2020/11/26 11:02
 * @description:声明了一个延时任务队列,将任务添加到任务队列中,每100ms取出一个任务去执行, 任务提交到线程池
 */
public class DelayRunnable implements Runnable {

    private static final Logger logger = LoggerFactory.getLogger(DelayRunnable.class);

    private ExecutorService pooling  = new ThreadPoolExecutor(2, 3, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10));

    private DelayQueue<DelayedTask<Runnable>> dQueue = new DelayQueue<>();
    private volatile boolean started = true;

    public void put(Runnable r, long delay){
        dQueue.add(new DelayedTask<Runnable>(r, delay));
    }

    @Override
    public void run() {
        DelayedTask<Runnable> task = null;
        while (started){
            try {
                task = dQueue.poll(100, TimeUnit.MILLISECONDS); // 每100ms取出一个任务去执行
            }catch (InterruptedException e){
                logger.warn("dQueue thread is interrupted.",e);
            }
            if(task != null){
                logger.info("tasks in delayQueue: " + dQueue.size());
                fireTask(task.getItem());
                task = null;
            }
        }
    }

    public void stop(){
        started = false;
    }


    private void fireTask(Runnable r){
        this.pooling.submit(r);  // 任务提交到线程池
    }
}

package com.robinboot.service.task;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * @auther: sww
 * @date: 2020/11/26 10:57
 * @description:
 */
public class DelayedTask<T> implements Delayed {

    /**
     * Sequence number to break scheduling ties, and in turn to guarantee FIFO order among tied
     * entries.
     */
    private static final AtomicLong SEQUENCER = new AtomicLong(0);
    /** The time the task is enabled to execute in nanoTime units */
    private long time;
    /** Sequence number to break ties FIFO */
    private final long SEQUENCE_NUMBER;
    /** Base of nanosecond timings, to avoid wrapping */
    private static final long NANO_ORIGIN = System.nanoTime();
    private final T item;

    public DelayedTask(T submit, long timeout) {
        this.time = now() + TimeUnit.NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS);
        this.item = submit;
        this.SEQUENCE_NUMBER = SEQUENCER.getAndIncrement();
    }

    public T getItem() {
        return this.item;
    }

    final static long now() {
        return System.nanoTime() - NANO_ORIGIN;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if(o == this){
            return 0;
        }
        if(o instanceof DelayedTask){
            DelayedTask t = (DelayedTask) o;
            long diff = this.time - t.time;
            if(diff < 0){
                return -1;
            }else if(diff > 0){
                return 1;
            }else if(SEQUENCE_NUMBER < t.SEQUENCE_NUMBER){
                return -1;
            }else {
                return 1;
            }
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }

    @Override
    public boolean equals(Object object){
        if(object ==null || !(object instanceof DelayedTask)){
            return false;
        }
        return this.getItem().equals(((DelayedTask<T>)object).getItem());
    }
}

. . .

相关推荐

额外说明

人工智能领域:面试常见问题超全(深度学习基础、卷积模型、对抗神经网络、预训练模型、计算机视觉、自然语言处理、推荐系统、模型压缩、强化学习、元学习)

【深度学习入门到进阶】必看系列,含激活函数、优化策略、损失函数、模型调优、归一化算法、卷积模型、序列模型、预训练模型、对抗神经网络等 专栏详细介绍:【深度学习入门到进阶】必看系列,含激活函数、优化策略、损失函数、模型调优、归一化算法、卷积模型、序列模型、

额外说明

设计模式 | 适配器模式及典型应用

适配器模式 适配器模式(Adapter Pattern):将一个接口转换成客户希望的另一个接口,使接口不兼容的那些类可以一起工作,其别名为包装器(Wrapper)。适配器模式既可以作为类结构型模式,也可以作为对象结构型模式。 在适配器模式中,我们通过增加

额外说明

如何 DIY 一款属于自己的【3D 重力感应 动态壁纸 】,看完这篇文章你也可以学会

文章目录 -前言 -Unity实战篇 | unity制作动态壁纸,一款支持 DIY的【重力感应 的 3D动态壁纸】 制作过程 ❤️新建一个Unity工程,导入壁纸插件 -导入Unity的模型素材 -配置导入的素材包 -加入一个切换对象组的UI -使用Un

额外说明

C++程序设计:输出一行字符“This is a C++ program.”

【问题描述】 输出一行字符:“This is a C++ program.”。   【输入形式】 无 【输出形式】 This is a C++ program.   #include <iostream> using namespace std; int

额外说明

Windows下编写Lua程序

Lua是一个非常小巧的嵌入式编程语言,经常会被嵌入到Redis、Nginx等系统中,对原有系统做一些功能的灵活扩展。一般来讲,Lua脚本极大程度都会运行在Linux下,也因此官方只提供了Linux下的安装包。但是在学习、开发和测试阶段,我们使用的都是Wi

额外说明

SpringSecurity - 用户动态认证

一、SpringSecurity Spring Security 是 Spring 家族中的成员,基于 Spring 框架,提供了一套 Web 应用安全性的完整解决方案。 两个主要的策略是“认证”和“授权”(或者访问控制),一般来说,Web 应用的安全性

额外说明

SpringBoot框架(邮件发送Mail|持久层框架JPA|Extra前后端分离跨域处理|接口管理Swagger)这一篇就够了(超详细)

-作者简介:练习时长两年半的Java up主 -个人主页:老茶icon - ps:点赞-是免费的,却可以让写博客的作者开兴好久好久- -系列专栏:Java全栈,计算机系列(火速更新中) - 格言:种一棵树最好的时间是十年前,其次是现在 -动动小手,点个关

额外说明

前后端分离博客项目

文章目录 一、博客项目概述 二、Java后端接口开发 (一)技术栈 1、Spring Boot 2、MyBatis-Plus 3、Shiro 4、Lombok 5、Redis 6、Hibernate Validator 7、JWT (二)开发步骤 1、新

额外说明

Pinpoint【部署 02】Pinpoint Agent 安装启动及监控 SpringBoot 项目案例分享(添加快速测试math-game.jar包)

本文主要是介绍 Pinpoint 环境的部署,小伙伴儿们也可以参考 Pinpoint 《官网》的《快速入门》手册,最新版本v2.3.3组件可到官方《GitHub仓库》进行下载,使用到的文件列表: hbase-1.7.1-bin.tar.gz hbase-

额外说明

IDEA实现Maven项目创建并连接Tomcat

Maven简介 Maven是Apache下纯用Java开发的开源项目。它是一个项目管理工具,使用Maven来构建和管理Java项目的依赖关系。 项目构建是一个项目从编写源码到编译、测试、运行、打包、部署的过程。 Maven项目的依赖管理所依赖的jar包不

ads via 小工具