本文共 8823 字,大约阅读时间需要 29 分钟。
代码来自链接2,自己做了修改。水平有限,有不对的地方希望指出。hadoop3.2.1 centos7 window下编写代码,打包提交到centos上的hadoop集群运行。
思路: 把图片放到hdfs上,然后把每张待处理的图片路径放写在一个txt文本中。运行MR程序的时候,把这个txt文件作为输入传入,通过文件中的图片路径去找要处理的图片,达到处理图片的目的。txt文件的路径:/sst/mr/input/imgpath/srcimgpathcolor.txt
hdfs://你的namenode所在的主机ip:8020/sst/mr/image/srcimgcolor/1.jpghdfs://你的namenode所在的主机ip:8020/sst/mr/image/srcimgcolor/2.jpghdfs://你的namenode所在的主机ip:8020/sst/mr/image/srcimgcolor/3.jpg
代码的思路是:把通过路径读取的图片转化为SequenceFile,然后读取这个序列文件来对图片进行操作。
直接粘贴代码:
package com.createSequenceFile;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;import org.apache.hadoop.util.GenericOptionsParser;import java.io.ByteArrayOutputStream;import java.io.IOException;import java.io.OutputStream;import java.net.URI;import java.net.URISyntaxException;/** * So our map / reduce job will include only a map that will read one file at a time and write it to a sequence file by using SequenceFileOutputFormat. * It uses a FileSystem object in order to open the HDFS file and FSDataInputStream in order to read from it. * The byte array is being written to the context as a bytewritable. * Since the output format of the job is SequenceFileOutputFormat class, the output of the map is being written to a sequence file. **
* Target:generate sequence file (key:filename value:BytesWritable) */public class BinaryFilesToHadoopSequenceFile {
//private static Logger logger = Logger.getLogger(BinaryFilesToHadoopSequenceFile.class); private static Configuration conf = new Configuration(); private static URI urifs; private static FileSystem fs; private static OutputStream out; private static String hdfsUri = "hdfs://你的namenode所在主机ip:8020"; public static class BinaryFilesToHadoopSequenceFileMapper extends Mapper
把代码打包提交到hadoop集群运行。
ImageProj-1.0-SNAPSHOT.jar:从ide打包的工程的jar包,把它上传到linux上,该命令是在jar包所在的路径下输入的。通过在pom中使用maven-shade-plugin打包并且指定主类BinaryFilesToHadoopSequenceFile。
/sst/mr/input/imgpath/srcimgpathcolor.txt :图片路径txt文件在dfs上的路径(输入路径) /sst/mr/result_1 :生成的序列文件的路径(输出路径) 注意:result_1文件不能事先存在,不然会报错!!!hadoop jar ImageProj-1.0-SNAPSHOT.jar /sst/mr/input/imgpath/srcimgpathcolor.txt /sst/mr/result_1
执行下面的命令可以看到输出文件:
hdfs dfs -ls /sst/mr/result_1
part-r-00000这个文件就是我们需要的结果。
package com.readsuquencefile;import com.utils.ChangeImgFormt;import com.utils.ImageUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.BytesWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.awt.image.BufferedImage;import java.io.IOException;import java.io.OutputStream;import java.net.URI;import java.net.URISyntaxException;public class ReadeSequence2Img { private static Configuration conf = new Configuration(); private static URI uri; private static FileSystem fs; private static OutputStream out; private static String hdfsUri = "hdfs://your namenode IP:8020"; //map输入的key:图片的路径 value:图片的字节数组(从序列文件中读取) public static class Sequence2ImgMapper extends Mapper{ @Override protected void setup(Context context) throws IOException{ //获取FileSystem操作实例 try { uri = new URI(hdfsUri); } catch (URISyntaxException e) { e.printStackTrace(); } try { fs = FileSystem.get(uri, conf, "root");//root:获取文件系统实例时的用户 } catch (InterruptedException e) { e.printStackTrace(); } } @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { //获取图片字节流 byte[] b = value.getBytes(); //给每张图片都添加文字水印 BufferedImage srcBuff = ChangeImgFormt.bytes2bufImg(b); BufferedImage waterBuff = ImageUtils.waterMarkImage(srcBuff, "let me down"); byte[] byteWater = ChangeImgFormt.bufImg2Bytes(waterBuff); //通过图片路径获得当前处理的图片的名字 String imguri = key.toString(); String imgname = imguri.substring(imguri.lastIndexOf("/") + 1, imguri.lastIndexOf(".")); //把图片以jpg格式存到hdfs上 String file = hdfsUri + "/img3/" + imgname + "_result" + ".jpg"; out = fs.create(new Path(file)); out.write(byteWater, 0, byteWater.length); out.close(); //map输出:处理后的图片的存放路径 context.write(new Text(file), NullWritable.get()); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { conf = new Configuration(); Job job = Job.getInstance(); job.setJarByClass(Sequence2ImgMapper.class); job.setInputFormatClass(SequenceFileInputFormat.class);//map输入的文件是一个SequenceFile文件 job.setMapperClass(Sequence2ImgMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
ImageProj-1.0-SNAPSHOT.jar:主类和上一个步骤的jar包 不一样,该jar包中的主类是ReadeSequence2Img,通过maven-shade-plugin来打包指定的。
/sst/mr/result_1/part-r-00000:上一步中生成的序文件的路径 /sst/mr/readseqfile2image:输出文件。(存放的是处理过后的图片的路径)hadoop jar ImageProj-1.0-SNAPSHOT.jar /sst/mr/result_1/part-r-00000 /sst/mr/readseqfile2image
linux:
3、
转载地址:http://gslqf.baihongyu.com/