安装篇-Hadoop3.2.1集群安装分享_大数据之阴阳的博客-程序员宅基地

技术标签: 大数据环境篇  hadoop  big data  大数据  

1.预备知识

1.1 linux须知

  1. linux微内核的特性,vmware安装linux
  2. centos:稳定
  3. linux常操作目录:/bin,/usr,/etc
  4. xshell:实用xshell操作centos
  5. linux记事本:vi/vim

1.2 常用命令

  1. 帮助:man
  2. 目录:mkdir,rmdir,mv,ls,rm -rf,cd
  3. 文件:touch/vi,cat,cp,rm,more,grep
  4. 搜索:which,whereis,find
  5. 时间:date ,date -s
  6. 用户和组管理:useradd…,groupadd…
  7. 进程:ps -ef,kill -9 进程id,pkill -p id 或者/-f 进程
  8. 网络:netstat -aux
  9. 磁盘:df
  10. 压缩和解压:zip,unzip,tar
  • tar -zcvf 压缩
  • tar -zxvf 解压
  1. 软件:yum
    • yum list
    • yum install
    • yum remove
    • rpm -ivh,evh:了解
  2. 上传、下载(lrzsz):rz,sz
  3. 定时任务:crontab -e
    • min,h,d,m,week
    • crontab -l
    • crontab -r:删除

1.3 shell脚本

  1. 变量:

    x,$x

  2. 运算符:

    $[3+6]

  3. 判断:

    if [];then fi

  4. 循环:

    for(()) 或者 for x in list

    do

    done

    while [ ]

    do

    done

  5. 函数:

    function fun(){} fun

2.hadoop之windows配置

  1. 解压hadoop压缩文件

  2. 指定HADOOP_HOME

  3. 指定path:/bin,/sbin

  4. 测试:

    hadoop version

3.在linux上搭建hadoop集群

集群成员:

主机 hdfs yarn

master namenode ,secondarynamenode resourcemanager

slave1 datanode nodemanager

slave2 datanode nodemanager

3.1 安装jdk8,hadoop3.2.1

  1. 上传压缩文件并解压(/usr)

  2. 设置环境变量(/etc/profile)

    export JAVA_HOME=/usr/jdk8
    export HADOOP_HOME=/usr/hadoop321
    export PATH= P A T H : PATH: PATH:JAVA_HOME/bin: H A D O O P H O M E / b i n : HADOOP_HOME/bin: HADOOPHOME/bin:HADOOP_HOME/sbin

  3. 激活配置文件

    . /etc/profile

  4. 测试:

    hadoop version

3.2 hdfs配置

  1. core-site.xml

    fs.defaultFS hdfs://master:9000
  2. hdfs-site.xml

    dfs.replication 2 dfs.http.address 0.0.0.0:5700 dfs.namenode.name.dir file:///root/hadoop/dfs/namenode dfs.datanode.data.dir file:///root/hadoop/dfs/datanode dfs.webhdfs.enabled true
  3. 初始化namenode

    hdfs namenode -format

  4. start-dfs.sh,stop-dfs.sh

    #设置用户
    HDFS_NAMENODE_USER=root
    HDFS_DATANODE_USER=root
    HDFS_SECONDARYNAMENODE_USER=root

  5. hadoop-env.sh

    export JAVA_HOME=/usr/jdk8

3.3 集群成员配置

  1. 域名与ip绑定(/etc/hosts)

    192.168.85.129 master
    192.168.85.130 slave1
    192.168.85.131 slave2

  2. 配置workers(工作节点)(/usr/hadoop321/etc/hadoop/workers)

    slave1

    slave2

  3. 修改副本数量(数据节点数量)(hdfs-site.xml)

    dfs.replication 2

3.4 yarn配置

  1. yarn-site.xml

    yarn.nodemanager.aux-services mapreduce_shuffle yarn.resourcemanager.hostname master yarn.resourcemanager.webapp.address master:8088 yarn.application.classpath /usr/hadoop321/etc/hadoop:/usr/hadoop321/share/hadoop/common/lib/*:/usr/hadoop321/share/hadoop/common/*:/usr/hadoop321/share/hadoop/hdfs:/usr/hadoop321/share/hadoop/hdfs/lib/*:/usr/hadoop321/share/hadoop/hdfs/*:/usr/hadoop321/share/hadoop/mapreduce/lib/*:/usr/hadoop321/share/hadoop/mapreduce/*:/usr/hadoop321/share/hadoop/yarn:/usr/hadoop321/share/hadoop/yarn/lib/*:/usr/hadoop321/share/hadoop/yarn/*
  2. mapred-site.xml

    mapreduce.framework.name yarn
  3. start-yarn.sh,stop-yarn.sh

    YARN_RESOURCEMANAGER_USER=root
    YARN_NODEMANAGER_USER=root

3.5 cnetos克隆

  1. 修改主机名

    hostnamectl set-hostanme 主机名

  2. 删除/tmp目录下的文件,使数据节点在浏览器端能看见(注意:防火墙关闭)

    systemctl disable firewalld(开机不自启)

3.6 master免密登录slave

  1. 在root目录创建密钥:

    ssh-keygen

  2. authorized_keys拷贝到slave上

cat id_rsa.pub >> authorized_keys

scp 拷贝到salve的.ssh文件夹下

scp authorized_keys [email protected]:/root/.ssh

3.7 启动hadoop集群

  1. 在master上启动

    start-all.sh

  2. 测试

    jps

  3. 查看节点

    hdfs dfsadmin -report

4. mapreduce实例

4.1 单词统计(入门)

/**
 * 英文单词统计
 */
