使用Java ApI 实现Hadoop文件上传

发布于:2025-04-01 ⋅ 阅读:(20) ⋅ 点赞:(0)

目录

 

文件传输步骤

windows的本机文件传输

linux的虚拟机文件传输


 

 

文件传输步骤

建立连接
在connect2HDFS()方法中,通过设置Configuration对象来指定HDFS的URI(在这个例子中为hdfs://192.168.12.133:9000),并初始化一个FileSystem实例fs,用于后续的所有HDFS操作。
关闭连接
close()方法用于在完成所有HDFS操作后关闭与HDFS的连接,确保资源被正确释放。
上传文件并分类 (uploadAndClassify(File file)):方法接收一个本地文件作为参数。
将本地文件上传到HDFS上的相应目录中。
业务逻辑
该方法接受一个字符串类型的目录路径作为参数,将其转换为Path对象,并检查该路径是否已存在。如果不存在,则创建新的目录。
主函数执行
首先调用connect2HDFS()方法与HDFS建立连接。指定一个本地目录(在这个例子中是/home/covid_data),然后遍历这个目录下的所有.json文件。对每个符合条件的文件调用uploadAndClassify(File file)方法进行处理。

package hdfs.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.File;
import java.io.IOException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class HDFSApiDemo {
    private static FileSystem fs = null;

    // 用于和HDFS建立连接
    public static void connect2HDFS() throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","hdfs://192.168.12.133:9000");
        // 显式指定本地文件系统的实现类
        conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
        System.out.println("111");
        fs = FileSystem.get(conf);
    }

    // 关闭客户端和HDFS的连接
    public static void close(){
        if(fs != null){
            try{
                fs.close();
            } catch (IOException e){
                e.printStackTrace();
            }
        }
    }


    /**
     * 上传文件并分类
     * @param file
     * @throws IOException
     */
    public static void uploadAndClassify(File file) throws IOException {
        // 提取所有汉字作为省份名称
        Pattern pattern = Pattern.compile("([\\p{IsHan}]+).*\\.json");
        // 匹配所有汉字
        Matcher matcher = pattern.matcher(file.getName());

        // 匹配汉字成功
        if (matcher.find()) {
            String province = matcher.group(1);  // 获取所有汉字组成的字符串
            String targetDir = "/covid_data/" + province + "/";  // 使用全部汉字作为目录名
            String fileName = file.getName();

            System.out.println("Processing file: " + fileName);

            // 创建省份目录(如果不存在)
            mkdir(targetDir);

            // HDFS目的路径
            Path dst = new Path(targetDir + fileName);

            // 上传文件
            fs.copyFromLocalFile(new Path(file.getAbsolutePath()), dst);
            System.out.println("Uploaded: " + fileName + " to " + targetDir);
        } else {
            System.out.println("File does not match the expected pattern: " + file.getName());
        }
    }

    // 重载mkdir()方法,支持String类型参数
    public static void mkdir(String dir) throws IOException {
        Path path = new Path(dir);
        if (!fs.exists(path)) {
            fs.mkdirs(path); // 创建目录
        }
    }



    public static void main(String[] args) throws IOException {
        try {
            connect2HDFS();

            // 虚拟机上的本地目录
            File localDir = new File("/home/covid_data");

            // 遍历目录下的所有文件
            for (File file : localDir.listFiles()) {
                if (file.isFile() && file.getName().endsWith(".json")) {
                    uploadAndClassify(file);
                }
            }

            close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}






















windows的本机文件传输

对于上述代码只需要更改上传路径即可
遭遇问题1:端口无法访问

// 第一步 cd /usr/local/hadoop(安装路径
// 第二步  vi ./etc/hadoop/hdfs-site.xml
// 第三步 加入下列配置 目的:让NameNode监听所有网络接口上的9000端口
<property>
    <name>dfs.namenode.rpc-bind-host</name>
    <value>0.0.0.0</value>
</property>
// 第四步 让配置生效 
   先关闭HDFS命令      ./sbin/stop-dfs.sh
   在重启HDFS命令      ./sbin/start-dfs.sh

遭遇问题2:用户权限不足

// 改变hadoop用户权限 这会将/data目录的权限设置为rwxrwxrwx,允许所有用户读写执行
// hdfs dfs -chmod 777 /data

linux的虚拟机文件传输

遭遇问题1:无法找到主类

<!--  添加maven-shade-plugin插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>hdfs.demo.HDFSApiDemo</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
// 这段Maven配置的作用是使用maven-shade-plugin插件在打包阶段创建一个包含所有依赖的可执行JAR文件,并指定hdfs.demo.HDFSApiDemo作为JAR文件的主类(即包含main方法的入口类)

遭遇问题1:Hadoop尝试使用file:// URI方案时找不到对应的文件系统实现

// 显式指定本地文件系统的实现类
conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");