步骤 2 : 模仿和排错 步骤 3 : 14万条数据 步骤 4 : 关于数据库 步骤 5 : Product.java 步骤 6 : ProductUtil 步骤 7 : TestElasticSearch4J
老规矩,先下载右上角的可运行项目,配置运行起来,确认可用之后,再学习做了哪些步骤以达到这样的效果。
运行 TestElasticSearch4J, 然后通过Kibana-查询操作 看到到如图所示,总共有147939条数据被批量增加到了服务器
在确保可运行项目能够正确无误地运行之后,再严格照着教程的步骤,对代码模仿一遍。
模仿过程难免代码有出入,导致无法得到期望的运行结果,此时此刻通过比较正确答案 ( 可运行项目 ) 和自己的代码,来定位问题所在。 采用这种方式,学习有效果,排错有效率,可以较为明显地提升学习速度,跨过学习路上的各个槛。 推荐使用diffmerge软件,进行文件夹比较。把你自己做的项目文件夹,和我的可运行项目文件夹进行比较。 这个软件很牛逼的,可以知道文件夹里哪两个文件不对,并且很明显地标记出来 这里提供了绿色安装和使用教程:diffmerge 下载和使用教程
所以为了模仿真实环境,花了很多精力,四处搜刮来了14万条天猫的产品数据,接下来我们就会把这14万条记录批量增加到 Elastic Search,然后观察搜索效果。
这14万条记录放在右上角 140k_products.rar,其解析办法在后续会讲解。 下载解压之后放在项目目录下。
本来应该先把这14万条记录保存进数据库,然后再从数据库中取出来的,不过考虑到不是每个同学都有JDBC基础,以及放进数据库的繁琐,和14万条数据从数据库里读取出来的时间消耗,就改成直接从文件里读取出来,然后转换为泛型是Product的集合的形式,相当于从数据库里读取出来了,不过会快很多。
修改Product.java,使得其有更多的字段。 除此之外,还增加了一个toMap()方法,使得其返回为一个Map对象,方便后续的批量增加。
package com.how2java;
import java.util.HashMap;
import java.util.Map;
public class Product {
int id;
String name;
String category;
float price;
String place;
String code;
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
public String getPlace() {
return place;
}
public void setPlace(String place) {
this.place = place;
}
public String getCode() {
return code;
}
public void setCode(String code) {
this.code = code;
}
@Override
public String toString() {
return "Product [id=" + id + ", name=" + name + ", category=" + category + ", price=" + price + ", place="
+ place + ", code=" + code + "]";
}
public Map toMap() {
HashMap<String,Object> map = new HashMap<>();
map.put("name", name);
map.put("category", category);
map.put("code", code);
map.put("place", place);
map.put("price", price);
return map;
}
}
准备工具类,把文本文件 140k_products.txt,转换为泛型是Product的集合。
package com.how2java;
import java.awt.AWTException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.FileUtils;
public class ProductUtil {
public static void main(String[] args) throws IOException, InterruptedException, AWTException {
String fileName = "140k_products.txt";
List<Product> products = file2list(fileName);
System.out.println(products.size());
}
public static List<Product> file2list(String fileName) throws IOException {
File f = new File(fileName);
List<String> lines = FileUtils.readLines(f,"UTF-8");
List<Product> products = new ArrayList<>();
for (String line : lines) {
Product p = line2product(line);
products.add(p);
}
return products;
}
private static Product line2product(String line) {
Product p = new Product();
String[] fields = line.split(",");
p.setId(Integer.parseInt(fields[0]));
p.setName(fields[1]);
p.setCategory(fields[2]);
p.setPrice(Float.parseFloat(fields[3]));
p.setPlace(fields[4]);
p.setCode(fields[5]);
return p;
}
}
通过ProductUtil 获取到14万条记录的List集合,然后把这个集合通过 BulkRequest 批量地提交到Elastic Search服务器。
package com.how2java;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
public class TestElasticSearch4J {
private static RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http")
));
private static String indexName = "how2java";
public static void main(String[] args) throws IOException {
//确保索引存在
if(!checkExistIndex(indexName)){
createIndex(indexName);
}
//14万准备数据
List<Product> products = ProductUtil.file2list("140k_products.txt");
System.out.println("准备数据,总计"+products.size()+"条");
batchInsert(products);
client.close();
}
private static void batchInsert(List<Product> products) throws IOException {
// TODO Auto-generated method stub
BulkRequest request = new BulkRequest();
for (Product product : products) {
Map<String,Object> m = product.toMap();
IndexRequest indexRequest= new IndexRequest(indexName, "product", String.valueOf(product.getId())).source(m);
request.add(indexRequest);
}
client.bulk(request);
System.out.println("批量插入完成");
}
private static void deleteDocument(int id) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest (indexName,"product", String.valueOf(id));
client.delete(deleteRequest);
System.out.println("已经从ElasticSearch服务器上删除id="+id+"的文档");
}
private static void updateDocument(Product product) throws IOException {
UpdateRequest updateRequest = new UpdateRequest (indexName, "product", String.valueOf(product.getId()))
.doc("name",product.getName());
client.update(updateRequest);
System.out.println("已经在ElasticSearch服务器修改产品为:"+product);
}
private static void getDocument(int id) throws IOException {
// TODO Auto-generated method stub
GetRequest request = new GetRequest(
indexName,
"product",
String.valueOf(id));
GetResponse response = client.get(request);
if(!response.isExists()){
System.out.println("检查到服务器上 "+"id="+id+ "的文档不存在");
}
else{
String source = response.getSourceAsString();
System.out.print("获取到服务器上 "+"id="+id+ "的文档内容是:");
System.out.println(source);
}
}
private static void addDocument(Product product) throws IOException {
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("name", product.getName());
IndexRequest indexRequest = new IndexRequest(indexName, "product", String.valueOf(product.getId()))
.source(jsonMap);
client.index(indexRequest);
System.out.println("已经向ElasticSearch服务器增加产品:"+product);
}
private static boolean checkExistIndex(String indexName) throws IOException {
boolean result =true;
try {
OpenIndexRequest openIndexRequest = new OpenIndexRequest(indexName);
client.indices().open(openIndexRequest).isAcknowledged();
} catch (ElasticsearchStatusException ex) {
String m = "Elasticsearch exception [type=index_not_found_exception, reason=no such index]";
if (m.equals(ex.getMessage())) {
result = false;
}
}
if(result)
System.out.println("索引:" +indexName + " 是存在的");
else
System.out.println("索引:" +indexName + " 不存在");
return result;
}
private static void deleteIndex(String indexName) throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
client.indices().delete(request);
System.out.println("删除了索引:"+indexName);
}
private static void createIndex(String indexName) throws IOException {
// TODO Auto-generated method stub
CreateIndexRequest request = new CreateIndexRequest(indexName);
client.indices().create(request);
System.out.println("创建了索引:"+indexName);
}
}
HOW2J公众号,关注后实时获知最新的教程和优惠活动,谢谢。
问答区域
2019-02-24
亲爱的站长,数据库数据如何同步导入ES?
2019-02-19
超出内存了?
回答已经提交成功,正在审核。 请于 我的回答 处查看回答记录,谢谢
提问之前请登陆
提问已经提交成功,正在审核。 请于 我的提问 处查看提问记录,谢谢
|