`
乡里伢崽
  • 浏览: 108900 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

hdfs 的分布式缓存

    博客分类:
  • hdfs
阅读更多
DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。。它具有以下几个特点:缓存的文件是只读的,修改这些文件内容没有意义;用户可以调整文件可见范围(比如只能用户自己使用,所有用户都可以使用等),进而防止重复拷贝现象;按需拷贝,文件是通过HDFS作为共享数据中心分发到各节点的,且只发给任务被调度到的节点。

DistributeCache的命令方式:
  •   (1)-files:将指定的本地/hdfs文件分发到各个Task的工作目录下,不对文件进行任何处理;
  •   (2)-archives:将指定文件分发到各个Task的工作目录下,并对名称后缀为“.jar”、“.zip”,“.tar.gz”、“.tgz”的文件自动解压,默认情况下,解压后的内容存放到工作目录下名称为解压前文件名的目录中,比如压缩包为dict.zip,则解压后内容存放到目录dict.zip中。为此,你可以给文件起个别名/软链接,比如dict.zip#dict,这样,压  缩包会被解压到目录dict中。
  •   (3)-libjars:指定待分发的jar包,Hadoop将这些jar包分发到各个节点上后,会将其自动添加到任务的CLASSPATH环境变量中。


自己在写MAR/REDUCE代码时,遇到了一个问题,一个大数据文件和一个小数据文件匹配计算,但是小数据文件太小,所以想采用HIVE的MAP JOIN的方式,把小数据文件放到直接大数据文件map的datanode的内存中,这样少了MR代码的1对N的数据文件关联。

实现这个的最佳方案就是利用distributed cache。HIVE的MAP JOIN也是利用这个技术。

首先简要介绍一下distributed cache是如何使用的,然后总结下自己在使用distributed cache遇到的问题,这些问题网上也有人遇到,但是没有给出明确的解释。希望能够帮助同样遇到此类问题的朋友。

distributed cache至少有如下的两类类应用:

  • MAP、REDUCE本身和之间共享的较大数据量的数据
  • 布置第三方JAR包,可以避免集群的删减导致部分依赖的机器的JAR包的丢失



distributed cache使用的流程总结如下:

  • 1.在HDFS上准备好要共享的数据(text、archive、jar)
  • 2.在distributed cache中添加文件
  • 3.在mapper或者reducer类中获取数据
  • 4.在map或者reduce函数中使用数据


1.数据本来就在HDFS上,所以省去了流程中的第一步
可以使用hadoop fs -copyFromLocal把本地文件cp到HDFS上
2.在distributed cache中添加文件

  public static void disCache(String dimDir, JobConf conf) throws IOException {  
           FileSystem fs = FileSystem.get(URI.create(dimDir), conf);  
           FileStatus[] fileDir = fs.listStatus(new Path(dimDir));  
           for (FileStatus file : fileDir) {  
                DistributedCache.addCacheFile(URI.create(file.getPath().toString()), conf);  
           }  
}  


因为我利用的数据是HIVE脚本生成的,所以无法指定具体的文件路径,采用这种方式把一张HIVE表的所有数据都加载到cache中。如果能直接明确知道文件名称就简单很多了,例如:

DistributedCache.addCacheFile(URI.create(“/mytestfile/file.txt”), conf);


3.在mapper或者reducer中获取数据

  public void configure(JobConf job) {  
          try {  
               urlFile = DistributedCache.getLocalCacheFiles(job);  
               seoUrlFile = "file://" + urlFile[0].toString();  
               allUrlFile = "file://" + urlFile[1].toString();  
               FileReader reader = new FileReader(seoUrlFile );  
               BufferedReader br = new BufferedReader(reader);  
               System.out.println("this is OK");  
               String s1 = null;  
      
               int i=0;  
  
               while((s1 = br.readLine())!=null){  
               String[] word = s1.split("\\|");  
  
               //do something you want  
               }  
    }  
  
    br.close();  
    reader.close();                            
          } catch (IOException e) {  
                     // TODO Auto-generated catch block  
               e.printStackTrace();  
           }  
}  


   我在流程2中添加了两个文件,通过DistributedCache.getLocalCacheFiles(job)获取Path[],针对不同文件调用getData,把文件数据放在不同的List中seoUrlDim,allUrlDim
     如此一来,distributed cache的过程就结束了,接下来就在map()或者reduce()中使用这些数据就OK了。

4.在map或者reduce函数中使用数据
     一定要区分流程3和4,3是获取数据,4是使用数据。我就是在前期没弄明白这个的差异,导致内存溢出。

虽说好像挺简单的,但是在实现这个代码的过程中有如下几个问题困扰了我好久,网上也没找到很好的解决方案,后来在兄弟的帮助下搞定

  • 1.FileNotFoundException
  • 2.java.lang.OutOfMemoryError: GC overhead limit exceeded


1.FileNotFoundException
     这个问题涉及到DistributedCache.getLocalCacheFiles(job) 这个函数,此函数返回的Path[]是执行map或者reduce的datanode的本地文件系统中路径,但是我在getData中利用的SequenceFile.Reader的默认filesystem是hdfs,这就导致获取数据时是从hdfs上找文件,但是这个文件是在本地文件系统中,所以就会报出这个错误FileNotFoundException。
     例如,DistributedCache.getLocalCacheFiles(job)返回的PATH路径是:/home/dwapp/hugh.wangp/mytestfile/file.txt,在默认文件系统是hdfs时,获取数据时会读hdfs://hdpnn:9000/home/dwapp/hugh.wangp/mytestfile/file.txt,但是我们是在本地文件系统,为了避免数据获取函数选择"错误"的文件系统,我们在/home/dwapp/hugh.wangp/mytestfile/file.txt前加上"file://",这样就会从本地文件系统中读取数据了。就像我例子中的seoUrlFile = "file://" + urlFile[0].toString();

2.java.lang.OutOfMemoryError: GC overhead limit exceeded
     这个问题是我在没搞明白如何使用distributed cache时犯下的错误,也就是我没弄明白流程3和4的区别。我把流程3中说的获取数据的过程放在map函数中,而在map函数中其实是使用数据的过程。这个错误因为使每用一个map就要获取一下数据,也就是初始化一个list容器,使一个datanode上起N个map,就要获取N个list容器,内容溢出也就是自然而然的事情了。
     我们一定要把获取数据的过程放在mapper或reducer类的configure()函数中,这样对应一个datanode就只有一份数据,N个map可以共享着一份数据。


内容单薄,望志同道合之士互相学习。

原文:http://hugh-wangp.iteye.com/blog/1468989
分享到:
评论

相关推荐

    Fourinone分布式计算框架

    开发者可以自由控制调度过程,比如按照“农民工”的数量将源数据切分成多少份,然后远程分配给“农民工”节点进行计算处理,它处理完的中间结果数据不限制保存在hdfs里,而可以自由控制保存在分布式缓存、数据库、...

    Hadoop 分布式存储系统 HDFS的实例详解

    HDFS是Hadoop Distribute File System 的简称,也就是Hadoop的一个分布式文件系统。这篇文章主要介绍了Hadoop 分布式存储系统 HDFS,需要的朋友可以参考下

    网站架构技术

    分布式缓存 缓存产品 redis 业界主流 memcached 解决问题 数据库访问 使用应用服务器集群改善网站的并发处理能力 问题: 负载均衡情况下session状态的保持? 解决方案: 基于...

    论文研究-HDFS数据节点本地缓存的设计与实现 .pdf

    HDFS数据节点本地缓存的设计与实现,赵婧,王洪波,随着互联网应用的不断丰富和网络数据的急剧增长,海量数据的处理与存储已成为当前互联网应用中的最主要问题之一。Hadoop分布式文件

    Fourinone分布式并行计算四合一框架

    开发者可以自由控制调度过程,比如按照“农民工”的数量将源数据切分成多少份,然后远程分配给“农民工”节点进行计算处理,它处理完的中间结果数据不限制保存在hdfs里,而可以自由控制保存在分布式缓存、数据库、...

    存储/缓存技术中的HDFS 的多安全级数据销毁机制设计(二)

    3 HDFS 多安全级数据销毁机制  针对HDFS 不能彻底删除用户存储的数据的缺陷,文中设计了HDFS 的多安全级数据销毁机制,以达到数据的安全销毁。  3. 1 数据销毁技术  目前的数据销毁技术大致可以分两类:硬销毁...

    x01-lang-java

    lang-java【技术体系】/ JAVA核心: 多线程并发编程、网络...大数据相关: Zookeeper集群(协调服务)、Kafka(高速数据管道)、HBase(列式数据库-列簇 & rowkey)、Hadoop(HDFS分布式文件存储)app-samplesjust sa

    使用扩展的HDFS框架在Hadoop中合并小文件的系统方法-研究论文

    它包含两个主要组件:HDFS(Hadoop分布式文件系统)和Map reduce,用于处理大量数据。 各种平台都可以生成大量的小文件。 处理小文件非常困难,因为从分布式系统中的块中搜索文件时会花费大量的查找和跳转。 本文的...

    Apache Flume-Hadoop分布式日志收集_第二版

    深入介绍Flume众多更加有用的组件的细节信息,包括用于即时数据记录持久化的重要的文件通道、用于缓存并将数据写到HDFS中的HDFS接收器,以及Hadoop分布式文件系统。对于Flume各个架构组件(源、通道、接收器、通道...

    fourinone-3.04.25

    开发者可以自由控制调度过程,比如按照“农民工”的数量将源数据切分成多少份,然后远程分配给“农民工”节点进行计算处理,它处理完的中间结果数据不限制保存在hdfs里,而可以自由控制保存在分布式缓存、数据库、...

    大型分布式网站架构与实践

     分布式缓存memcache的使用及分布式策略,包括Hash算法的选择。  常见的分布式系统存储解决方案,包括MySQL的分布式扩展、HBase的API及使用场景、Redis的使用等。  如何使用分布式消息系统ActiveMQ来降低系统之间...

    大数据开源框架集锦.pdf

    Redis 开源的⽀持⽹络,基于内存可持久化⽇志,key-value数据库,可⽤于 数据库 缓存 消息中间件 Neo4j 开源⾼性能的NoSQL图形数据库 7 数据处理 MapReduce 分布式离线的计算框架 批处理 ⽇渐被spark和flink取代 ...

    hdp2WordCountOozie

    一个工作示例,展示了如何执行字数统计,使用YARN分布式缓存使用关键字过滤进行字数统计以及作为Oozie工作流进行编排。 ##样本数据集:MusixMatch(〜13.5mb) #以正常字数运行 ##独立MR使用说明: ###### 1)...

    存储/缓存技术中的基于HADOOP的数据挖掘平台分析与设计

    在HADOOP平台上,采用了HDFS(分布式文件系统)来实现超大文件的存储和容错,而使用了MapReduce的编程模式来进行计算。  一、数据挖掘技术概述  作为一门快速发展的技术,数据挖掘引起了信息产业界和社会的广泛...

    44 3 MyCat分布式架构

    3、一个可以视为MySQL集群的企业级数据库,用来替代昂贵的Oracle集群 4、一个融合内存缓存技术、NoSQL技术、HDFS大数据的新型SQL Server 5、结合传统数据库和新型分布式数据仓库的新一代企业级数据库产品 ...

    论文研究-基于集群规模调整的节能存储策略.pdf

    通过研究集群数据块访问规律,提出一种基于集群规模调整的Hadoop分布式文件系统(HDFS)节能存储策略,实现HDFS高效节能存储。策略主要在集群区域划分、数据块迁移策略优化、缓存机制等方面作出了改进。实验结果表明...

    flink入门到精通视频教程

    4.Flink 广播变量,分布式缓存,累加器 5.Flink Datastream开发 6.Flink Window操作 7.Flink watermark与侧道输出 8.Flink状态计算 9.Flink容错checkpoint与一致性语义 10.Flink进阶 异步IO,背压,内存管理 11....

    基于NoSQL的海量航空物流小文件分布式多级存储方法

    充分考虑到数据的时效性、本地性、操作的并发性以及文件之间的相关性,先根据相关性将文件合并,然后采用分布式多级存储,使用内存式Redis数据库做缓存,HDFS做数据的持久化存储,其过程采用预取机制。实验结果表明...

    HDFS及优化综述_朱紫钰_20192622781

    摘要:DHFS作为Hadoop分布式存储系统,具有高容错,高吞吐量,一致性的特点,但是对于小文件存储、缓存和通信方面还有许多可优化之处。小文件存储;缓存;通信优

Global site tag (gtag.js) - Google Analytics