public class WordCounter {
    
    //实现分词
    public static class MyMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
    
        public static Text text = new Text();
        public static IntWritable intWritable = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            String v = value.toString();
            String[] words = v.split(" ");
            for (String word : words) {
    
                text.set(word);
                context.write(text,intWritable);
            }
        }
    }

    //实现统计
    public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count = 0;
            for (IntWritable value : values) {
    
                count+=value.get();
            }
            context.write(key,new IntWritable(count));
        }
    }

    public static void main(String[] args) {
    
        Configuration conf = new Configuration();
        try {
    
            // 任务
            Job job = Job.getInstance(conf);
            job.setJobName("firstJob");
            job.setJarByClass(WordCounter.class);
            // 设置mapper,reducer
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            // 设置输出数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            // 设置输入输出目录
            FileInputFormat.setInputPaths(job,"data6");
            FileOutputFormat.setOutputPath(job,new Path("dTemp"));
            // 执行并关闭
            job.waitForCompletion(true);
            job.close();
        } catch (Exception e) {
    
            e.printStackTrace();
        }
    }
}

4.2 中文分词统计(ik)

/**
 * 中文单词统计
 */
public class CNWordCounter {
    
    // 实现中文分词
    public static class MyMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
    

        public static Text text = new Text();
        public static IntWritable intWritable = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            byte[] bytes = value.toString().getBytes();
            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
            InputStreamReader isReader = new InputStreamReader(bis);
            IKSegmenter ikSegmenter = new IKSegmenter(isReader, true);

            Lexeme lexeme=null;
            while ((lexeme=ikSegmenter.next())!=null){
    
                String word = lexeme.getLexemeText();
                text.set(word);
                context.write(text,intWritable);
            }
        }
    }

    // 实现统计
    public static class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    

        public static Text text = new Text();
        public static List<Record> list =new ArrayList<Record>();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count=0;
            for (IntWritable value : values) {
    
                count+=value.get();
            }
//            context.write(key,new IntWritable(count));
            Record record = new Record(key.toString(), count);
            list.add(record);
        }
        // 实现排序
        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
    
            Collections.sort(list);
            Collections.reverse(list);
            for (Record record : list) {
    
                text.set(record.getWord());
                context.write(text,new IntWritable(record.getCount()));
            }
        }
    }

    
    public static void main(String[] args) {
    
        Configuration conf = new Configuration();
        try {
    
            Job job = Job.getInstance(conf);
            job.setJobName("secondJob");
            job.setJarByClass(CNWordCounter.class);
            // 设置mapper,reducer
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            // 设置输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            // 设置输入输出目录
            FileInputFormat.setInputPaths(job,"/test99/data2");
            FileOutputFormat.setOutputPath(job,new Path("/test99/out"));
            // 启动任务并关闭
            job.waitForCompletion(true);
            job.close();
        } catch (Exception e) {
    
            e.printStackTrace();
        }

    }
}

4.3 数据清洗(去重、去空、去非)

/**
 * 数据清洗:去空,去重,去非
 */
public class DataClear {
    

    public static void main(String[] args) {
    
        Configuration conf = new Configuration();
        try {
    
            Job job = Job.getInstance(conf);
            job.setJobName("clearJob");
            job.setJarByClass(DataClear.class);
            // mapper
            job.setMapperClass(RemoveReplyMapper.class);

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 输入输出目录
            FileInputFormat.setInputPaths(job,"data4");
            FileOutputFormat.setOutputPath(job,new Path("out"));
            job.waitForCompletion(true);
            job.close();
        } catch (Exception e) {
    
            e.printStackTrace();
        }
    }
}

/**
 * 去空Mapper类
 */
class RemoveNullMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
        String v = value.toString();
        boolean flag = isEmpty(v);
        // 非空则写入
        if (!flag){
    
            context.write(value,NullWritable.get());
        }
    }
    // 如果某一字段为空,则判断为空
    private boolean isEmpty(String v) {
    
        String[] split = v.split("  ");
        for (String field : split) {
    
            if (field==null||field.equals("  ")||field.equals("")){
    
                return true;
            }
        }
        return false;
    }
}

/**
 * 去重Mapper类:利用set集合去重
 */
class RemoveReplyMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    

    public static Set<String> names = new HashSet<>();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
        String v = value.toString();
        boolean flag = isRely(v);
        // 非重复数据则写入
        if (!flag){
    
            context.write(value,NullWritable.get());
        }
    }

    // 如果姓名字段重复,则判定重复
    private boolean isRely(String v) {
    
        String[] split = v.split("  ");
        String name =split[0];
        // 重复
        if (names.contains(name)){
    
            return true;
        }
        // 不重复
        names.add(name);
        return false;
    }
}

/**
 * 去非Mapper类
 */
class RemoveIllegalMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
    

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
        String v = value.toString();
        boolean flag = isIllegal(v);
        // 合法数据则写入
        if (!flag){
    
            context.write(value,NullWritable.get());
        }
    }
    // 如果成绩字段>100或<0,则为非法数据
    private boolean isIllegal(String v) {
    
        String[] split = v.split("\\s+");
        for (int i = 1; i < split.length; i++) {
    
            int score = Integer.parseInt(split[i]);
            if (score>100 || score<0){
    
                return true;
            }
        }
        return false;
    }
}

4.4 序列化和反序列化(writable)

/**
 * 手机话费序列化类:话费、流量费
 */
public class PhoneWritable implements Writable {
    

    private String num;
    private Double base;
    private Double flow;

    public PhoneWritable() {
    
    }

    @Override
    public String toString() {
    
        return "PhoneWritable{" +
                "base=" + base +
                ", flow=" + flow +
                '}';
    }

    public PhoneWritable(Double base, Double flow) {
    
        this.base = base;
        this.flow = flow;
    }


    public String getNum() {
    
        return num;
    }

    public void setNum(String num) {
    
        this.num = num;
    }

    public Double getBase() {
    
        return base;
    }

    public void setBase(Double base) {
    
        this.base = base;
    }

    public Double getFlow() {
    
        return flow;
    }

    public void setFlow(Double flow) {
    
        this.flow = flow;
    }

    // 序列化
    @Override
    public void write(DataOutput out) throws IOException {
    
        out.writeDouble(base);
        out.writeDouble(flow);
    }
    // 反序列化
    @Override
    public void readFields(DataInput in) throws IOException {
    
        this.base=in.readDouble();
        this.flow=in.readDouble();
    }
}

4.5 数据排序(WritableComparable)

@Data
@NoArgsConstructor
@AllArgsConstructor
public class SortRecord implements WritableComparable<SortRecord> {
    

    private String key;
    private Integer value;

    @Override
    public String toString() {
    
        return key+"  "+value;
    }


    @Override
    public int compareTo(SortRecord o) {
    
        // 降序
        return o.getValue()-this.getValue();
    }

    @Override
    public void write(DataOutput out) throws IOException {
    
        out.writeUTF(key);
        out.writeInt(value);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    
        this.key=in.readUTF();
        this.value=in.readInt();
    }
}

4.6 数据压缩(map,reduce)

// map压缩
conf.setBoolean("mapreduce.map.output.compress",true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
// reduce压缩
FileOutputFormat.setCompressOutput(job,true);
FileOutputFormat.setOutputCompressorClass(job,BZip2Codec.class);

4.7 连接查询(mapJoin,reduceJoin)

  1. reduce端连接
class JoinMapper extends Mapper<LongWritable, Text,Text,Record>{
    

    Record record=new Record();
    Text text =new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
        InputSplit inputSplit = context.getInputSplit();
        FileSplit fileSplit = (FileSplit) inputSplit;
        String name = fileSplit.getPath().getName();
        String[] split = value.toString().split("\\s+");
        String pid=null;
        if (name.startsWith("order")){
    
            pid=split[1];
            record.setOrderid(split[0]);
            record.setPid(split[1]);
            record.setNum(Integer.parseInt(split[2]));
            record.setPname("");
        }else {
    
            pid=split[0];
            record.setOrderid("");
            record.setPid(split[0]);
            record.setPname(split[1]);
            record.setNum(0);
        }
        text.set(pid);
        context.write(text,record);
    }
}

class JoinReducer extends Reducer<Text,Record,Text, NullWritable>{
    

    Text text=new Text();

    @Override
    protected void reduce(Text key, Iterable<Record> values, Context context) throws IOException, InterruptedException {
    
        List<Record> list =new ArrayList<>();
        Record pd =new Record();
        for (Record record : values) {
    
            if (StringUtils.isEmpty(record.getPname())){
    
                Record record1 = new Record();
                // 订单
                try {
    
                    BeanUtils.copyProperties(record1,record);
                } catch (Exception e) {
    
                    e.printStackTrace();
                }
                list.add(record1);
            }else {
    
                pd.setPname(record.getPname());
            }
        }
        for (Record re : list) {
    
            String res =re.getOrderid()+" "+pd.getPname()+" "+re.getNum();
            text.set(res);
            context.write(text,NullWritable.get());
        }
    }
}
public class ReduceJoin {
    
