Custom river plugin in elasticsearch

Elasticsearch has capability to create custom plugins by inheriting its core classes and functionality. We can implement java code as per our requirement easily plug with elasticsearch. Following are the steps to do that …

Create folder structure – Following is sample plugin file structure, its recommended by elasticsearch and its standard way of doing things. In following hermes bracelets structure, I have taken example of plugin name like “rabbitmqtoes” for our reference.

rabbittoes

 

Based on directory structure you have and idea that it is maven project. Let me provide to POM file for it so you can easily build it.

POM File : –

    

    4.0.0

    project
    RabbitMqToEs
    1.0-SNAPSHOT
    jar

    rabbitmqtoes
    http://maven.apache.org

    
        UTF-8
        1.3.4
    

    
        
            junit
            junit
            3.8.1
            test
        
        
            log4j
            log4j
            1.2.16
        
        
            com.rabbitmq
            amqp-client
            3.4.1
        
        
            org.elasticsearch
            elasticsearch
            ${elasticsearch.version}
        
    
    
        
            
                src/main/resources
                true
                
                    **/*.properties
                
            
        
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                2.3.2
                
                    1.6
                    1.6
                
            
            
                org.apache.maven.plugins
                maven-source-plugin
                2.1.2
                
                    
                        attach-sources
                        
                            jar
                        
                    
                
            
            
                maven-assembly-plugin
                2.3
                
                    false
                    ${project.build.directory}/releases/
                    
                        ${basedir}/src/main/assemblies/plugin.xml
                    
                
                
                    
                        package
                        
                            single
                        
                    
                
            
        
    


River Plugin Class

Create river plugin class in folder – “src/main/java/org/elasticsearch/plugin/river/rabbitmqtoes/RabbitMqToEsRiverPlugin.java” which is identify your plugin and register/load it during elasticsearch start.

package org.elasticsearch.plugin.river.rabbitmqtoes;

import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.river.RiversModule;
import org.elasticsearch.river.rabbitmqtoes.RabbitMqToEsRiverModule;

/**
 *
 */
public class RabbitMqToEsRiverPlugin extends AbstractPlugin {

    @Inject
    public RabbitMqToEsRiverPlugin() {
    }

    @Override
    public String name() {
        return "river-RabbitMqToEs";
    }

    @Override
    public String description() {
        return "River RabbitMqToEs Plugin";
    }

    public void onModule(RiversModule module) {
        module.registerRiver("RabbitMqToEs", RabbitMqToEsRiverModule.class);
    }
}

 River Module Class:-

Create river module class in this folder – “src/main/java/org/elasticsearch/river/rabbitmqtoes/RabbitMqToEsRiverModule.java” which will bind your business logic class.

package org.elasticsearch.river.rabbitmqtoes;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.river.River;

/**
 *
 */
public class RabbitMqToEsRiverModule extends AbstractModule {

    @Override
    protected void configure() {
        bind(River.class).to(RabbitMqToEsRiver.class).asEagerSingleton();
    }
}

 Main Business Logic Class

It should be created in folder – “src/main/java/org/elasticsearch/river/rabbitmqtoes/RabbitMqToEsRiver.java”. This class is abstracted from “AbstractRiverComponent” and it implements “River” class of elasticsearch. In this we are going to override river methods like start(),close().

package org.elasticsearch.river.rabbitmqtoes;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.threadpool.ThreadPool;

/**
 *
 */
public class RabbitMqToEsRiver extends AbstractRiverComponent implements River {
    @SuppressWarnings({"unchecked"})
    @Inject
    public RabbitMqToEsRiver(RiverName riverName, RiverSettings <a href="http://www.chemungcounty.com/index.asp?pageId=688">fake cartier bracelets
</a> settings, Client client, ThreadPool threadPool) {
        super(riverName, settings);
    }
    @Override
    public void start() {
         logger.info(" Starting RabbitMqRiver...");        
    }

    @Override
    public void close() {
        if (closed) {
            return;
        }
        logger.info(" Closing rabbitmq river");
        closed = true;
    }

 

After creating above file, build this project using maven and install cartier bracelet
this plugin as following command and restart elasticsearch.

/usr/share/elasticsearch/bin/plugin --url file:///rabbitmqtoes/target/releases/RabbitMqToEs-1.0-SNAPSHOT.zip --install river-rabbitmq

Now plugin is installed, if you want to run it we need to create river index on elasticsearch. e.g. as follows …

curl -XPUT 'localhost:9200/_river/RabbitQ/_meta' -d '{
    "type" : "RabbitMqToEs",
    "rabbitmq" : {
        "host" : "localhost",
        "port" : 5672,
        "user" : "guest",
        "pass" : "guest",
        "vhost" : "/",
        "queue" : "apps"
    },
    "index" : {
        "index" : "apps",
		"type" : "test"
    }
}'

If you want to access above params in your business logic java class you can do as following …

//This is constructor method
public RabbitMqToEsRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {
        super(riverName, settings);
        if (settings.settings().containsKey("rabbitmq")) {
            Map rabbitSettings = (Map) settings.settings().get("rabbitmq");
            rabbitHost     = XContentMapValues.nodeStringValue(rabbitSettings.get("host"), "guest");
            rabbitPort     = XContentMapValues.nodeIntegerValue(rabbitSettings.get("port"), 5672);
            rabbitUser     = XContentMapValues.nodeStringValue(rabbitSettings.get("user"), "guest");
        }
        else{
            rabbitHost     = "localhost";
            rabbitPort     = 5672;
            rabbitUser     = "guest";
        }
        
        if (settings.settings().containsKey("index")) {
            Map indexSettings = (Map) settings.settings().get("index");
            clusterName       = XContentMapValues.nodeStringValue(indexSettings.get("cluster"), riverName.name());
            indexName         = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name());
            typeName          = XContentMapValues.nodeStringValue(indexSettings.get("type"), "status");
        } else {
            clusterName       = "elasticsearch";
            indexName         = riverName.name();
            typeName          = "status";
        }
        
    }

 

 

Please note :- Latest elasticsearch has deprecated rivers – https://www.elastic.co/blog/deprecating-rivers

Categories: Elasticsearch