Springboot通过redisTemplate实现发布订阅

要点:

RedisMessageListenerContainer Redis订阅发布的监听容器,你的消息发布、订阅配置都必须在这里面实现

* addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。

* setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析

MessageListenerAdapter 监听适配器

  • MessageListenerAdapter(Object , defaultListenerMethod) 订阅者及其方法

redisTemplate redis模版类

  • convertAndSend(String channel, Object message) 消息发布

RedisConfig核心类,实现了Redis连接,订阅以及发布配置

package com.example.demo.config;

import com.example.demo.project.MessageReceive1;
import com.example.demo.project.MessageReceive2;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author lzg
 * @date 2019/12/5 15:37
 */
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 设置value的序列化规则和 key的序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param redisConnectionFactory
     * @param listenerAdapter1
     * @return
     */
    //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
    @Bean
    public RedisMessageListenerContainer container1(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter1, MessageListenerAdapter listenerAdapter2) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        // 订阅多个频道
        redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test1"));
        redisMessageListenerContainer.addMessageListener(listenerAdapter1, new PatternTopic("test2"));
        //不同的订阅者
        redisMessageListenerContainer.addMessageListener(listenerAdapter2, new PatternTopic("test2"));

        //序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        redisMessageListenerContainer.setTopicSerializer(seria);
        return redisMessageListenerContainer;
    }


    //表示监听一个频道
    @Bean
    public MessageListenerAdapter listenerAdapter1(MessageReceive1 messageReceive1) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive1 ”
        return new MessageListenerAdapter(messageReceive1, "getMessage");
    }

    //表示监听一个频道
    @Bean
    public MessageListenerAdapter listenerAdapter2(MessageReceive2 messageReceive2) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“MessageReceive2 ”
        return new MessageListenerAdapter(messageReceive2, "getMessage");
    }
}

被消费的对象(即传输的数据)

/**
 * @author lzg
 * @date 2019/12/5 15:46
 */
@Data
public class Person implements Serializable {
    private final long serialVersionUID = 1L;

    private String id;

    private String userName;

    private String memberName;

    private String password;

    private String email;

    private String status;

    private String pwdSalt;

}

客户端:

@Component
public class MessageReceive1 {
    public void getMessage(String object) {
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        Person user = (Person) seria.deserialize(object.getBytes());
        System.out.println("消息客户端1号:" + user.toString());
    }
}
@Component
public class MessageReceive2 {
    public void getMessage(String object) {
//序列化对象(特别注意:发布的时候需要设置序列化;订阅方也需要设置序列化)
        Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Person.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        seria.setObjectMapper(objectMapper);
        Person user = (Person) seria.deserialize(object.getBytes());
        System.out.println("消息客户端2号:" + user);
    }
}

测试类:

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestPack {
    @Resource
    private RedisTemplate redisTemplate;
    @Test
    public void test() {
        Person person1 = new Person();
        person1.setId("001");
        person1.setUserName("一号");
        Person person2 = new Person();
        person2.setId("002");
        person2.setUserName("二号");
        redisTemplate.convertAndSend("test1", person1);
        redisTemplate.convertAndSend("test2", person2);
    }
}

第二种(简单版):

配置类:

@Configuration
public class MyRedisConf {
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        // 使用Jackson2JsonRedisSerialize 替换默认的jdkSerializeable序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        // 设置value的序列化规则和 key的序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("test1"));
        return container;
    }

    /**
     * 绑定消息监听者和接收监听的方法,必须要注入这个监听器,不然会报错
     */
    @Bean
    public MessageListenerAdapter listenerAdapter() {
        return new MessageListenerAdapter(new Receiver(), "receiveMessage");
    }
}

@Slf4j
class Receiver {
    public void receiveMessage(String message) {
        System.out.println(message);
    }
}

测试类:

@RunWith(SpringRunner.class)
@SpringBootTest
public class TestPack {
    @Resource
    private RedisTemplate redisTemplate;
    @Test
    public void test(){
        for (int i = 0; i < 10; i++) {
            System.out.println(i);
            redisTemplate.convertAndSend("test1","这是我发送的第"+i+"个消息");
        }
    }

}