    public static void main(String[] args) {
    
        Configuration conf = new Configuration();
        try {
    
            Job job = Job.getInstance(conf, "reduceJoin");
            job.setJarByClass(ReduceJoin.class);
            job.setMapperClass(JoinMapper.class);
            job.setReducerClass(JoinReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Record.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);

            FileInputFormat.setInputPaths(job,"data");
            FileOutputFormat.setOutputPath(job,new Path("out"));
            job.waitForCompletion(true);
            job.close();
        } catch (Exception e) {
    
            e.printStackTrace();
        }
    }
}
  1. map端连接

    • 自定义序列化类

      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      public class Record implements Writable {
              
          @Override
          public void write(DataOutput out) throws IOException {
              
              out.writeUTF(orderid);
              out.writeUTF(pid);
              out.writeUTF(pname);
              out.writeInt(num);
          }
      
          @Override
          public void readFields(DataInput in) throws IOException {
              
             orderid=in.readUTF();
             pid=in.readUTF();
             pname=in.readUTF();
             num=in.readInt();
          }
      
          private String orderid;
          private String pid;
          private String pname;
          private Integer num;
      }
      
    • map任务

      class MyMapper extends Mapper<LongWritable, Text,Text, NullWritable>{
              
          // 存放商品星系:id,name
          Map<String,String> map =new HashMap<>();
      
          @Override
          protected void setup(Context context) throws IOException, InterruptedException {
              
              FileInputStream fileInputStream = new FileInputStream("data/pd.txt");
              BufferedReader reader=new BufferedReader(new InputStreamReader(fileInputStream));
              String str=null;
              while ((str=reader.readLine())!=null){
              
                  String[] split = str.split("\\s+");
                  map.put(split[0],split[1]);
              }
          }
      
          @Override
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
              
              InputSplit inputSplit = context.getInputSplit();
              FileSplit fileSplit= (FileSplit) inputSplit;
              String name = fileSplit.getPath().getName();
              if (name.startsWith("order")){
              
                  String[] split = value.toString().split("\\s+");
                  String res =split[0]+" "+map.get(split[1])+" "+split[2];
                  context.write(new Text(res),NullWritable.get());
              }
          }
      }
      
      public class MapJoin {
              
          public static void main(String[] args) {
              
              Configuration conf = new Configuration();
              try {
              
                  Job job = Job.getInstance(conf, "mapJoin");
                  job.setMapperClass(MyMapper.class);
                  job.setOutputKeyClass(Text.class);
                  job.setOutputValueClass(NullWritable.class);
                  FileInputFormat.setInputPaths(job,"data");
                  FileOutputFormat.setOutputPath(job,new Path("out"));
                  job.waitForCompletion(true);
                  job.close();
              } catch (Exception e) {
              
                  e.printStackTrace();
              }
          }
      }
      
      

4.8 自定义分区(partitioner)

  1. 自定义分区类

    class MyPartition extends Partitioner<Text,Record>{
          
    
        @Override
        public int getPartition(Text text, Record record, int i) {
          
            String key= text.toString();
            switch (key){
          
                case "01":
                    return 1;
                case "02":
                    return 2;
                case "03":
                    return 3;
            }
            return 1;
        }
    }
    
    
  2. job中指定

    job.setPartitionerClass(MyPartition.class);
    job.setNumReduceTasks(3);
    
    

5. hadoop优化

  1. 选用高性能机器
  2. map之前预处理:小文件合并成大文件
  3. map阶段:combine,局部汇总
  4. reduce阶段:设置reduce buff参数
  5. 数据倾斜
    • 自定义分区
    • mapJoin

6.zookeeper的使用

  1. 下载文件解压(/usr)

  2. 配置环境变量(/etc/profile)

    export ZK_HOME=/usr/zk
    export PATH= P A T H : PATH: PATH:JAVA_HOME/bin: H A D O O P H O M E / b i n : HADOOP_HOME/bin: HADOOPHOME/bin:HADOOP_HOME/sbin:$ZK_HOME/bin

  3. 配置运行参数(/usr/zk/conf/zoo.cfg)

    dataDir=/root/zk/data
    dataLogDir=/root/zk/log

  4. 启动zk服务端

    zkServer.sh start

  5. 打开zk客户端

    zkCli.sh

  6. 关闭zk服务端

    zkServer.sh stop

7.mapreduce进阶案例

7.1 多mr顺序执行

public static void main(String[] args) {
    
    Mr1.execMr1();
    Mr2.execMr2();
}

7.2 mapreduce的链式执行

//map链
ChainMapper.addMapper(job,MyMapper1.class,LongWritable.class,Text.class,Text.class,IntWritable.class,cfg);
ChainMapper.addMapper(job,MyMapper2.class,Text.class,IntWritable.class,Text.class,IntWritable.class,cfg);
//reducer
ChainReducer.setReducer(job,MyReducer.class,Text.class,IntWritable.class,Text.class,IntWritable.class,cfg);
//mapper3
ChainMapper.addMapper(job,MyMapper3.class,Text.class,IntWritable.class,Text.class,IntWritable.class,cfg);

