IT博客汇
  • 首页
  • 精华
  • 技术
  • 设计
  • 资讯
  • 扯淡
  • 权利声明
  • 登录 注册

    ElasticSearch-2.0.0集群安装配置与API使用实践

    Yanjun发表于 2015-11-28 02:06:38
    love 0

    ElasticSearch是基于全文搜索引擎库Lucene构建的分布式搜索引擎,我们可以直接使用ElasticSearch实现分布式搜索系统的搭建与使用,都知道,Lucene只是一个搜索框架,它提供了搜索引擎操作的基本API,如果要实现一个能够使用的搜索引擎系统,还需要自己基于Lucene的API去实现,工作量很大,而且还需要很好地掌握Lucene的底层实现原理。
    ElasticSearch是一个完整的分布式搜索引擎系统,它的一些基本特性包括如下:

    • 全文检索
    • 提供插件机制,可以共享重用插件的功能
    • 分布式文件存储
    • 分布式实时索引和搜索
    • 实时统计分析
    • 可以横向扩展,支持大规模数据的搜索
    • 简单易用的RESTful API
    • 基于Replication实现了数据的高可用特性
    • 与其他系统的集成
    • 支持结构化和非结构化数据
    • 灵活的Schema设计(Mappings)
    • 支持多编程语言客户端

    我个人感觉,ElasticSearch尽量屏蔽底层Lucene相关的技术细节,让你根本无从感觉底层Lucene相关的内容,这样你可以省去了了解Lucene 的成本,学习曲线比较平缓,不像Solr,如果想要构造负责的查询(Query),还是要对Lucene有所了解的。另外,在分布式设计方面,ElasticSearch更轻量一些,用起来更简单,而使用Solr的分布式分片功能需要使用SolrCloud,它基于ZooKeeper来实现配置管理,以及Replication功能,而且Solr需要使用Web容器来部署,相对来说有点复杂一些(我个人之前使用的SolrCloud版本大概是3.1~3.5左右,比较早,现在可能更加完善了)。

    基本概念

    我们熟悉一下ElasticSearch中涉及到的一些基本概念:

    • 索引(Index)

    索引(Index)是文档的集合,它是根据实际业务逻辑进行划分的,通常会把相对独立且具有相似结构或者性质的数据作为文档,放在一起,形成一个索引,比如,用户相关信息可以作为一个索引,交易相关信息也可应作为另一个索引。

    • 类型(Type)

    类型(Type)是索引内部的一个逻辑划分,在一个索引内部可以定义多个类型(Type),类型将一个索引在逻辑上划分为多个集合,每个类型包含多个属性(字段)。比如,我们基于手机客户端应用App,创建一个了用户相关信息的索引,然后再在这个索引内部定义多个类型:基本信息类型、设备信息类型、行为信息类型,基本信息类型中包含用户编号、证件号码、名称、手机号码、年龄、出生日期,设备信息类型包括设备类型、设备名称、App版本号、渠道来源、系统版本、IMEI、mac地址,用户行为信息包含用户编号、事件编号、事件类型、时间、浏览页面代码、地区编码,这样有3个类型在一个索引当中。ElasticSearch中类型,与HBase中列簇(Column Family)的概念很相似。

    • 文档(Document)

    文档(Document)是索引的基本单元,它与关系数据库中的一条记录相类似,包含了一组属性信息,同时包含一个唯一标识这一组属性值的ID,通过该ID可以更新一个文档,也可以删除一个文档。

    • 分片(Shards)&副本(Replicas)

    一个索引是很多文档的集合,将一个索引进行分割,分成多个片段(一个索引的子集),每一个片段称为一个分片(Shard),这样划分可以很好地管理索引,跨节点存储,为分布式存储于搜索提供了便利。副本(Replica)是为了保证一个分片(Shard)的可用性,冗余复制存储,当一个分片对应的数据无法读取时,可以读取其副本,正常提供搜索服务。

    集群安装配置

    ElasticSearch集群安装配置非常容易,安装可以执行如下命令行:

    wget https://download.elasticsearch.org/elasticsearch/release/org/elasticsearch/distribution/zip/elasticsearch/2.0.0/elasticsearch-2.0.0.zip
    unzip elasticsearch-2.0.0.zip
    

    拿出集群的一个节点的进行配置,修改配置文件config/elasticsearch.yml的内容,如下所示:

    # ======================== Elasticsearch Configuration =========================
    #
    # NOTE: Elasticsearch comes with reasonable defaults for most settings.
    #       Before you set out to tweak and tune the configuration, make sure you
    #       understand what are you trying to accomplish and the consequences.
    #
    # The primary way of configuring a node is via this file. This template lists
    # the most important settings you may want to configure for a production cluster.
    #
    # Please see the documentation for further information on configuration options:
    # <http://www.elastic.co/guide/en/elasticsearch/reference/current/setup-configuration.html>
    #
    # ---------------------------------- Cluster -----------------------------------
    #
    # Use a descriptive name for your cluster:
    #
    cluster.name: dw_search_engine
    #
    # ------------------------------------ Node ------------------------------------
    #
    # Use a descriptive name for the node:
    #
    node.name: esnode-01
    #
    # Add custom attributes to the node:
    #
    # node.rack: r1
    #
    # ----------------------------------- Paths ------------------------------------
    #
    # Path to directory where to store the data (separate multiple locations by comma):
    #
    path.data: /data/dw_search_storage
    #
    # Path to log files:
    #
    path.logs: /tmp/es/logs
    #
    # ----------------------------------- Memory -----------------------------------
    #
    # Lock the memory on startup:
    #
    # bootstrap.mlockall: true
    #
    # Make sure that the `ES_HEAP_SIZE` environment variable is set to about half the memory
    # available on the system and that the owner of the process is allowed to use this limit.
    #
    # Elasticsearch performs poorly when the system is swapping the memory.
    #
    # ---------------------------------- Network -----------------------------------
    #
    # Set the bind adress to a specific IP (IPv4 or IPv6):
    #
    network.host: 10.10.2.62
    #
    # Set a custom port for HTTP:
    #
    http.port: 9200
    #
    # For more information, see the documentation at:
    # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-network.html>
    #
    # ---------------------------------- Gateway -----------------------------------
    #
    # Block initial recovery after a full cluster restart until N nodes are started:
    #
    # gateway.recover_after_nodes: 3
    #
    # For more information, see the documentation at:
    # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-gateway.html>
    #
    # --------------------------------- Discovery ----------------------------------
    #
    # Elasticsearch nodes will find each other via unicast, by default.
    #
    # Pass an initial list of hosts to perform discovery when new node is started:
    # The default list of hosts is ["127.0.0.1", "[::1]"]
    #
    discovery.zen.ping.unicast.hosts: ["es-01", "es-02"]
    #
    # Prevent the "split brain" by configuring the majority of nodes (total number of nodes / 2 + 1):
    #
    # discovery.zen.minimum_master_nodes: 3
    #
    # For more information, see the documentation at:
    # <http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-discovery.html>
    #
    # ---------------------------------- Various -----------------------------------
    #
    # Disable starting multiple nodes on a single system:
    #
    # node.max_local_storage_nodes: 1
    #
    # Require explicit names when deleting indices:
    #
    # action.destructive_requires_name: true
    

    其它节点的配置,在保证基本存储目录相同的前提下,可以根据需要修改如下几个参数:

    node.name
    network.host
    http.port
    

    最后,在每个节点上分别启动ElasticSearch,执行如下命令:

    cd elasticsearch-2.0.0
    bin/elasticsearch -d
    

    然后可以查看Web管理界面,需要安装插件elasticsearch-head,后面会介绍,Web管理界面,如下所示:
    es-cluster-plugin-head
    上图中,我们已经创建了一个索引,可以看到节点的状态,及其分片(Shard)的情况。

    RESTful API基本操作

    尤其是在进行搜索的时候,为了使得其他系统能够与ElasticSearch搜索系统很好地解耦合,使用ElasticSearch提供的RESTful API是一种不错的选择。下面,我们介绍RESTful API的基本操作。

    • 插件管理

    插件的存放目录为elasticsearch-2.0.0/plugins/,插件都是基于该存储目录进行操作的。
    安装插件:

    bin/plugin install analysis-icu
    bin/plugin install mobz/elasticsearch-head
    

    可以从不同的位置安装插件,上面第一个称为Core Elasticsearch plugin,它是Elasticsearch提供的,会从Elasticsearch上下载并安装;上面第一个是从Github上自动下载安装。还有其他的方式安装,如从特定的文件系统等进行安装。
    列出插件:

    bin/plugin list
    

    删除插件:

    bin/plugin remove analysis-icu
    

    安装完一个插件,我们可以查看,例如查看elasticsearch_head插件,查看如下链接:

    
    http://10.10.2.62:9200/_plugin/head/
    
    
    • 创建索引
    curl -XPUT 'http://10.10.2.62:9200/basis_device_info/'
    

    创建的索引名称为basis_device_info,我们也可以不指定一个索引对应的Mappings,而是在索引的时候自动生成Mappings,所以如果没有指定一个索引的Mappings,则这个索引可以支持任何的Mappings。同样可知,一个索引可以自动地增加不同的type,非常灵活。
    也可以指定索引的基本配置,如分片(Shard)数目、副本(Replica)数目,如下所示:

    curl -XPUT 'http://10.10.2.62:9200/basis_device_info /' -d '{
        "settings" : {
            "index" : {
                "number_of_shards" : 10,
                "number_of_replicas" : 1
            }
        }
    }'
    

    默认是5个分片,不进行复制,上面配置表示索引basis_device_info有10个分片,每个分片1个副本。
    下面在创建索引的时候,指定设计的schema,即配置mappings,如下所示:

    curl -XPUT 'http://10.10.2.62:9200/basis_device_info/' -d '
    {
      "mappings": {
        "user": {
          "_all":       { "enabled": false  },
          "properties": {
            "installid":    { "type": "string"  },
            "appid":    { "type": "string"  },
            "channel":  { "type":   "string", "index":  "analyzed" },
            "version":    { "type": "string"  },
            "osversion":    { "type": "string"  },
            "device_name":    { "type": "string", "index":  "analyzed"   },
            "producer":    { "type": "string"  },
            "device_type":    { "type": "string"  },
            "resolution":    { "type": "string", "index":  "analyzed"  },
            "screen_size":    { "type": "string", "index":  "analyzed"  },
            "mac":    { "type": "string", "index":  "not_analyzed"  },
            "idfa":    { "type": "string"  },
            "idfv":    { "type": "string", "index":  "not_analyzed"  },
            "imei":    { "type": "string", "index":  "not_analyzed"  },
            "create_time":  {
              "type":   "date",
              "format": "yyyy-MM-dd HH:mm:ss",
           "index":  "not_analyzed"
            }
          }
        }
      }
    }'
    

    上面创建了索引basis_device_info,同时type为user,有了mappings,我们就知道需要索引的数据的格式了。

    • 删除索引
    curl -XDELETE 'http://10.10.2.62:9200/basis_device_info/'
    

    删除索引basis_device_info。

    • 索引文档
    curl -PUT 'http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA' -d '{
            "installid":    "0000000L",
            "appid":    "0",
            "udid":     "CC49E748588490D41BFB89584007B0FA",
            "channel":  "wulei1",
            "version":    "3.1.2",
            "osversion":    "8.1",
            "device_name":    "iPhone Retina4 Simulator",
            "producer":    "apple",
            "device_type":    "1",
            "resolution":    "640*1136",
            "screen_size":    "320*568",
            "mac":    "600308A20C5E",
            "idfa":    "dbbbs-fdsfa-fafda-321saf",
            "idfv":    "4283FAE1-19EB-4FA9-B739-8148F76BC8C3",
            "imei":    "af-sfd0fdsa-fad-ff",
            "create_time":  "2015-01-14 20:32:05"
    }'
    

    基于我们前面创建的type为user的索引,索引一个文档,文档_id为CC49E748588490D41BFB89584007B0FA,文档内容为一个用户设备信息,使用JSON格式表示。

    • 批量索引

    批量索引,可以根据自己熟悉的编程语言或者脚本来实现,ElasticSearch也提供了一些客户端库。下面我们首先根据数据文件,构造成ElasticSearch索引支持的JSON格式,导出文件,然后通过curl工具去进行批量索引,实际上使用的是ElasticSearch提供的bulk API来实现的。
    首先处理原始带索引数据,代码如下所示:

    package org.shirdrn.es;
    
    import java.io.BufferedReader;
    import java.io.BufferedWriter;
    import java.io.Closeable;
    import java.io.File;
    import java.io.FileReader;
    import java.io.FileWriter;
    
    import net.sf.json.JSONObject;
    
    import com.google.common.base.Throwables;
    
    public class EsIndexingClient {
    
         public static void closeQuietly(Closeable... closeables) {
              if(closeables != null) {
                   for(Closeable closeable : closeables) {
                        try {
                             closeable.close();
                        } catch (Exception e) { }
                   }
              }
         }
        
         public static void main(String[] args) {
              String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
              String out = "C:\\Users\\yanjun\\Desktop\\basis_device_info.json";
              File in = new File(f);
              BufferedReader reader = null;
              BufferedWriter writer = null;
              try {
                   writer = new BufferedWriter(new FileWriter(out));
                   reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
                   String line = null;
                   while((line = reader.readLine()) != null) {
                        String[] a = line.split("\t", -1);
                        if(a.length == 16) {
                             String udid = a[2];
                            
                             JSONObject c = new JSONObject();
                             c.put("_index", "basis_device_info");
                             c.put("_type", "user");
                             c.put("_id", udid);
                            
                             JSONObject index = new JSONObject();
                             index.put("index", c);
                            
                             JSONObject doc = new JSONObject();
                             doc.put("installid", a[0]);
                             doc.put("appid", a[1]);
                             doc.put("udid", a[2]);
                             doc.put("channel", a[3]);
                             doc.put("version", a[4]);
                             doc.put("osversion", a[5]);
                             doc.put("device_name", a[6]);
                             doc.put("producer", a[7]);
                             doc.put("device_type", a[8]);
                             doc.put("resolution", a[9]);
                             doc.put("screen_size", a[10]);
                             doc.put("mac", a[11]);
                             doc.put("idfa", a[12]);
                             doc.put("idfv", a[13]);
                             doc.put("imei", a[14]);
                             doc.put("create_time", a[15]);
                            
                             writer.write(index.toString() + "\n");
                             writer.write(doc.toString() + "\n");
                        }
                   }
                  
              } catch (Exception e) {
                   throw Throwables.propagate(e);
              } finally {
                   closeQuietly(reader, writer);
              }
    
         }
    }
    

    运行代码,输出的数据文件为basis_device_info.json,该文件的格式了,示例如下所示:

    {"index":{"_index":"basis_device_info","_type":"user","_id":"1c207122a4b2c9632212ab86bac10f60"}}
    {"installid":"00000002","appid":"0","udid":"1c207122a4b2c9632212ab86bac10f60","channel":"itings","version":"3.1.1","osversion":"4.1.2","device_name":"Lenovo P770","producer":"Lenovo","device_type":"0","resolution":"540*960","screen_size":"4.59","mac":"d4:22:3f:83:17:06","idfa":"","idfv":"","imei":"861166023335745","create_time":"2015-01-14 19:39:35"}
    {"index":{"_index":"basis_device_info","_type":"user","_id":"FA6B1B98E6FF4E6994A1505A996F6102"}}
    {"installid":"00000003","appid":"0","udid":"FA6B1B98E6FF4E6994A1505A996F6102","channel":"appstore","version":"3.1.1","osversion":"8.1.2","device_name":"iPhone 6Plus","producer":"apple","device_type":"1","resolution":"640*1136","screen_size":"320*568","mac":"020000000000","idfa":"84018625-A3C9-47A8-88D0-C57C12F80520","idfv":"9D1E2514-9DC8-47A8-ABD0-129FC0FB3171","imei":"","create_time":"2015-01-14 19:41:21"}
    {"index":{"_index":"basis_device_info","_type":"user","_id":"8c5fe70b2408f184abcbe4f34b8f23c3"}}
    {"installid":"00000004","appid":"0","udid":"8c5fe70b2408f184abcbe4f34b8f23c3","channel":"itings","version":"3.1.1.014","osversion":"4.2.2","device_name":"2014011","producer":"Xiaomi","device_type":"0","resolution":"720*1280","screen_size":"4.59","mac":"0c:1d:af:4f:48:9f","idfa":"","idfv":"","imei":"865763025472173","create_time":"2015-01-14 19:46:37"}
    

    奇数编号行的内容为索引的指令信息,包括索引名称(_index)、类型(_type)、唯一标识(_id),偶数编号行的内容为实际待索引的文档数据。
    然后,通过curl命令来进行批量索引,执行如下命令:

    curl -s -XPOST http://10.10.2.62:9200/basis_device_info/_bulk --data-binary "@basis_device_info.json"
    
    • 搜索文档

    简单的搜索,可以通过GET方式搜索,如下所示:

    
    http://10.10.2.62:9200/basis_device_info/user/CC49E748588490D41BFB89584007B0FA
    
    
    http://10.10.2.62:9200/basis_device_info/user/_search?q=channel:B-hicloud
    
    

    上面第一个根据唯一的_id进行搜索,结果返回0个或者1个文档;第二个通过指定GET方式参数,其中_search和q是ElasticSearch内置的接口关键字,通过指定字段名称和搜索关键词的方式进行搜索,结果以JSON格式返回。

    • Request Body搜索

    可以设置请求的body内容,能够支持更加复杂的查询条件然后请求搜索,如下所示:

    curl -XGET 'http://10.10.2.245:9200/basis_device_info/user/_search' -d '{
        "query" : {
            "term" : { "udid": "bc0af2ca66a96725b8b0e0056d4213b6" }
        }
    }'
    

    结果示例,如下所示:

    {"took":11,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":9.45967,"hits":[{"_index":"basis_device_info","_type":"user","_id":"bc0af2ca66a96725b8b0e0056d4213b6","_score":9.45967,"_source":{"installid":"00000FPq","appid":"0","udid":"bc0af2ca66a96725b8b0e0056d4213b6","channel":"B-hicloud","version":"3.1.1","osversion":"4.4.2","device_name":"H60-L02","producer":"HUAWEI","device_type":"0","resolution":"720*1184","screen_size":"4.64","mac":"ec:cb:30:c4:93:e3","idfa":"","idfv":"","imei":"864103021536104","create_time":"2015-01-18 01:29:16"}}]}}
    
    • 基于Lucene查询语法搜索

    如果熟悉Lucene查询(Query),可以构造通过构造复杂的Term关系字符串来进行搜索,示例如下所示:

    curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
    {
      "query": {
        "query_string": { "query": "(channel:baidu OR device_name:HUAWEI)" }
       }
    }'
    

    查询query字符串的含义是:从channel字段搜索baidu,从device_name字段搜索HUAWEI,然后两者取并集,这实际上一个布尔查询,返回最终结果。

    • 使用multi_match搜索

    ElasticSearch支持给定搜索关键词,从多个字段中进行搜索,示例如下所示:

    curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
    {
        "query": {
            "multi_match" : {
                "query":    "HTC",
                "fields": [ "channel", "device_name" ]
            }
        }
    }'
    

    这样,只要在channel和device_name两个字段中出现关键词HTC,则都返回结果,结果应该是两个字段匹配上的文档集合的并集。

    • 支持Filter搜索

    可以在制定Filter进行搜索。例如下面是一个按照时间范围进行过滤,得到搜索结果的查询:

    curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search' -d '
    {
      "query": {
        "filtered": {
                "query": { "match_all": {} },
                "filter" : {
                    "range" : {
                        "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }
                     }
                }
          }
        }
    }'
    
    • 分页搜索

    ElasticSearch支持分页搜索,可以通过在RESTful连接中指定size和from参数,来进行分页搜索,如下所示:

    curl -XGET 'http://10.10.2.62:9200/basis_device_info/user/_search?size=10&from=20' -d '
    {
      "query": {
        "filtered": {
                "query": { "match_all": {} },
                "filter" : {
                    "range" : {
                        "create_time" : { "from" : "2015-01-16 00:00:00", "to" : "2015-01-16 23:59:59" }
                     }
                }
          }
        }
    }'
    

    上面搜索的含义是:按照时间范围搜索,从第20个文档开始,返回10个文档,相当于一页取10个文档。

    Java客户端

    如果熟悉Java语言,而不想使用脚本等其他方式操作ElasticSearch搜索集群,则可以使用ElasticSearch提供的Java客户端API来编码实现,能够更加灵活地控制。ElasticSearch提供的Java客户端支持全部常用操作,如更新索引、索引文档、搜索文档、删除索引等等操作,而且还支持其他一些功能,如同步异步模式、explain查询等,下面我们通过代码来了解一下。
    如果使用Maven管理Java代码,可以在pom.xml文件中加入如下依赖:

              <dependency>
                   <groupId>org.elasticsearch</groupId>
                   <artifactId>elasticsearch</artifactId>
                   <version>2.0.0</version>
              </dependency>
    

    创建一个ElasticSearch客户端,代码如下所示:

              // create & configure client
              Settings settings = Settings.settingsBuilder()
                        .put("cluster.name", "dw_search_engine")
                        .put("client.transport.sniff", true)
                        .build();
              final Client client = TransportClient.builder().settings(settings).build()
                        .addTransportAddress(newAddress("es-01", 9300))
                        .addTransportAddress(newAddress("es-02", 9300));
    

    可以将你的ElasticSearch集群的节点通过上面的addTransportAddress方法,都与Client对象关联起来,这样在操作ElasticSearch集群中的索引/更新/删除/搜索文档的时候,就能够自动感知。上面newAddress方法如下:

         private static InetSocketTransportAddress newAddress(String host, int port) throws UnknownHostException {
              return new InetSocketTransportAddress(InetAddress.getByName(host), port);
         }
    

    另外,也可以通过在配置文件elasticsearch.yml中指定相关配置,例如:

    cluster.name: dw_search_engine
    client.transport.sniff: true
    client.transport.ping_timeout: 10s
    client.transport.nodes_sampler_interval: 10s
    

    那么,创建客户端需要从配置文件中读取配置内容,具体可以查看官方文档。

    • 准备工作

    索引的时候,我们是从一个本地文件中读取数据,并构建索引文档需要的格式,然后请求ElasticSearch集群执行索引操作,下面代码是一些基本准备工作:

              final String index = "basis_device_info";
              final String type = "user";
             
              // index documents
              String f = "C:\\Users\\yanjun\\Desktop\\basis_device_info.txt";
              File in = new File(f);
    

    从文件中,每次读取一行记录,然后构建一个JSON格式字符串,通过XContentBuilder来表示,代码如下所示:

         protected static XContentBuilder createSource(String[] a) throws IOException {
              return jsonBuilder()
                      .startObject()
                          .field("installid", a[0])
                             .field("appid", a[1])
                             .field("udid", a[2])
                             .field("channel", a[3])
                             .field("version", a[4])
                             .field("osversion", a[5])
                             .field("device_name", a[6])
                             .field("producer", a[7])
                             .field("device_type", a[8])
                             .field("resolution", a[9])
                             .field("screen_size", a[10])
                             .field("mac", a[11])
                             .field("idfa", a[12])
                             .field("idfv", a[13])
                             .field("imei", a[14])
                             .field("create_time", a[15])
                      .endObject();
         }
    

    下面我们从API的功能入手,分别详细说明,并附加代码展示用法。

    • 创建索引

    可以直接通过Java客户端库来创建索引,代码如下所示:

         protected static void createIndex(final Client client, String index) {
              Map<String, Object> indexSettings = Maps.newHashMap();
              indexSettings.put("number_of_shards", "4");
              indexSettings.put("number_of_replicas", "1");
              CreateIndexRequest createIndexRequest = new CreateIndexRequest(
                        index, Settings.settingsBuilder().put(indexSettings).build());
              CreateIndexResponse createIndexResponse = client.admin().indices().create(createIndexRequest).actionGet();
              System.out.println(createIndexResponse);
         }
    
    • 创建Mappings

    通过Java客户端创建Mappings,相对比较复杂一点,需要拼接对应的JSON字符串,实现代码如下所示:

         protected static void createMappings(final Client client, String index) throws IOException, InterruptedException, ExecutionException {
              XContentBuilder basisInfoMapping = jsonBuilder()
                        .startObject()
                             .startObject("_all")
                                  .field("enabled", "false")
                             .endObject()
                             .startObject("properties")
                                  .startObject("id")
                                       .field("type", "string")
                                  .endObject()
                                  .startObject("name")
                                       .field("type", "string")
                                       .field("index", "analyzed")
                                  .endObject()
                                  .startObject("age")
                                       .field("type", "int")
                                  .endObject()
                                  .startObject("birthday")
                                       .field("type", "date")
                                       .field("format", "yyyy-MM-dd HH:mm:ss")
                                       .field("index", "not_analyzed")
                                  .endObject()
                             .endObject()
                        .endObject();
             
              XContentBuilder deviceInfoMapping = jsonBuilder()
                        .startObject()
                             .startObject("_all")
                                  .field("enabled", "false")
                             .endObject()
                             .startObject("properties")
                                  .startObject("udid")
                                       .field("type", "string")
                                  .endObject()
                                  .startObject("device_name")
                                       .field("type", "string")
                                       .field("index", "analyzed")
                                  .endObject()
                                  .startObject("privoder")
                                       .field("type", "string")
                                       .field("index", "analyzed")
                                  .endObject()
                                  .startObject("os_version")
                                       .field("type", "string")
                                  .endObject()
                             .endObject()
                        .endObject();
             
              PutMappingRequest putMappingRequest = Requests.putMappingRequest(index)
                   .type("basic_info")
                   .source(basisInfoMapping)
                  .type("device_info")
                  .source(deviceInfoMapping);
             
              System.out.println(putMappingRequest.indicesOptions());
             
              PutMappingResponse putMappingResponse = client.admin().indices().putMapping(putMappingRequest).get();
              System.out.println(putMappingResponse);
         }
    

    上面代码创建了一个名称为app_user_info的索引,该索引具有basic_info和device_info这2个type,可以通过elasticsearch_head插件,在Web管理页面上查看对应的索引信息。

    • 索引单个文档

    从文件中读取数据,一条记录构造一个文档,然后执行索引,代码如下所示:

         protected static void indexDocs(final Client client, final String index, final String type, File in) {
              BufferedReader reader = null;
              try {
                   reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
                   String line = null;
                   while((line = reader.readLine()) != null) {
                        String[] a = line.split("\t", -1);
                        if(a.length == 16) {
                             String udid = a[2];
                             IndexResponse response =
                                       client
                                       .prepareIndex(index, type, udid)
                                       .setSource(createSource(a))
                                       .get();
                             System.out.println(response.toString());
                        }
                   }
                  
              } catch (Exception e) {
                   throw Throwables.propagate(e);
              } finally {
                   closeQuietly(reader);
              }
         }
    
    • 批量索引

    批量索引有多种方式,首先,通过Bulk API进行索引,我们自己控制每一个batch的大小,代码如下所示:

         protected static void indexBulk(final Client client, final String index, final String type, File in) {
              BulkRequestBuilder bulkRequest = client.prepareBulk();
              final int batchSize = 100;
              int counter = 0;
              BufferedReader reader = null;
              try {
                   reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
                   String line = null;
                   while((line = reader.readLine()) != null) {
                        String[] a = line.split("\t", -1);
                        if(a.length == 16) {
                             String udid = a[2];
                             IndexRequestBuilder indexRequestBuilder =
                                       client
                                       .prepareIndex(index, type, udid)
                                       .setSource(createSource(a));
                             bulkRequest.add(indexRequestBuilder);
                             if(++counter >= batchSize) {
                                  System.out.println(!bulkRequest.get().hasFailures());
                                  counter = 0;
                                  bulkRequest = client.prepareBulk();
                             }
                        }
                   }
                  
              } catch (Exception e) {
                   throw Throwables.propagate(e);
              } finally {
                   System.out.println(!bulkRequest.get().hasFailures());
                   closeQuietly(reader);
              }
         }
    

    另一种方式,是根据ElasticSearch提供的Bulk Processor来实现,只需要设置相关参数,就可以实现批量索引,这种方式更加灵活,示例如下所示:

         protected static void indexUsingBulkProcessor(final Client client, final String index, final String type, File in) throws InterruptedException {
              String name = "device_info_processor";
              int bulkActions = 1000;
              ByteSizeValue bulkSize = new ByteSizeValue(100, ByteSizeUnit.MB);
              TimeValue flushInterval = TimeValue.timeValueSeconds(60);
              int concurrentRequests = 12;
             
              // create bulk processor
              final BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
    
                   public void afterBulk(long id, BulkRequest req, BulkResponse resp) {
                        System.out.println("id=" + id + ", resp=" + resp);
                   }
    
                   public void afterBulk(long id, BulkRequest req, Throwable cause) {
                        System.out.println("id=" + id + ", req=" + req + ", cause=" + cause);              
                   }
    
                   public void beforeBulk(long id, BulkRequest req) {
                        System.out.println("id=" + id + ", req=" + req);              
                   }
                  
              })
              .setName(name)
              .setBulkActions(bulkActions)
              .setBulkSize(bulkSize)
              .setFlushInterval(flushInterval)
              .setConcurrentRequests(concurrentRequests)
              .build();
             
              // index documents
              BufferedReader reader = null;
              try {
                   reader = new BufferedReader(new FileReader(in.getAbsoluteFile()));
                   String line = null;
                   while((line = reader.readLine()) != null) {
                        String[] a = line.split("\t", -1);
                        if(a.length == 16) {
                             String udid = a[2];
                             bulkProcessor.add(new IndexRequest(index, type, udid).source(createSource(a)));
                        }
                   }
                  
              } catch (Exception e) {
                   throw Throwables.propagate(e);
              } finally {
                   closeQuietly(reader);
                  
                   // close bulk processor
                   bulkProcessor.awaitClose(60, TimeUnit.SECONDS);
              }
         }
    

    可以通过实现自定义的BulkProcessor.Listener,它提供了Hook的功能,比如,索引某个文档失败的话,可以在Hook方法中增加处理,实现重试的功能;再比如,如果索引成功,给其他系统服务一个回调,等等。

    • 更新文档

    更新文档中的某些字段,需要指定id的值,以及需要更新的字段的值,代码如下所示:

         protected static void updateDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {
              String id = "60e90ddcb1a61622028b8d92112a646c";
              UpdateRequest updateRequest = new UpdateRequest(index, type, id);
              updateRequest.doc(jsonBuilder()
                        .startObject()
                          .field("channel", "h-google")
                          .field("appid", "1")
                      .endObject());
              UpdateResponse response = client.update(updateRequest).get();
              System.out.println(response);
         }
    

    如果更新文档的时候,文档不存在,则需要先执行索引操作,再进行更新操作,将这两个操作合并到一起,使用upsert操作,代码如下所示:

         protected static void upsertDoc(final Client client, final String index, final String type) throws IOException, InterruptedException, ExecutionException {
              String id = "fdd5ff7f56b613f0acb2c20a1ebc35e4";
              IndexRequest indexRequest = new IndexRequest(index, type, id).source(jsonBuilder()
                          .startObject()
                              .field("installid", "00000BSe")
                              .field("appid", "0")
                              .field("udid", "fdd5ff7f56b613f0acb2c20a1ebc35e4")
                              .field("channel", "A-wandoujia")
                              .field("version", "3.1.1")
                              .field("resolution", "960*540")
                              .field("mac", "00:08:22:be:1b:b7")
                              .field("device_type", "0")
                              .field("device_name", "HTC")
                              .field("producer", "alps")
                              .field("create_time", "2015-01-17 17:15:36")
                          .endObject());
             
              UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(jsonBuilder()
                          .startObject()
                              .field("resolution", "540*960")
                              .field("channel", "h-baidu")
                              .field("version", "3.1.1")
                              .field("imei", "861622010000056")
                          .endObject())
                      .upsert(indexRequest);             
              UpdateResponse response = client.update(updateRequest).get();
              System.out.println(response);
         }
    
    • 删除文档

    删除文档,需要指定文档的id的值,代码如下所示:

         protected static void deleteDoc(final Client client, final String index, final String type) {
              String id = "60e90ddcb1a61622028b8d92112a646c";
              DeleteResponse response = client.prepareDelete(index, type, id).get();
              System.out.println(response);
         }
    
    • 搜索文档

    搜索文档,可以根据需要构造指定的查询(Query),可以设置过滤器等等,然后提交搜索,示例代码如下所示:

         protected static void searchDocs(final Client client, final String index, final String type) {
              SearchResponse response = client
                   .prepareSearch(index)
                   .setTypes(type)
                   .setQuery(QueryBuilders.termQuery("device_name", "xiaomi"))
                   .setPostFilter(QueryBuilders.rangeQuery("create_time").from("2015-01-16 00:00:00").to("2015-01-16 23:59:59"))
                   .setFrom(30).setSize(10).setExplain(true)
                   .execute()
                   .actionGet();
              System.out.println(response);
         }
    

    查询(Query)的构造有很多的方式,比如构造布尔查询,指定与、或、非关系,然后提交搜索。执行搜索,可以设置搜索文档的起始偏移位置以及每次取多少个结果文档,这便能实现分页功能。

    其他话题

    ElasticSearch最经典的软件栈组合就是ELK(ElasticSearch Logstash Kibana),其中ElasticSearch提供了实时查询分析数据的功能,是一个非常通用的搜索引擎系统,而Logstash是一个日志管理工具,能够收集日志,对日志进行管理,Kibana是一个基于页面的前端展示工具,非常方便地使ElasticSearch中的数据可视化,具体使用起来如何,如果感兴趣可以尝试一下。
    另外,ElasticSearch也被好多开源大数据系统所拥抱,比如Cloudera的CDH也整合了ElasticSearch作为搜索系统,ElasticSearch也可以和其他系统,如Hadoop、HBase等进行整合,使用领域比较广泛。

    参考链接

    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index.html
    • https://www.elastic.co/downloads/elasticsearch
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/_basic_concepts.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/setup.html
    • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/index.html
    • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/installation.html
    • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/analysis-icu.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-create-index.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/indices-delete-index.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/index-modules.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/mapping-params.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-search.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-uri-request.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/search-request-body.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs.html
    • https://www.elastic.co/guide/en/elasticsearch/plugins/2.0/integrations.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/docs-bulk.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/modules-discovery.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/misc-cluster.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-filter-context.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-match-all-query.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-dis-max-query.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/query-dsl-bool-query.html
    • https://www.elastic.co/guide/en/elasticsearch/reference/2.0/full-text-queries.html
    • https://github.com/mobz/elasticsearch-head
    • https://www.elastic.co/blog/found-java-clients-for-elasticsearch
    • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/client.html
    • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/node-client.html
    • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs.html
    • https://www.elastic.co/guide/en/elasticsearch/client/java-api/2.0/java-docs-bulk-processor.html


沪ICP备19023445号-2号
友情链接