feat: 增加通知系统、RabbitMQ集成及Docker一键部署脚本
All checks were successful
test/timeline-server/pipeline/head This commit looks good

1. 新增通知中心功能,支持好友请求、评论、点赞等多种通知类型的持久化与推送
2. 集成 RabbitMQ 用于异步处理动态日志,解耦动态服务与日志记录逻辑
3. 提供完整的 Docker Compose 部署方案及一键启动/停止脚本(Shell/Bat)
4. 优化文件服务,增加图片上传时的自动压缩处理以节省存储空间
5. 增强动态服务,支持通过 shareId 公开访问动态项及关键词搜索功能
6. 完善代码健壮性,在关键业务 Service 层增加 @Transactional 事务控制
This commit is contained in:
2026-02-11 14:28:27 +08:00
parent 35f3959474
commit 482c32a59c
77 changed files with 2396 additions and 646 deletions

View File

@@ -22,7 +22,7 @@ public class StompPrincipalInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
// 从session attributes中获取userId
Map<String, Object> sessionAttrs = accessor.getSessionAttributes();
@@ -35,7 +35,7 @@ public class StompPrincipalInterceptor implements ChannelInterceptor {
accessor.setUser(principal);
}
}
// 如果还没有Principal尝试从请求头获取
if (accessor.getUser() == null) {
String userId = accessor.getFirstNativeHeader("X-User-Id");
@@ -45,7 +45,7 @@ public class StompPrincipalInterceptor implements ChannelInterceptor {
accessor.setUser(principal);
}
}
// 打印调试信息
Principal principal = accessor.getUser();
if (principal != null) {
@@ -54,8 +54,7 @@ public class StompPrincipalInterceptor implements ChannelInterceptor {
log.warn("STOMP CONNECT: 未设置PrincipalsessionAttributes: {}", sessionAttrs);
}
}
return message;
}
}

View File