7.3 气温指数分析

  1. 自定义天气类

    /**
     * 天气类:年,月,日,温度
     */
    @Data@NoArgsConstructor@AllArgsConstructor
    public class TianQi implements WritableComparable<TianQi> {
          
        private Integer year;
        private Integer month;
        private Integer day;
        private Integer wd;
    
        @Override
        public String toString() {
          
            return year+"\t"+month+"\t"+day+"\t"+wd+"c";
        }
    
        @Override
        public int compareTo(TianQi o) {
          
            // 按年升序,月升序,温度降序,日升序
            int yAsc = Integer.compare(this.getYear(),o.getYear());
            if (yAsc==0){
          
                int mAsc = Integer.compare(this.getMonth(), o.getMonth());
                if (mAsc==0){
          
                    int wdDesc = Integer.compare(o.getWd(), this.getWd());
                    if (wdDesc==0){
          
                        int dAsc = Integer.compare(this.getDay(), o.getDay());
                        return dAsc;
                    }
                    return wdDesc;
                }
                return mAsc;
            }
            return yAsc;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
          
            out.writeInt(year);
            out.writeInt(month);
            out.writeInt(day);
            out.writeInt(wd);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
          
            year=in.readInt();
            month=in.readInt();
            day=in.readInt();
            wd=in.readInt();
        }
    }
    
    
  2. 自定义分组类

    /**
     * 按年月分组
     */
    public class TianQiGroupComparator extends WritableComparator {
          
        public TianQiGroupComparator() {
          
            super(TianQi.class,true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
          
            TianQi aa= (TianQi) a;
            TianQi bb = (TianQi) b;
            int y =aa.getYear()-bb.getYear();
            if (y==0){
          
                return aa.getMonth()-bb.getMonth();
            }
            return y;
        }
    }
    
    
  3. 编写mr程序

    /**
     * 统计每月温度最高的两天
     */
    public class TianQiClient {
          
        /**
         * 封装到TianQi类中
         */
        public static class TianQiMapper extends Mapper<LongWritable, Text,TianQi, NullWritable>{
          
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
          
                String[] split = value.toString().split("\\s+");
                String time =split[0]+" "+split[1];
                int wd = Integer.parseInt(split[2].substring(0, split[2].length()-1));
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                try {
          
                    Date date = simpleDateFormat.parse(time);
                    Calendar calendar = Calendar.getInstance();
                    calendar.setTime(date);
                    int year =calendar.get(Calendar.YEAR);
                    int month =calendar.get(Calendar.MONTH)+1;
                    int day =calendar.get(Calendar.DAY_OF_MONTH);
                    TianQi tianQi = new TianQi(year, month, day, wd);
                    context.write(tianQi,NullWritable.get());
                } catch (ParseException e) {
          
                    e.printStackTrace();
                }
            }
        }
        public static class TianQiPartitioner extends Partitioner<TianQi, NullWritable>{
          
    
            @Override
            public int getPartition(TianQi tianQi, NullWritable nullWritable, int numPartitions) {
          
                return (tianQi.getYear()&Integer.MAX_VALUE)%numPartitions;
            }
        }
    
        /**
         * 找出温度最高的两天
         */
        public static class TianQiReducer extends Reducer<TianQi,NullWritable,TianQi,NullWritable>{
          
            @Override
            protected void reduce(TianQi key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
          
                int flag = 0;
                int day = 0;
                for (NullWritable nullWritable : values) {
          
                    // 写出最高温度
                    if(flag == 0){
          
                        context.write(key, NullWritable.get());
                        flag ++;
                        // 记录天
                        day = key.getDay();
                    }
                    // 写出次高温度
                    if(key.getDay() != day){
          
                        context.write(key, NullWritable.get());
                        break;
                    }
                }
            }
        }
    
    
        public static void main(String[] args) {
          
            Configuration cfg = new Configuration();
            try {
          
                Job job = Job.getInstance(cfg,"tianqi");
                job.setMapperClass(TianQiMapper.class);
                job.setReducerClass(TianQiReducer.class);
    
                job.setMapOutputKeyClass(TianQi.class);
                job.setMapOutputValueClass(NullWritable.class);
                job.setOutputKeyClass(TianQi.class);
                job.setOutputValueClass(NullWritable.class);
    
                FileInputFormat.setInputPaths(job,"data2");
                FileOutputFormat.setOutputPath(job,new Path("out"));
    
                job.setPartitionerClass(TianQiPartitioner.class);
                job.setNumReduceTasks(3);
                job.setGroupingComparatorClass(TianQiGroupComparator.class);
    
                job.waitForCompletion(true);
                job.close();
            } catch (Exception e) {
          
                e.printStackTrace();
            }
        }
    }
    
    

7.4 好友推荐

/**
 * 好友推荐:推荐潜在好友
 */
public class FriendClient {
    
