[logstash] Mysql DB에서 입력받아 Elasticsearch로 보내기
logstash는 실시간 파이프라인 기능을 가진 오픈소스 데이터 수집 엔진이다.
자세한 설명은 아래의 공식 문서를 참조하자.
https://www.elastic.co/guide/en/logstash/current/introduction.html
간단히 말해서 데이터를 수집 해서 원하는 형태로 가공하여 출력해주는 도구이다.
따라서 입력과 필터 그리고 출력을 어떤형태로 할것인지 기술해놓은 아래와 같은 설정파일이 필요하다.
# This is a comment. You should use comments to describe # parts of your configuration. input { ... } filter { ... } output { ... }
위의 설정 중에 입력과 출력은 필수이며 필터는 옵션이다.
logstash는 다양한 입출력을 plugin을 통해 지원한다.
플러그인을 따로 설치해줄 필요는 없으며 설정파일에 기술하는 것으로 사용이 가능하다.
주로 아파치 같은 웹서버의 로그를 분석하는데에 많이 쓰이는 것 같다.
내 경우엔 Mysql DB에서 데이터를 뽑아 elasticsearch에 넣어줄 일이 생겨 필요한 설정들을 모아봤다.
우선 Input plugin으로 JDBC 플러그인 설정을 아래와 같이 해주었다.
input { jdbc { jdbc_driver_library => "{path_to_driver_file}/mysql-connector-java-5.1.46-bin.jar" jdbc_driver_class => "com.mysql.jdbc.driver" jdbc_connection_string => "jdbc:mysql://{database_host}/{database}" jdbc_user => "{username}" jdbc_password => "{password}" statement_filepath => "{path_to_sql}/song.sql" } }
그 다음 Output plugin으로 elasticsearch 설정을 아래와 같이 해주었다.
output { elasticsearch { hosts => "localhost:9200" index => "{index}" codec => "json" document_type => "{document_type}" document_id => "%{idx}" } }
위의 설정들로 끝났으면 좋았겠지만 아쉽게도 문제가 있었다.
sql로 가져와야 하는 데이터는 join을 해서 가져오는 데이터였고 그렇기 때문에 elasticsearch에 하나의 index로 넣어주기 위해서는 데이터를 합쳐주는 작업이 필요했다.
즉 쿼리로 가져온 아래와 같은 데이터를
아래의 포맷으로 합쳐주는 작업이 필요했다.
해당 작업은 Filter plugin 중 하나인 aggregate plugin 을 사용하였다.
filter { aggregate { task_id => "%{idx}" code => " map['idx'] = event.get('idx') map['songName'] = event.get('song_name') map['singerName'] = event.get('singer_name') map['regTime'] = event.get('reg_time') map['songNames'] ||= [] if !(map['songNames'].include? event.get('song_name') ) map['songNames'] << event.get('song_name') end map['singerNames'] ||= [] if !(map['singerNames'].include? event.get('singer_name') ) map['singerNames'] << event.get('singer_name') end event.cancel() " push_previous_map_as_event => true timeout => 5 } mutate { remove_field => ["songName","singerName"] } }
어느 언어의 문법을 차용한 것인지는 모르겠다. 그냥 구글링해서 찾아 내 경우에 맞게 수정한 것이다.
참고로 ||= 이부분이 새 array를 만드는 문법인거같은데 || = 이렇게 두 문자 사이에 공백이 있으면 에러가 난다.
혹시 어디에서 나온 문법인지 아시는분 댓글좀 부탁드립니다. 꾸벅
그리하여 아래와 같은 최종 설정파일이 만들어졌다.
input { jdbc { jdbc_driver_library => "{path_to_driver_file}/mysql-connector-java-5.1.46-bin.jar" jdbc_driver_class => "com.mysql.jdbc.driver" jdbc_connection_string => "jdbc:mysql://{database_host}/{database}" jdbc_user => "{username}" jdbc_password => "{password}" statement_filepath => "{path_to_sql}/song.sql" } } filter { aggregate { task_id => "%{idx}" code => " map['idx'] = event.get('idx') map['songName'] = event.get('song_name') map['singerName'] = event.get('singer_name') map['regTime'] = event.get('reg_time') map['songNames'] ||= [] if !(map['songNames'].include? event.get('song_name') ) map['songNames'] << event.get('song_name') end map['singerNames'] ||= [] if !(map['singerNames'].include? event.get('singer_name') ) map['singerNames'] << event.get('singer_name') end event.cancel() " push_previous_map_as_event => true timeout => 5 } mutate { remove_field => ["songName","singerName"] } } output { elasticsearch { hosts => "localhost:9200" index => "{index}" codec => "json" document_type => "{document_type}" document_id => "%{idx}" } }
위와 같이 작성된 설정파일을 가지고 logstash -f /config-dir-path/final.conf 를 실행해 주니 원하는 대로 데이터를 잘 옮겨 준다.