@@ -11,6 +11,8 @@ import java.util.Map;
@Slf4j
public class UserIdPrincipalHandshakeHandler extends DefaultHandshakeHandler {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(UserIdPrincipalHandshakeHandler.class);
@Override
protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) {
// 首先尝试从attributes中获取userId由XUserIdHandshakeInterceptor放入

View File

@@ -20,6 +20,8 @@ import java.util.Map;
@Slf4j
public class XUserIdHandshakeInterceptor implements HandshakeInterceptor {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(XUserIdHandshakeInterceptor.class);
// 从配置中获取JWT密钥
@Value("${jwt.secret:timelineSecretKey}")
private String jwtSecret = "timelineSecretKey";

View File

@@ -61,13 +61,14 @@ public class AuthController {
String username = jwtUtils.getUsernameFromToken(refreshToken);
String newAccess = jwtUtils.generateAccessToken(userId, username);
String newRefresh = jwtUtils.generateRefreshToken(userId, username);
LoginResponse resp = new LoginResponse(newAccess, newRefresh, jwtUtils.getAccessExpirationSeconds(), userId, username);
LoginResponse resp = new LoginResponse(newAccess, newRefresh, jwtUtils.getAccessExpirationSeconds(), userId,
username);
return ResponseEntity.success(resp);
}
@PostMapping("/logout")
public ResponseEntity<String> logout(@RequestHeader(value = "Authorization", required = false) String authHeader,
@RequestBody(required = false) RefreshRequest request) {
@RequestBody(required = false) RefreshRequest request) {
String accessToken = extractToken(authHeader);
String refreshToken = request != null ? request.getRefreshToken() : null;

View File

@@ -1,6 +1,8 @@
package com.timeline.user.controller;
import com.timeline.common.response.ResponseEntity;
import com.timeline.user.dto.NotificationPayload;
import com.timeline.user.dto.NotificationType;
import com.timeline.user.service.UserMessageService;
import com.timeline.user.ws.WsNotifyService;
import lombok.Data;
@@ -8,17 +10,18 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@RestController
@RequestMapping("/user/message")
@RequestMapping("/user/message/push")
public class MessagePushController {
@Autowired
private WsNotifyService wsNotifyService;
@Autowired
private UserMessageService userMessageService;
@@ -35,7 +38,7 @@ public class MessagePushController {
@RequestParam String toUserId,
@RequestParam String destination,
@RequestBody String message) {
Map<String, Object> payload = new HashMap<>();
payload.put("message", message);
payload.put("timestamp", System.currentTimeMillis());
@@ -43,7 +46,7 @@ public class MessagePushController {
wsNotifyService.pushMessageToUser(toUserId, destination, payload);
log.info("向用户 {} 推送消息到 {}: {}", toUserId, destination, message);
return ResponseEntity.success("消息已推送");
}
@@ -58,29 +61,38 @@ public class MessagePushController {
public ResponseEntity<String> sendNotificationToUser(
@RequestParam String toUserId,
@RequestBody NotificationRequest request) {
NotificationPayload payload = new NotificationPayload();
payload.setTitle(request.getTitle());
payload.setContent(request.getContent());
payload.setType(request.getType());
payload.setTimestamp(System.currentTimeMillis());
NotificationType type = NotificationType.SYSTEM_MESSAGE;
try {
if (request.getType() != null) {
type = NotificationType.valueOf(request.getType());
}
} catch (Exception e) {
log.warn("Invalid notification type: {}", request.getType());
}
NotificationPayload payload = NotificationPayload.builder()
.title(request.getTitle())
.content(request.getContent())
.type(type)
.createTime(LocalDateTime.now())
.build();
wsNotifyService.sendNotificationToUser(toUserId, payload);
// 同时存储为未读消息
userMessageService.addUnreadMessage(toUserId, Map.of(
"type", "notification",
"title", request.getTitle(),
"content", request.getContent(),
"notificationType", request.getType(),
"timestamp", System.currentTimeMillis()
));
"type", "notification",
"title", request.getTitle(),
"content", request.getContent(),
"notificationType", request.getType(),
"timestamp", System.currentTimeMillis()));
log.info("向用户 {} 发送通知: {}", toUserId, request.getTitle());
return ResponseEntity.success("通知已发送");
}
/**
* 向指定用户添加未读消息(不会立即推送,只在下次连接时推送)
*
@@ -92,16 +104,15 @@ public class MessagePushController {
public ResponseEntity<String> addUnreadMessage(
@RequestParam String toUserId,
@RequestBody UnreadMessageRequest request) {
userMessageService.addUnreadMessage(toUserId, Map.of(
"type", request.getType(),
"title", request.getTitle(),
"content", request.getContent(),
"timestamp", System.currentTimeMillis()
));
"type", request.getType(),
"title", request.getTitle(),
"content", request.getContent(),
"timestamp", System.currentTimeMillis()));
log.info("为用户 {} 添加未读消息: {}", toUserId, request.getTitle());
return ResponseEntity.success("未读消息已添加");
}
@@ -110,20 +121,60 @@ public class MessagePushController {
private String title;
private String content;
private String type; // info, warning, error
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
@Data
public static class NotificationPayload {
private String title;
private String content;
private String type;
private long timestamp;
}
@Data
public static class UnreadMessageRequest {
private String title;
private String content;
private String type;
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
}
}

View File

@@ -0,0 +1,31 @@
package com.timeline.user.controller;
import com.timeline.user.entity.Notification;
import com.timeline.user.service.NotificationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.security.Principal;
import java.util.List;
@RestController
@RequestMapping("/notifications")
public class NotificationController {
@Autowired
private NotificationService notificationService;
@GetMapping("/unread")
public ResponseEntity<List<Notification>> getUnreadNotifications(Principal principal) {
String currentUserId = principal.getName(); // 从安全上下文中获取用户ID
List<Notification> notifications = notificationService.getUnreadNotifications(currentUserId);
return ResponseEntity.ok(notifications);
}
@PostMapping("/read")
public ResponseEntity<Void> markNotificationsAsRead(@RequestBody List<Long> notificationIds) {
notificationService.markAsRead(notificationIds);
return ResponseEntity.ok().build();
}
}

View File

@@ -1,12 +1,15 @@
package com.timeline.user.controller;
import com.timeline.common.response.ResponseEntity;
import com.timeline.user.dto.NotificationPayload;
import com.timeline.user.dto.NotificationType;
import com.timeline.user.ws.WsNotifyService;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
@@ -30,16 +33,15 @@ public class TestNotificationPayload {
@PostMapping("/friend")
public ResponseEntity<String> sendFriendTestMessage(@RequestParam String toUserId) {
Map<String, Object> payload = buildMessagePayload(
"test_friend_notification",
"这是一条测试好友通知"
);
"test_friend_notification",
"这是一条测试好友通知");
wsNotifyService.sendFriendNotify(toUserId, payload);
log.info("已发送测试好友通知给用户: {}", toUserId);
return ResponseEntity.success("测试好友通知已发送");
}
/**
* 发送测试好友通知到所有频道
*
@@ -49,9 +51,8 @@ public class TestNotificationPayload {
@PostMapping("/friend-all")
public ResponseEntity<String> sendFriendTestMessageToAllChannels(@RequestParam String toUserId) {
Map<String, Object> payload = buildMessagePayload(
"test_friend_notification_all",
"这是一条发送到所有频道的测试好友通知"
);
"test_friend_notification_all",
"这是一条发送到所有频道的测试好友通知");
wsNotifyService.sendFriendNotifyToAllChannels(toUserId, payload);
log.info("已发送测试好友通知到所有频道给用户: {}", toUserId);
@@ -68,9 +69,8 @@ public class TestNotificationPayload {
@PostMapping("/chat")
public ResponseEntity<String> sendChatTestMessage(@RequestParam String toUserId) {
Map<String, Object> payload = buildMessagePayload(
"test_chat_message",
"这是一条测试聊天消息"
);
"test_chat_message",
"这是一条测试聊天消息");
wsNotifyService.sendChatMessage(toUserId, payload);
log.info("已发送测试聊天消息给用户: {}", toUserId);
@@ -86,13 +86,12 @@ public class TestNotificationPayload {
*/
@PostMapping("/notification")
public ResponseEntity<String> sendNotificationTestMessage(@RequestParam String toUserId) {
Map<String, Object> payload = buildMessagePayload(
"test_notification",
"这是一条测试通知消息"
);
// 添加通知特有的字段
payload.put("title", "测试通知");
payload.put("type", "info");
NotificationPayload payload = NotificationPayload.builder()
.title("测试通知")
.content("这是一条测试通知消息")
.type(NotificationType.SYSTEM_MESSAGE)
.createTime(LocalDateTime.now())
.build();
wsNotifyService.sendNotificationToUser(toUserId, payload);
log.info("已发送测试通知给用户: {}", toUserId);
@@ -103,7 +102,7 @@ public class TestNotificationPayload {
/**
* 构建通用消息负载
*
* @param type 消息类型
* @param type 消息类型
* @param message 消息内容
* @return 消息负载Map
*/

View File

@@ -14,7 +14,6 @@ import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
@Slf4j
@RestController
@RequestMapping("/user")
@@ -28,8 +27,10 @@ public class UserController {
User user = userService.getCurrentUser();
return ResponseEntity.success(user);
}
/**
* 查询指定用户信息
*
* @param userId
* @return
*/
@@ -38,12 +39,13 @@ public class UserController {
User user = userService.getUserByUserId(userId);
return ResponseEntity.success(user);
}
@GetMapping("/search")
public ResponseEntity<List<User>> getMethodName(User user) {
log.info(user.toString());
return ResponseEntity.success(userService.searchUsers(user));
}
@DeleteMapping
public ResponseEntity<String> deleteUser() {
String userId = UserContextUtils.getCurrentUserId();

View File

@@ -70,7 +70,9 @@ public class UserMessageController {
dto.setToUserId(fn.getToUserId());
dto.setTitle("好友通知");
dto.setContent(fn.getContent());
dto.setTimestamp(fn.getCreateTime() != null ? fn.getCreateTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli() : now);
dto.setTimestamp(fn.getCreateTime() != null
? fn.getCreateTime().atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli()
: now);
dto.setStatus(fn.getStatus());
result.add(dto);
}
@@ -78,8 +80,7 @@ public class UserMessageController {
// 简单按时间倒序
result.sort((a, b) -> Long.compare(
b.getTimestamp() != null ? b.getTimestamp() : 0L,
a.getTimestamp() != null ? a.getTimestamp() : 0L
));
a.getTimestamp() != null ? a.getTimestamp() : 0L));
log.info("用户 {} 未读消息数量: {}", userId, result.size());
return ResponseEntity.success(result);
@@ -152,11 +153,8 @@ public class UserMessageController {
result.sort((a, b) -> Long.compare(
b.getTimestamp() != null ? b.getTimestamp() : 0L,
a.getTimestamp() != null ? a.getTimestamp() : 0L
));
a.getTimestamp() != null ? a.getTimestamp() : 0L));
log.info("用户 {} 好友通知历史数量: {}", userId, result.size());
return ResponseEntity.success(result);
}
}

View File

@@ -3,7 +3,11 @@ package com.timeline.user.controller;
import com.timeline.common.response.ResponseEntity;
import com.timeline.common.utils.UserContextUtils;
import com.timeline.user.ws.WsNotifyService;
import lombok.extern.slf4j.Slf4j;
import com.timeline.user.dto.NotificationPayload;
import com.timeline.user.dto.NotificationType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@@ -14,7 +18,7 @@ import java.util.Map;
*/
@Slf4j
@RestController
@RequestMapping("/user/ws/test")
@RequestMapping("/user/ws-test")
public class WebSocketTestController {
@Autowired
@@ -29,16 +33,21 @@ public class WebSocketTestController {
if (userId == null || userId.isEmpty()) {
return ResponseEntity.error(401, "未获取到用户身份");
}
log.info("测试推送通知给用户: {}", userId);
// 确保消息包含必要字段
if (!message.containsKey("timestamp")) {
message.put("timestamp", System.currentTimeMillis());
NotificationPayload payload = new NotificationPayload();
payload.setContent((String) message.getOrDefault("content", "Test Message"));
payload.setCreateTime(java.time.LocalDateTime.now());
String typeStr = (String) message.getOrDefault("type", "SYSTEM_MESSAGE");
try {
payload.setType(NotificationType.valueOf(typeStr));
} catch (IllegalArgumentException e) {
payload.setType(NotificationType.SYSTEM_MESSAGE);
}
if (!message.containsKey("type")) {
message.put("type", "test");
}
wsNotifyService.sendNotificationToUser(userId, message);
wsNotifyService.sendNotificationToUser(userId, payload);
return ResponseEntity.success("通知已推送给用户: " + userId);
}
@@ -50,7 +59,19 @@ public class WebSocketTestController {
@PathVariable String targetUserId,
@RequestBody Map<String, Object> message) {
log.info("测试推送通知给用户: {}", targetUserId);
wsNotifyService.sendNotificationToUser(targetUserId, message);
NotificationPayload payload = new NotificationPayload();
payload.setContent((String) message.getOrDefault("content", "Test Message"));
payload.setCreateTime(java.time.LocalDateTime.now());
String typeStr = (String) message.getOrDefault("type", "SYSTEM_MESSAGE");
try {
payload.setType(NotificationType.valueOf(typeStr));
} catch (IllegalArgumentException e) {
payload.setType(NotificationType.SYSTEM_MESSAGE);
}
wsNotifyService.sendNotificationToUser(targetUserId, payload);
return ResponseEntity.success("通知已推送");
}
@@ -78,4 +99,3 @@ public class WebSocketTestController {
return ResponseEntity.success("聊天消息已推送");
}
}

View File

@@ -0,0 +1,20 @@
package com.timeline.user.dao;
import com.timeline.user.entity.Notification;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
public interface NotificationMapper {
void insert(Notification notification);
List<Notification> selectUnreadByRecipientId(String recipientId);
void markAsRead(@Param("ids") List<Long> ids);
List<Notification> findByRecipientId(String recipientId);
long countUnreadByRecipientId(String recipientId);
}

View File

@@ -29,4 +29,3 @@ public class ChatMessage {
*/
private Long timestamp;
}

View File

@@ -0,0 +1,28 @@
package com.timeline.user.dto;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
import java.util.Map;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class NotificationPayload {
private Long id;
private String senderId;
private String senderName;
private String senderAvatar;
private NotificationType type;
private String title;
private String content;
private String targetId;
private String targetType;
private Map<String, Object> payload;
private LocalDateTime createTime;
}

View File

@@ -0,0 +1,11 @@
package com.timeline.user.dto;
public enum NotificationType {
FRIEND_REQUEST, // 好友请求
FRIEND_ACCEPT, // 好友接受
NEW_COMMENT, // 新评论
NEW_LIKE, // 新点赞
SYSTEM_MESSAGE // 系统消息
, FRIEND_ACCEPTED, FRIEND_REJECTED
}

View File

@@ -12,4 +12,21 @@ public class UpdateUser {
private String description;
private String location;
private String tag;
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getNickname() { return nickname; }
public void setNickname(String nickname) { this.nickname = nickname; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public String getPhone() { return phone; }
public void setPhone(String phone) { this.phone = phone; }
public String getAvatar() { return avatar; }
public void setAvatar(String avatar) { this.avatar = avatar; }
public String getDescription() { return description; }
public void setDescription(String description) { this.description = description; }
public String getLocation() { return location; }
public void setLocation(String location) { this.location = location; }
public String getTag() { return tag; }
public void setTag(String tag) { this.tag = tag; }
}

View File

@@ -2,55 +2,15 @@ package com.timeline.user.dto;
import lombok.Data;
/**
* 通用用户消息 DTO用于 WebSocket 推送和历史记录查询
*/
@Data
public class UserMessageDto {
/**
* 数据库主键(如果来源于持久化表,可以为 null
*/
private Long id;
/**
* 消息大类system / notification / friend / chat 等
*/
private String category;
/**
* 业务类型:如 friend_request / friend_accepted / friend_rejected / connection_established 等
*/
private String type;
/**
* 发送方用户 ID
*/
private String fromUserId;
/**
* 接收方用户 ID
*/
private String toUserId;
/**
* 可选标题(用于通知类消息)
*/
private String title;
/**
* 文本内容
*/
private String content;
/**
* 发送时间戳(毫秒)
*/
private Long timestamp;
/**
* 状态unread / read 等
*/
private String status;
}

View File

@@ -9,10 +9,8 @@ public class FriendNotify {
private Long id;
private String fromUserId;
private String toUserId;
private String type; // request / accept / reject
private String status; // unread / read
private String type;
private String content;
private LocalDateTime createTime;
private LocalDateTime readTime;
private String status;
}

View File

@@ -0,0 +1,20 @@
package com.timeline.user.entity;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class Notification {
private Long id;
private String recipientId; // 接收者ID
private String senderId; // 发送者ID
private String senderName; // 发送者名称
private String senderAvatar; // 发送者头像
private String type; // 通知类型 (e.g., FRIEND_REQUEST, NEW_COMMENT)
private String content; // 通知内容
private String targetId; // 相关实体的ID (e.g., 动态ID)
private String targetType; // 相关实体的类型 (e.g., STORY_ITEM)
private boolean read; // 是否已读
private LocalDateTime createTime;
}

View File

@@ -0,0 +1,17 @@
package com.timeline.user.service;
import com.timeline.user.dto.NotificationType;
import com.timeline.user.entity.Notification;
import java.util.List;
import java.util.Map;
public interface NotificationService {
void createAndSendNotification(String recipientId, NotificationType type, String title, String content,
Map<String, Object> payload);
List<Notification> getUnreadNotifications(String recipientId);
void markAsRead(List<Long> notificationIds);
}

View File

@@ -5,26 +5,26 @@ import com.timeline.common.exception.CustomException;
import com.timeline.common.response.ResponseEnum;
import com.timeline.common.utils.UserContextUtils;
import com.timeline.user.dao.FriendMapper;
import com.timeline.user.dao.FriendNotifyMapper;
import com.timeline.user.dao.UserMapper;
import com.timeline.user.dto.FriendUserDto;
import com.timeline.user.dto.FriendNotifyPayload;
import com.timeline.user.dto.NotificationType;
import com.timeline.user.entity.Friend;
import com.timeline.user.entity.FriendNotify;
import com.timeline.user.entity.Friendship;
import com.timeline.user.entity.User;
import com.timeline.user.service.FriendService;
import com.timeline.user.service.UserMessageService;
import com.timeline.user.service.NotificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
@Slf4j
public class FriendServiceImpl implements FriendService {
@Autowired
@@ -32,11 +32,7 @@ public class FriendServiceImpl implements FriendService {
@Autowired
private UserMapper userMapper;
@Autowired
private FriendNotifyMapper friendNotifyMapper;
@Autowired
private com.timeline.user.ws.WsNotifyService wsNotifyService;
@Autowired
private UserMessageService userMessageService;
private NotificationService notificationService;
private String currentUser() {
String uid = UserContextUtils.getCurrentUserId();
@@ -45,6 +41,7 @@ public class FriendServiceImpl implements FriendService {
}
return uid;
}
private String currentUsername() {
String username = UserContextUtils.getCurrentUsername();
if (username == null || username.isEmpty()) {
@@ -54,6 +51,7 @@ public class FriendServiceImpl implements FriendService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public void requestFriend(String targetUserId) {
String uid = currentUser();
log.info("用户 {} 向用户 {} 发送好友请求", uid, targetUserId);
@@ -81,39 +79,18 @@ public class FriendServiceImpl implements FriendService {
} else {
friendMapper.updateStatus(uid, targetUserId, CommonConstants.FRIENDSHIP_PENDING);
}
FriendNotify notify = new FriendNotify();
notify.setFromUserId(uid);
notify.setToUserId(targetUserId);
notify.setType("request");
notify.setStatus("unread");
notify.setCreateTime(now);
friendNotifyMapper.insert(notify);
FriendNotifyPayload payload = new FriendNotifyPayload();
payload.setType("request");
payload.setFromUserId(uid);
payload.setFromUsername(currentUsername());
payload.setContent("向你发送了好友请求");
payload.setTimestamp(System.currentTimeMillis());
log.info("准备发送好友请求通知给用户: {}", targetUserId);
wsNotifyService.sendFriendNotifyToAllChannels(targetUserId, payload);
// 存储未读消息,以便用户下次连接时能收到
userMessageService.addUnreadMessage(targetUserId, Map.of(
"category", "friend",
"type", "friend_request",
"fromUserId", uid,
"fromUsername", currentUsername(),
"toUserId", targetUserId,
"title", "好友请求",
"content", "您收到了一个好友请求",
"timestamp", System.currentTimeMillis(),
"status", "unread"
));
notificationService.createAndSendNotification(
targetUserId,
NotificationType.FRIEND_REQUEST,
"好友请求",
String.format("用户 %s 向您发送了好友请求", currentUsername()),
Map.<String, Object>of("fromUserId", uid, "fromUsername", currentUsername()));
log.info("好友请求已处理完毕");
}
@Override
@Transactional(rollbackFor = Exception.class)
public void acceptFriend(String targetUserId) {
String uid = currentUser();
log.info("用户 {} 接受了用户 {} 的好友请求", uid, targetUserId);
@@ -134,71 +111,29 @@ public class FriendServiceImpl implements FriendService {
friendMapper.updateStatus(uid, targetUserId, CommonConstants.FRIENDSHIP_ACCEPTED);
}
FriendNotify notify = new FriendNotify();
notify.setFromUserId(uid);
notify.setToUserId(targetUserId);
notify.setType("accept");
notify.setStatus("unread");
notify.setCreateTime(now);
friendNotifyMapper.insert(notify);
FriendNotifyPayload payload = new FriendNotifyPayload();
payload.setType("accept");
payload.setFromUserId(uid);
payload.setFromUsername(currentUsername());
payload.setContent("接受了你的好友请求");
payload.setTimestamp(System.currentTimeMillis());
log.info("准备发送好友接受通知给用户: {}", targetUserId);
wsNotifyService.sendFriendNotifyToAllChannels(targetUserId, payload);
// 存储未读消息,以便用户下次连接时能收到
userMessageService.addUnreadMessage(targetUserId, Map.of(
"category", "friend",
"type", "friend_accepted",
"fromUserId", uid,
"fromUsername", currentUsername(),
"toUserId", targetUserId,
"title", "好友请求已通过",
"content", "您的好友请求已被接受",
"timestamp", System.currentTimeMillis(),
"status", "unread"
));
notificationService.createAndSendNotification(
targetUserId,
NotificationType.FRIEND_ACCEPTED,
"好友请求已通过",
String.format("用户 %s 接受了您的好友请求", currentUsername()),
Map.<String, Object>of("fromUserId", uid, "fromUsername", currentUsername()));
log.info("好友接受已处理完毕");
}
@Override
@Transactional(rollbackFor = Exception.class)
public void rejectFriend(String targetUserId) {
String uid = currentUser();
log.info("用户 {} 拒绝了用户 {} 的好友请求", uid, targetUserId);
friendMapper.updateStatus(targetUserId, uid, CommonConstants.FRIENDSHIP_REJECTED);
FriendNotify notify = new FriendNotify();
notify.setFromUserId(uid);
notify.setToUserId(targetUserId);
notify.setType("reject");
notify.setStatus("unread");
notify.setCreateTime(LocalDateTime.now());
friendNotifyMapper.insert(notify);
FriendNotifyPayload payload = new FriendNotifyPayload();
payload.setType("reject");
payload.setFromUserId(uid);
payload.setContent("拒绝了你的好友请求");
payload.setTimestamp(System.currentTimeMillis());
log.info("准备发送好友拒绝通知给用户: {}", targetUserId);
wsNotifyService.sendFriendNotifyToAllChannels(targetUserId, payload);
// 存储未读消息,以便用户下次连接时能收到
userMessageService.addUnreadMessage(targetUserId, Map.of(
"category", "friend",
"type", "friend_rejected",
"fromUserId", uid,
"toUserId", targetUserId,
"title", "好友请求被拒绝",
"content", "您的好友请求已被拒绝",
"timestamp", System.currentTimeMillis(),
"status", "unread"
));
notificationService.createAndSendNotification(
targetUserId,
NotificationType.FRIEND_REJECTED,
"好友请求被拒绝",
String.format("用户 %s 拒绝了您的好友请求", currentUsername()),
Map.<String, Object>of("fromUserId", uid, "fromUsername", currentUsername()));
log.info("好友拒绝已处理完毕");
}
@@ -212,14 +147,16 @@ public class FriendServiceImpl implements FriendService {
return friendMapper.selectPending(currentUser());
}
@Override
public List<FriendNotify> listUnreadNotify() {
return friendNotifyMapper.selectUnread(currentUser());
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'listUnreadNotify'");
}
@Override
public void markNotifyRead(Long id) {
friendNotifyMapper.markRead(id);
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'markNotifyRead'");
}
}

View File

@@ -0,0 +1,92 @@
package com.timeline.user.service.impl;
import com.timeline.user.dao.NotificationMapper;
import com.timeline.user.dao.UserMapper;
import com.timeline.user.dto.NotificationPayload;
import com.timeline.user.dto.NotificationType;
import com.timeline.user.entity.Notification;
import com.timeline.user.entity.User;
import com.timeline.user.service.NotificationService;
import com.timeline.user.ws.WsNotifyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@Service
@Slf4j
public class NotificationServiceImpl implements NotificationService {
@Autowired
private NotificationMapper notificationMapper;
@Autowired
private UserMapper userMapper;
@Autowired
private WsNotifyService wsNotifyService;
@Override
@Transactional(rollbackFor = Exception.class)
public void createAndSendNotification(String recipientId, NotificationType type, String title, String content,
Map<String, Object> payload) {
String senderId = (String) payload.get("fromUserId");
User sender = userMapper.selectByUserId(senderId);
if (sender == null) {
log.error("Sender with id {} not found.", senderId);
return;
}
// 1. 持久化通知
Notification notification = new Notification();
notification.setRecipientId(recipientId);
notification.setSenderId(senderId);
notification.setSenderName(sender.getUsername());
notification.setSenderAvatar(sender.getAvatar());
notification.setType(type.name());
notification.setContent(content);
if (payload != null) {
notification.setTargetId((String) payload.get("targetId"));
notification.setTargetType((String) payload.get("targetType"));
}
notification.setRead(false);
notification.setCreateTime(LocalDateTime.now());
notificationMapper.insert(notification);
// 2. 构建推送载体
NotificationPayload pushPayload = NotificationPayload.builder()
.id(notification.getId())
.senderId(sender.getUserId())
.senderName(sender.getUsername())
.senderAvatar(sender.getAvatar())
.type(type)
.title(title)
.content(content)
.payload(payload)
.createTime(notification.getCreateTime())
.build();
// 3. 通过 WebSocket 推送
wsNotifyService.sendNotificationToUser(recipientId, pushPayload);
log.info("Sent notification {} to user {}.", type, recipientId);
}
@Override
public List<Notification> getUnreadNotifications(String recipientId) {
return notificationMapper.selectUnreadByRecipientId(recipientId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void markAsRead(List<Long> notificationIds) {
if (notificationIds == null || notificationIds.isEmpty()) {
return;
}
notificationMapper.markAsRead(notificationIds);
}
}

View File

@@ -5,6 +5,7 @@ import java.time.LocalDateTime;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.timeline.common.constants.CommonConstants;
import com.timeline.common.exception.CustomException;
@@ -35,7 +36,7 @@ public class UserAuthServiceImpl implements UserAuthService {
@Autowired
private RedisUtils redisUtils;
@SuppressWarnings("null")
@Override
public LoginResponse login(LoginRequest loginRequest) {
@@ -56,7 +57,8 @@ public class UserAuthServiceImpl implements UserAuthService {
String accessToken = jwtUtils.generateAccessToken(user.getUserId(), user.getUsername());
String refreshToken = jwtUtils.generateRefreshToken(user.getUserId(), user.getUsername());
redisUtils.set(loginRequest.getUsername(), refreshToken, jwtUtils.getAccessExpirationSeconds());
return new LoginResponse(accessToken, refreshToken, jwtUtils.getAccessExpirationSeconds(), user.getUserId(), user.getUsername());
return new LoginResponse(accessToken, refreshToken, jwtUtils.getAccessExpirationSeconds(), user.getUserId(),
user.getUsername());
} catch (CustomException e) {
throw e;
} catch (Exception e) {
@@ -66,6 +68,7 @@ public class UserAuthServiceImpl implements UserAuthService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public User register(RegisterRequest registerRequest) {
try {
// 检查用户名是否已存在
@@ -96,5 +99,4 @@ public class UserAuthServiceImpl implements UserAuthService {
}
}
}

View File

@@ -11,19 +11,21 @@ import com.timeline.user.ws.WsNotifyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class UserServiceImpl implements UserService {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
private UserMapper userMapper;
@Autowired
private WsNotifyService wsNotifyService;
@@ -34,23 +36,27 @@ public class UserServiceImpl implements UserService {
}
return uid;
}
@Override
public User getUserByUserId(String userId) {
return userMapper.selectByUserId(userId);
}
@Override
public User getCurrentUser() {
String userId = getCurrentUserId();
log.info("获取当前用户信息: {}",userId);
return userMapper.selectByUserId(userId);
}
@Override
public User getUserInfo(String userId) {
log.info("获取指定用户信息: {}",userId);
log.info("获取当前用户信息: {}", userId);
return userMapper.selectByUserId(userId);
}
@Override
public User getUserInfo(String userId) {
log.info("获取指定用户信息: {}", userId);
return userMapper.selectByUserId(userId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public User updateUserInfo(UpdateUser updateUser) {
try {
String userId = getCurrentUserId();
@@ -69,14 +75,14 @@ public class UserServiceImpl implements UserService {
user.setUpdateTime(LocalDateTime.now());
userMapper.update(user);
// 用户信息更新后通过WebSocket推送通知
Map<String, Object> notification = new HashMap<>();
notification.put("type", "user_profile_updated");
notification.put("message", "您的个人信息已成功更新");
notification.put("timestamp", System.currentTimeMillis());
wsNotifyService.sendNotificationToUser(userId, notification);
com.timeline.user.dto.NotificationPayload payload = new com.timeline.user.dto.NotificationPayload();
payload.setType(com.timeline.user.dto.NotificationType.SYSTEM_MESSAGE);
payload.setContent("您的个人信息已成功更新");
payload.setCreateTime(LocalDateTime.now());
wsNotifyService.sendNotificationToUser(userId, payload);
return user;
} catch (CustomException e) {
throw e;
@@ -87,6 +93,7 @@ public class UserServiceImpl implements UserService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public void deleteUser(String userId) {
try {
User user = userMapper.selectByUserId(userId);

View File

@@ -1,34 +0,0 @@
package com.timeline.user.ws;
import com.timeline.common.utils.UserContextUtils;
import com.timeline.user.dto.ChatMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Controller;
import java.security.Principal;
@Slf4j
@Controller
public class ChatWsController {
@Autowired
private WsNotifyService wsNotifyService;
@MessageMapping("/chat/send")
public void send(@Payload ChatMessage msg, Principal principal) {
if (principal == null) {
log.warn("未认证用户尝试发送聊天消息");
return;
}
String fromUserId = principal.getName();
msg.setFromUserId(fromUserId);
msg.setFromUsername(UserContextUtils.getCurrentUsername());
msg.setTimestamp(System.currentTimeMillis());
log.info("用户 {}({}) 向 {} 发送消息: {}", fromUserId, msg.getFromUsername(), msg.getToUserId(), msg.getContent());
wsNotifyService.sendChatMessage(msg.getToUserId(), msg);
}
}

View File

@@ -1,5 +1,7 @@
package com.timeline.user.ws;
import com.timeline.user.dto.NotificationPayload;
import com.timeline.user.dto.NotificationType;
import com.timeline.user.service.UserMessageService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -12,16 +14,18 @@ import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import org.springframework.messaging.simp.stomp.StompCommand;
import java.time.LocalDateTime;
import java.security.Principal;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class WebSocketEventListener {
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(WebSocketEventListener.class);
@Autowired
@SuppressWarnings("unused")
private SimpMessagingTemplate messagingTemplate;
@@ -41,18 +45,18 @@ public class WebSocketEventListener {
Principal principal = headerAccessor.getUser();
String sessionId = headerAccessor.getSessionId();
StompCommand command = headerAccessor.getCommand();
log.info("WebSocket 连接建立事件会话ID: {}STOMP命令: {}Principal: {}",
sessionId, command, principal != null ? principal.getName() : "null");
log.info("WebSocket 连接建立事件会话ID: {}STOMP命令: {}Principal: {}",
sessionId, command, principal != null ? principal.getName() : "null");
// 打印所有session attributes
Map<String, Object> sessionAttrs = headerAccessor.getSessionAttributes();
log.info("会话属性: {}", sessionAttrs);
if (principal != null) {
String userId = principal.getName();
log.info("WebSocket 连接建立会话ID: {}用户ID: {}", sessionId, userId);
// 检查用户是否在注册表中
org.springframework.messaging.simp.user.SimpUser user = userRegistry.getUser(userId);
if (user != null) {
@@ -73,7 +77,7 @@ public class WebSocketEventListener {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
Principal principal = headerAccessor.getUser();
String sessionId = headerAccessor.getSessionId();
if (principal != null) {
String userId = principal.getName();
log.info("WebSocket 连接断开用户ID: {}会话ID: {}", userId, sessionId);
@@ -99,22 +103,22 @@ public class WebSocketEventListener {
Principal principal = headerAccessor.getUser();
String destination = headerAccessor.getDestination();
String sessionId = headerAccessor.getSessionId();
if (principal != null) {
String userId = principal.getName();
log.info("用户 {} 订阅了频道: {}会话ID: {}", userId, destination, sessionId);
// 检查用户是否在注册表中
org.springframework.messaging.simp.user.SimpUser user = userRegistry.getUser(userId);
if (user != null) {
log.info("用户 {} 已在注册表中,会话数: {}", userId, user.getSessions().size());
} else {
log.warn("用户 {} 不在注册表中!当前注册用户: {}", userId,
userRegistry.getUsers().stream()
.map(org.springframework.messaging.simp.user.SimpUser::getName)
.toList());
log.warn("用户 {} 不在注册表中!当前注册用户: {}", userId,
userRegistry.getUsers().stream()
.map(org.springframework.messaging.simp.user.SimpUser::getName)
.toList());
}
// 如果是第一次订阅(任意频道),推送初始消息
if (!initialMessageSent.contains(userId)) {
initialMessageSent.add(userId);
@@ -123,7 +127,7 @@ public class WebSocketEventListener {
pushInitialMessages(userId);
});
}
// 如果订阅的是通知频道,立即推送未读消息
if (destination != null && destination.contains("/queue/notification")) {
CompletableFuture.delayedExecutor(300, TimeUnit.MILLISECONDS).execute(() -> {
@@ -131,8 +135,8 @@ public class WebSocketEventListener {
});
}
} else {
log.warn("订阅事件中未获取到用户身份会话ID: {},目标: {}sessionAttributes: {}",
sessionId, destination, headerAccessor.getSessionAttributes());
log.warn("订阅事件中未获取到用户身份会话ID: {},目标: {}sessionAttributes: {}",
sessionId, destination, headerAccessor.getSessionAttributes());
}
}
@@ -145,22 +149,18 @@ public class WebSocketEventListener {
long now = System.currentTimeMillis();
// 1. 推送连接成功的欢迎消息(统一结构)
Map<String, Object> welcomeMsg = Map.of(
"category", "system",
"type", "connection_established",
"fromUserId", "system",
"toUserId", userId,
"title", "连接成功",
"content", "WebSocket 连接已建立",
"timestamp", now,
"status", "unread"
);
wsNotifyService.sendNotificationToUser(userId, welcomeMsg);
NotificationPayload welcomePayload = NotificationPayload.builder()
.title("连接成功")
.content("WebSocket 连接已建立")
.type(NotificationType.SYSTEM_MESSAGE)
.createTime(LocalDateTime.now())
.build();
wsNotifyService.sendNotificationToUser(userId, welcomePayload);
log.info("已推送欢迎消息给用户: {}", userId);
// 2. 推送未读通知
pushUnreadNotifications(userId);
} catch (Exception e) {
log.error("推送初始消息失败用户ID: {}", userId, e);
}
@@ -173,12 +173,27 @@ public class WebSocketEventListener {
try {
List<Map<String, Object>> unreadMessages = userMessageService.getUnreadMessages(userId);
log.info("用户 {} 有 {} 条未读消息", userId, unreadMessages.size());
for (Map<String, Object> message : unreadMessages) {
wsNotifyService.sendNotificationToUser(userId, message);
NotificationPayload payload = NotificationPayload.builder()
.title((String) message.get("title"))
.content((String) message.get("content"))
.type(NotificationType.SYSTEM_MESSAGE)
.createTime(LocalDateTime.now())
.build();
if (message.get("notificationType") != null) {
try {
payload.setType(NotificationType.valueOf((String) message.get("notificationType")));
} catch (Exception e) {
// ignore
}
}
wsNotifyService.sendNotificationToUser(userId, payload);
log.debug("已推送未读消息给用户 {}: {}", userId, message);
}
// 推送完成后清除未读消息
if (!unreadMessages.isEmpty()) {
userMessageService.clearUnreadMessages(userId);
@@ -189,4 +204,3 @@ public class WebSocketEventListener {
}
}
}

View File

@@ -55,4 +55,3 @@ public class WebSocketSessionRegistry {
users.forEach(userId -> log.info("在线用户: {}", userId));
}
}

View File

@@ -6,6 +6,8 @@ import org.springframework.messaging.simp.user.SimpUser;
import org.springframework.messaging.simp.user.SimpUserRegistry;
import org.springframework.stereotype.Service;
import com.timeline.user.dto.NotificationPayload;
import lombok.extern.slf4j.Slf4j;
@Service
@@ -31,45 +33,48 @@ public class WsNotifyService {
messagingTemplate.convertAndSendToUser(toUserId, "/queue/chat", payload);
log.info("聊天消息已发送到目标路径: /user/{}/queue/chat", toUserId);
}
/**
* 向指定用户推送任意消息
* @param toUserId 目标用户ID
*
* @param toUserId 目标用户ID
* @param destination 消息目的地
* @param payload 消息内容
* @param payload 消息内容
*/
public void pushMessageToUser(String toUserId, String destination, Object payload) {
log.info("推送消息给用户:{},目的地:{},内容:{}", toUserId, destination, payload);
messagingTemplate.convertAndSendToUser(toUserId, destination, payload);
log.debug("消息已推送");
}
/**
* 向指定用户推送通知消息
*
* @param toUserId 目标用户ID
* @param payload 消息内容
* @param payload 消息内容
*/
public void sendNotificationToUser(String toUserId, Object payload) {
public void sendNotificationToUser(String toUserId, NotificationPayload payload) {
log.info("发送通知给用户:{},内容:{}", toUserId, payload);
checkUserOnline(toUserId);
messagingTemplate.convertAndSendToUser(toUserId, "/queue/notification", payload);
log.info("通知已发送到目标路径: /user/{}/queue/notification", toUserId);
}
/**
* 向指定用户推送好友相关通知到所有可能的频道
*
* @param toUserId 目标用户ID
* @param payload 消息内容
* @param payload 消息内容
*/
public void sendFriendNotifyToAllChannels(String toUserId, Object payload) {
log.info("向用户 {} 的所有频道发送好友通知,内容:{}", toUserId, payload);
// 发送到好友通知频道
messagingTemplate.convertAndSendToUser(toUserId, "/queue/friend", payload);
// 发送到通知频道
messagingTemplate.convertAndSendToUser(toUserId, "/queue/notification", payload);
log.info("好友通知已发送到所有频道");
}

View File

@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.timeline.user.dao.NotificationMapper">
<insert id="insert" useGeneratedKeys="true" keyProperty="id">
INSERT INTO notification (recipient_id, sender_id, sender_name, sender_avatar, type, content, target_id, target_type, `read`, create_time)
VALUES (#{recipientId}, #{senderId}, #{senderName}, #{senderAvatar}, #{type}, #{content}, #{targetId}, #{targetType}, #{read}, #{createTime})
</insert>
<select id="findByRecipientId" resultType="com.timeline.user.entity.Notification">
SELECT id, recipient_id, sender_id, sender_name, sender_avatar, type, content, target_id, target_type, `read`, create_time
FROM notification
WHERE recipient_id = #{recipientId}
ORDER BY create_time DESC
</select>
<select id="countUnreadByRecipientId" resultType="long">
SELECT count(*)
FROM notification
WHERE recipient_id = #{recipientId} AND `read` = false
</select>
<update id="markAsRead">
UPDATE notification
SET `read` = true
WHERE recipient_id = #{recipientId} AND id IN
<foreach item="id" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</update>
</mapper>

View File

@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="com.timeline.user.dao.NotificationMapper">
<insert id="insert" parameterType="com.timeline.user.entity.Notification" useGeneratedKeys="true" keyProperty="id">
INSERT INTO notification (recipient_id, sender_id, sender_name, sender_avatar, type, content, target_id, target_type, `read`, create_time)
VALUES (#{recipientId}, #{senderId}, #{senderName}, #{senderAvatar}, #{type}, #{content}, #{targetId}, #{targetType}, #{read}, #{createTime})
</insert>
<select id="selectUnreadByRecipientId" resultType="com.timeline.user.entity.Notification">
SELECT * FROM notification
WHERE recipient_id = #{recipientId} AND `read` = false
ORDER BY create_time DESC
</select>
<update id="markAsRead">
UPDATE notification
SET `read` = true
WHERE id IN
<foreach item="id" collection="ids" open="(" separator="," close=")">
#{id}
</foreach>
</update>
</mapper>