Issue
I am using Spring's spring-integration-mqtt and i can connect to a single Mqtt server and can receive messages on subscribed topics , and now i want to make application which can connect to multiple Mqtt Servers and can receive data from every connection and i want to manage it as dynamic where i can add more Mqtt servers from database or text file.
a simple bean for single Mqtt connection for subscription is as follow
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
above code creates a connection for the mqtt server and can receive messages and if i copy paste the same code twice for second server with different Mqtt ip address i can connect to both Mqtt Server as follows
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
@Bean
public MessageProducer inbound2() {
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel());
return adapter2;
}
above code also works fine and i can receive message from both Mqtt Servers, but is there any way i can manage it dynamically like as follows, i change the bean's return type to list, but didn't worked:
@Bean
public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
MqttPahoMessageDrivenChannelAdapter adapter2 =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter2.setCompletionTimeout(0);
adapter2.setConverter(new DefaultPahoMessageConverter());
adapter2.setQos(2);
adapter2.setOutputChannel(mqttInputChannel() );
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
"DATA/#", "LD/#","CONF/#","CONFIG/#");
adapter.setCompletionTimeout(0);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel() );
logConfList.add(adapter);
logConfList.add(adapter2);
return logConfList;
}
is there any way i can manage these beans dynamically, where i can fetch mqtt server details from text file and in a for loop or something i can manage multiple connections.
Solution
See Dynamic and runtime Integration Flows.
@Autowired
private IntegrationFlowContext flowContext;
private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
String... topics) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
// more adapter configuration
IntegrationFlow flow = IntegrationFlows.from(adapter)
.channel(channel)
.get();
return this.flowContext.registration(flow).register();
}
private void removeAdapter(IntegrationFlowRegistration flowReg) {
this.flowContext.remove(flowReg.getId());
}
Answered By - Gary Russell
Answer Checked By - Candace Johnson (JavaFixing Volunteer)