公司里目前用的是RocketMQ,用的过程中遇到一些问题,逐渐将一些业务转到 kafka 上,正好目前项目是spring boot项目,所以就来试试 spring cloud stream,本地环境有 rabbitmq,所以使用它了。
和接收一样,需要加上
但上面接收和发送在一个项目里面启动的时候会失败,spring会把
依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
定义接收器
public interface MyReceiver {
@Input("my-channel")
SubscribableChannel input();
}
定义发送器
public interface MySender {
@Output("my-channle")
MessageChannel output();
}
上面的 @Input()
和 @Output()
里的参数就是绑定的名字,需要使用一致的名字。接受发送消息
@EnableBinding(value = {MyReceiver.class, MySender.class})
@RestController
@Slf4j
public class MessageController {
@Autowired
private MySender sender;
/**
* 发送
*/
@PostMapping("/send")
public void send(@RequestParam("message") String message) {
sender.output().send(MessageBuilder.withPayload(message).build());
}
/**
* 接收
*/
@StreamListener("my-channel")
public void receive(String message) {
log.info("Received: {}", message);
}
}
必须加上 @EnableBinding(value = {MyReceiver.class})
,使接收生效,在接收方法上加上 @StreamListener("my-channel")
。和接收一样,需要加上
@EnableBinding(value = {MySender.class})
使发送生效(应该是创建代理类吧)。但上面接收和发送在一个项目里面启动的时候会失败,spring会把
@Input
和 @Output
绑定的名字作为Bean的名字创建Bean,所以就重复了,解决方法是在代码里设置成不同的名字,然后在配置文件里指定 destination
。spring:
cloud:
stream:
bindings:
input:
destination: my-channel
output:
destination: my-channel
最终的代码是
public interface MyReceiver {
@Input(Sink.INPUT)
SubscribableChannel input();
}
public interface MySender {
@Output(Source.OUTPUT)
MessageChannel output();
}
@EnableBinding(value = {MyReceiver.class, MySender.class})
@RestController
@Slf4j
public class MessageController {
@Autowired
private MySender sender;
@PostMapping("/send")
public void send(@RequestParam("message") String message) {
sender.output().send(MessageBuilder.withPayload(message).build());
}
@StreamListener(Sink.INPUT)
public void receive(String message) {
log.info("Received: {}", message);
}
}
调用接口可以看到控制台的打印curl -XPOST 'localhost:8080/send?message=hello-world'