    /**
     * 直接好友:0,间接好友:1
     */
    public static class FriendMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            String[] split = value.toString().split(":");
            String left =split[0];
            String[] rights = split[1].split("\\s+");
            for (int i = 0; i < rights.length; i++) {
    
                // 直接好友
                context.write(new Text(unit(left,rights[i])),new IntWritable(0));
                for (int j = i+1; j < rights.length; j++) {
    
                    // 间接好友
                    context.write(new Text(unit(rights[i],rights[j])),new IntWritable(1));
                }
            }
        }
    }
    public static class FriendReducer extends Reducer<Text, IntWritable,Text, IntWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count = 0;
            for (IntWritable value : values) {
    
                // 排除直接好友
                if (value.get() == 0) {
    
                    return;
                }
                count++;
            }
            context.write(key, new IntWritable(count));
        }
    }
    // 交换
    private static String unit(String left, String right) {
    
        return left.compareTo(right)>0?left+":"+right:right+":"+left;
    }


    public static void main(String[] args) {
    
        Configuration cfg = new Configuration();
        try {
    
            Job job = Job.getInstance(cfg,"fried");
            job.setMapperClass(FriendMapper.class);
            job.setReducerClass(FriendReducer.class);

            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);

            FileInputFormat.setInputPaths(job,"data4");
            Path out = new Path("out");
            FileSystem fs = out.getFileSystem(cfg);
            if (fs.exists(out)){
    
                fs.delete(out,true);
            }
            FileOutputFormat.setOutputPath(job,out);
            job.waitForCompletion(true);
            job.close();
        } catch (Exception e) {
    
            e.printStackTrace();
        }
    }
}

8 部署高可用Hadoop集群

集群成员:

主机 hdfs yarn

master namenode resourcemanager

slave1 namenode,datanode resourcemanager,nodemanager

slave2 datanode nodemanager

8.1 在master上安装zk

8.2 修改core-site.xml

fs.defaultFS hdfs://cluster hadoop.tmp.dir file:/root/hadoop/tmp ha.zookeeper.quorum master:2181

8.3 修改hdfs-site.xml

<configuration>
<!--指定hdfs的nameservice为myNameService1,需要和core-site.xml中的保持一致 -->
<property>
      <name>dfs.nameservices</name>
      <value>cluster</value>
</property>
<!-- myNameService1下面有两个NameNode,分别是nn1,nn2 -->
<property>
      <name>dfs.ha.namenodes.cluster</name>
      <value>master,slave1</value>
</property>
<!-- master的RPC通信地址 -->
<property>
      <name>dfs.namenode.rpc-address.cluster.master</name>
      <value>master:9000</value>
</property>

<!-- master的http通信地址 -->
<property>
      <name>dfs.namenode.http-address.cluster.master</name>
      <value>master:50070</value>
</property>

<!-- slave1的RPC通信地址 -->
<property>
      <name>dfs.namenode.rpc-address.cluster.slave1</name>
      <value>slave1:9000</value>
</property>

<!-- slave2的http通信地址 -->
<property>
      <name>dfs.namenode.http-address.cluster.slave1</name>
      <value>slave1:50070</value>
</property>

<!-- 指定NameNode的元数据在JournalNode上的存放位置 -->
<property>
      <name>dfs.namenode.shared.edits.dir</name>
      <value>qjournal://master:8485;slave1:8485;slave2:8485/cluster</value>
</property>

<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
      <name>dfs.journalnode.edits.dir</name>
      <value>/root/hadoop/journalData</value>
</property>

<!-- 开启NameNode失败自动切换 -->
<property>
      <name>dfs.ha.automatic-failover.enabled</name>
      <value>true</value>
</property>

<!-- 配置失败自动切换实现方式 -->
<property>
      <name>dfs.client.failover.proxy.provider.myNameService1</name>
      <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- 配置隔离机制方法,Failover后防止停掉的Namenode启动,造成两个服务,多个机制用换行分割,即每个机制暂用一行-->
<property>
      <name>dfs.ha.fencing.methods</name>
      <value>
              sshfence
              shell(/bin/true)
      </value>
</property>

<!-- 使用sshfence隔离机制时需要ssh免登陆,注意换成自己的用户名 -->
<property>
      <name>dfs.ha.fencing.ssh.private-key-files</name>
      <value>/root/.ssh/id_rsa</value>
</property>

<!-- 配置sshfence隔离机制超时时间 -->
<property>
      <name>dfs.ha.fencing.ssh.connect-timeout</name>
      <value>30000</value>
</property>
<property>
      <name>dfs.replication</name>
      <value>2</value>
</property>
<property>
      <name>dfs.namenode.name.dir</name>
      <value>/root/hadoop/dfs/namenode</value>
</property>
<property>
      <name>dfs.datanode.data.dir</name>
      <value>/root/hadoop/dfs/datanode</value>
</property>
<property>
       <name>dfs.webhdfs.enabled</name>
       <value>true</value>
</property>
<property>
      <name>dfs.permissions</name>
      <value>false</value>
</property>
<property> 
       <name>dfs.client.failover.proxy.provider.cluster</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
</configuration>

8.4 修改mapred-site.xml

