Hadoop日志采集综合案例
需求分析
 
- 
定时采集已滚动完毕日志文件,即类似access.log.1和access.log.2的文件,access.log文件不采集。 
- 
将待采集文件转移到临时目录,临时目录再上传到hdfs目录。 
- 
临时目录中的日志文件上传到hdfs目录后转移到备份目录,如此,方便直接服务器寻找日志文件,不用访问hdfs目录。 
代码实现
- 
idea创建一个maven工程,工程名为collect_log,pom文件加入以下依赖。 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39<dependencies> 
 <!-- 单元测试junit依赖-->
 <dependency>
 <groupId>junit</groupId>
 <artifactId>junit</artifactId>
 <version>RELEASE</version>
 </dependency>
 <!-- log4j 打印日志依赖-->
 <dependency>
 <groupId>org.apache.logging.log4j</groupId>
 <artifactId>log4j-core</artifactId>
 <version>2.8.2</version>
 </dependency>
 <!-- hadoop-common依赖,以下为maven远程仓库的所在地址,选择和安装的hadoop版本一致-->
 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-common</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!-- hadoop-client依赖,以下为maven远程仓库的所在地址,选择和安装的hadoop版本一致-->
 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-client</artifactId>
 <version>2.9.2</version>
 </dependency>
 <!-- hadoop-hdfs依赖,以下为maven远程仓库的所在地址,选择和安装的hadoop版本一致-->
 <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-hdfs -->
 <dependency>
 <groupId>org.apache.hadoop</groupId>
 <artifactId>hadoop-hdfs</artifactId>
 <version>2.9.2</version>
 </dependency>
 </dependencies>
- 
创建LogCollector类,main方法执行定时器,定时调度采集任务。 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18package com.lagou.collect; 
 import java.util.Timer;
 public class LogCollector {
 /*
 - 定时采集已滚动完毕日志文件
 - 将待采集文件上传到临时目录
 - 备份日志文件
 */
 public static void main(String[] args) {
 //获取定时器对象
 Timer timer = new Timer();
 //定时采集任务的调度
 //task:采集的业务逻辑,delay:延迟时间,period:周期时间
 timer.schedule(new LogCollectorTask(), 0, 3600*1000);
 }
 }
- 
创建采集任务LogCollectorTask类继承定时任务TimerTask类。 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80package com.lagou.collect; 
 import com.lagou.common.Constant;
 import com.lagou.singlton.PropTool2;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.Properties;
 import java.util.TimerTask;
 public class LogCollectorTask extends TimerTask {
 public void run() {
 //获取resource下配置文件对象
 Properties prop = null;
 try {
 prop = PropTool2.getProp();
 } catch (IOException e) {
 e.printStackTrace();
 }
 //获取当前系统时间的年-月-日格式的字符串
 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
 String todayStr = sdf.format(new Date());
 //采集的业务逻辑
 // 1 扫描指定目录,找到待上传文件,原始日志目录
 File logsDir = new File(prop.getProperty(Constant.LOGS_DIR));
 final String log_prefix = prop.getProperty(Constant.LOG_PREFIX);
 File[] uploadFiles = logsDir.listFiles(new FilenameFilter() {
 public boolean accept(File dir, String name) {
 return name.startsWith(log_prefix);
 }
 });
 //2 把待上传文件转移到临时目录
 //判断临时目录是否存在,不存在就创建
 File tmpFile = new File(prop.getProperty(Constant.LOG_TMP_FOLDER));
 if (!tmpFile.exists()) {
 tmpFile.mkdirs();
 }
 //循环扫描到的文件集合,转移文件到临时目录
 for (File file : uploadFiles) {
 file.renameTo(new File(tmpFile.getPath() + "/" + file.getName()));
 }
 //3 使用hdfs api上传日志文件到指定目录
 //根据hadoop集群Configuration对象获取FileSystem对象
 Configuration conf = new Configuration();
 conf.set("fs.defaultFS", "hdfs://linux121:9000");
 FileSystem fs = null;
 try {
 fs = FileSystem.get(conf);
 //判断hdfs目标路径是否存在,备份目录是否存在
 Path path = new Path(prop.getProperty(Constant.HDFS_TARGET_FOLDER) + todayStr);
 if (!fs.exists(path)) {
 fs.mkdirs(path);
 }
 File bakFolder = new File(prop.getProperty(Constant.BAK_FOLDER) + todayStr);
 if (!bakFolder.exists()) {
 bakFolder.mkdirs();
 }
 //扫描临时目录,获取待上传文件集合
 File[] files = tmpFile.listFiles();
 for (File file : files) {
 //按照日期分门别列存放
 fs.copyFromLocalFile(new Path(file.getPath()), new Path(prop.getProperty(Constant.HDFS_TARGET_FOLDER) + todayStr + "/" + file.getName()));
 //4 上传后的临时目录中文件转移到备份目录中
 file.renameTo(new File(bakFolder.getPath() + "/" + file.getName()));
 }
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 }
- 
创建Constant类和collector.properties文件,存储常量配置。 1 
 2
 3
 4
 5
 6
 7
 8
 9package com.lagou.common; 
 public class Constant {
 public static final String LOGS_DIR="LOGS.DIR";
 public static final String LOG_PREFIX="LOG.PREFIX";
 public static final String LOG_TMP_FOLDER="LOG.TMP.FOLDER";
 public static final String HDFS_TARGET_FOLDER="HDFS.TARGET.FOLDER";
 public static final String BAK_FOLDER="BAK.FOLDER";
 }1 
 2
 3
 4
 5LOGS.DIR=e:/logs/ 
 LOG.PREFIX=access.log.
 LOG.TMP.FOLDER=e:/log_tmp/
 HDFS.TARGET.FOLDER=/collect_log/
 BAK.FOLDER=e:/log_bak/
- 
创建工具类PropTool和PropTool2,读取.properties配置文件。 - 工具类PropTool使用饿汉式加载获取配置文件对象,不管是否需要读取配置文件。
 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25package com.lagou.singlton; 
 import com.lagou.collect.LogCollectorTask;
 import java.io.IOException;
 import java.util.Properties;
 public class PropTool {
 //类加载时初始化执行一次即可
 //使用静态代码块实现 饿汉式加载
 private static Properties prop = null;
 //不管是否使用,类加载时都会执行静态代码块
 static {
 prop=new Properties();
 try {
 prop.load(LogCollectorTask.class.getClassLoader().getResourceAsStream("collector.properties"));
 } catch (IOException e) {
 e.printStackTrace();
 }
 }
 public static Properties getProp(){
 return prop;
 }
 }- 需要读取配置文件时,使用工具类PropTool2的getProp方法获取配置文件对象。
 1 
 2
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28package com.lagou.singlton; 
 import com.lagou.collect.LogCollectorTask;
 import java.io.IOException;
 import java.util.Properties;
 public class PropTool2 {
 //volatile关键字是java中禁止指令重排序的关键字,保证有序性和可见性
 private static volatile Properties prop = null;
 //出现线程安全问题
 public static Properties getProp() throws IOException {
 //判断Properties是否已被创建
 if(prop == null){
 //多线程调用,判断同步锁是否已释放
 synchronized ("lock"){
 //同步锁已释放,判断Properties是否已被创建
 if(prop == null){
 prop = new Properties();
 prop.load(LogCollectorTask.class.getClassLoader()
 .getResourceAsStream("collector.properties"));
 }
 }
 }
 return prop;
 }
 }
- 
案例项目源码文件目录架构   
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 WeiJia_Rao!


