分布式文件系统05-生产级中间件的Java网络通信技术深度优化

发布于:2025-08-04 ⋅ 阅读:(12) ⋅ 点赞:(0)

生产级中间件的Java网络通信技术深度优化

134_完成读取本地磁盘文件以及发送给客户端的代码逻辑

public class DataNodeNIOServer extends Thread {

    private Map<String, String> waitReadingFiles = new ConcurrentHashMap<String, String>();

	/**
	 * 将文件发送到客户端去
	 * @param channel
	 * @param key
	 * @throws Exception
	 */
	private void sendFileToClient(SocketChannel channel, SelectionKey key) throws Exception {
		// 构建针对本地文件的输入流
		if(!channel.isOpen()) {
			channel.close();
			return;
		}
		
		String remoteAddr = channel.getRemoteAddress().toString(); 
		String filename = waitReadingFiles.get(remoteAddr);

		File file = new File(filename);
		Long fileLength = file.length();
		
		FileInputStream imageIn = new FileInputStream(filename);    
    	FileChannel imageChannel = imageIn.getChannel();
		
		// 循环不断的从channel里读取数据,并写入磁盘文件
    	ByteBuffer buffer = ByteBuffer.allocate(
    			Integer.parseInt(String.valueOf(fileLength)) * 2);
    	long hasReadImageLength = 0L;
		int len = -1;
        while((len = imageChannel.read(buffer)) > 0) {
        	hasReadImageLength += len;
        	System.out.println("已经从本地磁盘文件读取了" + hasReadImageLength + "字节的数据");  
        	buffer.flip();
        	channel.write(buffer);
        	buffer.clear();
        }
        
        imageChannel.close();
        imageIn.close();
        
        // 判断一下,如果已经读取完毕,就返回一个成功给客户端
        if(hasReadImageLength == fileLength) {
        	System.out.println("文件发送完毕,给客户端: " + remoteAddr);
        }
	}

}

按理说,上面数据写完以后,需要把waitReadingFiles中的数据删除

135_在NIO中处理完一次读写请求之后应该如何处理事件的监听?

上面的实现方式,是把Read事件和Write事件,分开作两个流程进行处理,实际上,也不用那么麻烦,直接在处理Read事件时,就从磁盘中读出文件并通过channel写回到客户端即可。

waitReadingFiles的逻辑,也都就不需要了

/**
 * 数据节点的NIOServer
 */
public class DataNodeNIOServer extends Thread {

    public static final Integer SEND_FILE = 1;
    public static final Integer READ_FILE = 2;

    // NIO的selector,负责多路复用监听多个连接的请求
    private Selector selector;
    // 内存队列,无界队列
    private List<LinkedBlockingQueue<SelectionKey>> queues =
            new ArrayList<LinkedBlockingQueue<SelectionKey>>();
    // 缓存的没读取完的文件数据
    private Map<String, CachedImage> cachedImages = new ConcurrentHashMap<String, CachedImage>();
    // 与NameNode进行通信的客户端
    private NameNodeRpcClient namenodeRpcClient;

