Spring boot项目redisTemplate实现轻量级消息队列的方法

网友投稿 217 2023-01-09


Spring boot项目redisTemplate实现轻量级消息队列的方法

背景

公司项目有个需求, 前端上传excel文件, 后端读取数据、处理数据、返回错误数据, 最简单的方式同步处理, 客户端上传文件后一直阻塞等待响应, 但用户体验无疑很差, 处理数据可能十分耗时, 没人愿意傻等, 由于项目暂未使用ActiveMQ等消息队列中间件, 而redis的lpush和rpop很适合作为一种轻量级的消息队列实现, 所以用它完成此次功能开发

一、本文涉及知识点

excel文件读写--阿里easyexcel sdk

文件上传、下载--腾讯云对象存储

远程服务调用--restTemplate

生产者、消费者--redisTemplate leftPush和rightPop操作

异步处理数据--Executors线程池

读取网络文件流--HttpClient

自定义注解实现用户身份认证--JWT token认证, 拦截器拦截标注有@LoginRequired注解的请求入口

当然, java实现咯

涉及的知识点比较多, 每一个知识点都可以作为专题进行学习分析, 本文将完整实现呈现出来, 后期拆分与小伙伴分享学习

二、项目目录结构

说明: 数据库DAO层放到另一个模块了, 不是本文重点

三、主要maven依赖

1、easyexcel

1.1.2-beta4

com.alibaba

easyexcel

${easyexcel-latestVersion}

JWT

io.jsonwebtoken

jjwt

0.7.0

redis

org.springframework.boot

spring-boot-starter-redis

1.3.5.RELEASE

腾讯cos

com.qcloud

cos_api

5.4.5

四、流程

用户上传文件

将文件存储到腾讯cos

将上传后的文件id及上传记录保存到数据库

redis生产一条导入消息, 即保存文件id到redis

请求结束, 返回"处理中"状态

redis消费消息

读取cos文件, 异步处理数据

将错误数据以excel形式上传至cos, 以供用户下载, 并更新处理状态为"处理完成"

客户端轮询查询处理状态, 并可以下载错误文件

结束

五、实现效果

上传文件

数据库导入记录

导入的数据

下载错误文件

错误数据提示

查询导入记录

六、代码实现

1、导入excel控制层

@LoginRequired

@RequestMapping(value = "doImport", method = RequestMethod.POST)

public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {

PLUser user = getUser(request);

return orderImportService.doImport(file, user.getId());

}

2、service层

@Override

public JsonResponse doImport(MultipartFile file, Integer userId) {

if (null == file || file.isEmpty()) {

throw new ServiceException("文件不能为空");

}

String filename = file.getOriginalFilename();

if (!checkFileSuffix(filename)) {

throw new ServiceException("当前仅支持xlsx格式的excel");

}

// 存储文件

String fileId = saveToOss(file);

if (StringUtils.isBlank(fileId)) {

throw new ServiceException("文件上传失败, 请稍后重试");

}

// 保存记录到数据库

saveRecordToDB(userId, fileId, filename);

// 生产一条订单导入消息

redisProducer.produce(RedisKey.orderImportKey, fileId);

return JsonResponse.ok("导入成功, 处理中...");

}

/**

* 校验文件格式

* @param fileName

* @return

*/

private static boolean checkFileSuffix(String fileName) {

if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {

return false;

}

int pointIndex = fileName.lastIndexhttp://Of(".");

String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();

if (".xlsx".equals(suffix)) {

return true;

}

return false;

}

/**

* 将文件存储到腾讯OSS

* @param file

* @return

*/

private String saveToOss(MultipartFile file) {

InputStream ins = null;

try {

ins = file.getInputStream();

} catch (IOException e) {

e.printStackTrace();

}

String fileId;

try {

String originalFilename = file.getOriginalFilename();

File f = new File(originalFilename);

inputStreamToFile(ins, f);

FileSystemResource resource = new FileSystemResource(f);

MultiValueMap param = new LinkedMultiValueMap<>();

param.add("file", resource);

ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);

fileId = (String) responseResult.getData();

} catch (Exception e) {

fileId = null;

}

return fileId;

}

3、redis生产者

@Service

public class RedisProducerImpl implements RedisProducer {

@Autowired

private RedisTemplate redisTemplate;

@Override

public JsonResponse produce(String key, String msg) {

Map map = Maps.newHashMap();

map.put("fileId", msg);

redisTemplate.opsForList().leftPush(key, map);

return JsonResponse.ok();

}

}

4、redis消费者

@Service

