Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@

package org.springframework.integration.redis.dsl;

import java.util.function.Function;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.expression.Expression;
import org.springframework.integration.expression.FunctionExpression;
import org.springframework.messaging.Message;

/**
* Factory class for Redis components.
Expand Down Expand Up @@ -45,6 +50,54 @@ public static RedisOutboundChannelAdapterSpec outboundChannelAdapter(RedisConnec
return new RedisOutboundChannelAdapterSpec(connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueInboundChannelAdapterSpec}.
* @param queueName The queueName of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueInboundChannelAdapterSpec} instance
*/
public static RedisQueueInboundChannelAdapterSpec queueInboundChannelAdapter(String queueName,
RedisConnectionFactory connectionFactory) {

return new RedisQueueInboundChannelAdapterSpec(queueName, connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
* @param queueName The queueName of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
*/
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(String queueName,
RedisConnectionFactory connectionFactory) {

return new RedisQueueOutboundChannelAdapterSpec(queueName, connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
* @param queueExpression The queueExpression of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
*/
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(Expression queueExpression,
RedisConnectionFactory connectionFactory) {

return new RedisQueueOutboundChannelAdapterSpec(queueExpression, connectionFactory);
}

/**
* The factory to produce a {@link RedisQueueOutboundChannelAdapterSpec}.
* @param queueFunction The queueExpression of the Redis list to build on
* @param connectionFactory the {@link RedisConnectionFactory} to build on
* @return the {@link RedisQueueOutboundChannelAdapterSpec} instance
*/
public static RedisQueueOutboundChannelAdapterSpec queueOutboundChannelAdapter(
Function<Message<?>, String> queueFunction, RedisConnectionFactory connectionFactory) {

return new RedisQueueOutboundChannelAdapterSpec(new FunctionExpression<>(queueFunction), connectionFactory);
}

private Redis() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@
import org.springframework.messaging.converter.MessageConverter;

/**
* A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapterSpec}.
* A {@link MessageProducerSpec} for a {@link RedisInboundChannelAdapter}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisInboundChannelAdapterSpec extends MessageProducerSpec<RedisInboundChannelAdapterSpec, RedisInboundChannelAdapter> {
public class RedisInboundChannelAdapterSpec extends
MessageProducerSpec<RedisInboundChannelAdapterSpec, RedisInboundChannelAdapter> {

protected RedisInboundChannelAdapterSpec(RedisConnectionFactory connectionFactory) {
this.target = new RedisInboundChannelAdapter(connectionFactory);
}

/**
* Specify the RedisSerializer to deserialize the body of Redis messages.
* @param serializer the serializer
* @return the spec
* @see RedisInboundChannelAdapter#setSerializer(RedisSerializer)
Expand All @@ -48,6 +50,7 @@ public RedisInboundChannelAdapterSpec serializer(RedisSerializer<?> serializer)
}

/**
* Specify the topics to subscribe.
* @param topics the topics
* @return the spec
* @see RedisInboundChannelAdapter#setTopics(String...)
Expand All @@ -58,6 +61,7 @@ public RedisInboundChannelAdapterSpec topics(String... topics) {
}

/**
* Specify the topicPatterns to subscribe.
* @param topicPatterns the topicPatterns
* @return the spec
* @see RedisInboundChannelAdapter#setTopicPatterns(String...)
Expand All @@ -68,6 +72,7 @@ public RedisInboundChannelAdapterSpec topicPatterns(String... topicPatterns) {
}

/**
* Specify the messageConverter to convert between Redis messages and Spring message payloads.
* @param messageConverter the messageConverter
* @return the spec
* @see RedisInboundChannelAdapter#setMessageConverter(MessageConverter)
Expand All @@ -78,6 +83,7 @@ public RedisInboundChannelAdapterSpec messageConverter(MessageConverter messageC
}

/**
* Specify an {@link Executor} for running the message listeners when messages are received.
* @param taskExecutor the taskExecutor
* @return the spec
* @see RedisInboundChannelAdapter#setTaskExecutor(Executor)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@
import org.springframework.messaging.converter.MessageConverter;

/**
* A {@link MessageHandlerSpec} for a {@link RedisOutboundChannelAdapterSpec}.
* A {@link MessageHandlerSpec} for a {@link RedisPublishingMessageHandler}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisOutboundChannelAdapterSpec extends MessageHandlerSpec<RedisOutboundChannelAdapterSpec, RedisPublishingMessageHandler> {
public class RedisOutboundChannelAdapterSpec extends
MessageHandlerSpec<RedisOutboundChannelAdapterSpec, RedisPublishingMessageHandler> {

protected RedisOutboundChannelAdapterSpec(RedisConnectionFactory connectionFactory) {
this.target = new RedisPublishingMessageHandler(connectionFactory);
}

/**
* Specify the RedisSerializer to serialize data before sending to the Redis.
* @param serializer the serializer
* @return the spec
* @see RedisPublishingMessageHandler#setSerializer(RedisSerializer)
Expand All @@ -51,6 +53,7 @@ public RedisOutboundChannelAdapterSpec serializer(RedisSerializer<?> serializer)
}

/**
* Specify the messageConverter to convert between Redis messages and Spring message payloads.
* @param messageConverter the messageConverter
* @return the spec
* @see RedisPublishingMessageHandler#setMessageConverter(MessageConverter)
Expand All @@ -61,6 +64,7 @@ public RedisOutboundChannelAdapterSpec messageConverter(MessageConverter message
}

/**
* Specify the topic to publish messages.
* @param topic the topic
* @return the spec
* @see RedisPublishingMessageHandler#setTopic(String)
Expand All @@ -71,6 +75,7 @@ public RedisOutboundChannelAdapterSpec topic(String topic) {
}

/**
* Specify the topicExpression to determine the topic.
* @param topicExpression the topicExpression
* @return the spec
* @see RedisPublishingMessageHandler#setTopicExpression(Expression)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.dsl;

import java.time.Duration;
import java.util.concurrent.Executor;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.integration.dsl.MessageProducerSpec;
import org.springframework.integration.redis.inbound.RedisQueueMessageDrivenEndpoint;

/**
* A {@link MessageProducerSpec} for a {@link RedisQueueMessageDrivenEndpoint}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisQueueInboundChannelAdapterSpec extends
MessageProducerSpec<RedisQueueInboundChannelAdapterSpec, RedisQueueMessageDrivenEndpoint> {

protected RedisQueueInboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) {
this.target = new RedisQueueMessageDrivenEndpoint(queueName, connectionFactory);
}

/**
* Specify the RedisSerializer to deserialize the body of Redis messages.
* @param serializer the serializer
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setSerializer(RedisSerializer)
*/
public RedisQueueInboundChannelAdapterSpec serializer(RedisSerializer<?> serializer) {
this.target.setSerializer(serializer);
return this;
}

/**
* Specify whether expects data from the Redis queue to contain entire Message instances.
* @param expectMessage the expectMessage
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setExpectMessage(boolean)
*/
public RedisQueueInboundChannelAdapterSpec expectMessage(boolean expectMessage) {
this.target.setExpectMessage(expectMessage);
return this;
}

/**
* Specify the timeout for 'pop' operation to wait for a Redis message from the Redis Queue.
* @param receiveTimeout the receiveTimeout
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setReceiveDuration(Duration)
*/
public RedisQueueInboundChannelAdapterSpec receiveDuration(Duration receiveTimeout) {
this.target.setReceiveDuration(receiveTimeout);
return this;
}

/**
* Specify the timeout for 'pop' operation to wait for a Redis message from the Redis Queue.
* @param receiveTimeout the receiveTimeout
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setReceiveTimeout(long)
*/
public RedisQueueInboundChannelAdapterSpec receiveTimeout(long receiveTimeout) {
this.target.setReceiveTimeout(receiveTimeout);
return this;
}

/**
* Specify an {@link Executor} for the underlying listening task.
* @param taskExecutor the taskExecutor
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setTaskExecutor(Executor)
*/
public RedisQueueInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) {
this.target.setTaskExecutor(taskExecutor);
return this;
}

/**
* Specify the time in milliseconds for the underlying listening task should sleep
* after exceptions on the 'pop' operation.
* @param recoveryInterval the recoveryInterval
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setRecoveryInterval(long)
*/
public RedisQueueInboundChannelAdapterSpec recoveryInterval(long recoveryInterval) {
this.target.setRecoveryInterval(recoveryInterval);
return this;
}

/**
* Specify use "right pop" or "left pop" to read messages from the Redis Queue.
* @param rightPop the rightPop
* @return the spec
* @see RedisQueueMessageDrivenEndpoint#setRightPop(boolean)
*/
public RedisQueueInboundChannelAdapterSpec rightPop(boolean rightPop) {
this.target.setRightPop(rightPop);
return this;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2026-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.integration.redis.dsl;

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.expression.Expression;
import org.springframework.integration.dsl.MessageHandlerSpec;
import org.springframework.integration.redis.outbound.RedisQueueOutboundChannelAdapter;

/**
* A {@link MessageHandlerSpec} for a {@link RedisQueueOutboundChannelAdapter}.
*
* @author Jiandong Ma
*
* @since 7.1
*/
public class RedisQueueOutboundChannelAdapterSpec extends
MessageHandlerSpec<RedisQueueOutboundChannelAdapterSpec, RedisQueueOutboundChannelAdapter> {

protected RedisQueueOutboundChannelAdapterSpec(String queueName, RedisConnectionFactory connectionFactory) {
this.target = new RedisQueueOutboundChannelAdapter(queueName, connectionFactory);
}

protected RedisQueueOutboundChannelAdapterSpec(Expression queueExpression, RedisConnectionFactory connectionFactory) {
this.target = new RedisQueueOutboundChannelAdapter(queueExpression, connectionFactory);
}

/**
* Specify send only the payload or the entire Message to the Redis queue.
* @param extractPayload the extractPayload
* @return the spec
* @see RedisQueueOutboundChannelAdapter#setExtractPayload(boolean)
*/
public RedisQueueOutboundChannelAdapterSpec extractPayload(boolean extractPayload) {
this.target.setExtractPayload(extractPayload);
return this;
}

/**
* Specify the RedisSerializer to serialize data before sending to the Redis Queue.
* @param serializer the serializer
* @return the spec
* @see RedisQueueOutboundChannelAdapter#setSerializer(RedisSerializer)
*/
public RedisQueueOutboundChannelAdapterSpec serializer(RedisSerializer<?> serializer) {
this.target.setSerializer(serializer);
return this;
}

/**
* Specify use "left push" or "right push" to write messages to the Redis Queue.
* @param leftPush the leftPush
* @return the spec
* @see RedisQueueOutboundChannelAdapter#setLeftPush(boolean)
*/
public RedisQueueOutboundChannelAdapterSpec leftPush(boolean leftPush) {
this.target.setLeftPush(leftPush);
return this;
}

}
Loading