    /**
     * NIOServer的初始化,监听端口、队列初始化、线程初始化
     */
    public DataNodeNIOServer(NameNodeRpcClient namenodeRpcClient) {
        ServerSocketChannel serverSocketChannel = null;

        try {
            this.namenodeRpcClient = namenodeRpcClient;

            selector = Selector.open();

            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            for (int i = 0; i < 3; i++) {
                queues.add(new LinkedBlockingQueue<SelectionKey>());
            }

            for (int i = 0; i < 3; i++) {
                new Worker(queues.get(i)).start();
            }

            System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void run() {
        /**
         * 无限循环,等待IO多路复用方式监听请求
         */
        while (true) {
            try {
                selector.select();
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();

                while (keysIterator.hasNext()) {
                    SelectionKey key = keysIterator.next();
                    keysIterator.remove();
                    handleEvents(key);
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }
    }

    /**
     * 处理请求分发
     *
     * @param key
     * @throws IOException
     * @throws ClosedChannelException
     */
    private void handleEvents(SelectionKey key) throws IOException {
        SocketChannel channel = null;

        try {
            if (key.isAcceptable()) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                channel = serverSocketChannel.accept();
                if (channel != null) {
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                }
            } else if (key.isReadable()) {
                channel = (SocketChannel) key.channel();
                String remoteAddr = channel.getRemoteAddress().toString();
                int queueIndex = remoteAddr.hashCode() % queues.size();
                queues.get(queueIndex).put(key);
            }
        } catch (Throwable t) {
            t.printStackTrace();
            if (channel != null) {
                channel.close();
            }
        }
    }

    /**
     * 处理请求的工作线程
     */
    class Worker extends Thread {

        private LinkedBlockingQueue<SelectionKey> queue;

        public Worker(LinkedBlockingQueue<SelectionKey> queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            while (true) {
                
                SocketChannel channel = null;

                try {
                    
                    SelectionKey key = queue.take();
                    
                    channel = (SocketChannel) key.channel();
                    
                    handleRequest(channel, key);
                    
                } catch (Exception e) {
                    e.printStackTrace();
                    if (channel != null) {
                        try {
                            channel.close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }

    }

    /**
     * 处理客户端发送过来的请求
     *
     * @param channel
     * @param key
     * @throws Exception
     */
    private void handleRequest(SocketChannel channel, SelectionKey key) throws Exception {
        
        // 假如说你这个一次读取的数据里包含了多个文件的话
        // 这个时候我们会先读取文件名,然后根据文件的大小去读取这么多的数据
        String remoteAddr = channel.getRemoteAddress().toString();
        System.out.println("接收到客户端的请求:" + remoteAddr);

        // 需要先提取出来这次请求是什么类型:1 发送文件;2 读取文件
        if (cachedImages.containsKey(remoteAddr)) {
            handleSendFileRequest(channel, key);
        } else {
            // 但是此时channel的position肯定也变为了4
            Integer requestType = getRequestType(channel); 
            
            if (SEND_FILE.equals(requestType)) {
                handleSendFileRequest(channel, key);
            } else if (READ_FILE.equals(requestType)) {
                handleReadFileRequest(channel, key);
            }
        }
    }

    /**
     * 发送文件
     */
    private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
        String remoteAddr = channel.getRemoteAddress().toString();

        Filename filename = getFilename(channel);
        System.out.println("从网络请求中解析出来文件名:" + filename);
        if (filename == null) {
            channel.close();
            return;
        }
        // 从请求中解析文件大小
        long imageLength = getImageLength(channel);
        System.out.println("从网络请求中解析出来文件大小:" + imageLength);
        // 定义已经读取的文件大小
        long hasReadImageLength = getHasReadImageLength(channel);
        System.out.println("初始化已经读取的文件大小:" + hasReadImageLength);

        // 构建针对本地文件的输出流
        FileOutputStream imageOut = new FileOutputStream(filename.absoluteFilename);
        FileChannel imageChannel = imageOut.getChannel();
        imageChannel.position(imageChannel.size());

        // 循环不断的从channel里读取数据,并写入磁盘文件
        ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
        int len = -1;
        while ((len = channel.read(buffer)) > 0) {
            hasReadImageLength += len;
            System.out.println("已经向本地磁盘文件写入了" + hasReadImageLength + "字节的数据");
            buffer.flip();
            imageChannel.write(buffer);
            buffer.clear();
        }

        imageChannel.close();
        imageOut.close();

        // 判断一下,如果已经读取完毕,就返回一个成功给客户端
        if (hasReadImageLength == imageLength) {
            ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());
            channel.write(outBuffer);
            cachedImages.remove(remoteAddr);
            System.out.println("文件读取完毕,返回响应给客户端: " + remoteAddr);

            // 增量上报Master节点自己接收到了一个文件的副本
            // /image/product/iphone.jpg
            namenodeRpcClient.informReplicaReceived(filename.relativeFilename);
            System.out.println("增量上报收到的文件副本给NameNode节点......");

            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
        }
        // 如果一个文件没有读完,缓存起来,等待下一次读取
        else {
            CachedImage cachedImage = new CachedImage(filename, imageLength, hasReadImageLength);
            cachedImages.put(remoteAddr, cachedImage);
            System.out.println("文件没有读取完毕,等待下一次OP_READ请求,缓存文件:" + cachedImage);
        }
    }

    /**
     * 读取文件
     */
    private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
        String remoteAddr = channel.getRemoteAddress().toString();

        // 从请求中解析文件名
        // 已经是:F:\\development\\tmp1\\image\\product\\iphone.jpg
        Filename filename = getFilename(channel);
        System.out.println("从网络请求中解析出来文件名:" + filename);
        if (filename == null) {
            channel.close();
            return;
        }

        File file = new File(filename.absoluteFilename);
        Long fileLength = file.length();

        FileInputStream imageIn = new FileInputStream(filename.absoluteFilename);
        FileChannel imageChannel = imageIn.getChannel();

        // 循环不断的从channel里读取数据,并写入磁盘文件
        ByteBuffer buffer = ByteBuffer.allocate(
                Integer.parseInt(String.valueOf(fileLength)) * 2);
        long hasReadImageLength = 0L;
        int len = -1;
        while ((len = imageChannel.read(buffer)) > 0) {
            hasReadImageLength += len;
            System.out.println("已经从本地磁盘文件读取了" + hasReadImageLength + "字节的数据");
            buffer.flip();
            channel.write(buffer);
            buffer.clear();
        }

        imageChannel.close();
        imageIn.close();

        // 判断一下,如果已经读取完毕,就返回一个成功给客户端
        if (hasReadImageLength == fileLength) {
            System.out.println("文件发送完毕,给客户端: " + remoteAddr);
            
            // 把文件给客户端发送回去后,就删除要关注的Read事件
            // 表示这个连接的事情就处理完了,等待客户端主动关闭这个连接了,因为客户端用的是短连接
            key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
        }
    }

    /**
     * 获取本次请求的类型
     */
    public Integer getRequestType(SocketChannel channel) throws Exception {
        ByteBuffer requestType = ByteBuffer.allocate(4);
        channel.read(requestType);  // 此时requestType ByteBuffer,position跟limit都是4,remaining是0
        if (!requestType.hasRemaining()) {
            // 已经读取出来了4个字节,可以提取出来requestType了
            requestType.rewind(); // 将position变为0,limit还是维持着4
            return requestType.getInt();
        }
        return -1;
    }

}

主要就是加了,取消针对当前通道的Read事件的监听

136_在客户端实现从数据节点接收发送过来的图片数据

137_工业级NIO通信组件:请求头的拆包问题应该如何解决?

读取请求类型

第237行的remove动作不可少

读取文件名

	/**
	 * 获取相对路径的文件名
	 * @param channel
	 * @return
	 */
	private String getRelativeFilename(SocketChannel channel) throws Exception {
		String client = channel.getRemoteAddress().toString();
		
		Integer filenameLength = null;
		String filename = null;
		
		// 读取文件名的大小
		if(!filenameByClient.containsKey(client)) {
			ByteBuffer filenameLengthBuffer = null;
			if(filenameLengthByClient.containsKey(client)) {
				filenameLengthBuffer = filenameLengthByClient.get(client);
			} else {
				filenameLengthBuffer = ByteBuffer.allocate(4);
			}
					
			channel.read(filenameLengthBuffer); 
			
			if(!filenameLengthBuffer.hasRemaining()) { 
				filenameLengthBuffer.rewind();
				filenameLength = filenameLengthBuffer.getInt();
				filenameLengthByClient.remove(client);
	 		} else {
	 			filenameLengthByClient.put(client, filenameLengthBuffer);
	 		}
		}
		
		// 读取文件名
		ByteBuffer filenameBuffer = null;
		if(filenameByClient.containsKey(client)) {
			filenameBuffer = filenameByClient.get(client);
		} else {
			filenameBuffer = ByteBuffer.allocate(filenameLength);
		}
		
		channel.read(filenameBuffer);
		
		if(!filenameBuffer.hasRemaining()) {
			filenameBuffer.rewind();
			filename = new String(filenameBuffer.array());  
			filenameByClient.remove(client);
		} else {
			filenameByClient.put(client, filenameBuffer);
		}
		
		return filename;
	}

读取文件长度

获取已经读取的文件大小

当客户端发送文件过来时,可能一个文件过大,不可避免的整个文件体就会被拆成多个包,表现在代码层面,就是会有一轮又一轮的Read事件达到,在最后一轮Read事件达到之前的每一轮,我们都需要记录已经读取了文件大小是多少。只有当已经读取的文件大小,等于请求头中的文件完整长度时,才表示这个文件全部接收完毕了

138_工业级NIO通信组件:多个数据文件的粘包问题应该如何解决?

某个客户端在一个连接中,连续上传了多个文件,才可能会出现粘包问题。虽然,我们这里使用的是短连接,一个请求一个连接,所以是不会出现粘包问题的

针对拆包问题,我们只要把已经读到的一部分包缓存起来,并保持对OP_READ事件的继续关注即可

针对粘包问题,我们就是一次只能读取一个文件的内容,如果一个文件读完了,channel中还有数据,那么就只能再走上面的拆包并缓存数据的逻辑,来重复处理第二个文件的数据包

这里是一个文件内容,发生拆包时的处理逻辑。如果要处理粘包问题,那么也还是在这段代码中进行修改,但是定义的fileBuffer也还是fileLength的大小

当前一个文件数据包处理完之后,继续while循环,从channel中读取数据,如果没有读取到数据就结束本次OP_READ事件处理,如果从channel中读取到了数据,则缓存起来,进行下一个包的的处理,依然是先获取4个字节的文件名长度的请求头或者8个字节文件长度的请求头,后续的处理逻辑又是拆包的处理逻辑

139_工业级NIO通信组件:数据文件的拆包问题应该如何解决?

上传文件

下载文件

140_工业级NIO通信组件:客户端读取文件的拆包问题如何解决?

141_工业级NIO通信组件:客户端读取多文件的粘包问题如何解决?

如果客户端通过一个连接,发送了多个文件的下载请求,服务端可能同时把多个文件包发过来,从而产生了粘包问题。我们这里使用的是短连接,当然对应客户端下载文件来说,也不存在粘包问题

通过定义的这个buffer,保证了每次最多只读完一个文件,不会读到下一个文件去。所以,如果发生粘包问题,我们依然可以通过给增加一个while循环,继续从channel中读取下一个文件包的内容即可。当然了,读取下一个文件包时,又有可能发生拆包问题


网站公告

今日签到

点亮在社区的每一天
去签到