public class RedisConsumer {

@Autowired

public RedisTemplate redisTemplate;

@Value("${txOssFileUrl}")

private String txOssFileUrl;

@Value("${txOssUploadUrl}")

private String txOssUploadUrl;

@PostConstruct

public void init() {

processOrderImport();

}

/**

* 处理订单导入

*/

private void processOrderImport() {

ExecutorService executorService = Executors.newCachedThreadPool();

executorService.execute(() -> {

while (true) {

Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.http://SECONDS);

if (null == object) {

continue;

}

String msg = JSON.toJSONString(object);

executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));

}

});

}

}

5、处理任务线程类

public class OrderImportTask implements Runnable {

public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {

this.msg = msg;

this.txOssFileUrl = txOssFileUrl;

this.txOssUploadUrl = txOssUploadUrl;

}

}

/**

* 注入bean

*/

private void autowireBean() {

this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);

this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);

this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);

}

@Override

public void run() {

// 注入bean

autowireBean();

JSONObject jsonObject = JSON.parseObject(msg);

String fileId = jsonObject.getString("fileId");

MultiValueMap param = new LinkedMultiValueMap<>();

param.add("id", fileId);

ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);

String fileUrl = (String) responseResult.getData();

if (StringUtils.isBlank(fileUrl)) {

return;

}

InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);

List list = ExcelUtil.read(inputStream);

process(list, fileId);

}

/**

* 将文件上传至oss

* @param file

* @return

*/

private String saveToOss(File file) {

String fileId;

try {

FileSystemResource resource = new FileSystemResource(file);

MultiValueMap param = new LinkedMultiValueMap<>();

param.add("file", resource);

ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);

fileId = (String) responseResult.getData();

} catch (Exception e) {

fileId = null;

}

return fileId;

}

说明: 处理数据的业务逻辑代码就不用贴了

6、上传文件到cos

@RequestMapping("/txOssUpload")

@ResponseBody

public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {

if (null == file || file.isEmpty()) {

return ResponseResult.fail("文件不能为空");

}

String originalFilename = file.getOriginalFilename();

originalFilename = MimeUtility.decodeText(originalFilename);// 解决中文乱码问题

String contentType = getContentType(originalFilename);

String key;

InputStream ins = null;

File f = null;

try {

ins = file.getInputStream();

f = new File(originalFilename);

inputStreamToFile(ins, f);

key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);

} catch (Exception e) {

return ResponseResult.fail(e.getMessage());

} finally {

if (null != ins) {

try {

ins.close();

} catch (IOException e) {

e.printStackTrace();

}

}

if (f.exists()) {// 删除临时文件

f.delete();

}

}

return ResponseResult.ok(key);

}

public static void inputStreamToFile(InputStream ins,File file) {

try {

OutputStream os = new FileOutputStream(file);

int bytesRead = 0;

byte[] buffer = new byte[8192];

while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {

os.write(buffer, 0, bytesRead);

}

os.close();

ins.close();

} catch (Exception e) {

e.printStackTrace();

}

}

public String txOssUpload(FileInputStream inputStream, String key, String contentType) {

key = Uuid.getUuid() + "-" + key;

OSSUtil.txOssUpload(inputStream, key, contentType);

try {

if (null != inputStream) {

inputStream.close();

}

} catch (IOException e) {

e.printStackTrace();

}

return key;

}

public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {

ObjectMetadata objectMetadata = new ObjectMetadata();

try{

int length = inputStream.available();

objectMetadata.setContentLength(length);

}catch (Exception e){

logger.info(e.getMessage());

}

objectMetadata.setContentType(contentType);

cosclient.putObject(txbucketName, key, inputStream, objectMetadata);

}

7、下载文件

/**

* 腾讯云文件下载

* @param response

* @param id

* @return

*/

@RequestMapping("/txOssDownload")

public Object txOssDownload(HttpServletResponse response, String id) {

COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);

String contentType = getContentType(id);

FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);

return null;

}

public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {

FileOutputStream fos = null;

response.reset();

OutputStream os = null;

try {

response.setContentType(contentType + "; charset=utf-8");

if(!contentType.equals(PlConstans.FileContentType.image)){

try {

response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));

} catch (UnsupportedEncodingException e) {

response.setHeader("Content-Disposition", "attachment; filename=" + fileName);

logger.error("encoding file name failed", e);

}

}

os = response.getOutputStream();

byte[] b = new byte[1024 * 1024];

int len;

while ((len = fileStream.read(b)) > 0) {

os.write(b, 0, len);

os.flush();

try {

if(fos != null) {

fos.write(b, 0, len);

fos.flush();

}

} catch (Exception e) {

logger.error(e.getMessage());

}

}

} catch (IOException e) {

IOUtils.closeQuietly(fos);

fos = null;

} finally {

IOUtils.closeQuietly(os);

IOUtils.closeQuietly(fileStream);

if(fos != null) {

IOUtils.closeQuietly(fos);

}

}

}

