# CentOS7安装LogStash

Logstash是一个开源的服务器端数据处理管道,可以同时从多个数据源获取数据,并对其进行转换,然后将其发送到你最喜欢的“存储”。(当然,我们最喜欢的是Elasticsearch) 在这里插入图片描述

# 1 下载logstash和mysql驱动包

下载地址

# 2 拷贝到指定目录,并解压

tar -xzvf logstash-6.4.3.tar.gz
1

# 3 安装 jdbc 和 elasticsearch 插件

#进入到logstash的bin目录下面执行安装插件 执行成功如下图
./logstash-plugin install logstash-input-jdbc
./logstash-plugin install logstash-output-elasticsearch
1
2
3

在这里插入图片描述

# 4 简单测试logstash-6.4.3

#进入到logstash的bin 命令行启动测试 启动成功如下图
./logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

#接着屏幕就等着你输入了,比如输入一个Hello World,会出现以下的提示语句。
1
2
3
4

在这里插入图片描述

# 5 拷贝mysql包到config目录下

由于logstash是ruby开发的,所以这里要下载mysql的连接库jar包,将下载好的mysql-connector-java-5.1.48.jar,放至logstash/config/目录下。

# 6 创建mysql配置文件

在config目录下,创建配置文件(logstash-mysql-es.conf):

input {
  jdbc {
    # mysql相关jdbc配置
    jdbc_connection_string => "jdbc:mysql://10.112.76.30:3306/jack_test?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"

    # jdbc连接mysql驱动的文件目录,可去官网下载:https://dev.mysql.com/downloads/connector/j/
    jdbc_driver_library => "./config/mysql-connector-java-5.1.48.jar"
    # mysql的驱动类名称
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
    jdbc_page_size => "50000"
	# 默认的时区
    jdbc_default_timezone =>"Asia/Shanghai"

    # mysql文件, 也可以直接写SQL语句在此处,如下:
    # statement => "select * from t_order where update_time >= :sql_last_value;"
	# 专门写sql语句的文件 配置时请注意路径 注:statement和statement_filepath不能同时放开
    statement_filepath => "./config/jdbc.sql"

    # 这里类似crontab,可以定制定时操作,比如每分钟执行一次同步(分 时 天 月 年)
    schedule => "* * * * *"
    #type => "jdbc"

    # 是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
    #record_last_run => true

    # 是否需要记录某个column 的值,如果record_last_run为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
    use_column_value => true

    # 如果 use_column_value 为真,需配置此参数. track 的数据库 column 名,该 column 必须是递增的. 一般是mysql主键
    tracking_column => "update_time"
    
    tracking_column_type => "timestamp"

    last_run_metadata_path => "./logstash_capital_bill_last_id"

    # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
    clean_run => false

    #是否将 字段(column) 名称转小写
    lowercase_column_names => false
  }
}

# hosts-->elasticsearch的主机地址
#index-->索引名称, document_id文档ID,一般对应mysql数据表的主键id字段

output {
  elasticsearch {
    hosts => "192.168.0.100:9200"
    index => "mysql_order"
    document_id => "%{id}"
    template_overwrite => true
  }

  # 这里输出调试,正式运行时可以注释掉
  stdout {
      codec => json_lines
  } 
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64

# 7 配置文件注意项

这里有几个注意点:
(1)jdbc_driver_library
    mysql-connector-java-5.1.48.jar的存放目录,这个一定要配置正确,支持全路径和相对路径。如果配置不对,将会报“can ”错误。
(2)sql_last_value
    标志目前logstash同步的位置信息(类似offset)。比如id、updatetime。logstash通过这个标志,可以判断目前同步到哪一条数据。
(3)statement、statement_filepath
    statement:执行同步的sql语句,可以同步部分数据。
    statement_filepath:存储执行同步的sql语句。不和statement同时使用。
(4)schedule
     定时器,表示每隔多长时间同步一次数据。格式类似crontab。
(5)tracking_column、tracking_column_type
    tracking_column:表示表中哪一列用于判断logstash同步的位置信息。与sql_last_value比较判断是否需要同步这条数据。
    tracking_column_type:racking_column指定列的类型。支持两种类型:numeric(默认)、timestamp。注意:如果列是时间字段(比如updateTime),一定要指定这个类型为timestamp。我就踩了这个大坑。。。一直同步不成功!!!
(6)last_run_metadata_path
     存储sql_last_value值的文件名称及位置。
(7)document_id
     生成elasticsearch的文档值,尽量使用同步的数据中已有的唯一标识。比如同步订单数据,可以使用订单号。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 8 启动logstash

#在根目录下,执行命令:
nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out &
1
2