Spring Boot集成Java DSL的实现代码

网友投稿 285 2023-07-12

Spring Boot集成Java DSL的实现代码

Spring Integration java DSL已经融合到Spring Integration Core 5.0,这是一个聪明而明显的举动,因为:

基于Java Config启动新Spring项目的每个人都使用它

SI Java DSL使您可以使用Lambdas等新的强大Java 8功能

您可以使用 基于IntegrationFlowBuilder的Builder模式构建流

让我们看看基于ActiveMQ JMS的示例如何使用它。

Maven依赖:

org.springframework.boot

spring-boot-starter-activemq

org.springframework.integration

spring-integration-core

org.springframework.integration

spring-integration-jms

org.springframework.boot

spring-boot-starter-test

test

org.apache.activemq

activemq-kahadb-store

org.springframework.integration

spring-integration-java-dsl

1.2.3.RELEASE

示例1:Jms入站网关

我们有以下ServiceActivator:

@Service

public class ActiveMQEndpoint {

@ServiceActivator(inputChannel = "inboundChannel")

public void processMessage(final String inboundPayload) {

System.out.println("Inbound message: "+inboundPayload);

}

}

如果您想使用SI Java DSL 将inboundPayload从Jms队列发送到Gateway风格的激活器,那么请使用DSLJms工厂:

@Bean

public DynamicDestinationResolver dynamicDestinationResolver() {

return new DynamicDestinationResolver();

}

@Bean

public ActiveMQConnectionFactory connectionFactory() {

return new ActiveMQConnectionFactory();

}

@Bean

public DefaultMessageListenerContainer listenerContainer() {

final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();

defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());

defaultMessageListenerContainer.setConnectionFactory(connectionFactory());

defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");

return defaultMessageListenerContainer;

}

@Bean

public MessageChannel inboundChannel() {

return MessageChannels.direct("inboundChannel").get();

}

@Bean

public JmsInboundGateway dataEndpoint() {

return Jms.inboundGateway(listenerContainer())

.requestChannel(inboundChannel()).get();

}

通过dataEndpoint bean 返回JmsInboundGatewaySpec,您还可以向SI通道或Jms目标发送回复。查看文档。

示例2:Jms消息驱动的通道适配器

如果您正在寻找替换消息驱动通道适配器的XML JMS配置,那么JmsMessageDrivenChannelAdapter是一种适合您的方式:

@Bean

public DynamicDestinationResolver dynamicDestinationResolver() {

return new DynamicDestinationResolver();

}

@Bean

public ActiveMQConnectionFactory connectionFactory() {

return new ActiveMQConnectionFactory();

}

@Bean

public DefaultMessageListenerContainer listenerContainer() {

final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();

defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());

defaultMessageListenerContainer.setConnectionFactory(connectionFactory());

defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");

return defaultMessageListenerContainer;

}

@Bean

public MessageChannel inboundChannel() {

return MessageChannels.direct("inboundChannel").get();

}

@Bean

public JmsMessageDrivenChannelAdapter dataEndpoint() {

final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =

new ChannelPublishingJmsMessageListener();

channelPublishingJmsMessageListener.setExpectReply(false);

final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new

JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener

);

messageDrivenChannelAdapter.setOutputChannel(inboundChannel());

return messageDrivenChannelAdapter;

}

与前面的示例一样,入站有效负载如样本1中一样发送给激活器。

示例3:使用JAXB的Jms消息驱动的通道适配器

在典型的场景中,您希望通过Jms接受XML作为文本消息,将其转换为JAXB存根并在服务激活器中处理它。我将向您展示如何使用SI Java DSL执行此操作,但首先让我们为xml处理添加两个依赖项:

org.springframework.integration

spring-integration-xml

org.springframework

spring-oxm

我们将通过JMS接受shiporders ,所以首先XSD命名为shiporder.xsd:

新增JAXB maven plugin 生成JAXB存根:

org.codehaus.mojo