<configuration>
        <!-- 指定mr框架为yarn方式 -->
        <property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>
        <!-- 配置 MapReduce JobHistory Server 地址 ,默认端口10020 -->
        <property>
                <name>mapreduce.jobhistory.address</name>
                <value>master:10020</value>
        </property>
        <!-- 配置 MapReduce JobHistory Server web ui 地址, 默认端口19888 -->
        <property>
                <name>mapreduce.jobhistory.webapp.address</name>
                <value>master:19888</value>
        </property>
        <property>
                <name>mapreduce.application.classpath</name>             			<value>/usr/hadoop321/share/hadoop/mapreduce/*,/usr/hadoop321/share/hadoop/mapreduce/lib/*</value>
        </property>
</configuration>

8.5 修改yarn-site.xml

<configuration>
        <!-- 开启RM高可用 -->
        <property>
                <name>yarn.resourcemanager.ha.enabled</name>
                <value>true</value>
        </property>
        <!-- 指定RM的cluster id -->
        <property>
                <name>yarn.resourcemanager.cluster-id</name>
                <value>yrc</value>
        </property>
        <!-- 指定RM的名字 -->
        <property>
                <name>yarn.resourcemanager.ha.rm-ids</name>
                <value>rm1,rm2</value>
        </property>
        <!-- 分别指定RM的地址 -->
        <property>
                <name>yarn.resourcemanager.hostname.rm1</name>
                <value>master</value>
        </property>
        <property>
                <name>yarn.resourcemanager.hostname.rm2</name>
                <value>slave1</value>
        </property>
		<!-- RM对外暴露的web http地址,用户可通过该地址在浏览器中查看集群信息 -->
        <property>
                <name>yarn.resourcemanager.webapp.address.rm1</name>
                <value>master:8088</value>
        </property>
        <property>
                <name>yarn.resourcemanager.webapp.address.rm2</name>
                <value>slave1:8088</value>
        </property>
        <!-- 指定zookeeper集群地址 -->
        <property>
                <name>yarn.resourcemanager.zk-address</name>
                <value>master:2181</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>
        <property>
                <name>yarn.application.classpath</name>             	<value>/usr/hadoop321/etc/hadoop:/usr/hadoop321/share/hadoop/common/lib/*:/usr/hadoop321/share/hadoop/common/*:/usr/hadoop321/share/hadoop/hdfs:/usr/hadoop321/share/hadoop/hdfs/lib/*:/usr/hadoop321/share/hadoop/hdfs/*:/usr/hadoop321/share/hadoop/mapreduce/lib/*:/usr/hadoop321/share/hadoop/mapreduce/*:/usr/hadoop321/share/hadoop/yarn:/usr/hadoop321/share/hadoop/yarn/lib/*:/usr/hadoop321/share/hadoop/yarn/*</value>
        </property>
</configuration>

8.6 修改hadoop-env.sh

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_JOURNALNODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

8.7 设置多机互相免密登录

  1. 在各机器上生成密钥

    ssh-keygen

  2. 都把公钥放到authorized_keys文件中

    cat id_rsa.pub>>authorized_keys

    cat id_rsa.pub.s1>>authorized_keys

    cat id_rsa.pub.s2>>authorized_keys

  3. 发送到每台机器上

    scp authorized_keys [email protected]:/root/.ssh

    scp authorized_keys [email protected]:/root/.ssh

8.8 启动设置

  1. 三台机上都启动journalnode

    hdfs --daemon start journalnode

  2. 在master上:

    ​ hdfs namenode -format #格式化namenode
    ​ zkServer.sh start #启动zk
    ​ hdfs zkfc -formatZK #格式化zk
    ​ scp -r /root/hadoop [email protected]:/root #同步两个namenode
    ​ start-all.sh #启动服务

  3. jps查看进程,浏览器访问namenode

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_41200768/article/details/108233146

智能推荐

Android 中ellipsize属性学习_歌楼听雨品人生的博客-程序员宅基地

llipsize是处理当文字长度超过TextView可显示的长度的时候,系统处理方式,ellipsize主要有以下几种值:“start“:省略号显示在开头 “…789” “end“:省略号显示在结尾 “123…” “middle“:省略号显示在中间 “12…89” “marquee“:以横向滚动方式显示(需获得当前焦点时) “none“:不做任何处理(会将要显示的文字截断) TextVi

未明学院:用excel不好吗?为什么还要学python?_EnlightenAcademy的博客-程序员宅基地

原创:未明学院在互联网管理、金融、物流等领域,往往离不开数据处理、统计分析等辅助决策的操作。传统的商业分析(Business Analysis),定性占比很大,以相对简单的数据处理为辅助,人们使用的分析工具主要是Excel;然而,自Excel2007版起,最大支持的工作表大小为16,384 列 × 1,048,576 行,超出最大行列数单元格中的数据将会丢失。在大数据背景的...

Python爬取人民网夜读文案_忆想不到的晖的博客-程序员宅基地

爬取高清晚安图片,给朋友道声晚安,好梦!

有向无环图的最小路径点覆盖_DarksideCoder的博客-程序员宅基地

有向无环图的最小路径点覆盖问题解法1.1.1. 拆点二分图定义2.2.2. 定理1113.3.3. 拓展问题问题给定一张有向无环图 ,,, 要求用尽量少的不相交的简单路径((( 不是有向边 ))) ,,, 覆盖有向无环图的所有顶点 ((( 也就是每个顶点恰好被覆盖一次 ).).). 这个问题被称为有向无环图的最小路径点覆盖...解法1.1.1. 拆点二分图定义设原来的有向无环图为 G=(V,E),n=∣V∣.G=(V,E),n=|V|.G=(V,E),n=∣V∣. 把 GGG 中的每一个 xx

数百万台车联网设备同时在线 0 故障,中瑞集团的云原生探索之路_阿里云开发者的博客-程序员宅基地

简介: 在保持对业界趋势调度关注的同时,始终选用最适合自身的技术,这可能是中瑞能在车联网领域引领行业的重要原因之一,正如中瑞CTO所说“阿里云云原生产品体系带给我们的,不是单纯的IT工具,而是整个团队战斗力的提升”。作者 | 山猎中瑞集团成立于2011年,是一家青岛本土的物联网独角兽企业。中瑞集团致力于利用物联网和人工智能技术,融合智慧交通、智慧城管、智慧出行、智慧物流、智慧风控、智慧审计、智慧车险、智慧校园、智慧零售等业务场景,为数万家政府和企业客户提供资产数字化管理解决方案。自2015年以来,集团业务营

post 携带参数过大 nginx报错_季骏杰的博客-程序员宅基地

nginx出现 “414 request-uri too large”nginx出现了“414 request-uri too large”错误。出现这种问题可以按照如下解决:在nginx的nginx.conf增加或修改如下参数的:client_header_buffer_size 512k; large_client_header_buffers 4 512k;...

随便推点

Pandas数据分析(十年期国债收益率 与 十年期国债期货价格的相关性)_班克o的博客-程序员宅基地

一、Pandas数据类型介绍Pandas基于numpy实现。与numpy相比,numpy更关注数据的组织,而pandas则更关注数据的表达和索引。Pandas包括两种数据类型:Series和DataFrame其中,Series是具有相同索引的一组数据的表示。而DataFrame是具有相同索引的多组数据的表示,各组数据具有不同的含义。#Series类型创建dat = pd.Series(np.arange(5)+1)print(dat)0 11 22 33 44

Java NIO之通道Channel分析_DreamTech1113的博客-程序员宅基地

目录简介文件通道FileChannelSocket通道其他内容                                                    ​案例总结简介通道Channel是NIO里面的一个创新点,用于缓冲区和文件或者套接字之间的数据传输。通道Channel的继承体系相对比较复杂,主要在java.nio.channels,部分的channe...

python ioc di_轻松理解 Java开发中的依赖注入(DI)和控制反转(IOC)_weixin_39652760的博客-程序员宅基地

关于这个话题, 网上有很多文章,这里, 我希望通过最简单的话语与大家分享.依赖注入和控制反转两个概念让很多初学这迷惑, 觉得玄之又玄,高深莫测.这里想先说明两点:依赖注入和控制反转不是高级的,很初级,也很简单.在JAVA世界,这两个概念像空气一样无所不在,彻底理解很有必要.第一节 依赖注入 Dependency injection这里通过一个简单的案例来说明.在公司里有一个常见的案例: "把任务指...

hadoop2.2.0集群搭建 centos6.4 32位_chen820655096的博客-程序员宅基地

centos6.4 32位 hadoop2.2.0 搭建参考传智播客hadoop1.0的视频以及《Hadoop 技术内幕深入解析 YARN 架构设计与实现原理》中的配置和网友帖子环境:VMware 10,jdk:jdk-6u24-linux-i586 centos6.4 32位  hadoop2.2.0在搭建的过程中遇到很多问题1.SSH免登陆,参考传智播客的视频做

Android应用篇 - block 如何分析_Tony-老师的博客-程序员宅基地_android block

今天来聊聊卡顿的问题,除了内存泄漏,页面的流畅度也非常重要。我目前使用的最流畅的 app 就是 Telegram 了。那么在 Android 中如何检测卡顿呢?目录:卡顿的原因 BlockCannary StrictMode1.卡顿的原因我总结了以下几点:在主线程处理耗时任务,比如处理 IO,操作数据库,数据计算等。 在主线程进行网络请求,当然在 A...

深入理解移动前端开发之viewport_web_blog的博客-程序员宅基地

在移动设备上进行网页的重构或开发,首先得搞明白的就是移动设备上的viewport了,只有明白了viewport的概念以及弄清楚了跟viewport有关的meta标签的使用,才能更好地让我们的网页适配或响应各种不同分辨率的移动设备。一、viewport的概念通俗的讲,移动设备上的viewport就是设备的屏幕上能用来显示我们的网页的那一块区域,在具体一点,就是浏览器上(也可能是一个app中的webview)用来显示网页的那部分区域,但viewport又不局限于浏览器可视区域的大小,它可能比浏览器的可视区域

推荐文章

热门文章

相关标签