Issue
I am using ActiveMQ Artemis 2.19.1. I created producer and consumer apps using Spring Boot. I need multiple instances of the consumer to receive all the messages (multicast). I configured a Last Value Queue like this (broker.xml
):
<address-settings>
<address-setting match="quote.#">
<max-size-bytes>1000000000</max-size-bytes> <!-- 1GB -->
<address-full-policy>BLOCK</address-full-policy>
<default-last-value-key>symbol</default-last-value-key>
<default-last-value-queue>true</default-last-value-queue>
<default-non-destructive>true</default-non-destructive>
</address-setting>
...
</address-settings>
Sending is like this and appears to work correctly. "symbol" is the VLQ key.
import org.springframework.jms.core.JmsTemplate;
@Service
public class DispatcherService {
@Autowired
JmsTemplate jmsTemplate;
public void sendMessageA(String message) {
jmsTemplate.convertAndSend(jmsQueue, message, m-> {
m.setStringProperty("symbol", "ABC");
return m;
});
}
If Spring Boot applicaiton.properties
has:
spring.jms.pub-sub-domain=true
...then all clients receive all messages when published (good). However, the most recent message is not published to new clients when they start and subscribe to the topic.
If instead using:
spring.jms.pub-sub-domain=false
I can see the last message remains in the Last Value Queue (good) and connecting consumers get the last msg. However as messages are published they're distributed round-robin (anycast), not all messages to all consumers.
How can I make sure clients connecting to a LVQ receive the most recent message then all future messages, not just a round-robin distribution of future messages?
EDIT:
Doing this works. Just leave spring.jms.pub-sub-domain=true and set retroactive-message-count greater than the number of symbols that may be encountered otherwise some will not be retained:
<address-setting match="quotes">
<retroactive-message-count>100000</retroactive-message-count>
</address-setting>
<address-setting match="*.*.*.quotes.*.retro">
<default-last-value-key>symbol</default-last-value-key>
</address-setting>
Solution
It sounds to me like everything is working as designed. I believe your expectations are being thwarted because you're using pub/sub (i.e. JMS topics).
Let me provide a bit of background. When a JMS client creates a subscription on a topic the broker responds by creating a multicast queue on the address with the same name. The queue is named according to the kind of subscription it is. If it is a non-durable subscription then the queue is named with a UUID. If it is a durable subscription then the queue is named according to the subscription name provided by the client and the client ID (if available). When a message is sent to the address it is put in all the multicast queues bound to that address.
Therefore, when a new non-durable subscription is created a new queue for that subscription is also created which means that the subscriber will receive none of the messages sent to the topic prior to the creation of the subscription. This is the expected behavior for JMS topics (i.e. normal pub/sub semantics). Also, since the queue for a non-durable subscription is only available while the subscriber is connected that means there's no way to enforce LVQ semantics since any message which arrives in the queue will be immediately dispatched to the consumer. In short, LVQ with JMS topics doesn't make a lot of sense.
The behavior changes when you use a JMS queue because the queue is always there to receive messages. Consumers can come and go as they please while the broker enforces LVQ semantics.
One possible solution would be to create a special "initialization" queue where consumers could initially connect to get the latest information and after that they could subscribe to the JMS topic to get the pub/sub semantics you need. You could use a divert to make this transparent for the applications sending the messages so they can continue to just send to the JMS topic. Here's sample configuration:
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
...
<diverts>
<divert name="myDivert">
<address>myTopic</address>
<forwarding-address>initQueue</forwarding-address>
<exclusive>false</exclusive>
</divert>
</diverts>
...
<addresses>
<address name="myTopic">
<multicast/>
</address>
<address name="initQueue">
<anycast>
<queue name="initQueue" last-value-key="symbol" non-destructive="true" />
</anycast>
</address>
...
</addresses>
</core>
</configuration>
Using this configuration every message send to the JMS topic myTopic
will transparently sent to initQueue
as well. This queue will keep only the most up-to-date messages since it using last-value semantics. Also, those up-to-date messages will stay in the queue for any subsequent consumer since the queue is non-destructive.
The only difficulty I anticipate here is with Spring which may not provide you with the flexibility to create the initial queue consumer and then create a topic subscriber. If you used the JMS API directly this would be a relatively simple matter.
Another potential solution would be to use retroactive addresses. The main thing to do here would be to ensure the internal ring queues were LVQs. You can do that with the default-last-value-key
address-setting
. See the documentation details on the match
to use.
Answered By - Justin Bertram
Answer Checked By - David Marino (JavaFixing Volunteer)