[logstash] Mysql DB에서 입력받아 Elasticsearch로 보내기
Posted on: 2018년 5월 15일 /
logstash는 실시간 파이프라인 기능을 가진 오픈소스 데이터 수집 엔진이다.
자세한 설명은 아래의 공식 문서를 참조하자.
https://www.elastic.co/guide/en/logstash/current/introduction.html
간단히 말해서 데이터를 수집 해서 원하는 형태로 가공하여 출력해주는 도구이다.
따라서 입력과 필터 그리고 출력을 어떤형태로 할것인지 기술해놓은 아래와 같은 설정파일이 필요하다.
# This is a comment. You should use comments to describe
# parts of your configuration.
input {
...
}
filter {
...
}
output {
...
}
위의 설정 중에 입력과 출력은 필수이며 필터는 옵션이다.
logstash는 다양한 입출력을 plugin을 통해 지원한다.
플러그인을 따로 설치해줄 필요는 없으며 설정파일에 기술하는 것으로 사용이 가능하다.
주로 아파치 같은 웹서버의 로그를 분석하는데에 많이 쓰이는 것 같다.
내 경우엔 Mysql DB에서 데이터를 뽑아 elasticsearch에 넣어줄 일이 생겨 필요한 설정들을 모아봤다.
우선 Input plugin으로 JDBC 플러그인 설정을 아래와 같이 해주었다.
input {
jdbc {
jdbc_driver_library => "{path_to_driver_file}/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.driver"
jdbc_connection_string => "jdbc:mysql://{database_host}/{database}"
jdbc_user => "{username}"
jdbc_password => "{password}"
statement_filepath => "{path_to_sql}/song.sql"
}
}
그 다음 Output plugin으로 elasticsearch 설정을 아래와 같이 해주었다.
output {
elasticsearch {
hosts => "localhost:9200"
index => "{index}"
codec => "json"
document_type => "{document_type}"
document_id => "%{idx}"
}
}
위의 설정들로 끝났으면 좋았겠지만 아쉽게도 문제가 있었다.
sql로 가져와야 하는 데이터는 join을 해서 가져오는 데이터였고 그렇기 때문에 elasticsearch에 하나의 index로 넣어주기 위해서는 데이터를 합쳐주는 작업이 필요했다.
즉 쿼리로 가져온 아래와 같은 데이터를
아래의 포맷으로 합쳐주는 작업이 필요했다.
해당 작업은 Filter plugin 중 하나인 aggregate plugin 을 사용하였다.
filter {
aggregate {
task_id => "%{idx}"
code => "
map['idx'] = event.get('idx')
map['songName'] = event.get('song_name')
map['singerName'] = event.get('singer_name')
map['regTime'] = event.get('reg_time')
map['songNames'] ||= []
if !(map['songNames'].include? event.get('song_name') )
map['songNames'] << event.get('song_name')
end
map['singerNames'] ||= []
if !(map['singerNames'].include? event.get('singer_name') )
map['singerNames'] << event.get('singer_name')
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
mutate {
remove_field => ["songName","singerName"]
}
}
어느 언어의 문법을 차용한 것인지는 모르겠다. 그냥 구글링해서 찾아 내 경우에 맞게 수정한 것이다.
참고로 ||= 이부분이 새 array를 만드는 문법인거같은데 || = 이렇게 두 문자 사이에 공백이 있으면 에러가 난다.
혹시 어디에서 나온 문법인지 아시는분 댓글좀 부탁드립니다. 꾸벅
그리하여 아래와 같은 최종 설정파일이 만들어졌다.
input {
jdbc {
jdbc_driver_library => "{path_to_driver_file}/mysql-connector-java-5.1.46-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.driver"
jdbc_connection_string => "jdbc:mysql://{database_host}/{database}"
jdbc_user => "{username}"
jdbc_password => "{password}"
statement_filepath => "{path_to_sql}/song.sql"
}
}
filter {
aggregate {
task_id => "%{idx}"
code => "
map['idx'] = event.get('idx')
map['songName'] = event.get('song_name')
map['singerName'] = event.get('singer_name')
map['regTime'] = event.get('reg_time')
map['songNames'] ||= []
if !(map['songNames'].include? event.get('song_name') )
map['songNames'] << event.get('song_name')
end
map['singerNames'] ||= []
if !(map['singerNames'].include? event.get('singer_name') )
map['singerNames'] << event.get('singer_name')
end
event.cancel()
"
push_previous_map_as_event => true
timeout => 5
}
mutate {
remove_field => ["songName","singerName"]
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "{index}"
codec => "json"
document_type => "{document_type}"
document_id => "%{idx}"
}
}
위와 같이 작성된 설정파일을 가지고 logstash -f /config-dir-path/final.conf 를 실행해 주니 원하는 대로 데이터를 잘 옮겨 준다.