Issue
I have been trying to implement the following solution:
My application is expecting to consume message A
from all-messages
, do some business logic and then producing
message B
back into all-messages
.
The reason why I am using StreamBridge
instead of a Function<A,B>
is because I want the producing side to work with an arbitrary
number of produced messages, but for the sake of this example I am trying to simplify the scenario to just one.
Additionally, there is a custom router function in order to avoid an infinite loop, that dispatch the incoming messages
into the appropriate consumer, either incoming
or discarded
, would be great to have a way to effectively discard messages.
That being said I cannot quite get the implementation right using Spring Cloud Stream.
I'd like your help to understand what am I doing wrong and how to fix the current configure/setup in order to make the solution work as expected, specifically:
- Is this solution supported by Spring cloud stream?
- Is my application configuration correctly implementing the solution diagram above?
- Which bindings exactly should I use inside the app when sending/receiving messages?
The major headache comes from the bindings, so I tried to write a test with different combinations of incoming and outgoing bindings to see what is what, something along these lines:
class ScsProblemTests {
/* ... */
@ParameterizedTest
@MethodSource("bindings")
void consumeFromAndProduceIntoSharedTopic(String incomingBinding, String outgoingBinding) {
givenNoOutgoingMessages();
whenAnIncomingMessageArrives(incomingBinding);
thenEventuallyAnOutgoingMessageIsProduced(outgoingBinding);
}
public static Stream<Arguments> bindings() {
return Stream.of(
Arguments.of(null, null),
Arguments.of(null, "outgoing-out-0"),
Arguments.of(null, "all-messages"),
Arguments.of("incoming-in-0", null),
Arguments.of("incoming-in-0", "outgoing-out-0"),
Arguments.of("incoming-in-0", "all-messages"),
Arguments.of("all-messages", null),
Arguments.of("all-messages", "outgoing-out-0"),
Arguments.of("all-messages", "all-messages")
);
}
/* ... */
}
I am running this set of tests with a overrides
spring profile, where I setup the destination overrides as per the diagram, I am also testing the same set with a different spring profile with no overrides, again just for having a control group to compare with. Only 2 tests from the no-overrides
profile pass, the rest fails.
The no-overrides
profile does not match the design obviously, but I was curious to see how the overrides where affecting the results, specifically the no-overrides
tests that passes are the one where:
- incomingBinding=null, outgoingBinding=null
- incomingBinding=null, outgoingBinding=outgoing-out-0
And for my understanding of Spring Cloud Stream, even in this no-overrides
case, I'd expect also the following to pass (the are not):
- incomingBinding=incoming-in-0, outgoingBinding=null
- incomingBinding=incoming-in-0, outgoingBinding=outgoing-out-0
At this point I am starting to think I misunderstood some concepts behind Spring Cloud Stream, but I really hope you can provide some useful advise.
I have shared my code into this repository for convenience.
Thanks in advance.
Solution
Ok here are the problems in my implementation and how to fix them:
- I had a typo in the routing configuration, so wasn't enabled, this is how to enable the function router in spring cloud:
spring:
cloud:
stream:
function:
routing:
enabled: true
- Because I have a function router involved, I don't need anymore to configure my
incoming-in-0
binding instead I need to configure a destination for the function router:
spring:
cloud:
stream:
bindings:
functionRouter-in-0:
destination: all-messages
outgoing-out-0:
destination: all-messages
source: outgoing
# ...
- I misunderstood bindings/destinations and how to use the testing helpers provided by the framework, specifically
InputDestination
andOutputDestination
. I was not sure what parameters should I use to send a message or receive one. The answer is that those components are there to simulate the real binder (e.g. RabbitMQ, Kafka, etc), and they have no knowledge of abinding
(which is a construct from spring cloud), they only know aboutdestination
. So in my case that translated into something like this:
@SpringBootTest
@Import(TestChannelBinderConfiguration.class)
class ScsProblemTests {
@Autowired
private InputDestination input;
@Autowired
private OutputDestination output;
/* ... */
@Test
void consumeFromAndProduceIntoSharedTopic() {
// prepare message A ...
// simulate message "A" arriving into "all-messages"
input.send(messageA, "all-messages")
// ...
// application will pick up the message
// the function router will dispatch the message to the right consumer
// the consumer does some business logic
// eventually a message "B" should be produced into "all-messages"
// check if "all-messages" contains message "B"
// NOTE: "all-messages" will contain both "A" and "B"
var discard = output.receive(1000, "all-messages"); // message A
var messageB = output.receive(1000, "all-messages");
// assertions ...
}
/* ... */
}
Note: as per the comments in the pseudo-code, the final state is represented by having both A
and B
in all-messages
, in this case the OutputDestination
is simply a window to the shared channel, which obviously will also contain the initial message we sent.
Hopefully this makes sense. I cleaned and pushed a working version of the code into a fixed
branch in the same repository so you can see the actual fixes.
Answered By - th3n3rd
Answer Checked By - David Marino (JavaFixing Volunteer)