hw3-impl
请你在大二开发的E-BookStore系统的基础上,完成下列任务:
A. 在上次作业编写的通过Kafka消息中间件处理订单的功能的基础上,迭代更新采用WebSocket方式将订单处理的结果发送回客户端,具体要求为:
-
i. 参考课程给出的React前端和Spring后端采用WebSocket方式通信的样例,按照WebSocketTransfer的方式:
- 在后端的消息监听器类监听到消息处理结果Topic中的消息后,通过WebSocket发送给前端;
- 或者,直接在Service中处理好订单后,通过WebSocket发送给前端。
-
ii. 你应该参考WebSocketTransfer样例中的方式,正确地将订单处理结果返回给下订单的客户端,不能发送给所有客户端。即,你需要对返回给前端的用户进行筛选。
-
iii. 参考课程给出的样例,在你代码中维护WebSocket的客户端Session时,选择线程安全的集合数据结构。
-
iv. 你应该将上述功能集成到你的E-Book系统中,如果你无法将上述功能集成到你的E-Book系统中,可以单独建立工程实现,但是会适当扣分。
-
v. 请你编写一个文档,解释你的程序设计方案,包括WebSocket的消息格式设计、筛选客户端的方式设计,以及回答为什么要选择线程安全的集合类型来维护客户端Session,而你选择的类型为什么是线程安全的。将文档与代码一起上传。
B. 在数据库事务管理中,持久性是指事务提交后即使数据库产生故障,事务提交的结果仍然可以在数据库中访问到,不会丢失。请编写文档回答下面的问题:
- i. 如果数据库系统在事务执行过程中不断地将事务操作的结果执行落盘操作,会带来什么潜在问题?可以如何处理?
- ii. 如果数据库系统在事务执行提交后再将事务操作的结果执行落盘操作,会带来什么潜在问题?可以如何处理?
由于在上次作业我使用的是WebSocketMessageBrokerConfigurer + SimpMessagingTemplate的形式
这次我依然使用它们进行迭代
WebSocket消息格式
public class WebSocketDTO {
public enum Status {
JOIN, MESSAGE, LEAVE
}
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class Message {
private String content;
private Long senderId;
private Long receiverId;
private String date;
private Status status;
}
}
Status 用于标识特殊消息(加入和离开)
senderId和receiverId用于"路由", 这也是我设计的筛选客户端的方法的一部分
筛选客户端方式设计
client可以选择订阅广播的public topic或者给发送/接受一对一的user私人频道, 而请求一对一的私人频道就由Id来路由
通过@MessageMapping 和 convertAndSend 来控制发送对象, 通过WebSocketMessageBrokerConfigurer来自动维护topic群组的概念, 应用程序只需要向某个topic发送或者接受消息即可
广播heartbeat和私发通知的实例代码如下
// 10s heartbeat
@Scheduled(fixedRate = 10000)
public WebSocketDTO.Message heartbeat() {
System.out.println("heartbeat");
simpMessagingTemplate.convertAndSend("/topic/heartbeats", new WebSocketDTO.Message("heartbeat", 0L, -1L, now(), WebSocketDTO.Status.MESSAGE));
return new WebSocketDTO.Message("heartbeat", 0L, -1L, now(), WebSocketDTO.Status.MESSAGE);
}
// /user/{customerId}/queue/notifications
@KafkaListener(topics = Constant.RESULT_TOPIC, groupId = Constant.GROUP_ID)
public void notifyOrderFinished(OrderDTO.AsyncOrderCreateResponse response) {
logger.warn("Received response: {}", response);
WebSocketDTO.Message message = new WebSocketDTO.Message(response.getMessage(), response.getCustomerId(),
response.getCustomerId(), now(), WebSocketDTO.Status.MESSAGE);
logger.warn("Send to user "+ response.getCustomerId());
simpMessagingTemplate.convertAndSendToUser(String.valueOf(response.getCustomerId()),
"/queue/notifications", message);
}
通过getCustomerId()
组装进路由实现单发
为什么要选择并发安全的集合类型
因为常见的集合结构例如HashMap, ArrayList是并发不安全的, 在并发请求后端websocket服务时, 会导致请求的session不能正常得到维护, 发生错误或资源泄漏等
我选用的并发数据结构是ConcurrentMap
在
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer
这个集合类的底层实现里面
public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
// ...
private final Map<String, SessionInfo> sessions = new ConcurrentHashMap(); // 这里
@Nullable
private ScheduledFuture<?> heartbeatFuture;
public SimpleBrokerMessageHandler(SubscribableChannel clientInboundChannel, MessageChannel clientOutboundChannel, SubscribableChannel brokerChannel, Collection<String> destinationPrefixes) {
super(clientInboundChannel, clientOutboundChannel, brokerChannel, destinationPrefixes);
}
为什么ConcurrentMap是线程安全的
因为它底层使用数组 + 链表/红黑树的形式
在数组的单个节点上有一把可重入锁, 需要获取锁才能对链表/红黑树之中的元素进行操作, 因而是线程安全的
如果数据库系统在事务执行过程中不断地将事务操作的结果执行落盘操作,会带来什么潜在问题?可以如何处理?
脏读, 不可重复读等事务隔离性问题, 回滚等事务原子性问题; 可以用MVCC和锁(取决于事务隔离级别)来控制并发事务相互的可见性, 从而控制执行中事务对其他事务的落盘数据的可见性; 可以在落盘之前先写undo log来让落盘数据允许回滚.