8、读取网络文件流

/**

* 读取网络文件流

* @param url

* @return

*/

public static InputStream readFileFromURL(String url) {

if (StringUtils.isBlank(url)) {

return null;

}

HttpClient httpClient = new DefaultHttpClient();

HttpGet methodGet = new HttpGet(url);

try {

HttpResponse response = httpClient.execute(methodGet);

if (response.getStatusLine().getStatusCode() == 200) {

HttpEntity entity = response.getEntity();

return entity.getContent();

}

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

9、ExcelUtil

/**

* 读excel

* @param inputStream 文件输入流

* @return list集合

*/

public static List read(InputStream inputStream) {

return EasyExcelFactory.read(inputStream, new Sheet(1, 1));

}

/**

* 写excel

* @param data list数据

* @param clazz

* @param saveFilePath 文件保存路径

* @throws IOException

*/

public static void write(List extends BaseRowModel> data, Class extends BaseRowModel> clazz, String saveFilePath) throws IOException {

File tempFile = new File(saveFilePath);

OutputStream out = new FileOutputStream(tempFile);

ExcelWriter writer = EasyExcelFactory.getWriter(out);

Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);

writer.write(data, sheet);

writer.finish();

out.close();

}

说明: 至此, 整个流程算是完整了, 下面将其他知识点代码也贴出来参考

七、其他

1、@LoginRequired注解

/**

* 在需要登录验证的Controller的方法上使用此注解

*/

@Target({ElementType.METHOD})

@Retention(RetentionPolicy.RUNTIME)

public @interface LoginRequired {

}

2、MyControllerAdvice

@ControllerAdvice

public class MyControllerAdvice {

@ResponseBody

@ExceptionHandler(TokenValidationException.class)

public JsonResponse tokenValidationExceptionHandler() {

return JsonResponse.loginInvalid();

}

@ResponseBody

@ExceptionHandler(ServiceException.class)

public JsonResponse serviceExceptionHandler(ServiceException se) {

return JsonResponse.fail(se.getMsg());

}

@ResponseBody

@ExceptionHandler(Exception.class)

public JsonResponse exceptionHandler(Exception e) {

e.printStackTrace();

return JsonResponse.fail(e.getMessage());

}

}

3、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {

private static final String CURRENT_USER = "user";

@Autowired

private UserService userService;

@Override

public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {

// 如果不是映射到方法直接通过

if (!(handler instanceof HandlerMethod)) {

return true;

}

HandlerMethod handlerMethod = (HandlerMethod) handler;

Method method = handlerMethod.getMethod();

// 判断接口是否有@LoginRequired注解, 有则需要登录

LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);

if (methodAnnotation != null) {

// 验证token

Integer userId = JwtUtil.verifyToken(request);

PLUser plUser = userService.selectByPrimaryKey(userId);

if (null == plUser) {

throw new RuntimeException("用户不存在,请重新登录");

}

request.setAttribute(CURRENT_USER, plUser);

return true;

}

return true;

}

@Override

public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {

}

@Override

public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {

}

}

4、JwtUtil

public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天

public static final String SECRET = "pl_token_secret";

public static final String HEADER = "token";

public static final String USER_ID = "userId";

/**

* 根据userId生成token

* @param userId

* @return

*/

public static String generateToken(String userId) {

HashMap map = new HashMap<>();

map.put(USER_ID, userId);

String jwt = Jwts.builder()

.setClaims(map)

.setExpCpGAHEiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))

.signWith(SignatureAlgorithm.HS512, SECRET)

.compact();

return jwt;

}

/**

* 验证token

* @param request

* @return 验证通过返回userId

*/

public static Integer verifyToken(HttpServletRequest request) {

String token = request.getHeader(HEADER);

if (token != null) {

try {

Map body = Jwts.parser()

.setSigningKey(SECRET)

.parseClaimsJws(token)

.getBody();

for (Map.Entry entry : body.entrySet()) {

Object key = entry.getKey();

Object value = entry.getValue();

if (key.toString().equals(USER_ID)) {

return Integer.valueOf(value.toString());// userId

}

}

return null;

} catch (Exception e) {

logger.error(e.getMessage());

throw new TokenValidationException("unauthorized");

}

} else {

throw new TokenValidationException("missing token");

}

}

结语: OK, 搞定,睡了, 好困

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:mybatis查询语句的背后揭秘
下一篇:mybatis查询语句揭秘之参数解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~