小工具      在线工具  汉语词典  css  js  c++  java

使用Testconainers来进行JAVA测试

JAVA编程,java,开发语言 额外说明

收录于:112天前

在JAVA项目中,我们需要对代码进行单元测试和集成测试。在测试中,我们需要用到一些外部系统如Kafka,数据库等,常用的做法是Mock这些服务或者搭建测试环境。Mock服务不能完全模拟外部系统,而搭建测试环境的工作量又比较大,另外有些时候测试环境会被多个测试项目共享,互相之间会有干扰。

Testcontainers是一个开源项目,提供了通过加载镜像的服务来提供测试所需要的环境。在官网上我们可以看到现在支持了很多常见的服务,这里我也测试了一下如何用Testcontainers来做单元测试。

假设我们有以下的类,其提供了一个方法,根据输入的消息主题,开始时间戳和结束时间戳,获取Kafka消息的相关分区以及实际的截至时间戳,从而获取指定时间范围内的消息。代码如下:

public class CheckKafkaMsgTimestamp {
    private static final Logger LOG = LoggerFactory.getLogger(CheckKafkaMsgTimestamp.class);
   
    public static KafkaResult getTimestamp(String bootstrapServer, String topic, long startTimestamp, long stopTimestamp) {
        long max_timestamp = stopTimestamp;
        long max_records = 5L;
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", bootstrapServer);
        props.setProperty("group.id", "test");
        props.setProperty("enable.auto.commit", "true");
        props.setProperty("auto.commit.interval.ms", "1000");
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // Get all the partitions of the topic
        int partition_num = consumer.partitionsFor(topic).size();
        HashMap<TopicPartition, Long> search_map = new HashMap<>();
        ArrayList<TopicPartition> tp = new ArrayList<>();
        for (int i=0;i<partition_num;i++) {
            search_map.put(new TopicPartition(topic, i), stopTimestamp);
            tp.add(new TopicPartition(topic, i));
        }
        // Check if message exist with timestamp greater than search timestamp
        Boolean flag = true;
        ArrayList<TopicPartition> selected_tp = new ArrayList<>();
        Map<TopicPartition, OffsetAndTimestamp> results = consumer.offsetsForTimes(search_map);
        for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : results.entrySet()) {
            OffsetAndTimestamp value = entry.getValue();
            if (value==null) {   //there is at least one partition don't have timestamp greater or equal to the stopTime
                flag = false;
                break;
            }
        }
        // Get the latest timestamp of all partitions if the above check result is false
        // Note the timestamp is the earliest of all the partitions. 
        if (!flag) {
            max_timestamp = 0L;
            consumer.assign(tp);
            Map<TopicPartition, Long> endoffsets = consumer.endOffsets(tp);
            for (Map.Entry<TopicPartition, Long> entry : endoffsets.entrySet()) {
                Long temp_timestamp = 0L;
                int record_count = 0;
                TopicPartition t = entry.getKey();
                long offset = entry.getValue();
                if (offset < 1) {
                    LOG.warn("Can not get max_timestamp as partition has no record!");
                    continue;
                }
                consumer.assign(Arrays.asList(t));
                consumer.seek(t, offset>max_records?offset-5:0);
            
                Iterator<ConsumerRecord<String, String>> records = consumer.poll(Duration.ofSeconds(2)).iterator();
                while (records.hasNext()) {
                    record_count++;
                    ConsumerRecord<String, String> record = records.next();
                    LOG.info("Topic: {}, Record Timestamp: {}, recordcount: {}", t, record.timestamp(), record_count);
                    if (temp_timestamp == 0L || record.timestamp() > temp_timestamp) {
                        temp_timestamp = record.timestamp();
                    }
                }
                if (temp_timestamp > 0L && temp_timestamp > startTimestamp) {
                    if (max_timestamp == 0L || max_timestamp > temp_timestamp) {
                        max_timestamp = temp_timestamp;
                    }
                    selected_tp.add(t);
                }
            }
        } else {
            selected_tp = tp;
        }
        consumer.close();
        LOG.info("Max Timestamp: {}", max_timestamp);
        return new KafkaResult(max_timestamp, selected_tp);
    }
}

现在我们要对这个功能做单元测试,可以用Testcontainers来起一个Kafka环境。首先我们在pom.xml里面增加以下依赖

<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>testcontainers</artifactId>
  <version>1.19.1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>kafka</artifactId>
  <version>1.19.1</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.13.2</version>
</dependency>
<dependency>
  <groupId>org.hamcrest</groupId>
  <artifactId>hamcrest-all</artifactId>
  <version>1.3</version>
  <scope>test</scope>
</dependency>

然后编写以下的测试代码:

