WebSocket协议定义了两种消息类型(文本消息和二进制消息),但是其内容未定义。 该协议定义了一种机制,供客户端和服务器协商用于在WebSocket之上使用的子协议(即高级消息协议),以定义每种协议可以发送的消息类型,格式,内容。 每个消息,依此类推。 子协议的使用是可选的,但无论哪种方式,客户端和服务器都需要就定义消息内容的某种协议达成共识。
STOMP(面向简单文本的消息传递协议)最初是为脚本语言(例如Ruby,Python和Perl)创建的,以连接到企业消息代理。 它旨在解决常用消息传递模式的最小子集。 STOMP可以在任何可靠的双向流网络协议上使用,例如TCP和WebSocket。 尽管STOMP是面向文本的协议,但是消息有效负载可以是文本或二进制。
STOMP是基于帧的协议,其帧以HTTP为模型。 以下清单显示了STOMP帧的结构:
COMMAND
header1:value1
header2:value2
Body^@
客户端可以使用SEND或SUBSCRIBE命令发送或订阅消息,以及描述消息的内容和接收者的目的地标头。这启用了一种简单的发布-订阅机制,您可以使用该机制通过代理将消息发送到其他连接的客户端,或者将消息发送到服务器以请求执行某些工作。
使用Spring的STOMP支持时,Spring WebSocket应用程序将充当客户端的STOMP代理。消息被路由到@Controller消息处理方法或简单的内存代理,该代理跟踪订阅并向订阅的用户广播消息。您还可以将Spring配置为与专用的STOMP代理(例如RabbitMQ,ActiveMQ等)一起使用,以实际广播消息。在那种情况下,Spring维护与代理的TCP连接,将消息中继到该代理,并将消息从该代理向下传递到已连接的WebSocket客户端。因此,Spring Web应用程序可以依靠基于HTTP的统一安全性,通用验证以及用于消息处理的熟悉的编程模型。
以下示例显示了一个订阅以接收股票报价的客户端,服务器可能会定期发出该股票报价(例如,通过计划任务,该任务通过SimpMessagingTemplate向代理发送消息):
SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*
^@
以下示例显示了一个发送交易请求的客户端,服务器可以通过@MessageMapping方法处理该请求:
SEND
destination:/queue/trade
content-type:application/json
content-length:44
{"action":"BUY","ticker":"MMM","shares",44}^@
执行后,服务器可以向客户广播交易确认消息和详细信息。
在STOMP规范中,目的地的含义是故意不透明的。 它可以是任何字符串,并且完全由STOMP服务器定义它们所支持的目的地的语义和语法。 但是,目的地通常是类似路径的字符串,其中/ topic / ..表示发布-订阅(一对多),而/ queue /表示点对点(一对一)消息 交流。
STOMP服务器可以使用MESSAGE命令向所有订户广播消息。 以下示例显示了服务器向订阅的客户端发送股票报价的服务器:
MESSAGE
message-id:nxahklf6-1
subscription:sub-1
destination:/topic/price.stock.MMM
{"ticker":"MMM","price":129.45}^@
服务器无法发送未经请求的消息。 来自服务器的所有消息都必须响应特定的客户端订阅,并且服务器消息的subscription-id标头必须与客户端订阅的id标头匹配。
前面的概述旨在提供对STOMP协议的最基本的了解。 我们建议您全面阅读协议规范。
与使用原始WebSocket相比,使用STOMP作为子协议可以使Spring Framework和Spring Security提供更丰富的编程模型。 关于HTTP与原始TCP以及它如何使Spring MVC和其他Web框架提供丰富的功能,可以得出相同的观点。 以下是好处列表:
spring-messaging和spring-websocket模块中提供了STOMP over WebSocket支持。 一旦有了这些依赖关系,就可以使用SockJS Fallback通过WebSocket公开STOMP端点,如以下示例所示:
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.setApplicationDestinationPrefixes("/app");
config.enableSimpleBroker("/topic", "/queue");
}
}
下面的示例显示与前面的示例等效的XML配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio">
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:simple-broker prefix="/topic, /queue"/>
</websocket:message-broker>
</beans>
对于内置的简单代理,/ topic和/ queue前缀没有任何特殊含义。 它们仅是区分发布订阅消息传递和点对点消息传递的约定(即,许多订户与一个消费者)。 使用外部代理时,请检查代理的STOMP页面以了解其支持哪种STOMP目标和前缀。
要从浏览器进行连接,对于SockJS,您可以使用sockjs-client。 对于STOMP,许多应用程序都使用了jmesnil / stomp-websocket库(也称为stomp.js),该库功能齐全,已在生产中使用多年,但不再维护。 目前,JSteunou / webstomp-client是该库中最活跃且发展最快的后继程序。 以下示例代码基于此:
var socket = new SockJS("/spring-websocket-portfolio/portfolio");
var stompClient = webstomp.over(socket);
stompClient.connect({}, function(frame) {
}
另外,如果您通过WebSocket(没有SockJS)进行连接,则可以使用以下代码:
var socket = new WebSocket("/spring-websocket-portfolio/portfolio");
var stompClient = Stomp.over(socket);
stompClient.connect({}, function(frame) {
}
请注意,前面示例中的stompClient不需要指定登录名和密码标头。 即使这样做,它们也会在服务器端被忽略(或更确切地说,被覆盖)。 有关身份验证的更多信息,请参见连接到代理和身份验证。
要配置基础的WebSocket服务器,请应用“服务器配置”中的信息。 对于Jetty,但是您需要通过StompEndpointRegistry设置HandshakeHandler和WebSocketPolicy:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").setHandshakeHandler(handshakeHandler());
}
@Bean
public DefaultHandshakeHandler handshakeHandler() {
WebSocketPolicy policy = new WebSocketPolicy(WebSocketBehavior.SERVER);
policy.setInputBufferSize(8192);
policy.setIdleTimeout(600000);
return new DefaultHandshakeHandler(
new JettyRequestUpgradeStrategy(new WebSocketServerFactory(policy)));
}
}
公开STOMP端点后,Spring应用程序将成为已连接客户端的STOMP代理。本节描述服务器端的消息流。
spring-messaging模块包含对起源于Spring Integration的消息传递应用程序的基础支持,后来被提取并合并到Spring Framework中,以便在许多Spring项目和应用程序场景中更广泛地使用。下面的列表简要描述了一些可用的消息传递抽象:
Java配置(即@EnableWebSocketMessageBroker)和XML名称空间配置(即<websocket:message-broker>)都使用前面的组件来组装消息工作流。下图显示了启用简单内置消息代理时使用的组件:
上图显示了三个消息通道:
下图显示了将外部代理(例如RabbitMQ)配置为用于管理订阅和广播消息时使用的组件:
前面两个图之间的主要区别是使用“代理中继”将消息通过TCP传递到外部STOMP代理,以及将消息从代理向下传递给订阅的客户端。
当从WebSocket连接接收到消息时,消息将被解码为STOMP帧,转换为Spring消息表示形式,然后发送到clientInboundChannel进行进一步处理。例如,目标标头以/ app开头的STOMP消息可以路由到带注解的控制器中的@MessageMapping方法,而/ topic和/ queue消息可以直接路由到消息代理。
处理来自客户端的STOMP消息的带注解的@Controller可以通过brokerChannel将消息发送到消息代理,并且代理通过clientOutboundChannel将消息广播给匹配的订户。相同的控制器还可以响应HTTP请求执行相同的操作,因此客户端可以执行HTTP POST,然后@PostMapping方法可以将消息发送到消息代理,以广播到订阅的客户端。
我们可以通过一个简单的示例跟踪流程。考虑以下示例,该示例设置了服务器:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
registry.enableSimpleBroker("/topic");
}
}
@Controller
public class GreetingController {
@MessageMapping("/greeting") {
public String handle(String greeting) {
return "[" + getTimestamp() + ": " + greeting;
}
}
前面的示例支持以下流程:
下一节将提供有关带注解方法的更多详细信息,包括支持的参数类型和返回值。
应用程序可以使用带注解的@Controller类来处理来自客户端的消息。 这样的类可以声明@ MessageMapping,@ SubscribeMapping和@ExceptionHandler方法,如以下主题所述:
默认情况下,映射值是Ant样式的路径模式(例如/ thing ,/ thing / *),包括对模板变量的支持(例如/ thing / {id})。 可以通过@DestinationVariable方法参数引用这些值。 应用程序还可以切换到以点分隔的映射的目标约定,如“作为分隔符的点”中所述。
支持的方法参数
下表描述了方法参数:
Method argument | Description |
---|---|
Message | 用于访问完整的消息。 |
MessageHeaders | 用于访问中的标头Message。 |
MessageHeaderAccessor,SimpMessageHeaderAccessor和StompHeaderAccessor | 用于通过类型化访问器方法访问标头。 |
@Payload | 为了访问消息的有效负载,由configure转换(例如,从JSON转换) MessageConverter。不需要此注解,因为默认情况下会假定没有其他自变量匹配。您可以使用@javax.validation.Valid或Spring的注解有效负载参数@Validated,以使有效负载参数被自动验证。 |
@Header | 用于访问特定的标头值- org.springframework.core.convert.converter.Converter如有必要,还可以使用进行类型转换 。 |
@Headers | 用于访问消息中的所有标题。此参数必须可分配给 java.util.Map。 |
@DestinationVariable | 用于访问从消息目标中提取的模板变量。根据需要将值转换为声明的方法参数类型。 |
java.security.Principal | 反映在WebSocket HTTP握手时登录的用户。 |
Return Values
默认情况下,@ MessageMapping方法的返回值通过匹配的MessageConverter序列化为有效负载,并作为消息发送到brokerChannel,从该Channel广播到订阅者。出站邮件的目的地与入站邮件的目的地相同,但以/ topic为前缀。
您可以使用@SendTo和@SendToUser批注来自定义输出消息的目标。 @SendTo用于自定义目标位置或指定多个目标。 @SendToUser用于将输出消息定向到仅与输入消息关联的用户。请参阅用户目标。
您可以在同一方法上同时使用@SendTo和@SendToUser,并且它们在类级别都受支持,在这种情况下,它们充当类中方法的默认值。但是,请记住,任何方法级别的@SendTo或@SendToUser批注都会在类级别覆盖所有此类批注。
消息可以异步处理,@ MessageMapping方法可以返回ListenableFuture,CompletableFuture或CompletionStage。
请注意,@ SendTo和@SendToUser只是一种便利,等同于使用SimpMessagingTemplate发送消息。如有必要,对于更高级的方案,可以直接使用SimpMessagingTemplate来使用@MessageMapping方法。这可以代替返回值,也可以附加于返回值。请参阅发送消息。
@SubscribeMapping
@SubscribeMapping与@MessageMapping相似,但是将映射范围缩小到仅订阅消息。它支持相同的@ MessageMappingclientOutboundChannelbrokerChannel @ SendTo @ SendToUser
什么时候有用?假定代理映射到/ topic和/ queue,而应用程序控制器映射到/ app。在此设置中,代理将所有打算重复广播的/ topic和/ queue订阅都存储起来,并且不需要应用程序参与其中。客户端还可以订阅某个/ app目的地,并且控制器可以响应该订阅而返回一个值,而无需经纪人,而无需再次存储或使用该订阅(实际上是一次请求-答复交换)。一个用例是在启动时用初始数据填充UI。
什么时候没有用?不要尝试将代理和控制器映射到相同的目标前缀,除非出于某种原因您希望两者都独立处理消息(包括订阅)。入站消息是并行处理的。无法保证经纪人还是控制者首先处理给定的消息。如果要在存储预订并准备好广播时通知目标,则客户端应请求服务器是否支持收据(简单代理不支持)。例如,对于Java STOMP客户端,您可以执行以下操作添加收据:
@Autowired
private TaskScheduler messageBrokerTaskScheduler;
// During initialization..
stompClient.setTaskScheduler(this.messageBrokerTaskScheduler);
// When subscribing..
StompHeaders headers = new StompHeaders();
headers.setDestination("/topic/...");
headers.setReceipt("r1");
FrameHandler handler = ...;
stompSession.subscribe(headers, handler).addReceiptTask(() -> {
// Subscription ready...
});
服务器端的选项是在brokerChannel上注册ExecutorChannelInterceptor并实现afterMessageHandled方法,该方法在处理包括订阅在内的消息之后被调用。
@MessageExceptionHandler
应用程序可以使用@MessageExceptionHandler方法来处理@MessageMapping方法中的异常。 如果要访问异常实例,则可以在批注本身中声明异常,也可以通过方法参数声明异常。 以下示例通过方法参数声明异常:
@Controller
public class MyController {
// ...
@MessageExceptionHandler
public ApplicationError handleException(MyException exception) {
// ...
return appError;
}
}
@MessageExceptionHandler方法支持灵活的方法签名,并支持与@MessageMapping方法相同的方法参数类型和返回值。
通常,@ MessageExceptionHandler方法适用于声明它们的@Controller类(或类层次结构)。 如果希望此类方法在全局范围内(跨控制器)应用,则可以在标有@ControllerAdvice的类中声明它们。 这与Spring MVC中可用的类似支持相当。
如果要从应用程序的任何部分向连接的客户端发送消息怎么办? 任何应用程序组件都可以将消息发送到brokerChannel。 最简单的方法是注入SimpMessagingTemplate并使用它发送消息。 通常,您将按类型注入它,如以下示例所示:
@Controller
public class GreetingController {
private SimpMessagingTemplate template;
@Autowired
public GreetingController(SimpMessagingTemplate template) {
this.template = template;
}
@RequestMapping(path="/greetings", method=POST)
public void greet(String greeting) {
String text = "[" + getTimestamp() + "]:" + greeting;
this.template.convertAndSend("/topic/greetings", text);
}
}
但是,如果存在另一个相同类型的bean,也可以通过其名称(brokerMessagingTemplate)对其进行限定。
内置的简单消息代理处理来自客户端的订阅请求,将其存储在内存中,并将消息广播到具有匹配目标的已连接客户端。 该代理支持类似路径的目标,包括对Ant样式目标模式的订阅。
应用程序还可以使用点分隔(而不是斜杠分隔)目标。 请参见点作为分隔符。
如果配置了任务调度程序,则简单代理支持STOMP心跳。 为此,您可以声明自己的调度程序,也可以使用内部自动声明和使用的调度程序。 以下示例显示如何声明自己的调度程序:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
private TaskScheduler messageBrokerTaskScheduler;
@Autowired
public void setMessageBrokerTaskScheduler(TaskScheduler taskScheduler) {
this.messageBrokerTaskScheduler = taskScheduler;
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/queue/", "/topic/")
.setHeartbeatValue(new long[] {10000, 20000})
.setTaskScheduler(this.messageBrokerTaskScheduler);
// ...
}
}
简单代理非常适合入门,但是仅支持STOMP命令的一个子集(它不支持ack,回执和其他一些功能),它依赖于简单的消息发送循环,并且不适合于集群。 或者,您可以升级应用程序以使用功能齐全的消息代理。
请参阅STOMP文档以了解您选择的消息代理(例如RabbitMQ,ActiveMQ和其他),安装代理,并在启用STOMP支持的情况下运行它。 然后,您可以在Spring配置中启用STOMP代理中继(而不是简单代理)。
以下示例配置启用了功能齐全的代理:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/portfolio").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
}
下面的示例显示与前面的示例等效的XML配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app">
<websocket:stomp-endpoint path="/portfolio" />
<websocket:sockjs/>
</websocket:stomp-endpoint>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
</beans>
先前配置中的STOMP代理中继是Spring MessageHandler,它通过将消息转发到外部消息代理来处理消息。 为此,它建立到代理的TCP连接,将所有消息转发给它,然后通过它们的WebSocket会话将从代理收到的所有消息转发给客户端。 本质上,它充当双向转发消息的“中继”。
将io.projectreactor.netty:reactor-netty和io.netty:netty-all所有依赖项添加到项目中以进行TCP连接管理。
此外,应用程序组件(例如HTTP请求处理方法,业务服务等)也可以将消息发送到代理中继,如“发送消息”中所述,以将消息广播到订阅的WebSocket客户端。
实际上,代理中继可实现健壮且可伸缩的消息广播。
STOMP代理中继器维护与代理的单个“系统” TCP连接。此连接仅用于源自服务器端应用程序的消息,而不用于接收消息。您可以为此连接配置STOMP凭据(即STOMP框架登录名和密码标头)。这在XML名称空间和Java配置中都以systemLogin和systemPasscode属性(默认值为guest和guest)公开。
STOMP代理中继还为每个连接的WebSocket客户端创建一个单独的TCP连接。您可以配置用于代表客户端创建的所有TCP连接的STOMP凭据。这在XML名称空间和Java配置中都显示为具有默认值guest和guest的clientLogin和clientPasscode属性。
STOMP代理中继始终在代表客户端转发给代理的每个CONNECT帧上设置登录和密码标头。因此,WebSocket客户端无需设置这些标头。他们被忽略。如“身份验证”部分所述,WebSocket客户端应改为依靠HTTP身份验证来保护WebSocket端点并建立客户端身份。
STOMP代理中继还通过“系统” TCP连接向消息代理发送和从消息代理接收心跳。您可以配置发送和接收心跳的间隔(默认情况下,每个间隔为10秒)。如果与代理的连接断开,则代理中继每5秒继续尝试重新连接,直到成功。
当与代理的“系统”连接丢失并重新建立时,任何Spring bean都可以实现ApplicationListener <BrokerAvailabilityEvent>来接收通知。例如,当没有活动的“系统”连接时,广播股票报价的股票报价服务可以停止尝试发送消息。
默认情况下,STOMP代理中继始终连接到同一主机和端口,如果连接断开,则根据需要重新连接。如果希望提供多个地址,则在每次尝试连接时,都可以配置地址供应商,而不是固定的主机和端口。以下示例显示了如何执行此操作:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue/", "/topic/").setTcpClient(createTcpClient());
registry.setApplicationDestinationPrefixes("/app");
}
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
client -> client.addressSupplier(() -> ... ),
new StompReactorNettyCodec());
}
}
您还可以使用virtualHost属性配置STOMP代理中继。 此属性的值设置为每个CONNECT帧的主机标头,并且很有用(例如,在建立TCP连接的实际主机与提供基于云的STOMP服务的主机不同的云环境中) )。
将消息路由到@MessageMapping方法时,它们将与AntPathMatcher匹配。 默认情况下,模式应使用斜杠(/)作为分隔符。 这是Web应用程序中的一个良好约定,类似于HTTP URL。 但是,如果您更习惯于消息传递约定,则可以切换为使用点(.)作为分隔符。
以下示例显示了如何在Java配置中执行此操作:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// ...
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/queue", "/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker application-destination-prefix="/app" path-matcher="pathMatcher">
<websocket:stomp-endpoint path="/stomp"/>
<websocket:stomp-broker-relay prefix="/topic,/queue" />
</websocket:message-broker>
<bean id="pathMatcher" class="org.springframework.util.AntPathMatcher">
<constructor-arg index="0" value="."/>
</bean>
</beans>
之后,控制器可以使用点(.)作为@MessageMapping方法中的分隔符,如以下示例所示:
@Controller
@MessageMapping("red")
public class RedController {
@MessageMapping("blue.{green}")
public void handleGreen(@DestinationVariable String green) {
// ...
}
}
客户端现在可以将消息发送到/app/red.blue.green123。
在前面的示例中,我们没有更改“代理中继”上的前缀,因为这些前缀完全取决于外部消息代理。 有关您使用的代理的信息,请参见STOMP文档页面,以查看其对目标标头支持的约定。
另一方面,“简单代理”确实依赖于已配置的PathMatcher,因此,如果切换分隔符,该更改也将应用于代理,以及代理将目标从消息匹配到订阅中的模式的方式。
每个通过WebSocket进行的STOMP消息传递会话均以HTTP请求开头。这可以是升级到WebSockets的请求(即WebSocket握手),或者在SockJS后备的情况下,可以是一系列SockJS HTTP传输请求。
许多Web应用程序已经具有身份验证和授权来保护HTTP请求。通常,使用某种机制(例如登录页面,HTTP基本认证或其他方式)通过Spring Security对用户进行认证。经过身份验证的用户的安全上下文保存在HTTP会话中,并与同一基于cookie的会话中的后续请求关联。
因此,对于WebSocket握手或SockJS HTTP传输请求,通常已经有一个可以通过HttpServletRequest#getUserPrincipal()访问的经过身份验证的用户。 Spring会自动将该用户与为其创建的WebSocket或SockJS会话相关联,并随后与该会话中通过用户标头传输的所有STOMP消息相关联。
简而言之,典型的Web应用程序除了已经为安全起见,就不需要采取任何其他措施。通过基于cookie的HTTP会话(然后与为该用户创建的WebSocket或SockJS会话相关联)维护的安全上下文在HTTP请求级别对用户进行身份验证,并导致在每个消息流上标记用户头通过应用程序。
请注意,STOMP协议在CONNECT帧上确实具有登录名和密码标头。这些最初是设计用于并且仍然需要的,例如,基于TCP的STOMP。但是,对于默认情况下,对于基于WebSocket的STOMP,Spring会在STOMP协议级别忽略授权标头,并假定该用户已经在HTTP传输级别进行了身份验证,并期望WebSocket或SockJS会话包含已通过身份验证的用户。
Spring Security提供了WebSocket子协议授权,该授权使用ChannelInterceptor来基于消息中的用户头对消息进行授权。另外,Spring Session提供了WebSocket集成,以确保当WebSocket会话仍处于活动状态时,用户HTTP会话不会过期。
Spring Security OAuth支持基于令牌的安全性,包括JSON Web令牌(JWT)。您可以将其用作Web应用程序中的身份验证机制,包括上一节中所述的WebSocket交互中的STOMP(即,通过基于cookie的会话维护身份)。
同时,基于cookie的会话并非总是最合适的(例如,在不维护服务器端会话的应用程序中或在通常使用标头进行身份验证的移动应用程序中)。
WebSocket协议RFC 6455“没有规定服务器可以在WebSocket握手期间对客户端进行身份验证的任何特定方式。”但是,实际上,浏览器客户端只能使用标准身份验证标头(即基本HTTP身份验证)或cookie,并且不能使用(例如)提供自定义标题。同样,SockJS JavaScript客户端也不提供通过SockJS传输请求发送HTTP标头的方法。请参阅sockjs-client发行号196。相反,它确实允许发送可用于发送令牌的查询参数,但是有其自身的缺点(例如,令牌可能会无意中与服务器日志中的URL一起记录)。
前面的限制适用于基于浏览器的客户端,不适用于基于Spring Java的STOMP客户端,该客户端支持通过WebSocket和SockJS请求发送标头。
因此,希望避免使用cookie的应用程序可能没有在HTTP协议级别进行身份验证的任何好的替代方法。他们可能更喜欢在STOMP消息传递协议级别使用标头进行身份验证,而不是使用cookie。这样做需要两个简单的步骤:
下一个示例使用服务器端配置来注册自定义身份验证拦截器。请注意,拦截器仅需要认证并在CONNECT消息上设置用户标头。 Spring记录并保存经过身份验证的用户,并将其与同一会话上的后续STOMP消息相关联。以下示例显示了如何注册自定义身份验证拦截器:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
Authentication user = ... ; // access authentication header(s)
accessor.setUser(user);
}
return message;
}
});
}
}
此外,请注意,目前,当您使用Spring Security的消息授权时,需要确保在Spring Security之前订购了ChannelInterceptor身份验证配置。 最好通过在标有@Order(Ordered.HIGHEST_PRECEDENCE + 99)的WebSocketMessageBrokerConfigurer的实现中声明自定义拦截器来最好地完成此操作。
应用程序可以发送针对特定用户的消息,并且Spring的STOMP支持可以识别以/ user /为前缀的目标。例如,客户端可能订阅了/ user / queue / position-updates目标。该目标由UserDestinationMessageHandler处理,并转换为用户会话唯一的目标(例如/ queue / position-updates-user123)。这提供了订阅通用命名目的地的便利,同时确保与预订同一目的地的其他用户不发生冲突,以便每个用户都可以接收唯一的库存头寸更新。
在发送方,可以将消息发送到一个目的地,例如/ user / {username} / queue / position-updates,然后由UserDestinationMessageHandler将其转换为一个或多个目的地,每个与该用户关联的会话一个目的地。这使应用程序中的任何组件都可以发送针对特定用户的消息,而不必知道他们的姓名和通用目的地。注解和消息传递模板也支持此功能。
消息处理方法可以通过@SendToUser批注将消息发送给与正在处理的消息相关联的用户(在类级别上也支持共享公共目标),如以下示例所示:
@Controller
public class PortfolioController {
@MessageMapping("/trade")
@SendToUser("/queue/position-updates")
public TradeResult executeTrade(Trade trade, Principal principal) {
// ...
return tradeResult;
}
}
如果用户具有多个会话,则默认情况下,所有订阅给定目标的会话都是目标。 但是,有时可能仅需要将发送正在处理的消息的会话作为目标。 您可以通过将broadcast属性设置为false来做到这一点,如以下示例所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handleAction() throws Exception{
// raise MyBusinessException here
}
@MessageExceptionHandler
@SendToUser(destinations="/queue/errors", broadcast=false)
public ApplicationError handleException(MyBusinessException exception) {
// ...
return appError;
}
}
尽管用户目的地通常暗指经过身份验证的用户,但这并不是严格要求的。 不与已认证用户关联的WebSocket会话可以订阅用户目的地。 在这种情况下,@ SendToUser批注的行为与broadcast = false完全相同(也就是说,仅针对发送正在处理的消息的会话)。
您可以从任何应用程序组件向用户目标发送消息,例如,注入由Java配置或XML名称空间创建的SimpMessagingTemplate。 (如果使用@Qualifier进行资格鉴定,则bean名称为“ brokerMessagingTemplate”。)下面的示例演示如何执行此操作:
@Service
public class TradeServiceImpl implements TradeService {
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public TradeServiceImpl(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
// ...
public void afterTradeExecuted(Trade trade) {
this.messagingTemplate.convertAndSendToUser(
trade.getUserName(), "/queue/position-updates", trade.getResult());
}
}
当将用户目标与外部消息代理一起使用时,应检查代理文档以了解如何管理非活动队列,以便在用户会话结束时,所有唯一的用户队列都将被删除。 例如,当您使用诸如/exchange/amq.direct/position-updates之类的目标时,RabbitMQ会创建自动删除队列。 因此,在这种情况下,客户端可以订阅/user/exchange/amq.direct/position-updates。 同样,ActiveMQ具有用于清除非活动目标的配置选项。
在多应用程序服务器方案中,由于用户连接到其他服务器,因此用户目的地可能无法解析。 在这种情况下,您可以配置目标以广播未解决的消息,以便其他服务器可以尝试。 这可以通过Java配置中MessageBrokerRegistry的userDestinationBroadcast属性以及XML中message-broker元素的user-destination-broadcast属性来完成。
来自代理的消息被发布到clientOutboundChannel,从那里被写入WebSocket会话。 由于该通道由ThreadPoolExecutor支持,因此消息在不同的线程中处理,并且客户端接收到的结果序列可能与发布的确切顺序不匹配。
如果这是一个问题,请启用setPreservePublishOrder标志,如以下示例所示:
@Configuration
@EnableWebSocketMessageBroker
public class MyConfig implements WebSocketMessageBrokerConfigurer {
@Override
protected void configureMessageBroker(MessageBrokerRegistry registry) {
// ...
registry.setPreservePublishOrder(true);
}
}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker preserve-publish-order="true">
<!-- ... -->
</websocket:message-broker>
</beans>
设置该标志后,同一客户端会话中的消息将一次发布到clientOutboundChannel,这样可以保证发布顺序。 请注意,这会产生很小的性能开销,因此,仅在需要时才应启用它。
通过实施Spring的ApplicationListener接口,可以发布并接收多个ApplicationContext事件:
当您使用功能齐全的代理时,如果代理暂时不可用,则STOMP“代理中继”会自动重新连接“系统”连接。但是,客户端连接不会自动重新连接。假设启用了心跳,客户端通常会注意到代理在10秒内没有响应。客户端需要实现自己的重新连接逻辑。
事件为STOMP连接的生命周期提供通知,但不是为每条客户端消息提供通知。 应用程序还可以注册一个ChannelInterceptor来拦截处理链中任何消息。 以下示例显示如何拦截来自客户端的入站消息:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new MyChannelInterceptor());
}
}
定制ChannelInterceptor可以使用StompHeaderAccessor或SimpMessageHeaderAccessor来访问有关消息的信息,如以下示例所示:
public class MyChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
StompCommand command = accessor.getStompCommand();
// ...
return message;
}
}
应用程序还可以实现ExecutorChannelInterceptor,它是ChannelInterceptor的子接口,在处理消息的线程中具有回调。 虽然针对发送到通道的每个消息调用一次ChannelInterceptor,但ExecutorChannelInterceptor在订阅到通道消息的每个MessageHandler线程中提供了挂钩。
请注意,与前面介绍的SesionDisconnectEvent一样,DISCONNECT消息可以来自客户端,也可以在关闭WebSocket会话时自动生成。 在某些情况下,对于每个会话,拦截器可能会多次拦截此消息。 关于多个断开事件,组件应该是幂等的。
Spring提供了一个基于WebSocket的STOMP客户端和一个基于TCP的STOMP客户端。
首先,您可以创建和配置WebSocketStompClient,如以下示例所示:
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的示例中,您可以用SockJsClient替换StandardWebSocketClient,因为这也是WebSocketClient的实现。 SockJsClient可以使用基于WebSocket或HTTP的传输作为后备。 有关更多详细信息,请参见SockJsClient。
接下来,您可以建立连接并为STOMP会话提供处理程序,如以下示例所示:
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
会话准备就绪后,将通知处理程序,如以下示例所示:
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
一旦建立会话,就可以发送任何有效负载并使用配置的MessageConverter对其进行序列化,如以下示例所示:
session.send("/topic/something", "payload");
您还可以订阅目的地。 subscription方法需要一个用于订阅中消息的处理程序,并返回可用于取消订阅的Subscription句柄。 对于每个收到的消息,处理程序可以指定有效负载应反序列化的目标对象类型,如以下示例所示:
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要启用STOMP心跳,您可以使用TaskScheduler配置WebSocketStompClient并有选择地自定义心跳间隔(写非活动状态为10秒,导致发送心跳,读非活动状态为10秒,关闭连接)。
当您使用WebSocketStompClient进行性能测试以模拟同一台计算机上的数千个客户端时,请考虑关闭心跳,因为每个连接都计划自己的心跳任务,并且并未针对在同一台计算机上运行的大量客户端进行优化。
STOMP协议还支持回执,在发送或订阅处理完毕后,客户端必须在其中添加一个回执标头,服务器以RECEIPT帧响应该回执标头。为此,StompSession提供了setAutoReceipt(boolean),它使收据头添加到每个后续的send或subscription事件上。或者,您也可以手动将收据标头添加到StompHeaders。发送和订阅都返回一个Receiptable实例,您可以使用该实例注册接收成功和失败的回调。要使用此功能,必须为客户端配置TaskScheduler以及收据过期之前的时间(默认为15秒)。
请注意,StompSessionHandler本身是一个StompFrameHandler,它除了处理来自消息处理的异常的handleException回调和处理包含ConnectionLostException的传输级错误的handleTransportError之外,还可以处理ERROR帧。
每个WebSocket会话都有一个属性映射。 该映射作为标头附加到入站客户端消息,可以通过控制器方法进行访问,如以下示例所示:
@Controller
public class MyController {
@MessageMapping("/action")
public void handle(SimpMessageHeaderAccessor headerAccessor) {
Map<String, Object> attrs = headerAccessor.getSessionAttributes();
// ...
}
}
您可以在websocket范围内声明一个Spring托管的bean。 您可以将WebSocket作用域的bean注入控制器以及在clientInboundChannel上注册的所有通道拦截器。 这些通常是单例,并且比任何单独的WebSocket会话寿命更长。 因此,您需要对作用域WebSocket的bean使用作用域代理模式,如以下示例所示:
@Component
@Scope(scopeName = "websocket", proxyMode = ScopedProxyMode.TARGET_CLASS)
public class MyBean {
@PostConstruct
public void init() {
// Invoked after dependencies injected
}
// ...
@PreDestroy
public void destroy() {
// Invoked when the WebSocket session ends
}
}
@Controller
public class MyController {
private final MyBean myBean;
@Autowired
public MyController(MyBean myBean) {
this.myBean = myBean;
}
@MessageMapping("/action")
public void handle() {
// this.myBean from the current WebSocket session
}
}
与任何自定义范围一样,Spring首次在控制器中对其进行访问时会初始化一个新的MyBean实例,并将该实例存储在WebSocket会话属性中。 随后将返回相同的实例,直到会话结束。 WebSocket范围的bean调用了所有Spring生命周期方法,如前面的示例所示。
关于性能,没有灵丹妙药。影响它的因素很多,包括消息的大小和数量,应用程序方法是否执行需要阻止的工作以及外部因素(例如网络速度和其他问题)。本部分的目的是提供可用配置选项的概述,以及有关如何进行扩展的一些想法。
在消息传递应用程序中,消息通过通道传递以进行异步执行,并由线程池支持。配置这样的应用程序需要对通道和消息流有充分的了解。因此,建议查看消息流。
最明显的开始是配置支持clientInboundChannel和clientOutboundChannel的线程池。默认情况下,两者都配置为可用处理器数量的两倍。
如果带注解的方法中的消息处理主要是受CPU限制的,则clientInboundChannel的线程数应保持接近处理器数。如果他们所做的工作更多地受到IO限制,并且需要阻塞或等待数据库或其他外部系统,则可能需要增加线程池大小。
ThreadPoolExecutor具有三个重要属性:核心线程池大小,最大线程池大小,以及队列存储没有可用线程的任务的容量。
常见的混淆点是,配置核心池大小(例如10)和最大池大小(例如20)会导致线程池具有10到20个线程。实际上,如果将容量保留为其默认值Integer.MAX_VALUE,则由于所有其他任务都已排队,因此线程池永远不会超过核心池的大小。
请参阅ThreadPoolExecutor的javadoc,以了解这些属性如何工作并了解各种排队策略。
在clientOutboundChannel方面,所有关于向WebSocket客户端发送消息。如果客户端在快速网络上,则线程数应保持接近可用处理器数。如果它们很慢或带宽很低,它们将花费更长的时间来消耗消息并给线程池增加负担。因此,必须增加线程池的大小。
尽管clientInboundChannel的工作负载可以预测-毕竟,它是基于应用程序的工作-但是,如何配置“ clientOutboundChannel”却比较困难,因为它基于应用程序无法控制的因素。因此,还有两个与消息发送有关的属性:sendTimeLimit和sendBufferSizeLimit。您可以使用这些方法来配置发送消息到客户端时允许发送多长时间以及可以缓冲多少数据。
通常的想法是,在任何给定时间,只能使用单个线程将其发送给客户端。同时,所有其他消息都将被缓冲,并且您可以使用这些属性来决定允许发送消息花费多长时间以及在此期间可以缓冲多少数据。有关其他重要信息,请参见XML模式的javadoc和文档。
以下示例显示了可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setSendTimeLimit(15 * 1000).setSendBufferSizeLimit(512 * 1024);
}
// ...
}
下面的示例显示与前面的示例等效的XML配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport send-timeout="15000" send-buffer-size="524288" />
<!-- ... -->
</websocket:message-broker>
</beans>
您还可以使用前面显示的WebSocket传输配置来配置传入STOMP消息的最大允许大小。 从理论上讲,WebSocket消息的大小几乎是无限的。 实际上,WebSocket服务器施加了限制,例如Tomcat 8K和Jetty 64K。 因此,STOMP客户端(例如JavaScript webstomp-client等)在16K边界处拆分较大的STOMP消息,并将其作为多个WebSocket消息发送,这需要服务器进行缓冲和重新组装。
Spring的STOMP-over-WebSocket支持可以做到这一点,因此应用程序可以为STOMP消息配置最大大小,而与WebSocket服务器特定的消息大小无关。 请记住,如有必要,将自动调整WebSocket消息的大小,以确保它们最多可以承载16K WebSocket消息。
以下示例显示了一种可能的配置:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(128 * 1024);
}
// ...
}
下面的示例显示与前面的示例等效的XML配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:websocket="http://www.springframework.org/schema/websocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/websocket
https://www.springframework.org/schema/websocket/spring-websocket.xsd">
<websocket:message-broker>
<websocket:transport message-size="131072" />
<!-- ... -->
</websocket:message-broker>
</beans>
关于扩展的重要一点涉及使用多个应用程序实例。 当前,您无法使用简单代理执行此操作。 但是,当您使用功能齐全的代理(例如RabbitMQ)时,每个应用程序实例都连接到代理,并且从一个应用程序实例广播的消息可以通过代理广播到通过任何其他应用程序实例连接的WebSocket客户端。
使用Spring的STOMP-over-WebSocket支持时,有两种主要的方法来测试应用程序。首先是编写服务器端测试以验证控制器的功能及其带注解的消息处理方法。第二个是编写涉及运行客户端和服务器的完整的端到端测试。
两种方法不是互斥的。相反,每个人在整体测试策略中都有自己的位置。服务器端测试更加集中,更易于编写和维护。另一方面,端到端集成测试更完整,测试更多,但编写和维护也更加复杂。
服务器端测试的最简单形式是编写控制器单元测试。但是,这还不够有用,因为控制器所做的很多事情都取决于其注解。纯单元测试根本无法测试。
理想情况下,应该像在运行时那样调用被测控制器,就像使用Spring MVC Test框架测试处理HTTP请求的控制器的方法一样,即不运行Servlet容器而是依靠Spring框架来调用被测控制器。带注解的控制器。与Spring MVC Test一样,您有两种可能的选择,要么使用“基于上下文的”设置,要么使用“独立的”设置:
在股票投资组合样本应用程序的测试中演示了这两种设置方案。
第二种方法是创建端到端集成测试。为此,您需要以嵌入式模式运行WebSocket服务器,并将其作为WebSocket客户端连接到该服务器,该客户端发送包含STOMP帧的WebSocket消息。针对股票样本应用程序的测试还通过将Tomcat用作嵌入式WebSocket服务器和用于测试目的的简单STOMP客户端,演示了此方法。