springboot集成socket服务
1,先在配置文件中配置Socket监听的端口,由于JDK1.8中已经整合了Socket服务到java.net包中,因此不需要引入其他依赖了
在application.yml中配置下
#Socket配置
socket:
port: 8503
2,配置Socket连接类,这一步类似配置一个控制器(Controller),主要用于接收客户端的连接请求,我这里将它写在了控制器类中
package com.linzhuo.app.controller;
import com.linzhuo.app.service.IBusinessSocketService;
import org.springframework.beans.factory.annotation.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author: jss
* Date: 2023/7/25 上午10:58
*/
@Slf4j
@Component
public class SocketServers {
//注入被开放的端口
@Value("${socket.port}")
private Integer port;
//这个是业务处理的接口
@Resource
private IBusinessSocketService socketService;
@PostConstruct
public void socketStart(){
//直接另起一个线程挂起Socket服务
new Thread(this::socketServer).start();
}
private void socketServer() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
try (ServerSocket serverSocket = new ServerSocket(port)) {
log.info("socket端口在:{}中开启并持续监听=====>", port);
while (Boolean.TRUE) {
Socket clientSocket = serverSocket.accept();
//设置流读取的超时时间,这里设置为10s。超时后自动断开连接
clientSocket.setSoTimeout(10000);
//是否与客户端保持持续连接,这行代码在本示例中,并没有作用,因为后面的逻辑处理完成后,会自动断开连接.
clientSocket.setKeepAlive(Boolean.TRUE);
log.info("发现客户端连接Socket:{}:{}===========>", clientSocket.getInetAddress().getHostAddress(),
clientSocket.getPort());
//这里通过线程池启动ClientHandler方法中的run方法.
executorService.execute(new ClientHandler(clientSocket, socketService));
}
} catch (Exception e) {
log.error("Socket服务启动异常!", e);
}
}
public static void main(String[] args) throws Exception {
requestInfoToSocketServer();
}
private static void requestInfoToSocketServer() {
try (Socket socket = new Socket("127.0.0.1", 8503);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
// Object phone = "15210161743";
out.write("15210161743");
out.flush();
//记得调用shutdownOutput()方法,否则服务端的流读取会一直等待
socket.shutdownOutput();
//开始接收服务端的消息
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("接收到服务端的回复:" + in.readLine());
} catch (Exception e) {
System.out.println("Socket传输数据异常!" + e.getMessage());
}
}
}
3,配置自定义客户端类,这里也将写在了控制器类中,也可以自己去写在其他文件夹里
package com.linzhuo.app.controller;
import com.linzhuo.app.service.IBusinessSocketService;
import com.linzhuo.app.util.SnowFlakeUtil;
import com.linzhuo.app.util.StringUtil;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
/**
* @author: jss
* Date: 2023/7/25 上午11:06
*/
@Slf4j
public class ClientHandler implements Runnable {
private final Socket clientSocket;
private final IBusinessSocketService socketService;
public ClientHandler(Socket clientSocket, IBusinessSocketService socketService) {
this.clientSocket = clientSocket;
this.socketService = socketService;
}
@Override
@SneakyThrows
public void run() {
//SnowFlakeUtil 雪花ID生成工具类,后面会统一给出
String logId = SnowFlakeUtil.getId();
String hostIp = clientSocket.getInetAddress().getHostAddress();
String port = String.valueOf(clientSocket.getPort());
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), Boolean.TRUE)) {
//这里的StringUtil是自己写的工具类,后面会统一给出
String requestInfo = StringUtil.readToStreamToString(clientSocket.getInputStream());
if (requestInfo != null) {
log.info("监听到客户端消息:{},监听日志ID为:{}", requestInfo, logId);
socketService.executeBusinessCode(requestInfo, logId, out);
clientSocket.shutdownOutput();
TimeUnit.SECONDS.sleep(1L);
}
} catch (IOException e) {
log.error("与客户端:[{}:{}]通信异常!错误信息:{}", hostIp, port, e.getMessage());
} finally {
log.info("客户端:[{}:{}]Socket连接已关闭,日志ID为:{}========>", hostIp, port, logId);
}
}
}
4,定义业务接口,service类
package com.linzhuo.app.service;
import java.io.PrintWriter;
/**
* @author: jss
* Date: 2023/7/25 上午11:11
*/
public interface IBusinessSocketService {
/**
* 从Socket中接受消息并处理
*
* @param requestInfo 请求报文
* @param logId 日志ID
* @param writer 回写给客户端消息的回写类(Socket自带)
*/
void executeBusinessCode(String requestInfo, String logId, PrintWriter writer);
}
5,定义业务实现类,serviceImpl
package com.linzhuo.app.service.impl;
import com.linzhuo.app.service.IBusinessSocketService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.io.PrintWriter;
/**
* @author: jss
* Date: 2023/7/25 上午11:12
*/
@Slf4j
@Service
public class IBusinessSocketServiceImpl implements IBusinessSocketService {
@Resource
LzLoginServiceImpl loginService;
@Override
public void executeBusinessCode(String requestInfo, String logId, PrintWriter writer) {
Object responseMsg;
boolean isSuccess = Boolean.TRUE;
try {
if (StringUtils.isEmpty(requestInfo)) {
return;
}
//执行你的业务操作
log.info("接收到的手机为:{}", requestInfo);
// responseMsg = "回填给客户端的信息,可以是任何格式的对象34";
//这里调用了自己的其他接口,接收客户端发送的手机号 ,进行发送验证码
Object phoneCode = loginService.getPhoneCode(requestInfo, "1");
responseMsg = phoneCode;
} catch (Exception e) {
isSuccess = Boolean.FALSE;
e.printStackTrace();
responseMsg = "回填给客户端的信息(业务处理错误的情况下)";
}
try {
//将响应报文通过PrintWriter回写给客户端
writer.println(responseMsg);
} catch (Exception e) {
log.error("Socket客户端数据返回异常!当前日志ID:[{}],异常信息:{}", logId, e);
}
}
}
6,两个相关的工具类
第一个SnowFlakeUtil
package com.linzhuo.app.util;
/**************************************************
* copyright (c) 2022
* created by jss
* date: 2022-9-18
* description: 雪花算法工具类
*
**************************************************/
public class SnowFlakeUtil {
private long workerId;
private long datacenterId;
private long sequence = 0L;
private long twepoch = 1288834974657L; // Thu, 04 Nov 2010 01:42:54 GMT 标记时间 用来计算偏移量,距离当前时间不同,得到的数据的位数也不同
private long workerIdBits = 5L; // 物理节点ID长度
private long datacenterIdBits = 5L; // 数据中心ID长度
private long maxWorkerId = -1L ^ (-1L << workerIdBits); // 最大支持机器节点数0~31,一共32个
private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits); // 最大支持数据中心节点数0~31,一共32个
private long sequenceBits = 12L; // 序列号12位, 4095,同毫秒内生成不同id的最大个数
private long workerIdShift = sequenceBits; // 机器节点左移12位
private long datacenterIdShift = sequenceBits + workerIdBits; // 数据中心节点左移17位
private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits; // 时间毫秒数左移22位
private long sequenceMask = -1L ^ (-1L << sequenceBits); // 用于和当前时间戳做比较,以获取最新时间
private long lastTimestamp = -1L;
//成员类,SnowFlakeUtil的实例对象的保存域
private static class IdGenHolder {
private static final SnowFlakeUtil instance = new SnowFlakeUtil();
}
//外部调用获取SnowFlakeUtil的实例对象,确保不可变
public static SnowFlakeUtil get() {
return IdGenHolder.instance;
}
//初始化构造,无参构造有参函数,默认节点都是0
public SnowFlakeUtil() {
this(0L, 0L);
}
//设置机器节点和数据中心节点数,都是 0-31
public SnowFlakeUtil(long workerId, long datacenterId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
if (datacenterId > maxDatacenterId || datacenterId < 0) {
throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));
}
this.workerId = workerId;
this.datacenterId = datacenterId;
}
//线程安全的id生成方法
@SuppressWarnings("all")
public synchronized long nextId() {
//获取当前毫秒数
long timestamp = timeGen();
//如果服务器时间有问题(时钟后退) 报错。
if (timestamp < lastTimestamp) {
throw new RuntimeException(String.format(
"Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
}
//如果上次生成时间和当前时间相同,在同一毫秒内
if (lastTimestamp == timestamp) {
//sequence自增,因为sequence只有12bit,所以和sequenceMask相与一下,去掉高位
sequence = (sequence + 1) & sequenceMask;
//判断是否溢出,也就是每毫秒内超过4095,当为4096时,与sequenceMask相与,sequence就等于0
if (sequence == 0) {
//自旋等待到下一毫秒
timestamp = tilNextMillis(lastTimestamp);
}
} else {
//如果和上次生成时间不同,重置sequence,就是下一毫秒开始,sequence计数重新从0开始累加,每个毫秒时间内,都是从0开始计数,最大4095
sequence = 0L;
}
lastTimestamp = timestamp;
// 最后按照规则拼出ID 64位
// 000000000000000000000000000000000000000000 00000 00000 000000000000
//1位固定整数 time datacenterId workerId sequence
return ((timestamp - twepoch) << timestampLeftShift) | (datacenterId << datacenterIdShift)
| (workerId << workerIdShift) | sequence;
}
//比较当前时间和过去时间,防止时钟回退(机器问题),保证给的都是最新时间/最大时间
protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
//获取当前的时间戳(毫秒)
protected long timeGen() {
return System.currentTimeMillis();
}
/**
* 获取全局唯一编码
*/
public static String getId() {
long id = SnowFlakeUtil.get().nextId();
return Long.toString(id);
}
}
第二个工具类为StringUtil
package com.linzhuo.app.util;
import lombok.SneakyThrows;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
/**
* <Description> ******
*
* @author jss
* @version 1.0
* @date 2022/04/17
*/
public class StringUtil {
@SneakyThrows
public static String readToStreamToString(InputStream is) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] buffer = new byte[2048];
int bytesRead;
/*
这一步可能会卡住,因为客户端如果传输完数据以后,
如果没有调用socket.shutdownOutput()方法,会导致服务端不知道流是否已传输完毕,
等待我们之前设置的10S流读取时间后,连接就会被自动关掉
如果出现这种情况,服务端可以通过其它方式判断。例如换行符或者特殊字符等,只需要在
while条件中加一个&&判断即可.例如我这里的业务结束标记是字符: ">",那么判断逻辑如下
若客户端调用了shutdownOutput()方法,则不需要这个判断
*/
while (!bos.toString().contains(">") && (bytesRead = is.read(buffer)) != -1) {
bos.write(buffer, 0, bytesRead);
}
return bos.toString();
}
}
7,最后测试一下,用Main方法或者另外一个Springboot项目
public static void main(String[] args) throws Exception {
requestInfoToSocketServer();
}
private static void requestInfoToSocketServer() {
try (Socket socket = new Socket("127.0.0.1", 8503);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
// Object phone = "15210161743";
out.write("15210161743");
out.flush();
//记得调用shutdownOutput()方法,否则服务端的流读取会一直等待
socket.shutdownOutput();
//开始接收服务端的消息
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("接收到服务端的回复:" + in.readLine());
} catch (Exception e) {
System.out.println("Socket传输数据异常!" + e.getMessage());
}
}
out.flush();这里需要手动刷新下,才会发送到服务器端,自动刷新不生效 需要手动刷新
自己测试了可以使用