热线电话:13121318867

登录
首页大数据时代大数据分析师教程-1.4 Hadoop安装与HDFS、MapReduce实验:HDFS存取代码分析与Word Count程序代码分析
大数据分析师教程-1.4 Hadoop安装与HDFS、MapReduce实验:HDFS存取代码分析与Word Count程序代码分析
2024-10-17
收藏

Hadoop安装与HDFS、MapReduce实验:HDFS存取代码分析与Word Count程序代码分析


Hadoop大数据分析——HDFS存取代码分析

启动HDFS和YARN

rm -rf /opt/linuxsir/hadoop/logs/*.*
ssh root@192.168.31.132 rm -rf /opt/linuxsir/hadoop/logs/*.*
ssh root@192.168.31.133 rm -rf /opt/linuxsir/hadoop/logs/*.*

clear
cd /opt/linuxsir/hadoop/sbin
./start-dfs.sh
./start-yarn.sh
 
clear
jps
ssh root@192.168.31.132 jps
ssh root@192.168.31.133 jps

在eclipse编写和运行代码

在eclipse里面操作如下:

  1. 先自定义一个项目 New-Java Project,名称自定义即可,如 java-prj
  2. 接着在项目里面新建一个包New-Package,名称自定义为com.pai.hdfs_demo
  3. 在包里新建一个类 New-Class,名称自定义为ReadWriteHDFSExample
package com.pai.hdfs_demo;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.*;
import java.nio.charset.StandardCharsets;

public class ReadWriteHDFSExample {
 // main 新建一个类ReadWriteHDFSExample,编写main函数如下。main函数调用其它函数,创建目录,写入数据,添加数据,然后再读取数据
 public static void main(String[] args) throws IOException {
  // ReadWriteHDFSExample.checkExists();
  ReadWriteHDFSExample.createDirectory();
  ReadWriteHDFSExample.writeFileToHDFS();
  ReadWriteHDFSExample.appendToHDFSFile();
  ReadWriteHDFSExample.readFileFromHDFS();
 }

 // readFileFromHDFS 该函数读取文件内容,以字符串形式显示出来
 public static void readFileFromHDFS() throws IOException {
  Configuration configuration = new Configuration();
  configuration.set("fs.defaultFS""hdfs://192.168.31.131:9000");
  FileSystem fileSystem = FileSystem.get(configuration);

  // Create a path
  String fileName = "read_write_hdfs_example.txt";
  Path hdfsReadPath = new Path("/javareadwriteexample/" + fileName);
  // initialize input stream
  FSDataInputStream inputStream = fileSystem.open(hdfsReadPath);
  // Classical input stream usage
  String out = IOUtils.toString(inputStream, "UTF-8");
  System.out.println(out);
  // BufferedReader bufferedReader = new BufferedReader(
  // new InputStreamReader(inputStream, StandardCharsets.UTF_8));
  // String line = null;
  // while ((line=bufferedReader.readLine())!=null){
  // System.out.println(line);
  // }
  inputStream.close();
  fileSystem.close();
 }

 // writeFileToHDFS writeFileToHDFS函数打开文件,写入一行文本

public static void writeFileToHDFS() throws IOException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS""hdfs://192.168.31.131:9000");
    FileSystem fileSystem = FileSystem.get(configuration);
    // Create a path
    String fileName = "read_write_hdfs_example.txt";
    Path hdfsWritePath = new Path("/javareadwriteexample/" + fileName);
    FSDataOutputStream fsDataOutputStream = fileSystem.create(hdfsWritePath, true);
    BufferedWriter bufferedWriter = new BufferedWriter(
            new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
    bufferedWriter.write("Java API to write data in HDFS");
    bufferedWriter.newLine();
    bufferedWriter.close();
    fileSystem.close();
}

 // appendToHDFSFile 函数打开文件,添加一行文本。需要注意的是,需要对Configuration类的对象configuration进行适当设置,否则出错

public static void appendToHDFSFile() throws IOException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS""hdfs://192.168.31.131:9000");
    //configuration.setBoolean("dfs.client.block.write.replace-datanode-on-failure.enabled", true);
    configuration.set("dfs.client.block.write.replace-datanode-on-failure.policy","NEVER");
    configuration.set("dfs.client.block.write.replace-datanode-on-failure.enable","true"); 
    FileSystem fileSystem = FileSystem.get(configuration);
    // Create a path
    String fileName = "read_write_hdfs_example.txt";
    Path hdfsWritePath = new Path("/javareadwriteexample/" + fileName);
    FSDataOutputStream fsDataOutputStream = fileSystem.append(hdfsWritePath);
    BufferedWriter bufferedWriter = new BufferedWriter(
        new OutputStreamWriter(fsDataOutputStream, StandardCharsets.UTF_8));
    bufferedWriter.write("Java API to append data in HDFS file");
    bufferedWriter.newLine();
    bufferedWriter.close();
    fileSystem.close();
}

 // createDirectory 函数创建一个目录
public static void createDirectory() throws IOException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS""hdfs://192.168.31.131:9000");
    FileSystem fileSystem = FileSystem.get(configuration);
    String directoryName = "/javareadwriteexample";
    Path path = new Path(directoryName);
    fileSystem.mkdirs(path);
}

 // checkExists checkExists检查目录或者文件是否存在。注意如下代码的最后一个括号是ReadWriteHDFSExample类的结束括号
public static void checkExists() throws IOException {
    Configuration configuration = new Configuration();
    configuration.set("fs.defaultFS""hdfs://192.168.31.131:9000");
    FileSystem fileSystem = FileSystem.get(configuration);
    String directoryName = "/javareadwriteexample";
    Path path = new Path(directoryName);
    if (fileSystem.exists(path)) {
        System.out.println("File/Folder Exists : " + path.getName());
    } else {
        System.out.println("File/Folder does not Exists : " + path.getName());
    }
}

}

为了编译通过上述Java代码,需要把如下目录下的jar包导入Eclipse项目的Build Path 操作序列为 右键点击Eclipse里的Java项目→PropertiesJava Build PathLibrariesAdd External Jars

# 添加如下路径的包
D:hadoop-2.7.3sharehadoopcommonlib
D:hadoop-2.7.3sharehadoopcommon

D:hadoop-2.7.3sharehadoophdfs
D:hadoop-2.7.3sharehadoophdfslib


D:hadoop-2.7.3sharehadoopmapreducelib
D:hadoop-2.7.3sharehadoopmapreduce

D:hadoop-2.7.3sharehadoopyarnlib
D:hadoop-2.7.3sharehadoopyarn

在hd-master主机上检查已经写入的文件

就可以愉快地执行了,执行完毕上述代码后,在hd-master主机上可以通过如下命令,检查已经写入的文件

[root@hd-master bin]# cd /opt/linuxsir/hadoop/bin
[root@hd-master bin]# ./hdfs dfs -ls /javareadwriteexample/read_write_hdfs_example.txt
-rw-r--r--   3 root supergroup         70 2024-10-10 04:47 /javareadwriteexample/read_write_hdfs_example.txt

[root@hd-master bin]# ./hdfs dfs -cat /javareadwriteexample/read_write_hdfs_example.txt
Java API to write data in HDFS
Java API to append data in HDFS file

为了多次进行实验(或者为了调试代码),可以把HDFS文件删除,然后再执行或者调试Java代码,否则一经存在该目录,执行创建目录的代码就会出错

cd /opt/linuxsir/hadoop/bin
./hdfs dfs -rm /javareadwriteexample/*
./hdfs dfs -rmdir /javareadwriteexample

运行完后停止YARN和HDFS

cd /opt/linuxsir/hadoop/sbin
./stop-yarn.sh
./stop-dfs.sh
 
jps
ssh root@192.168.31.132 jps
ssh root@192.168.31.133 jps


Hadoop大数据分析——Word Count程序代码分析

在eclipse编写和运行代码

package mywordcount;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {
    //定义WordCount类的内部类TokenizerMapper 该类实现了map函数,把从文件读取的每个word变成一个形式为<word,1>的Key Value对,输出到map函数的参数context对象,由执行引擎完成Shuffle
 public static class TokenizerMapper extends Mapper<ObjectTextTextIntWritable{

  private final static IntWritable one = new IntWritable(1);
  private Text word = new Text();

  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
   StringTokenizer itr = new StringTokenizer(value.toString());
   while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
   }
  }
 }
    //定义WordCount类的内部类IntSumReducer    该类实现了reduce函数,它收拢所有相同key的、形式为<word,1>的Key-Value对,对Value部分进行累加,输出一个计数
 public static class IntSumReducer extends Reducer<TextIntWritableTextIntWritable{
  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException 
{
   int sum = 0;
   for (IntWritable val : values) {
    sum += val.get();
   }
   result.set(sum);
   context.write(key, result);
   String thekey = key.toString();
   int thevalue = sum;
  }
 }
    // WordCount类的main函数,负责配置Job的若干关键的参数,并且启动这个Job。在main函数中,conf对象包含了一个属性即“fs.defaultFS”,它的值为“hdfs://192.168.31.131:9000”,使得WordCount程序知道如何存取HDFS

 public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  if (otherArgs.length != 2) {
   System.err.println("Usage: wordcount <in> <out>");
   System.exit(2);
  }
  conf.set("fs.defaultFS""hdfs://192.168.31.131:9000");
  Job job = new Job(conf, "word count");
  job.setJarByClass(WordCount.class);
  job.setMapperClass(TokenizerMapper.class);
  job.setCombinerClass(IntSumReducer.class);
  job.setReducerClass(IntSumReducer.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(IntWritable.class);
  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  System.exit(job.waitForCompletion(true) ? 0 : 1);
 }

}


在hd-master主机上检查已经写入的文件

[root@hd-master bin]# ./hdfs dfs -ls /output1
Found 2 items
-rw-r--r--   3 root supergroup          0 2024-10-10 05:17 /output1/_SUCCESS
-rw-r--r--   3 root supergroup         89 2024-10-10 05:17 /output1/part-r-00000
 [root@hd-master bin]# ./hdfs dfs -cat /output1/part-r-00000
I       1
apache  1
cloudera        1
google  1
hadoop  8
hortonworks     1
ibm     1
intel   1
like    1
microsoft       1

数据分析咨询请扫描二维码

最新资讯
更多
客服在线
立即咨询