Spring Cloud Stream集成Kafka
这里演示使用Spring Boot ,Spring Cloud集成Kafka来实现一个简单的实时流系统。
添加依赖
可以在https://start.spring.io创建一个基于spring boot的maven项目。
需要添加的主要依赖:spring-cloud-stream以及spring-cloud-starter-stream-kafka,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
添加dependencyManagement
<dependencyManagement>
<dependencies>
<dependency>
<!-- Import dependency management from Spring Boot -->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-dependencies</artifactId>
<version>${spring-cloud-stream.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
在<repository>节点添加:
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>http://repo.spring.io/libs-milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
定义kafka stream
package demo.streamkafka.stream;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface GreetingsStreams {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
SubscribableChannel inboundGreetings();
@Output(OUTPUT)
MessageChannel outboundGreetings();
}
这里分别定义了一个输出流和一个输入流:
- outboundGreetings:输出流,它用于把消息写入Kafka的topic里
- inboundGreetings:输入流,它用于消费kafka的消息。
Spring会用一个Java代理来试下GreetingsStreams接口。
配置Spring Cloud Stream
接着我们需要配置Spring Cloud Stream来绑定stream到GreetingsStreams:
package demo.streamkafka.config;
import demo.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableBinding(GreetingsStreams.class)
public class StreamsConfig {
}
使用注释@EnableBinding启动绑定。
配置Kafka属性
spring boot的默认配置文件为src/main/resources/application.properties,或者是一个application.yaml。
使用application.yaml配置如下:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
greetings-in:
destination: greetings
contentType: application/json
greetings-out:
destination: greetings
contentType: application/json
配置文件里设置了:
- kafka服务器的连接地址
- kafka的topic为greetings
- 发送消息的contentType为json
新建消息模型
package demo'.streamkafka.model;
public class Greetings {
private long timestamp;
private String message;
//get/set方法
}
在Service层写消息到Kafka
package demo.streamkafka.service;
import demo.streamkafka.model.Greetings;
import demo.streamkafka.stream.GreetingsStreams;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
@Service
public class GreetingsService {
private final GreetingsStreams greetingsStreams;
public GreetingsService(GreetingsStreams greetingsStreams) {
this.greetingsStreams = greetingsStreams;
}
public void sendGreeting(final Greetings greetings) {
MessageChannel messageChannel = greetingsStreams.outboundGreetings();
messageChannel.send(MessageBuilder
.withPayload(greetings)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
在sendGreeting()方法接收消息对象Greetings,使用messageChannel来发送消息。
触发写消息Rest API
使用spring mvc来创建一个RestController,它提供一个Rest API让我们触发写消息到Kafka。GreetingsController会调用GreetingsService来发送消息。
package demo.streamkafka.web;
import demo.streamkafka.model.Greetings;
import demo.streamkafka.service.GreetingsService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GreetingsController {
private final GreetingsService greetingsService;
public GreetingsController(GreetingsService greetingsService) {
this.greetingsService = greetingsService;
}
@GetMapping("/greetings")
@ResponseStatus(HttpStatus.ACCEPTED)
public void greetings(@RequestParam("message") String message) {
Greetings greetings = Greetings.builder()
.message(message)
.timestamp(System.currentTimeMillis())
.build();
greetingsService.sendGreeting(greetings);
}
}
监听kafka的topic
GreetingsListener 是一个监听器,它用来监听kafka的主题是否有消息。
package demo.streamkafka.service;
import demo.streamkafka.model.Greetings;
import demo.streamkafka.stream.GreetingsStreams;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class GreetingsListener {
@StreamListener(GreetingsStreams.INPUT)
public void handleGreetings(@Payload Greetings greetings) {
System.out.println("Received greetings: " + greetings);
}
}
StreamListener为GreetingsStreams.INPUT。如果有消息进入kafka就会被GreetingsListener监听到,并在handleGreetings()处理消费消息。
启动应用
最后就是要实现一个Spring Boot的Application用于启动应用。
package demo.streamkafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamKafkaApplication {
public static void main(String[] args) {
SpringApplication.run(StreamKafkaApplication.class, args);
}
}
到此就完成了一个简单的kafka消息流处理。使用rest api可以触发消息的发送。