jaxb2-maven-plugin

2.3.1

xjc-schema1

xjc

src/main/resources/xsds/shiporder.xsd

com.example.stubs

src/main/java

false

我们已经准备好了存根类和一切,现在使用Jaxb magic的Java DSL JMS消息驱动适配器:

/**

* Sample 3: Jms message driven adapter with JAXB

*/

@Bean

public JmsMessageDrivenChannelAdapter dataEndpoint() {

final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =

new ChannelPublishingJmsMessageListener();

channelPublishingJmsMessageListener.setExpectReply(false);

channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()));

final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new

JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener

);

messageDrivenChannelAdapter.setOutputChannel(inboundChannel());

return messageDrivenChannelAdapter;

}

@Bean

public Jaxb2Marshaller shipOrdersMarshaller() {

Jaxb2Marshaller marshaller = new Jaxb2Marshaller();

marshaller.setContextPath("com.example.stubs");

return marshaller;

}

XML配置在Java中使用它可以为您提供如此强大的功能和灵活性。要完成此示例,inboundChannel的服务激活器将如下所示:

/**

* Sample 3

* @param shiporder

*/

@ServiceActivator(inputChannel = "inboundChannel")

public void processMessage(final Shiporder shiporder) {

System.out.println(shiporder.getOrderid());

System.out.println(shiporder.getOrderperson());

}

要测试流,您可以使用以下XML通过JConsole发送到JMS队列:

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:noNamespaceSchemaLocation="shiporder.xsd">

John Smith

Ola Nordmann

Langgt 23

xmlns:xsi="http://w3.org/2001/XMLSchema-instance"

xsi:noNamespaceSchemaLocation="shiporder.xsd">

John Smith

Ola Nordmann

4000 Stavanger

Norway

Special Edition

1

10.90

1

9.90

示例4:具有JAXB和有效负载根路由的Jms消息驱动的通道适配器

另一种典型情况是接受XML作为JMS文本消息,将其转换为JAXB存根并根据有效负载根类型将有效负载路由到某个服务激活器。当然SI Java DSL支持所有类型的路由,我将向您展示如何根据有效载荷类型进行路由。

首先,将以下XSD添加到shiporder.xsd所在的文件夹中,并将其命名为purchaseorder.xsd:

xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd"

targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd"

elementFormDefault="qualified">

xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd"

targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd"

elementFormDefault="qualified">

然后添加到jaxb maven插件配置:

org.codehaus.mojo

jaxb2-maven-plugin

2.3.1

xjc-schema1

xjc

src/main/resources/xsds/shiporder.xsd

src/main/resources/xsds/purchaseorder.xsd

com.example.stubs

src/main/java

false

运行mvn clean install以生成新XSD的JAXB存根。现在承诺有效负载根映射:

@Bean

public Jaxb2Marshaller ordersMarshaller() {

Jaxb2Marshaller marshaller = new Jaxb2Marshaller();

marshaller.setContextPath("com.example.stubs");

return marshaller;

}

/**

* Sample 4: Jms message driven adapter with Jaxb and Payload routing.

* @return

*/

@Bean

public JmsMessageDrivenChannelAdapter dataEndpoint() {

final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =

new ChannelPublishingJmsMessageListener();

channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller()));

final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new

JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener

);

messageDrivenChannelAdapter.setOutputChannel(inboundChannel());

return messageDrivenChannelAdapter;

}

@Bean

public IntegrationFlow payloadRootMapping() {

return IntegrationFlows.from(inboundChannel()).>route(Object::getClass, m->m

.subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> {

final Shiporder shiporder = (Shiporder) message.getPayload();

System.out.println(shiporder.getOrderperson());

System.out.println(shiporder.getOrderid());

}))

.subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> {

final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();

System.out.println(purchaseOrderType.getBillTo().getName());

}))

).get();

}

注意payloadRootMapping bean,让我们解释一下重要的部分:

> route - 表示来自inboundChannel的输入将是Object,并且将根据Class <?>执行路由

subFlowMapping(Shiporder.class.. - ShipOders的处理。

subFlowMapping(PurchaseOrder.class ... - 处理PurchaseOrders。

要测试ShipOrder有效负载,请使用示例3中的XML,以测试PurchaseOrder有效负载,使用以下XML:

name1

street1

city1

state1

1

name2

street2

city2

state2

-79228162514264337593543950335

name1

street1

city1

state1

1

应根据subflow 子流Map路由两个有效载荷。

示例5:IntegrationFlowAdapter

除了企业集成模式的其他实现(check them out)),我需要提到IntegrationFlowAdapter。通过扩展此类并实现buildFlow方法,如:

[url=https://bitbucket.org/Component/]@Component[/url]

public class MyFlowAdapter extends IntegrationFlowAdapter {

@Autowired

private ConnectionFactory rabbitConnectionFactory;

@Override

protected IntegrationFlowDefinition> buildFlow() {

return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue"))

.transform(String::toLowerCase)

.channel(c -> c.queue("myFlowAdapterOutput"));

}

你可以将bean的重复声明包装成一个组件并给它们所需的流量。然后可以配置这样的组件并将其作为一个类实例提供给调用代码!

因此,让我们举例说明这个repo中的示例3更短一些,并为所有JmsEndpoints定义基类,并在其中定义重复bean:

public class JmsEndpoint extends IntegrationFlowAdapter {

private String queueName;

private String channelName;

private String contextPath;

/**

* @param queueName

* @param channelName

* @param contextPath

*/

public JmsEndpoint(String queueName, String channelName, String contextPath) {

this.queueName = queueName;

this.channelName = channelName;

this.contextPath = contextPath;

}

@Override

protected IntegrationFlowDefinition> buildFlow() {

return from(Jms.messageDrivenChannelAdapter(listenerContainer())

.jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()))

).channel(channelName);

}

@Bean

public Jaxb2Marshaller shipOrdersMarshaller() {

Jaxb2Marshaller marshaller = new Jaxb2Marshaller();

marshaller.setContextPath(contextPath);

return marshaller;

}

@Bean

public DynamicDestinationResolver dynamicDestinationResolver() {

return new DynamicDestinationResolver();

}

@Bean

public ActiveMQConnectionFactory connectionFactory() {

return new ActiveMQConnectionFactory();

}

@Bean

public DefaultMessageListenerContainer listenerContainer() {

final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();

defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());

defaultMessageListenerContainer.setConnectionFactory(connectionFactory());

defaultMessageListenerContainer.setDestinationName(queueName);

return defaultMessageListenerContainer;

}

@Bean

public MessageChannel inboundChannel() {

return MessageChannels.direct(channelName).get();

}

}

现在声明特定队列的Jms端点很容易:

@Bean

public JmsEndpoint jmsEndpoint() {

return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs");

}

inboundChannel的服务激活器:

/**

* Sample 3, 5

* @param shiporder

*/

@ServiceActivator(inputChannel = "inboundChannel")

public void processMessage(final Shiporder shiporder) {

System.out.println(shiporder.getOrderid());

System.out.println(shiporder.getOrderperson());

}

您不应该错过在项目中使用IntegrationFlowAdapter。我喜欢它的概念。

我最近在Embedit的新的基于Spring Boot的项目中开始使用Spring Integration Java DSL 。即使有一些配置,我发现它非常有用。

它很容易调试。不添加像wiretap这样的配置。

阅读起来要容易得多。是的,即使是lambdas!

它很强大。在Java配置中,您现在有很多选择。

源码地址:https://bitbucket.org/tomask79/spring-integration-java-dsl

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:在Spring中自动装配Bean的属性
下一篇:关于Spring中Bean的创建进行更多方面的控制
相关文章

 发表评论

暂时没有评论,来抢沙发吧~