diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/Redis.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/Redis.java index 198fb5c442..8e92ef879a 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/Redis.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/Redis.java @@ -16,7 +16,12 @@ package org.springframework.integration.redis.dsl; +import java.util.function.Function; + import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.expression.Expression; +import org.springframework.integration.expression.FunctionExpression; +import org.springframework.messaging.Message; /** * Factory class for Redis components. @@ -45,6 +50,54 @@ public static RedisOutboundChannelAdapterSpec outboundChannelAdapter(RedisConnec return new RedisOutboundChannelAdapterSpec(connectionFactory); } + /** + * The factory to produce a {@link RedisQueueInboundChannelAdapterSpec}. + * @param queueName The queueName of the Redis list to build on + * @param connectionFactory the {@link RedisConnectionFactory} to build on + * @return the {@link RedisQueueInboundChannelAdapterSpec} instance + */ + public static RedisQueueInboundChannelAdapterSpec queueInboundChannelAdapter(String queueName, + RedisConnectionFactory connectionFactory) { + + return new RedisQueueInboundChannelAdapterSpec(queueName, connectionFactory); + } + + /** + * The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}. + * @param queueName The queueName of the Redis list to build on + * @param connectionFactory the {@link RedisConnectionFactory} to build on + * @return the {@link RedisQueueOutboundChannelAdapterSpec} instance + */ + public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(String queueName, + RedisConnectionFactory connectionFactory) { + + return new RedisQueueOutboundChannelAdapterSpec(queueName, connectionFactory); + } + + /** + * The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}. + * @param queueExpression The queueExpression of the Redis list to build on + * @param connectionFactory the {@link RedisConnectionFactory} to build on + * @return the {@link RedisQueueOutboundChannelAdapterSpec} instance + */ + public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(Expression queueExpression, + RedisConnectionFactory connectionFactory) { + + return new RedisQueueOutboundChannelAdapterSpec(queueExpression, connectionFactory); + } + + /** + * The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}. + * @param queueFunction The queueExpression of the Redis list to build on + * @param connectionFactory the {@link RedisConnectionFactory} to build on + * @return the {@link RedisQueueOutboundChannelAdapterSpec} instance + */ + public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter( + Function, String> queueFunction, RedisConnectionFactory connectionFactory) { + + return new RedisQueueOutboundChannelAdapterSpec(new FunctionExpression<>(queueFunction), connectionFactory); + } + private Redis() { } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisInboundChannelAdapterSpec.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisInboundChannelAdapterSpec.java index 8dfcca59dc..7ec9d905a4 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisInboundChannelAdapterSpec.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisInboundChannelAdapterSpec.java @@ -25,19 +25,21 @@ import org.springframework.messaging.converter.MessageConverter; /** - * A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapterSpec}. + * A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapter}. * * @author Jiandong Ma * * @since 7.1 */ -public class RedisInboundChannelAdapterSpec extends MessageProducerSpec { +public class RedisInboundChannelAdapterSpec extends + MessageProducerSpec { protected RedisInboundChannelAdapterSpec(RedisConnectionFactory connectionFactory) { this.target = new RedisInboundChannelAdapter(connectionFactory); } /** + * Specify the RedisSerializer to deserialize the body of Redis messages. * @param serializer the serializer * @return the spec * @see RedisInboundChannelAdapter#setSerializer(RedisSerializer) @@ -48,6 +50,7 @@ public RedisInboundChannelAdapterSpec serializer(RedisSerializer serializer) } /** + * Specify the topics to subscribe. * @param topics the topics * @return the spec * @see RedisInboundChannelAdapter#setTopics(String...) @@ -58,6 +61,7 @@ public RedisInboundChannelAdapterSpec topics(String... topics) { } /** + * Specify the topicPatterns to subscribe. * @param topicPatterns the topicPatterns * @return the spec * @see RedisInboundChannelAdapter#setTopicPatterns(String...) @@ -68,6 +72,7 @@ public RedisInboundChannelAdapterSpec topicPatterns(String... topicPatterns) { } /** + * Specify the messageConverter to convert between Redis messages and Spring message payloads. * @param messageConverter the messageConverter * @return the spec * @see RedisInboundChannelAdapter#setMessageConverter(MessageConverter) @@ -78,6 +83,7 @@ public RedisInboundChannelAdapterSpec messageConverter(MessageConverter messageC } /** + * Specify an {@link Executor} for running the message listeners when messages are received. * @param taskExecutor the taskExecutor * @return the spec * @see RedisInboundChannelAdapter#setTaskExecutor(Executor) diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisOutboundChannelAdapterSpec.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisOutboundChannelAdapterSpec.java index a5ace759da..bb6346c586 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisOutboundChannelAdapterSpec.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisOutboundChannelAdapterSpec.java @@ -28,19 +28,21 @@ import org.springframework.messaging.converter.MessageConverter; /** - * A {@link MessageHandlerSpec} for a {@link RedisOutboundChannelAdapterSpec}. + * A {@link MessageHandlerSpec} for a {@link RedisPublishingMessageHandler}. * * @author Jiandong Ma * * @since 7.1 */ -public class RedisOutboundChannelAdapterSpec extends MessageHandlerSpec { +public class RedisOutboundChannelAdapterSpec extends + MessageHandlerSpec { protected RedisOutboundChannelAdapterSpec(RedisConnectionFactory connectionFactory) { this.target = new RedisPublishingMessageHandler(connectionFactory); } /** + * Specify the RedisSerializer to serialize data before sending to the Redis. * @param serializer the serializer * @return the spec * @see RedisPublishingMessageHandler#setSerializer(RedisSerializer) @@ -51,6 +53,7 @@ public RedisOutboundChannelAdapterSpec serializer(RedisSerializer serializer) } /** + * Specify the messageConverter to convert between Redis messages and Spring message payloads. * @param messageConverter the messageConverter * @return the spec * @see RedisPublishingMessageHandler#setMessageConverter(MessageConverter) @@ -61,6 +64,7 @@ public RedisOutboundChannelAdapterSpec messageConverter(MessageConverter message } /** + * Specify the topic to publish messages. * @param topic the topic * @return the spec * @see RedisPublishingMessageHandler#setTopic(String) @@ -71,6 +75,7 @@ public RedisOutboundChannelAdapterSpec topic(String topic) { } /** + * Specify the topicExpression to determine the topic. * @param topicExpression the topicExpression * @return the spec * @see RedisPublishingMessageHandler#setTopicExpression(Expression) diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisQueueInboundChannelAdapterSpec.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisQueueInboundChannelAdapterSpec.java new file mode 100644 index 0000000000..9b14593bdf --- /dev/null +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisQueueInboundChannelAdapterSpec.java @@ -0,0 +1,119 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.dsl; + +import java.time.Duration; +import java.util.concurrent.Executor; + +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.integration.dsl.MessageProducerSpec; +import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint; + +/** + * A {@link MessageProducerSpec} for a {@link RedisQueueMessageDrivenEndpoint}. + * + * @author Jiandong Ma + * + * @since 7.1 + */ +public class RedisQueueInboundChannelAdapterSpec extends + MessageProducerSpec { + + protected RedisQueueInboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) { + this.target = new RedisQueueMessageDrivenEndpoint(queueName, connectionFactory); + } + + /** + * Specify the RedisSerializer to deserialize the body of Redis messages. + * @param serializer the serializer + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setSerializer(RedisSerializer) + */ + public RedisQueueInboundChannelAdapterSpec serializer(RedisSerializer serializer) { + this.target.setSerializer(serializer); + return this; + } + + /** + * Specify whether expects data from the Redis queue to contain entire Message instances. + * @param expectMessage the expectMessage + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setExpectMessage(boolean) + */ + public RedisQueueInboundChannelAdapterSpec expectMessage(boolean expectMessage) { + this.target.setExpectMessage(expectMessage); + return this; + } + + /** + * Specify the timeout for 'pop' operation to wait for a Redis message from the Redis Queue. + * @param receiveTimeout the receiveTimeout + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setReceiveDuration(Duration) + */ + public RedisQueueInboundChannelAdapterSpec receiveDuration(Duration receiveTimeout) { + this.target.setReceiveDuration(receiveTimeout); + return this; + } + + /** + * Specify the timeout for 'pop' operation to wait for a Redis message from the Redis Queue. + * @param receiveTimeout the receiveTimeout + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setReceiveTimeout(long) + */ + public RedisQueueInboundChannelAdapterSpec receiveTimeout(long receiveTimeout) { + this.target.setReceiveTimeout(receiveTimeout); + return this; + } + + /** + * Specify an {@link Executor} for the underlying listening task. + * @param taskExecutor the taskExecutor + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setTaskExecutor(Executor) + */ + public RedisQueueInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) { + this.target.setTaskExecutor(taskExecutor); + return this; + } + + /** + * Specify the time in milliseconds for the underlying listening task should sleep + * after exceptions on the 'pop' operation. + * @param recoveryInterval the recoveryInterval + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setRecoveryInterval(long) + */ + public RedisQueueInboundChannelAdapterSpec recoveryInterval(long recoveryInterval) { + this.target.setRecoveryInterval(recoveryInterval); + return this; + } + + /** + * Specify use "right pop" or "left pop" to read messages from the Redis Queue. + * @param rightPop the rightPop + * @return the spec + * @see RedisQueueMessageDrivenEndpoint#setRightPop(boolean) + */ + public RedisQueueInboundChannelAdapterSpec rightPop(boolean rightPop) { + this.target.setRightPop(rightPop); + return this; + } + +} diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisQueueOutboundChannelAdapterSpec.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisQueueOutboundChannelAdapterSpec.java new file mode 100644 index 0000000000..597eaf6e41 --- /dev/null +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/dsl/RedisQueueOutboundChannelAdapterSpec.java @@ -0,0 +1,76 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.redis.dsl; + +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.expression.Expression; +import org.springframework.integration.dsl.MessageHandlerSpec; +import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter; + +/** + * A {@link MessageHandlerSpec} for a {@link RedisQueueOutboundChannelAdapter}. + * + * @author Jiandong Ma + * + * @since 7.1 + */ +public class RedisQueueOutboundChannelAdapterSpec extends + MessageHandlerSpec { + + protected RedisQueueOutboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) { + this.target = new RedisQueueOutboundChannelAdapter(queueName, connectionFactory); + } + + protected RedisQueueOutboundChannelAdapterSpec(Expression queueExpression, RedisConnectionFactory connectionFactory) { + this.target = new RedisQueueOutboundChannelAdapter(queueExpression, connectionFactory); + } + + /** + * Specify send only the payload or the entire Message to the Redis queue. + * @param extractPayload the extractPayload + * @return the spec + * @see RedisQueueOutboundChannelAdapter#setExtractPayload(boolean) + */ + public RedisQueueOutboundChannelAdapterSpec extractPayload(boolean extractPayload) { + this.target.setExtractPayload(extractPayload); + return this; + } + + /** + * Specify the RedisSerializer to serialize data before sending to the Redis Queue. + * @param serializer the serializer + * @return the spec + * @see RedisQueueOutboundChannelAdapter#setSerializer(RedisSerializer) + */ + public RedisQueueOutboundChannelAdapterSpec serializer(RedisSerializer serializer) { + this.target.setSerializer(serializer); + return this; + } + + /** + * Specify use "left push" or "right push" to write messages to the Redis Queue. + * @param leftPush the leftPush + * @return the spec + * @see RedisQueueOutboundChannelAdapter#setLeftPush(boolean) + */ + public RedisQueueOutboundChannelAdapterSpec leftPush(boolean leftPush) { + this.target.setLeftPush(leftPush); + return this; + } + +} diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/dsl/RedisTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/dsl/RedisTests.java index 571aa60f58..d241d17ab7 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/dsl/RedisTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/dsl/RedisTests.java @@ -65,6 +65,10 @@ class RedisTests implements RedisContainerTest { static final String TOPIC_FOR_OUTBOUND_CHANNEL_ADAPTER = "dslOutboundChannelAdapterTopic"; + static final String QUEUE_NAME_FOR_QUEUE_INBOUND_CHANNEL_ADAPTER = "dslQueueInboundChannelAdapter"; + + static final String QUEUE_NAME_FOR_QUEUE_OUTBOUND_CHANNEL_ADAPTER = "dslQueueOutboundChannelAdapter"; + @Autowired RedisConnectionFactory connectionFactory; @@ -78,6 +82,13 @@ class RedisTests implements RedisContainerTest { @Qualifier("outboundChannelAdapterFlow.input") MessageChannel outboundChannelAdapterInputChannel; + @Autowired + QueueChannel queueInboundChannelAdapterOutputChannel; + + @Autowired + @Qualifier("queueOutboundChannelAdapterFlow.input") + MessageChannel queueOutboundChannelAdapterInputChannel; + @Test void testInboundChannelAdapterFlow() throws Exception { StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); @@ -147,6 +158,46 @@ public void onMessage(org.springframework.data.redis.connection.Message message, container.stop(); } + @Test + void testQueueInboundChannelAdapterFlow() { + // Given + int numToTest = 10; + StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); + var listOps = redisTemplate.boundListOps(QUEUE_NAME_FOR_QUEUE_INBOUND_CHANNEL_ADAPTER); + for (int i = 0; i < numToTest; i++) { + listOps.leftPush("queue-inbound-message-" + i); + } + + for (int i = 0; i < numToTest; i++) { + // When + Message message = queueInboundChannelAdapterOutputChannel.receive(10000); + // Then + assertThat(message).isNotNull() + .extracting(Message::getPayload) + .isEqualTo("queue-inbound-message-" + i); + } + } + + @Test + void testQueueOutboundChannelAdapterFlow() { + // Given + int numToTest = 10; + for (int i = 0; i < numToTest; i++) { + queueOutboundChannelAdapterInputChannel.send(MessageBuilder.withPayload("queue-outbound-message-" + i).build()); + } + + StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory); + var listOps = redisTemplate.boundListOps(QUEUE_NAME_FOR_QUEUE_OUTBOUND_CHANNEL_ADAPTER); + for (int i = 0; i < numToTest; i++) { + // When + String msg = listOps.rightPop(); + // Then + assertThat(msg) + .isNotNull() + .isEqualTo("queue-outbound-message-" + i); + } + } + @Configuration(proxyBeanMethods = false) @EnableIntegration static class Config { @@ -173,6 +224,24 @@ IntegrationFlow outboundChannelAdapterFlow(RedisConnectionFactory redisConnectio .topicExpression(new LiteralExpression(TOPIC_FOR_OUTBOUND_CHANNEL_ADAPTER))); } + @Bean + IntegrationFlow queueInboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) { + return IntegrationFlow.from(Redis + .queueInboundChannelAdapter(QUEUE_NAME_FOR_QUEUE_INBOUND_CHANNEL_ADAPTER, redisConnectionFactory) + .serializer(RedisSerializer.string()) + ) + .channel(c -> c.queue("queueInboundChannelAdapterOutputChannel")) + .get(); + } + + @Bean + IntegrationFlow queueOutboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) { + return flow -> flow + .handle(Redis.queueOutboundChannelAdapter(QUEUE_NAME_FOR_QUEUE_OUTBOUND_CHANNEL_ADAPTER, redisConnectionFactory) + .serializer(RedisSerializer.string()) + ); + } + } } diff --git a/src/reference/antora/modules/ROOT/pages/redis.adoc b/src/reference/antora/modules/ROOT/pages/redis.adoc index d325574c54..abcfc027a1 100644 --- a/src/reference/antora/modules/ROOT/pages/redis.adoc +++ b/src/reference/antora/modules/ROOT/pages/redis.adoc @@ -148,13 +148,17 @@ Java DSL:: RedisConnectionFactory connectionFactory() { return RedisContainerTest.connectionFactory(); } - +@Bean +MessageConverter testConverter() { + return new SimpleMessageConverter(); +} @Bean IntegrationFlow inboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) { return IntegrationFlow.from(Redis .inboundChannelAdapter(redisConnectionFactory) - .topics(TOPIC_FOR_INBOUND_CHANNEL_ADAPTER)) + .topics(TOPIC_FOR_INBOUND_CHANNEL_ADAPTER) + .messageConverter(testConverter())) .channel(c -> c.queue("inboundChannelAdapterQueueChannel")) .get(); } @@ -169,12 +173,16 @@ Java:: RedisConnectionFactory connectionFactory() { return RedisContainerTest.connectionFactory(); } - +@Bean +MessageConverter testConverter() { + return new SimpleMessageConverter(); +} @Bean RedisInboundChannelAdapter inboundChannelAdapter(RedisConnectionFactory connectionFactory) { var adapter = new RedisInboundChannelAdapter(connectionFactory); adapter.setTopics(redisChannelName); adapter.setOutputChannel(channel); + adapter.setMessageConverter(testConverter()); return adapter; } ---- @@ -240,11 +248,17 @@ RedisConnectionFactory connectionFactory() { return RedisContainerTest.connectionFactory(); } +@Bean +MessageConverter testConverter() { + return new SimpleMessageConverter(); +} + @Bean IntegrationFlow outboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) { return flow -> flow .handle(Redis.outboundChannelAdapter(redisConnectionFactory) - .topicExpression(new LiteralExpression(TOPIC_FOR_OUTBOUND_CHANNEL_ADAPTER))); + .topicExpression(new LiteralExpression(TOPIC_FOR_OUTBOUND_CHANNEL_ADAPTER)) + .messageConverter(testConverter())); } ---- @@ -257,10 +271,16 @@ RedisConnectionFactory connectionFactory() { return RedisContainerTest.connectionFactory(); } +@Bean +MessageConverter testConverter() { + return new SimpleMessageConverter(); +} + @Bean RedisPublishingMessageHandler outboundChannelAdapter(RedisConnectionFactory connectionFactory) { var handler = new RedisPublishingMessageHandler(connectionFactory); handler.setTopicExpression(new LiteralExpression(topic)); + handler.setMessageConverter(testConverter()); return handler; } ---- @@ -301,7 +321,62 @@ It uses an internal listener thread and does not use a poller. The following listing shows all the available attributes for `queue-inbound-channel-adapter`: -[source,xml] +[tabs] +====== +Java DSL:: ++ +[source, java, role="primary"] +---- +@Bean +IntegrationFlow queueInboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) { + return IntegrationFlow.from(Redis + .queueInboundChannelAdapter( + queueName, <6> + redisConnectionFactory) <5> + .id() <1> + .outputChannel() <2> + .autoStartup() <3> + .phase() <4> + .errorChannel() <7> + .serializer() <8> + .receiveTimeout() <9> + .recoveryInterval() <10> + .expectMessage() <11> + .taskExecutor() <12> + .rightPop() <13> + ) + .get(); +} +---- + +Java:: ++ +[source, java, role="secondary"] +---- +@Bean +RedisQueueMessageDrivenEndpoint queueInboundChannelAdapter() { + var adapter = new RedisQueueMessageDrivenEndpoint( + queueName, <6> + redisConnectionFactory <5> + ); + adapter.setBeanName(); <1> + adapter.setOutputChannel(); <2> + adapter.setAutoStartup(); <3> + adapter.setPhase(); <4> + adapter.setErrorChannel(); <7> + adapter.setSerializer(); <8> + adapter.setReceiveTimeout(); <9> + adapter.setRecoveryInterval(); <10> + adapter.setExpectMessage(); <11> + adapter.setTaskExecutor(); <12> + adapter.setRightPop(); <13> + return adapter; +} +---- + +XML:: ++ +[source,xml, role="secondary"] ---- channel="" <2> @@ -319,6 +394,8 @@ The following listing shows all the available attributes for `queue-inbound-chan ---- +====== + <1> The component bean name. If `channel` attribute not provided, a `DirectChannel` is created and registered in the application context with this `id` attribute as the bean name. In this case, the endpoint itself is registered with the bean name `id` plus `.adapter`. @@ -363,7 +440,50 @@ Spring Integration 3.0 introduced a queue outbound channel adapter to "`push`" t By default, it uses "`left push`", but "`right push`" can be configured instead. The following listing shows all the available attributes for a Redis `queue-outbound-channel-adapter`: -[source,xml] +[tabs] +====== +Java DSL:: ++ +[source, java, role="primary"] +---- +@Bean +IntegrationFlow queueOutboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) { + return IntegrationFlow.from("inputChannel") <2> + .handle(Redis.queueOutboundChannelAdapter( + queueName <4> + or queueExpression, <5> + redisConnectionFactory) <3> + .serializer() <6> + .extractPayload() <7> + .leftPush() <8> + , e->e.id()) <1> + .get(); +} +---- + +Java:: ++ +[source, Java, role="secondary"] +---- +@Bean +@ServiceActivator(inputChannel = "channel") <2> +RedisQueueOutboundChannelAdapter queueOutboundChannelAdapter() { + var adapter = new RedisQueueOutboundChannelAdapter( + queueName <4> + or queueExpression, <5> + redisConnectionFactory <3> + ); + adapter.setBeanName(); <1> + adapter.setSerializer(); <6> + adapter.setExtractPayload(); <7> + adapter.setLeftPush(); <8> + return adapter; +} +---- + +XML:: ++ +[source, xml, role="secondary"] ---- channel="" <2> @@ -376,6 +496,8 @@ The following listing shows all the available attributes for a Redis `queue-outb ---- +====== + <1> The component bean name. If the `channel` attribute is not provided, a `DirectChannel` is created and registered in the application context with this `id` attribute as the bean name. In this case, the endpoint is registered with a bean name of `id` plus `.adapter`.