@RunWith(JUnit4.class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class CheckKafkaMsgTimestampTest {
    private String bootstrapServer;
    private Producer<String, String> producer;

    @Rule
    public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @Before
    public void setUp() {
        kafka.start();
        bootstrapServer = kafka.getBootstrapServers();
        // Create a test topic with 3 partitions
        Properties adminProps = new Properties();
        adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        Admin admin = Admin.create(adminProps);
        int partitions = 3;
        short replicationFactor = 1;
        NewTopic newTopic = new NewTopic("test", partitions, replicationFactor);
        CreateTopicsResult result = admin.createTopics(
            Collections.singleton(newTopic)
        );
        try {
            KafkaFuture<Void> future = result.values().get("test");
            future.get();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

        // Create a producer
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServer);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producer = new KafkaProducer<>(props);
    }

    @After
    public void tearDown() {
        producer.close();
        kafka.stop();
    }

    @Test
    public void testGetTimestamp() {
        // Prepare 6 messages to send to test topic
        // Each partition will receive 2 message
        long ts = System.currentTimeMillis()-5000L;
        producer.send(new ProducerRecord<String, String>("test", 0, ts+100, "", "test message"));
        producer.send(new ProducerRecord<String, String>("test", 0, ts+300, "", "test message"));
        producer.send(new ProducerRecord<String, String>("test", 1, ts+110, "", "test message"));
        producer.send(new ProducerRecord<String, String>("test", 1, ts+200, "", "test message"));
        producer.send(new ProducerRecord<String, String>("test", 2, ts+105, "", "test message"));
        producer.send(new ProducerRecord<String, String>("test", 2, ts+250, "", "test message"));

        KafkaResult result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts, System.currentTimeMillis());
        assertEquals(ts+200, result.max_timestamp);
        assertEquals(3, result.selected_tp.size());

        result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts+500, System.currentTimeMillis());
        assertEquals(0, result.max_timestamp);
        assertEquals(0, result.selected_tp.size());

        result = CheckKafkaMsgTimestamp.getTimestamp(bootstrapServer, "test", ts+205, System.currentTimeMillis());
        assertEquals(ts+250, result.max_timestamp);
        assertEquals(2, result.selected_tp.size());
    }
}

解释一下代码,在Rule里面我们用Testcontainer加载了一个Kafka的镜像,然后在Before里面我们启动了Kafka服务,并获取bootstrapserver的地址,然后我们就可以在Kafka里面创建一个包含3个分区的消息主题。

在testGetTimestamp方法中,我们先发送了几条消息到不同的消息分区,然后就可以调用CheckKafkaMsgTimestamp的方法来进行测试和验证了。

如果我们有多个测试要用到同一个Kafka,那么还可以创建一个抽象类,把Kafka的加载定义在这个抽象类中,其他测试类继承这个抽象类。例如

public abstract class AbstractContainerBaseTest {
    public static final KafkaContainer KAFKA_CONTAINER;

    static {
        KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));;
        KAFKA_CONTAINER.start();
    }
}

然后改写一下刚才我们的测试类

public class CheckKafkaMsgTimestampTest extends AbstractContainerBaseTest {
    //@Rule
    //public KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

    @Before
    public void setUp() {
        //kafka.start();
        //bootstrapServer = kafka.getBootstrapServers();
        bootstrapServer = KAFKA_CONTAINER.getBootstrapServers();

 

. . .

相关推荐

额外说明

Android中级-PackageManagerService和Intent

PackageManagerService和Intent PackageManagerService Intent PackageManagerService 从SystemService的main()方法开始执行 public static void

额外说明

Android图表MPandroidChart曲线绘制教程

前言: 本文介绍MpAndroidChart对曲线图绘制的用法,包括引用库的配置及语法使用,帮你从零做一款曲线图或者折线图。 一.MPandroidChart的简介和配置: 1.简介: MPandroidChart是一款稳定实用的绘图库,可以绘制折线图、

额外说明

C# 20.弹出对话框

1. 提示 MessageBox.Show("数据不能为0","提示"); 2. 选择 DialogResult result = MessageBox.Show("确定要删除吗?","删除",MessageBoxButtons.YesNo,Messa

额外说明

day15---(07)课程支付接口开发(准备)

1、在service模块下创建子模块service_order 2、在service_order引入相关依赖 <dependencies> <dependency> <groupId>com.github.wxpay</group

额外说明

Unity中Shader的深度偏移Offset

文章目录 前言 一、深度偏移一般用于什么时候 1、深度偏移一般用于两个模型 重合在同一平面时,在其中一个模型上使用深度偏移后,就能区别出两个模型的深度,从而消除闪动 2、虽然,可以让两个模型在深度上错开一点点,来解决这个闪动的物体,但是,在需求就是需要在

额外说明

数据结构——二分查找法

二分查找法(Binary Search)是一种高效的查找算法,通常用于在已排序的数组或列表中查找特定的目标值。这个算法的基本思想是不断将查找范围缩小为原来的一半,直到找到目标值或确定目标值不存在。 二分查找是一种在每次比较之后将查找空间一分为二的算法。每

额外说明

C++解决win7下文件拖拽无效的问题

        近日测试那边提出这样的bug:在测试文件传输模块时,发现程序在win7下面出现文件拖放失效的问题。刚看到这样的问题,估计是与win7的安全级别有关系,但具体该如何处理却没有多少头绪。于是,带着相关的疑问到网上查阅了一下,找到了相关的处理办

额外说明

React的性能优化(useMemo和useCallback)的使用

一、业务场景 React是一个用于构建用户界面的javascript的库,主要负责将数据转换为视图,保证数据和视图的统一,react在每次数据更新的时候都会重新render来保证数据和视图的统一,但是当父组件内部数据的变化,在父组件下挂载的所有子组件也会

额外说明

Power BI 傻瓜入门 6. 从动态数据源获取数据

本章内容将介绍 发现如何从关系数据库和非关系数据库中提取数据 学习如何使用Power BI使用在线和实时数据源 跨多个数据源应用分析服务 使用Power BI通过静态和动态数据解决纠正措施 数据有时可能有点复杂。诚然,上传一个包含几个电子表格的文件,或者

额外说明

kubernetes VS OpenShift浅析

Kubernetes vs OpenShift浅析 古语有云:“知彼知己,百战不殆。不知彼而知己,一胜一负。不知彼,不知己,每战必殆。” 这句话同样也适用于技术体系。无论我们在落地,还是在学习、实践某一项技术,对提供相同功能的体系框架的对比学习,可以使得

ads via 小工具