Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package org.springframework.integration.redis.dsl;

import java.util.function.Function;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.expression.Expression;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.messaging.Message;

/**
* Factory class for Redis components.
Expand Down Expand Up @@ -45,6 +50,46 @@ public static RedisOutboundChannelAdapterSpec outboundChannelAdapter(RedisConnec
return new RedisOutboundChannelAdapterSpec(connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueInboundChannelAdapterSpec}.
* @param queueName The queueName of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueInboundChannelAdapterSpec} instance
*/
public static RedisQueueInboundChannelAdapterSpec queueInboundChannelAdapter(String queueName, RedisConnectionFactory connectionFactory) {
return new RedisQueueInboundChannelAdapterSpec(queueName, connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
* @param queueName The queueName of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
*/
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(String queueName, RedisConnectionFactory connectionFactory) {
return new RedisQueueOutboundChannelAdapterSpec(queueName, connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
* @param queueExpression The queueExpression of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
*/
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(Expression queueExpression, RedisConnectionFactory connectionFactory) {
return new RedisQueueOutboundChannelAdapterSpec(queueExpression, connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
* @param queueFunction The queueExpression of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
*/
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(Function<Message<?>, String> queueFunction, RedisConnectionFactory connectionFactory) {
return new RedisQueueOutboundChannelAdapterSpec(new FunctionExpression<>(queueFunction), connectionFactory);
}

private Redis() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.springframework.messaging.converter.MessageConverter;

/**
* A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapterSpec}.
* A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapter}.
*
* @author Jiandong Ma
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.springframework.messaging.converter.MessageConverter;

/**
* A {@link MessageHandlerSpec} for a {@link RedisOutboundChannelAdapterSpec}.
* A {@link MessageHandlerSpec} for a {@link RedisPublishingMessageHandler}.
*
* @author Jiandong Ma
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.dsl;

import java.time.Duration;
import java.util.concurrent.Executor;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;

/**
* A {@link MessageProducerSpec} for a {@link RedisQueueMessageDrivenEndpoint}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisQueueInboundChannelAdapterSpec extends MessageProducerSpec<RedisQueueInboundChannelAdapterSpec, RedisQueueMessageDrivenEndpoint> {

protected RedisQueueInboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) {
this.target = new RedisQueueMessageDrivenEndpoint(queueName, connectionFactory);
}

/**
* @param serializer the serializer
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setSerializer(RedisSerializer)
*/
public RedisQueueInboundChannelAdapterSpec serializer(RedisSerializer<?> serializer) {
this.target.setSerializer(serializer);
return this;
}

/**
* @param expectMessage the expectMessage
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setExpectMessage(boolean)
*/
public RedisQueueInboundChannelAdapterSpec expectMessage(boolean expectMessage) {
this.target.setExpectMessage(expectMessage);
return this;
}

/**
* @param receiveTimeout the receiveTimeout
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setReceiveDuration(Duration)
*/
public RedisQueueInboundChannelAdapterSpec receiveDuration(Duration receiveTimeout) {
this.target.setReceiveDuration(receiveTimeout);
return this;
}

/**
* @param receiveTimeout the receiveTimeout
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setReceiveTimeout(long)
*/
public RedisQueueInboundChannelAdapterSpec receiveTimeout(long receiveTimeout) {
this.target.setReceiveTimeout(receiveTimeout);
return this;
}

/**
* @param taskExecutor the taskExecutor
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setTaskExecutor(Executor)
*/
public RedisQueueInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return this;
}

/**
* @param recoveryInterval the recoveryInterval
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setRecoveryInterval(long)
*/
public RedisQueueInboundChannelAdapterSpec recoveryInterval(long recoveryInterval) {
this.target.setRecoveryInterval(recoveryInterval);
return this;
}

/**
* @param rightPop the rightPop
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setRightPop(boolean)
*/
public RedisQueueInboundChannelAdapterSpec rightPop(boolean rightPop) {
this.target.setRightPop(rightPop);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.dsl;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.expression.Expression;
import org.springframework.integration.dsl.MessageHandlerSpec;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;

/**
* A {@link MessageHandlerSpec} for a {@link RedisQueueOutboundChannelAdapter}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisQueueOutboundChannelAdapterSpec extends MessageHandlerSpec<RedisQueueOutboundChannelAdapterSpec, RedisQueueOutboundChannelAdapter> {

protected RedisQueueOutboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) {
this.target = new RedisQueueOutboundChannelAdapter(queueName, connectionFactory);
}

protected RedisQueueOutboundChannelAdapterSpec(Expression queueExpression, RedisConnectionFactory connectionFactory) {
this.target = new RedisQueueOutboundChannelAdapter(queueExpression, connectionFactory);
}

/**
* @param extractPayload the extractPayload
* @return the spec
* @see RedisQueueOutboundChannelAdapter#setExtractPayload(boolean)
*/
public RedisQueueOutboundChannelAdapterSpec extractPayload(boolean extractPayload) {
this.target.setExtractPayload(extractPayload);
return this;
}

/**
* @param serializer the serializer
* @return the spec
* @see RedisQueueOutboundChannelAdapter#setSerializer(RedisSerializer)
*/
public RedisQueueOutboundChannelAdapterSpec serializer(RedisSerializer<?> serializer) {
this.target.setSerializer(serializer);
return this;
}

/**
* @param leftPush the leftPush
* @return the spec
* @see RedisQueueOutboundChannelAdapter#setLeftPush(boolean)
*/
public RedisQueueOutboundChannelAdapterSpec leftPush(boolean leftPush) {
this.target.setLeftPush(leftPush);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ class RedisTests implements RedisContainerTest {

static final String TOPIC_FOR_OUTBOUND_CHANNEL_ADAPTER = "dslOutboundChannelAdapterTopic";

static final String QUEUE_NAME_FOR_QUEUE_INBOUND_CHANNEL_ADAPTER = "dslQueueInboundChannelAdapter";

static final String QUEUE_NAME_FOR_QUEUE_OUTBOUND_CHANNEL_ADAPTER = "dslQueueOutboundChannelAdapter";

@Autowired
RedisConnectionFactory connectionFactory;

Expand All @@ -78,6 +82,13 @@ class RedisTests implements RedisContainerTest {
@Qualifier("outboundChannelAdapterFlow.input")
MessageChannel outboundChannelAdapterInputChannel;

@Autowired
QueueChannel queueInboundChannelAdapterOutputChannel;

@Autowired
@Qualifier("queueOutboundChannelAdapterFlow.input")
MessageChannel queueOutboundChannelAdapterInputChannel;

@Test
void testInboundChannelAdapterFlow() throws Exception {
StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);
Expand Down Expand Up @@ -147,6 +158,46 @@ public void onMessage(org.springframework.data.redis.connection.Message message,
container.stop();
}

@Test
void testQueueInboundChannelAdapterFlow() {
// Given
int numToTest = 10;
StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);
var listOps = redisTemplate.boundListOps(QUEUE_NAME_FOR_QUEUE_INBOUND_CHANNEL_ADAPTER);
for (int i = 0; i < numToTest; i++) {
listOps.leftPush("queue-inbound-message-" + i);
}

for (int i = 0; i < numToTest; i++) {
// When
Message<?> message = queueInboundChannelAdapterOutputChannel.receive(10000);
// Then
assertThat(message).isNotNull()
.extracting(Message::getPayload)
.isEqualTo("queue-inbound-message-" + i);
}
}

@Test
void testQueueOutboundChannelAdapterFlow() {
// Given
int numToTest = 10;
for (int i = 0; i < numToTest; i++) {
queueOutboundChannelAdapterInputChannel.send(MessageBuilder.withPayload("queue-outbound-message-" + i).build());
}

StringRedisTemplate redisTemplate = new StringRedisTemplate(connectionFactory);
var listOps = redisTemplate.boundListOps(QUEUE_NAME_FOR_QUEUE_OUTBOUND_CHANNEL_ADAPTER);
for (int i = 0; i < numToTest; i++) {
// When
String msg = listOps.rightPop();
// Then
assertThat(msg)
.isNotNull()
.isEqualTo("queue-outbound-message-" + i);
}
}

@Configuration(proxyBeanMethods = false)
@EnableIntegration
static class Config {
Expand All @@ -173,6 +224,24 @@ IntegrationFlow outboundChannelAdapterFlow(RedisConnectionFactory redisConnectio
.topicExpression(new LiteralExpression(TOPIC_FOR_OUTBOUND_CHANNEL_ADAPTER)));
}

@Bean
IntegrationFlow queueInboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) {
return IntegrationFlow.from(Redis
.queueInboundChannelAdapter(QUEUE_NAME_FOR_QUEUE_INBOUND_CHANNEL_ADAPTER, redisConnectionFactory)
.serializer(RedisSerializer.string())
)
.channel(c -> c.queue("queueInboundChannelAdapterOutputChannel"))
.get();
}

@Bean
IntegrationFlow queueOutboundChannelAdapterFlow(RedisConnectionFactory redisConnectionFactory) {
return flow -> flow
.handle(Redis.queueOutboundChannelAdapter(QUEUE_NAME_FOR_QUEUE_OUTBOUND_CHANNEL_ADAPTER, redisConnectionFactory)
.serializer(RedisSerializer.string())
);
}

}

}
Loading