hw2-impl
请你在大二开发的E-BookStore系统的基础上,完成下列任务:
A. 针对现有E-BookStore中下订单功能,进行事务控制,具体要求为:
- i. 假设你的代码中在下订单的Service中,需要完成两件事情,在数据库的Order表中插入一条记录,在OrderItem表中插入多条记录。例如,如果一个订单包含3种不同的书,那么在Order表中会插入一条记录,在OrderItem表中会插入3条记录;
- ii. 按照上述操作,Service需要调用OrderDao和OrderItemDao两个对象的响应方法来实现下订单,而且必须保证两个表的数据要么都插入成功,要么都不插入,即这两个表的插入操作必须在一个事务中完成;
- iii. 如果你之前的系统不是这样实现的,那么就按照上述要求重构你的代码以满足要求;
- iv. 请你参照上课讲解的transfer样例,通过声明方式实现上述事务管理功能,并按照课件中第25页所列表格通过修改@Transactional中的事务传播属性值来观察不同的执行结果,在文档中记录下结果,并解释为什么会出现这样的结果,将文档与代码一起上传。
B. 参照上课给的样例,编写通过Kafka消息中间件处理订单的功能的前端,具体要求为:
- i. 你可以选择用以下三种方式中的任意一种实现下订单结果在前端呈现:
- 在前端工程中,使用JavaScript监听订单处理结果消息发送到的Topic,然后刷新页面,可以参照https://www.jianshu.com/p/d6b803fa808a 上给出的代码;
- 在前端发送Ajax请求获取订单的最新状态,后端接收到请求后将订单状态返回给前端去显示;
- 采用WebSocket方式,后端的消息监听器类监听到消息处理结果Topic中的消息后,通过WebSocket发送给前端。
- ii. 你应该将上述功能集成到你的E-Book系统中,如果你无法将上述功能集成到你的E-Book系统中,可以单独建立工程实现,但是会适当扣分。
- iii. 请将你编写的相关代码整体压缩后上传,请勿压缩整个工程提交。
- iv. 请你编写一个文档,解释你的程序设计方案,包括消息格式、Topic配置项、订单处理结果在前端呈现的方式,并且要对3种前端呈现方式的优缺点做出你的分析。将文档与代码一起上传。
partA
事务注解如图
我选择了使用@Transactional(propagation = Propagetion.REQUIRED)的事务执行策略
可以看到, 我在前端尝试下单, 而在后端用1/0故意触发错误后, 触发回滚,前端购物车没有更改,订单列表也没有更新, 后端抛出错误后依然正常运行.
此处是开启了一个包含了 书籍库存变化-创建orDERITEM项-创建ORDER项-写回返回到KAFKA的整个流程的事务, 使用Required的注解, 创建了一个新事务, 如果存在上层事务则合并
将整个大事务修改为REQUIRES_NEW和Nested时:由于只有一个大事务, 所以结果没有什么不同. 修改其中书籍部分为requires_new时, 由于书籍部分在独立的新事务完成, 当在书籍部分出现错误(例如库存不足)时, 整体并没有回滚;在书籍部分完成之后订单部分出现错误时, 书籍部分也不再回滚, 出现了状态的不一致和破坏. 将其中书籍部分为nested时, 订单部分错误导致了整体的回滚,但当只有书籍库存部分出错时, 整体依旧提交,只有书籍库存部分回滚, 也是出现了不一致.
partB
我的整体设计是采用websocket + sockjs的方案
后端核心代码见下
配置部分:
package com.example.ebookstorebackend.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// "/topic" is for broadcasting to all subscribers
// "/user" is for private messaging
config.enableSimpleBroker("/topic", "/user"); // Enables a simple in-memory broker
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("http://localhost:5173").withSockJS();
}
}
Controller部分:
package com.example.ebookstorebackend.controller;
import com.example.ebookstorebackend.config.WebSocketConfig;
import com.example.ebookstorebackend.dto.OrderDTO;
import com.example.ebookstorebackend.dto.WebSocketDTO;
import com.example.ebookstorebackend.utils.Constant;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnMessage;
import jakarta.websocket.OnOpen;
import jakarta.websocket.Session;
import jakarta.websocket.server.PathParam;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Controller;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import static com.example.ebookstorebackend.utils.Time.now;
@Controller
public class WsOrderController {
@Autowired
private SimpMessagingTemplate simpMessagingTemplate;
static final private Logger logger = LoggerFactory.getLogger(WsOrderController.class);
@MessageMapping("/broadcast") // app/broadcast
@SendTo("/topic/public")
public WebSocketDTO.Message receivePublicMessage(@Payload WebSocketDTO.Message message) {
return message;
}
@MessageMapping("/echo")
public void echo(@Payload WebSocketDTO.Message message) {
simpMessagingTemplate.convertAndSend("/topic/public", message);
}
@MessageMapping("/hello") // app/hello
public WebSocketDTO.Message greeting(@Payload WebSocketDTO.Message message) {
// /user/{receiver}/queue/chats
// config.setUserDestinationPrefix("/user");
// send
logger.warn("Received message: {}", message);
simpMessagingTemplate.convertAndSendToUser(String.valueOf(message.getReceiverId()), "/queue/chats", message);
// return not send
return message;
}
// 10s heartbeat
@Scheduled(fixedRate = 10000)
public WebSocketDTO.Message heartbeat() {
System.out.println("heartbeat");
simpMessagingTemplate.convertAndSend("/topic/heartbeats", new WebSocketDTO.Message("heartbeat", 0L, -1L, now(), WebSocketDTO.Status.MESSAGE));
return new WebSocketDTO.Message("heartbeat", 0L, -1L, now(), WebSocketDTO.Status.MESSAGE);
}
// /user/{customerId}/queue/notifications
@KafkaListener(topics = Constant.RESULT_TOPIC, groupId = Constant.GROUP_ID)
public void notifyOrderFinished(OrderDTO.AsyncOrderCreateResponse response) {
logger.warn("Received response: {}", response);
WebSocketDTO.Message message = new WebSocketDTO.Message(response.getMessage(), response.getCustomerId(),
response.getCustomerId(), now(), WebSocketDTO.Status.MESSAGE);
logger.warn("Send to user "+ response.getCustomerId());
simpMessagingTemplate.convertAndSendToUser(String.valueOf(response.getCustomerId()),
"/queue/notifications", message);
}
}
消息体部分
package com.example.ebookstorebackend.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
public class WebSocketDTO {
public enum Status {
JOIN, MESSAGE, LEAVE
}
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class Message {
private String content;
private Long senderId;
private Long receiverId;
private String date;
private Status status;
}
}
其中, 后端将订单确认放在了/user/{customerId}/queue/notifications
这个user topic, 将这个Controller的方法作为@KafkaListener
以接受kafka消息, 并使用
simpMessagingTemplate.convertAndSendToUser(String.valueOf(response.getCustomerId()),
"/queue/notifications", message);
转发到前端
前端订阅的核心代码是这个hook
import {
useState,
useEffect,
useRef,
useCallback,
SetStateAction,
} from "react";
import SockJS from "sockjs-client/dist/sockjs";
import { CompatClient, Stomp } from "@stomp/stompjs";
type PrivateTopicParam = {
topics: string[];
user_id: number;
};
type WebSocketClientProps = {
topics: string[];
url: string;
users?: PrivateTopicParam;
};
type WebSocketMessage = {
senderId: number;
receiverId?: number;
content: any;
date?: string;
};
const useWebSocketClient = ({ topics, url, users }: WebSocketClientProps) => {
const [messages, setMessages] = useState<string[]>([]);
const [isConnected, setIsConnect] = useState(false);
const [error, setError] = useState<string | null>(null);
const stompClientRef = useRef<CompatClient | null>(null);
const connect = useCallback(() => {
stompClientRef.current = Stomp.over(new SockJS(url));
stompClientRef.current.connect(
{},
() => {
setIsConnect(true);
setError(null);
topics.forEach((topic) =>
stompClientRef.current?.subscribe(`/topic/${topic}`, (message) => {
console.log(`Subscribe /topic/${topic}`);
setMessages((prev) => [...prev, message.body]);
})
);
if (users) {
users.topics.forEach((topic) =>
stompClientRef.current?.subscribe(
`/user/${users.user_id}/${topic}`,
(message) => {
console.log(`Subscribe /user/${users.user_id}/${topic}`);
setMessages((prev) => [...prev, message.body]);
}
)
);
}
},
(error: SetStateAction<string | null>) => {
setIsConnect(false);
setError(error);
}
);
}, [topics, url, users]);
const disconnect = useCallback(() => {
if (stompClientRef.current && stompClientRef.current.connected) {
stompClientRef.current.disconnect(() => {
setIsConnect(false);
});
}
}, []);
const sendMessage = useCallback(
(topic: string, message: WebSocketMessage) => {
if (stompClientRef.current && stompClientRef.current.connected) {
stompClientRef.current.send(
`/app/${topic}`,
{},
JSON.stringify(message)
);
}
},
[]
);
useEffect(() => {
connect();
return () => disconnect();
}, [connect, disconnect]);
return { isConnected, messages, error, sendMessage };
};
export default useWebSocketClient;
export type { WebSocketClientProps, PrivateTopicParam, WebSocketMessage };
使用了sockjs
库和后端的withSockJS();
相匹配, 将链接和订阅, 发送消息包装成一个hook向上提供, 在上层的component就可以使用其拿到websocket的message数据, 其后再根据WebSocketMessage
进行解析
sockJS为浏览器提供了回退的选项, 先尝试http, 能连通再升级为websocket, 当不支持websocket的时候自动降级为http
而在需要使用websocket的地方, 我的选择是
- 全局生存期的一个不可见组件
GlobalWebSocketReponseProvider
来订阅和连接, 接受消息 - Provider下的
WebSocketAdapter
接受消息后解析成Notification放入全局状态管理 - 其余业务逻辑通过全局状态管理的hook监听是否有Notification, 并处理
WebSocketAdapter
import { useEffect, useState } from "react";
import useWebSocketClient, {
WebSocketClientProps,
WebSocketMessage,
} from "../hooks/useWs";
import useNotificationStore from "../stores/useNotificationStore";
const WebSocketAdapter = (props: WebSocketClientProps) => {
console.log(
`re-rendering WebSocketAdapter with user_id: ${props.users?.user_id}`
);
const { messages } = useWebSocketClient(props);
const pushNotification = useNotificationStore((s) => s.pushNotification);
const [offset, setOffset] = useState(0);
useEffect(() => {
if (messages.length > offset) {
messages.slice(offset).forEach((element) => {
const message: WebSocketMessage = JSON.parse(element);
pushNotification({ content: message.content, level: "info" });
});
console.log("total messages " + messages.length);
setOffset(messages.length);
}
}, [messages, pushNotification]);
return <></>;
};
export default WebSocketAdapter;
全局通知状态管理
import { create } from "zustand";
type Notifaction = {
content: string;
level: "info" | "warning" | "error";
time?: number;
}
interface NotificationStore {
message_queue: Notifaction[];
pushNotification: (notifaction: Notifaction) => void;
popNotification: () => void;
}
const useNotificationStore = create<NotificationStore>((set) => ({
message_queue: [],
pushNotification: (notifaction: Notifaction) =>
set((state) => ({ message_queue: [...state.message_queue, notifaction] })),
popNotification: () =>
set((state) => ({ message_queue: state.message_queue.slice(1) })),
}));
export default useNotificationStore;
export type { Notifaction, NotificationStore };
Provider
import { useEffect, useState } from "react";
import { useMe } from "../hooks/useUsers";
import { SOCK_WS_CONN_URL } from "../common/common";
import useNotificationStore from "../stores/useNotificationStore";
import WebSocketAdapter from "./WebSocketAdapter";
const GlobalWebSocketReponseProvider = () => {
const { data: me, isLoading, isError } = useMe();
// {customerId}/queue/notifications
const user_topics = ["queue/notifications"];
const [props, setProps] = useState({
topics: [],
url: SOCK_WS_CONN_URL,
users: { user_id: me?.id || 0, topics: user_topics },
});
// ATTENTION: 新建一个props对象避免shallow copy 传给下面不adapter不重新渲染
useEffect(() => {
if (me?.id) {
console.log("update props");
const new_user = { user_id: me?.id || 0, topics: user_topics };
const new_props = { url: SOCK_WS_CONN_URL, topics: [], users: new_user };
setProps(new_props);
}
}, [me?.id]);
const pushNotification = useNotificationStore((s) => s.pushNotification);
useEffect(() => {
pushNotification({
content: `Connected to the server on user ${me?.id}`,
level: "info",
});
}, [me]);
return <WebSocketAdapter {...props} />;
};
export default GlobalWebSocketReponseProvider;
全局通知
import React, { useEffect } from "react";
import { message } from "antd";
import useNotificationStore from "../stores/useNotificationStore";
const GlobalNotification = () => {
const [messageApi, contextHolder] = message.useMessage();
const message_queue = useNotificationStore((s) => s.message_queue);
const pop = useNotificationStore((s) => s.popNotification);
useEffect(() => {
if (message_queue.length > 0) {
const element = message_queue[0];
const exist_time = element.time ? element.time : 1000;
setTimeout(() => {
if (element.level === "error") {
messageApi.error(element.content);
}
if (element.level === "info") {
messageApi.info(element.content);
}
if (element.level === "warning") {
messageApi.warning(element.content);
}
pop();
}, exist_time);
}
}, [message_queue, messageApi, pop]);
return <>{contextHolder}</>;
};
export default GlobalNotification;
在全局的Layout下挂上Notification和Provider
return (
<Layout>
<GlobalNotification />
<GlobalWebSocketReponseProvider />
//...
最后逻辑形成闭环
前端下订单 -> 后端拿到, 交给Kafka, 并返回给前端初步结果(处理中) -> 后端处理完毕, 通过websocket推送给前端 -> 前端从订阅的user私有topic接受 -> 存入全局通知队列 -> react渲染逻辑发现通知队列不空, 取出显示
三种方法的对比分析
方法一:使用 JavaScript 监听 Topic 并刷新页面
- 优点: 实现简单,无需复杂的客户端技术。 如果订单处理结果不需要实时更新,这种方法足够。
- 缺点: 效率低下,用户体验差。 页面刷新会中断用户当前操作,并导致页面重新加载所有资源,造成不必要的等待时间。 它是一种轮询机制的变种,只是轮询的触发条件变成了消息的到达,而不是定时器。 这种方法不适合需要实时更新的场景,例如需要立即显示订单状态变化的情况。 此外,它依赖于浏览器对消息的监听,如果浏览器关闭或网络中断,则无法收到消息。
方法二:使用 Ajax 轮询订单状态
- 优点: 比方法一效率更高,因为不需要刷新整个页面。 前端可以定期发送 Ajax 请求到后端获取订单状态,后端返回最新的状态信息。
- 缺点: 仍然存在一定的延迟,因为需要定期发送请求。 请求频率过高会增加服务器负载,频率过低则会增加延迟。 它是一种典型的轮询机制,需要前端不断地向后端请求数据,效率和资源 消耗都比 WebSocket 差。 同样,如果网络中断,则无法获取最新的订单状态。
方法三:使用 WebSocket
- 优点: 实时性高,效率高,用户体验好。 WebSocket 建立持久连接,后端可以立即将订单处理结果推送到前端,无需前端主动请求。 这显著提高了响应速度和用户体验。 资源消耗也相对较低,因为只需要建立一个连接,而不是不断地发送请求。
- 缺点: 实现相对复杂,需要前端和后端都支持 WebSocket 协议。 如果 WebSocket 连接断开,需要有重连机制来保证连接的稳定性。 需要考虑连接的并发数和服务器的负载能力。
对于需要实时更新订单状态的应用场景,我认为方法三 (WebSocket)是最佳选择,因为它提供了最高的实时性和最佳的用户体验。 方法二 (Ajax 轮询) 作为一种折中方案,适用于对实时性要求不高的情况。 方法一 (监听 Topic + 页面刷新) 效率最低,只适用于对实时性要求极低且页面刷新不会对用户造成太大干扰的场景。 选择哪种方法取决于具体的应用场景和对实时性、效率和开发成本的要求。在ebookstore这种情况下, 暂时没有高并发数, 所以我选择使用websocket来获取足够好的用户体验