日志处理

场景描述:

分布式系统多台服务器产生日志

需要对日志进行实时分析 分析电商流程 下单转化

处理方案:

单台处理:

  1. 读取文件 逐行读取 处理, 记录已经处理的行数 下次跳过不用处理,处理剩余日志

    缺点:每次处理都需要从头读取文件效率无法保证

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func runAnalysisLog(taskId int, fileName string) error{
    task, err := models.TaskGetById(taskId)
    if err != nil {
    return err
    }

    operatedNum := task.OperatedNum

    file, _ := os.Open(fileName)
    fileScanner := bufio.NewScanner(file)
    lineCount := 1
    errCount := 0
    for fileScanner.Scan(){
    if lineCount > operatedNum{
    //业务逻辑...
    }
    lineCount++
    }

    defer file.Close()
    if errCount > 0 {
    return fmt.Errorf("exec job errorCount :" + string(errCount))
    }
    task.OperatedNum = lineCount
    task.Update()
    return nil
    }

  2. 读取文件 读取完成 转入另一个文件 每次只处理新进入的日志信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    func runAnalysisLogTemp(taskId int, fileName string) error {

    twoFileName := strings.Split(fileName, "#@#")
    originalFileName := twoFileName[0]
    targetFileName := twoFileName[1]

    f, err := os.Open(originalFileName)
    if err != nil {
    return err
    }

    buf := bufio.NewReader(f)
    errCount := 0
    for {
    line, err := buf.ReadString('\n')
    line = strings.TrimSpace(line)
    //处理业务逻辑

    }


    copyAndAdd(originalFileName, targetFileName) //将源文件内容copy到新的文件中
    truncate(originalFileName, 0) //清除原文件

    return nil
    }

    func copyAndAdd(originalFileName string, targetFileName string) {
    // 打开原始文件
    bytes, err := ioutil.ReadFile(originalFileName)
    if err != nil {
    log.Fatal(err)
    }

    // 创建新的文件作为目标文件
    var newFile *os.File
    fi, _ := os.Stat(targetFileName)
    if fi == nil {
    newFile, _ = os.Create(targetFileName) // 文件不存在就创建
    } else {
    newFile, _ = os.OpenFile(targetFileName, os.O_WRONLY|os.O_APPEND, 0666) // 文件存在就打开
    newFile.Write([]byte("\n"))
    }

    //defer newFile.Close()
    // 从源中复制字节到目标文件
    _, error := newFile.Write(bytes)
    if error != nil {
    log.Fatal(error)
    }

    // 将文件内容flush到硬盘中
    err = newFile.Sync()
    if err != nil {
    log.Fatal(err)
    }
    }

    func truncate(fileName string, size int64) {
    // 裁剪一个文件到100个字节。
    // 如果文件本来就少于100个字节,则文件中原始内容得以保留,剩余的字节以null字节填充。
    // 如果文件本来超过100个字节,则超过的字节会被抛弃。
    // 这样我们总是得到精确的100个字节的文件。
    // 传入0则会清空文件。
    err := os.Truncate(fileName, size)
    if err != nil {
    log.Fatal(err)
    }
    }

分布式处理:

  1. 将日志信息存入redis中 (java处理 使用filter将需要的日志信息 输出到.log文件中 不再赘述)

  2. go脚本 定时执行处理(比如一分钟一次)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    func runRedisAnalysisLog(taskId int) error{
    c, err := redis.Dial("tcp", "redis地址:端口")
    if err != nil {
    fmt.Println("Connect to redis error", err)
    return err
    }

    if _, err := c.Do("AUTH", "密码(如果有的话)"); err != nil {
    c.Close()
    return err
    }

    l, err := redis.Int(c.Do("LLEN", "key"))
    if err != nil {
    fmt.Println("redis set failed:", err)
    }

    for i := 0; i < l; i += 1 {
    logText, err := redis.Values(c.Do("BRPOP", "key", 100))
    if err != nil {
    fmt.Println("redis set failed:", err)
    }

    split := strings.Split(string(logText[1].([]byte)), "#@#")

    //处理业务逻辑...

    }

    defer c.Close()
    return nil
    }

有不足之处还望不吝赐教 欢迎关注

未经作者允许 请勿转载,谢谢 :)

lemon wechat
欢迎大家关注我的订阅号 SeeMoonUp
写的不错?鼓励一下?不差钱?