[logstash] Mysql DB에서 입력받아 Elasticsearch로 보내기

logstash는 실시간 파이프라인 기능을 가진 오픈소스 데이터 수집 엔진이다.

자세한 설명은 아래의 공식 문서를 참조하자.

https://www.elastic.co/guide/en/logstash/current/introduction.html

간단히 말해서 데이터를 수집 해서 원하는 형태로 가공하여 출력해주는 도구이다.

따라서 입력과 필터 그리고 출력을 어떤형태로 할것인지 기술해놓은 아래와 같은 설정파일이 필요하다.

# This is a comment. You should use comments to describe
# parts of your configuration.
input {
  ...
}

filter {
  ...
}

output {
  ...
}

위의 설정 중에 입력과 출력은 필수이며 필터는 옵션이다.

logstash는 다양한 입출력을 plugin을 통해 지원한다.

플러그인을 따로 설치해줄 필요는 없으며 설정파일에 기술하는 것으로 사용이 가능하다.

주로 아파치 같은 웹서버의 로그를 분석하는데에 많이 쓰이는 것 같다.

내 경우엔 Mysql DB에서 데이터를 뽑아 elasticsearch에 넣어줄 일이 생겨 필요한 설정들을 모아봤다.

우선 Input plugin으로 JDBC 플러그인 설정을 아래와 같이 해주었다.

input {
  jdbc {
    jdbc_driver_library => "{path_to_driver_file}/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.driver"
    jdbc_connection_string => "jdbc:mysql://{database_host}/{database}"
    jdbc_user => "{username}"
    jdbc_password => "{password}"
    statement_filepath => "{path_to_sql}/song.sql"
  }
}

그 다음 Output plugin으로 elasticsearch 설정을 아래와 같이 해주었다.

output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "{index}"
    codec => "json"
    document_type => "{document_type}"
    document_id => "%{idx}"
  }
}

위의 설정들로 끝났으면 좋았겠지만 아쉽게도 문제가 있었다.

sql로 가져와야 하는 데이터는 join을 해서 가져오는 데이터였고 그렇기 때문에 elasticsearch에 하나의 index로 넣어주기 위해서는 데이터를 합쳐주는 작업이 필요했다.

즉 쿼리로 가져온 아래와 같은 데이터를

아래의 포맷으로 합쳐주는 작업이 필요했다.

 

해당 작업은 Filter plugin 중 하나인 aggregate plugin 을 사용하였다.

filter {
  aggregate {
    task_id => "%{idx}"
    code => "
      map['idx'] = event.get('idx')
      map['songName'] = event.get('song_name')
      map['singerName'] = event.get('singer_name')
      map['regTime'] = event.get('reg_time')

      map['songNames'] ||= []

      if !(map['songNames'].include? event.get('song_name') )
        map['songNames'] << event.get('song_name')
      end

      map['singerNames'] ||= []

      if !(map['singerNames'].include? event.get('singer_name') )
        map['singerNames'] << event.get('singer_name')
      end

      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 5
  }
  mutate {
    remove_field => ["songName","singerName"]
  }
}

어느 언어의 문법을 차용한 것인지는 모르겠다. 그냥 구글링해서 찾아 내 경우에 맞게 수정한 것이다.

참고로 ||= 이부분이 새 array를 만드는 문법인거같은데 ||    = 이렇게 두 문자 사이에 공백이 있으면 에러가 난다.

혹시 어디에서 나온 문법인지 아시는분 댓글좀 부탁드립니다. 꾸벅

그리하여 아래와 같은 최종 설정파일이 만들어졌다.

input {
  jdbc {
    jdbc_driver_library => "{path_to_driver_file}/mysql-connector-java-5.1.46-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.driver"
    jdbc_connection_string => "jdbc:mysql://{database_host}/{database}"
    jdbc_user => "{username}"
    jdbc_password => "{password}"
    statement_filepath => "{path_to_sql}/song.sql"
  }
}

filter {
  aggregate {
    task_id => "%{idx}"
    code => "
      map['idx'] = event.get('idx')
      map['songName'] = event.get('song_name')
      map['singerName'] = event.get('singer_name')
      map['regTime'] = event.get('reg_time')

      map['songNames'] ||= []

      if !(map['songNames'].include? event.get('song_name') )
        map['songNames'] << event.get('song_name')
      end

      map['singerNames'] ||= []

      if !(map['singerNames'].include? event.get('singer_name') )
        map['singerNames'] << event.get('singer_name')
      end

      event.cancel()
    "
    push_previous_map_as_event => true
    timeout => 5
  }
  mutate {
    remove_field => ["songName","singerName"]
  }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "{index}"
    codec => "json"
    document_type => "{document_type}"
    document_id => "%{idx}"
  }
}

위와 같이 작성된 설정파일을 가지고 logstash -f /config-dir-path/final.conf 를 실행해 주니 원하는 대로 데이터를 잘 옮겨 준다.

답글 남기기

이메일 주소는 공개되지 않습니다. 필수 필드는 *로 표시됩니다