Yesterday I had to setup a Logstash pipeline reading data from a MariaDB database and inserting it into Elasticsearch. Since it took a couple of hours to figure this out, here is the configuration I found to got it working.
First, get the latest JDBC mariadb connectorJ libraries from mariadb.org. Once its downloaded, move the file to /usr/share/logstash/logstash-core/lib/jars. This is the most important step, because apparently the only way for logstash to actually load the jar, is when its in the same folder as its other jars.
wget https://downloads.mariadb.com/Connectors/java/connector-java-2.6.0/mariadb-java-client-2.6.0.jar mv mariadb-java-client-2.6.0.jar /usr/share/logstash/logstash-core/lib/jars chown logstash:logstash /usr/share/logstash/logstash-core/lib/jars/mariadb-java-client-2.6.0.jar
Second, add the a new pipeline configuration file in /etc/logstash/config.d.
input { jdbc { # database configuration jdbc_connection_string => "jdbc:mariadb://localhost:3306/database" jdbc_user => "mariadb_readonly_user" jdbc_password => "mariadb_readonly_password" jdbc_driver_library => "" jdbc_driver_class => "Java::org.mariadb.jdbc.Driver" jdbc_validate_connection => true jdbc_default_timezone => "America/Toronto" # query every 1 minutes schedule => "*/1 * * * *" statement => "SELECT oid, [...] FROM [...] WHERE date_changed > :sql_last_value LIMIT 10000" use_column_value => true tracking_column => "date_changed" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path => "/var/lib/logstash/last_run/mariadb_last_run" } } output { # debugging only # stdout { codec => json_lines } elasticsearch { hosts => ["localhost:9200"] index => "index-name" document_id => "%{oid}" } }
Since we moved the jar to the logstash core folder the jdbc_driver_library needs to be empty. Other than this, most of this file is pretty standard. The only other notable changes are the jdbc_default_timezone which is needed to make the “last run” timestamp work correctly. Also, the document_id in the output section is needed to make it possible to update an existing document in Elasticsearch if the data from the database has changed.