场景描述:
分布式系统多台服务器产生日志
需要对日志进行实时分析 分析电商流程 下单转化
处理方案:
单台处理:
读取文件 逐行读取 处理, 记录已经处理的行数 下次跳过不用处理,处理剩余日志
缺点:每次处理都需要从头读取文件效率无法保证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func runAnalysisLog(taskId int, fileName string) error{
task, err := models.TaskGetById(taskId)
if err != nil {
return err
}
operatedNum := task.OperatedNum
file, _ := os.Open(fileName)
fileScanner := bufio.NewScanner(file)
lineCount := 1
errCount := 0
for fileScanner.Scan(){
if lineCount > operatedNum{
//业务逻辑...
}
lineCount++
}
defer file.Close()
if errCount > 0 {
return fmt.Errorf("exec job errorCount :" + string(errCount))
}
task.OperatedNum = lineCount
task.Update()
return nil
}
读取文件 读取完成 转入另一个文件 每次只处理新进入的日志信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69func runAnalysisLogTemp(taskId int, fileName string) error {
twoFileName := strings.Split(fileName, "#@#")
originalFileName := twoFileName[0]
targetFileName := twoFileName[1]
f, err := os.Open(originalFileName)
if err != nil {
return err
}
buf := bufio.NewReader(f)
errCount := 0
for {
line, err := buf.ReadString('\n')
line = strings.TrimSpace(line)
//处理业务逻辑
}
copyAndAdd(originalFileName, targetFileName) //将源文件内容copy到新的文件中
truncate(originalFileName, 0) //清除原文件
return nil
}
func copyAndAdd(originalFileName string, targetFileName string) {
// 打开原始文件
bytes, err := ioutil.ReadFile(originalFileName)
if err != nil {
log.Fatal(err)
}
// 创建新的文件作为目标文件
var newFile *os.File
fi, _ := os.Stat(targetFileName)
if fi == nil {
newFile, _ = os.Create(targetFileName) // 文件不存在就创建
} else {
newFile, _ = os.OpenFile(targetFileName, os.O_WRONLY|os.O_APPEND, 0666) // 文件存在就打开
newFile.Write([]byte("\n"))
}
//defer newFile.Close()
// 从源中复制字节到目标文件
_, error := newFile.Write(bytes)
if error != nil {
log.Fatal(error)
}
// 将文件内容flush到硬盘中
err = newFile.Sync()
if err != nil {
log.Fatal(err)
}
}
func truncate(fileName string, size int64) {
// 裁剪一个文件到100个字节。
// 如果文件本来就少于100个字节,则文件中原始内容得以保留,剩余的字节以null字节填充。
// 如果文件本来超过100个字节,则超过的字节会被抛弃。
// 这样我们总是得到精确的100个字节的文件。
// 传入0则会清空文件。
err := os.Truncate(fileName, size)
if err != nil {
log.Fatal(err)
}
}
分布式处理:
将日志信息存入redis中 (java处理 使用filter将需要的日志信息 输出到.log文件中 不再赘述)
go脚本 定时执行处理(比如一分钟一次)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32func runRedisAnalysisLog(taskId int) error{
c, err := redis.Dial("tcp", "redis地址:端口")
if err != nil {
fmt.Println("Connect to redis error", err)
return err
}
if _, err := c.Do("AUTH", "密码(如果有的话)"); err != nil {
c.Close()
return err
}
l, err := redis.Int(c.Do("LLEN", "key"))
if err != nil {
fmt.Println("redis set failed:", err)
}
for i := 0; i < l; i += 1 {
logText, err := redis.Values(c.Do("BRPOP", "key", 100))
if err != nil {
fmt.Println("redis set failed:", err)
}
split := strings.Split(string(logText[1].([]byte)), "#@#")
//处理业务逻辑...
}
defer c.Close()
return nil
}
有不足之处还望不吝赐教 欢迎关注
未经作者允许 请勿转载,谢谢 :)