Class MockedReceiver

java.lang.Object
fr.ght1pc9kc.testy.beat.messaging.MockedReceiver

public final class MockedReceiver extends Object
Receiver injectable by the extension WithRabbitMock.

Usage:

 @Test
 void my_test(MockedReceiver mockedReceiver, Channel channel) {
     final Flux<Delivery> receivedMessages = mockedReceiver.consumeOne().on("my-queue").start();

     // Send message
     final String message = "test-message";
     channel.basicPublish("", "my-queue", null, message.getBytes());

     // Obtain the messages sent on the queue
     final String actualMessage = receivedMessages.map(d -> new String(d.getBody()))
             .single()
             .block();
     Assertions.assertThat(actualMessage).isEqualTo(message);
 }
 

The number of consumed messages can be parameterized. The Flux returned by MockedReceiver.MockedConsumerBuilder.start() only completes when the expected number of messages is reached.

 @Test
 void my_test(MockedReceiver mockedReceiver, Channel channel) throws IOException {
     final String[] messages = {"test-message-1", "test-message-2"};
     final Flux<Delivery> receivedMessages = mockedReceiver.consume(2).on("my-queue").start();

     // Send message
     channel.basicPublish("", "my-queue", null, messages[0].getBytes());
     channel.basicPublish("", "my-queue", null, messages[1].getBytes());

     // Obtain the messages sent on the queue
     final List<String> actualMessages = receivedMessages.map(d -> new String(d.getBody()))
             .collectList()
             .block();
     Assertions.assertThat(actualMessages).containsExactly(messages);
 }
 

Responses of the receiver can be defined with model AmqpMessage. If there are more responses than requests, the last response is replied indefinitely.

 @Test
 void my_test(MockedReceiver mockedReceiver, Channel channel) throws IOException {
     final String expectedRequest = "test-request";
     final String expectedResponse = "test-response";

     final Flux<Delivery> receivedMessages = mockedReceiver.consumeOne().on("my-queue")
             .thenRespond(AmqpMessage.of(expectedResponse.getBytes()))
             .start();

     final RpcClient rpcClient = new RpcClient(Mono.just(channel), "my-exchange", "", () -> UUID.randomUUID().toString());

     // Send RPC request
     final Delivery response = rpcClient.rpc(Mono.just(new RpcClient.RpcRequest(expectedRequest.getBytes())))
             .block();
     Assertions.assertThat(response).isNotNull();
     Assertions.assertThat(response.getBody()).isNotEmpty();
     Assertions.assertThat(new String(response.getBody())).isEqualTo(expectedResponse);

     // Obtain the messages sent on the queue
     final String actualRequest = receivedMessages.map(d -> new String(d.getBody()))
             .single()
             .block();
     Assertions.assertThat(actualRequest).isEqualTo(expectedRequest);
 }