Skip to content

Latest commit

 

History

History
503 lines (361 loc) · 12.3 KB

12.消息整合驱动.md

File metadata and controls

503 lines (361 loc) · 12.3 KB

整合 Kafka

改造 user-service-client 消息发送源(Kafka 原生 API)

User 模型实现序列化接口

package com.springcloud.lesson12.domain;

import java.io.Serializable;

public class User implements Serializable {
    private static final long serialVersionUID = 7580121736769044612L;
    
    private Long id;

    private String name;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return "User{" +
                "id=" + id +
                ", name='" + name + '\'' +
                '}';
    }
}

增加 kafka 依赖

        <!-- 整合 Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

利用 KafkaTemplate 实现消息发送

package com.springcloud.lesson12.user.service.client.web.controller;

import com.springcloud.lesson12.api.UserService;
import com.springcloud.lesson12.domain.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;

/**
 *
 * {@link UserService} 客户端 {@link RestController}
 * 注意: 官方建议客户端和服务端不要同时实现Feign接口
 * 这里的代码只是一种说明,实际情况最好使用组合方式,而不是使用继承
 * @author <a href="mailto:[email protected]">Finen</a>
 * @see
 * @since
 */
@RestController
public class UserServiceController implements UserService {

    @Autowired
    private UserService userService;

    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Autowired
    public UserServiceController(KafkaTemplate<String, Object> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/user/save/message")
    public boolean saveUserByMessage(@RequestBody User user) {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("sc-users",0, "", user);
        return future.isDone();
    }

    // 通过方法继承,URL 映射: "/user/save"
    @Override
    public boolean saveUser(@RequestBody User user) {
        return userService.saveUser(user);
    }

    // 通过方法继承,URL 映射: "/user/find/all"
    @Override
    public List<User> findAll() {
        return userService.findAll();
    }
}

实现 Kafka 序列化器:Java 序列化协议

package com.springcloud.lesson12.user.service.client.serializer;

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Map;

/**
 * Object 序列化器
 *
 * @author <a href="mailto:[email protected]">Finen</a>
 * @see
 * @since
 */
public class ObjectSerializer implements Serializer<Object> {


    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public byte[] serialize(String topic, Object object) {

        System.out.println("topic : " + topic + " , object : " + object);

        byte[] dataArray = null;

        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
            objectOutputStream.writeObject(object);

            dataArray = outputStream.toByteArray();

        } catch (Exception e) {
            throw new RuntimeException(e);
        }


        return dataArray;
    }

    @Override
    public void close() {

    }
}

Spring Cloud Stream 整合

改造 user-service-provider 消息接收器(Sink)

引入 spring-cloud-stream-binder-kafka

        <!-- 依赖 Spring Cloud Stream Binder Kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>

用户消息 Stream 接口定义

package com.springcloud.lesson12.user.service.stream;

import com.springcloud.lesson12.domain.User;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

/**
 * {@link User 用户} 消息
 *
 * @author <a href="mailto:[email protected]">Finen</a>
 * @see
 * @since
 */
public interface UserMessage {
    @Input
    SubscribableChannel input();

}

激活用户消息 Stream 接口

package com.springcloud.lesson12;

import com.springcloud.lesson12.user.service.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.cloud.stream.annotation.EnableBinding;


@SpringBootApplication
@EnableHystrix
@EnableDiscoveryClient
@EnableBinding(UserMessage.class) // 激活Stream Binding 到 UserMessage
public class UserServiceProviderApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserServiceProviderApplication.class, args);
    }
}

配置 Kafka 以及 Stream Destination

## Spring Cloud Stream Binding 配置
### 配置Kafka Topic
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=group-1
spring.kafka.consumer.client-id=user-service-provider
## destination 指定Kafka Topic
spring.cloud.stream.bindings.input.destination=sc-users

添加 User 消息监听器

SubscribableChannel 实现
package com.springcloud.lesson12.user.service.provider.service;

import com.springcloud.lesson12.api.UserService;
import com.springcloud.lesson12.domain.User;
import com.springcloud.lesson12.user.stream.UserMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;

import static com.springcloud.lesson12.user.stream.UserMessage.INPUT;


/**
 * 用户消息服务
 *
 * @author <a href="mailto:[email protected]">Finen</a>
 * @see
 * @since
 */
@Service
public class MessagingUserService {

    @Autowired
    private UserMessage userMessage;

    @Autowired
    @Qualifier("inMemoryUserService")
    UserService userService;

    private void saveUser(byte[] data) {
        ByteArrayInputStream inputStream = new ByteArrayInputStream(data);
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
            User user = (User) objectInputStream.readObject();
            userService.saveUser(user);
        } catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }

    @PostConstruct
    public void init() {
        SubscribableChannel subscribableChannel = userMessage.input();
        subscribableChannel.subscribe(message -> {
            // message body 是字节流

            byte[] body = (byte[]) message.getPayload();

            saveUser(body);
        });
    }

}
@ServiceActivator 实现
    @ServiceActivator(inputChannel = INPUT)
    public void listen(byte[] data) {
        System.out.println("Subscribe by @ServiceActivator");
        saveUser(data);
    }
@StreamListener 实现
    @StreamListener(INPUT)
    public void onMessage(byte[] data) {
        System.out.println("Subscribe by @StreamListener");
        saveUser(data);
    }

改造 user-service-client 消息发送源( Stream Binder : Rabbit MQ)

增加 spring-cloud-stream-binder-rabbitmq 依赖

        <!-- 整合 Spring Cloud Stream binder Rabbitmq -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>

配置发送源管道

添加用户消息接口

package com.springcloud.lesson12.user.service.client.stream;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

/**
 * 用户消息
 *
 * @author <a href="mailto:[email protected]">Finen</a>
 * @see
 * @since
 */
public interface UserMessage {

    @Output("user-message-out")
    MessageChannel output();

}

激活用户消息接口

package com.springcloud.lesson12;

import com.netflix.loadbalancer.IPing;
import com.netflix.loadbalancer.IRule;
import com.springcloud.lesson12.api.UserService;
import com.springcloud.lesson12.user.service.client.ping.MyPing;
import com.springcloud.lesson12.user.service.client.rule.MyRule;
import com.springcloud.lesson12.user.service.client.stream.UserMessage;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
@RibbonClient("user-service-provider") // 指定目标应用名称
@EnableCircuitBreaker // 使用服务短路
@EnableFeignClients(clients = UserService.class) // 申明UserService作为Feign 服务调用
@EnableDiscoveryClient // 激活服务发现
@EnableBinding(UserMessage.class)
public class UserServiceClientApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserServiceClientApplication.class, args);
    }

    /**
     * 将 {@link MyRule} 暴露成 {@link Bean}
     * @return {@link MyRule}
     */
    @Bean
    public IRule myRule() {
        return new MyRule();
    }

    /**
     * 将 {@link MyPing} 暴露成 {@link Bean}
     * @return {@link MyPing}
     */
    @Bean
    public IPing myPing() {
        return new MyPing();
    }

    /**
     * 申明具有负载均衡能力的{@link RestTemplate}
     * @return
     */
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

实现消息发送到 RabbitMQ

    @PostMapping("/user/save/message/rabbit")
    public boolean saveUserByRabbitMessage(@RequestBody User user) throws JsonProcessingException {
        MessageChannel messageChannel = userMessage.output();

        // User序列化成JSON
        String payload = objectMapper.writeValueAsString(user);
        GenericMessage<String> message = new GenericMessage<String>(payload);
        // 发送消息
        return messageChannel.send(message);
    }

启动 Rabbit MQ

改造 user-service-provider 消息接收器( Stream Binder : Rabbit MQ)

替换依赖

        <!--&lt;!&ndash; 依赖 Spring Cloud Stream Binder Kafka &ndash;&gt;-->
        <!--<dependency>-->
            <!--<groupId>org.springframework.cloud</groupId>-->
            <!--<artifactId>spring-cloud-stream-binder-kafka</artifactId>-->
        <!--</dependency>-->

        <!-- 整合 Spring Cloud Stream Binder Rabbit MQ -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>