Odoo 集成 ElasticSearch
本期作者
王泽宇
数据开发工程师
擅长 java 开发
Step 1 安装分词器
找到 ElasticSearch 安装目录的 bin 目录,在该目录下执行以下命令,安装中文 ik 分词器。
这里注意需要 切换 ElasticSearch 启动用户。
注意:如果是集群安装 ElasticSearch,则要给每个节点的 ElasticSearch 都安装 ik 分词器。
su es
./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.6.0/elasticsearch-analysis-ik-7.6.0.zip
安装好后需要 kill -9 杀死 ElasticSearch 进程,再进行重启。
Step 2 测试分词器
首先创建一个测试索引,测试分词器到底安装成功了没有。
PUT test
POST test/_analyze
{
"analyzer": "ik_smart",
"text": "测试语句分词,测一测ik分词器到底行不行"
}
最终输出的分词结果应该是:
{
"tokens" : [
{
"token" : "测试",
"start_offset" : 0,
"end_offset" : 2,
"type" : "CN_WORD",
"position" : 0
},
{
"token" : "语句",
"start_offset" : 2,
"end_offset" : 4,
"type" : "CN_WORD",
"position" : 1
},
{
"token" : "分词",
"start_offset" : 4,
"end_offset" : 6,
"type" : "CN_WORD",
"position" : 2
},
{
"token" : "测",
"start_offset" : 7,
"end_offset" : 8,
"type" : "CN_CHAR",
"position" : 3
},
{
"token" : "一测",
"start_offset" : 8,
"end_offset" : 10,
"type" : "CN_WORD",
"position" : 4
},
{
"token" : "ik",
"start_offset" : 10,
"end_offset" : 12,
"type" : "ENGLISH",
"position" : 5
},
{
"token" : "分词器",
"start_offset" : 12,
"end_offset" : 15,
"type" : "CN_WORD",
"position" : 6
},
{
"token" : "到底",
"start_offset" : 15,
"end_offset" : 17,
"type" : "CN_WORD",
"position" : 7
},
{
"token" : "行不行",
"start_offset" : 17,
"end_offset" : 20,
"type" : "CN_WORD",
"position" : 8
}
]
}
ElasticSearch 安装好了以后,
剩下的任务就是,
如何将 Odoo 的数据
和 ElasticSearch 进行同步
选择操作包
首先我们要 选择操作包,python 操作 ElasticSearch 的包是 elasticsearch。
因为我安装的版本是 elasticsear7.6,所以选择 elasticsear7 这个包。
pip3 install elasticsearch7
字段映射
在业务设计中,经常出现这样的情形,两个不同的模型出现同名的字段。
如果要将这些字段全部放在同一个 ElasticSearch 索引下,就会导致 字段重名的问题,进而产生各种错误。
为了解决这个问题,可以利用 字段映射来解决。
用模型名和字段名做 key,去映射ElasticSearch 里面的一个唯一字段。
index_filed_list = {
("product", "product_name"): "product_name",
("product", "comment"): "product_comment",
("product", "solution"): "product_solution",
("company", "company_name"): "company_name",
("company", "founder"): "company_founder",
("company", "comment"): "company_comment",
("sales", "name"): "sale_names",
}
获取配置信息
我们在 odoo.conf 中配置好 es 的 ip、端口和索引,再在代码中去获取这个配置信息。
import odoo.tools
host = odoo.tools.config['es_host']
port = odoo.tools.config['es_port']
index = odoo.tools.config['index_name']
实现 ES 工具类
接下来就要去实现 ElasticSearch 的工具类了。
首先创建一个 ElasticSearch 的相关模型,并且禁止它去 PostgreSQL 中创建表。
# -*- coding: utf-8 -*-
from odoo import models, api
from elasticsearch7 import Elasticsearch
import odoo.tools
host = odoo.tools.config['es_host']
port = odoo.tools.config['es_port']
index = odoo.tools.config['index_name']
index_filed_list = {
("product", "product_name"): "product_name",
("product", "comment"): "product_comment",
("product", "solution"): "product_solution",
("company", "company_name"): "company_name",
("company", "founder"): "company_founder",
("company", "comment"): "company_comment",
("sales", "name"): "sale_names",
}
class ElasticSearchUtil(models.AbstractModel):
_description = 'es工具类'
_name = 'prod.util.elasticsearch'
_auto = False
实现一个静态的获取 ElasticSearch 实例的方法。
def get_es_instance():
return Elasticsearch([{'host': host, 'port': port}], http_auth=())
考虑到索引有可能是未创建的状态,因此需要先检查一下索引是否存在,如果不存在则创建一个新索引
@api.model
def create_index(self, es):
# 判断索引是否存在,不存在就创建一个
if not es.indices.exists(index=index):
es.indices.create(index=index)
在某些特殊情况,我们需要删掉索引强制重建
@api.model
def delete_index(self):
# 存在就给他删了
es = self.get_es_instance()
if es.indices.exists(index=index):
es.indices.delete(index=index)
ElasticSearch 默认的分词器是不支持中文的。
因此在遇到新的字段时,必须显式的创建 mapping 并支持中文分词。
这里我们使用 ik 分词器。
def get_fileds_mapping_or_create(self, es, fileds):
# 判断这些字段的mapping是否存在,不存在就创建
mapping_dict = es.indices.get_mapping(index=index).get(index).get('mappings')
if mapping_dict.get('properties'):
mapping_list = mapping_dict.get('properties').keys()
else:
mapping_list = []
body = {
"properties": {
}
}
update_mapping = False
for filed in fileds:
if filed not in mapping_list:
update_mapping = True
body['properties'][filed] = {
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_smart"
}
if update_mapping:
es.indices.put_mapping(index=index, body=body)
在实际的业务过程中,新插入一条数据,可以直接在 ElasticSearch 中创建一个文档。
而如果有对数据的修改,ElasticSearch 的包可不支持直接的修改,需要写一个修改的 json。
我们这里把修改字段内容的方法封装一下。
def update_filed_by_id(self, es, id, fileds: dict):
# 更新一条数据
update_str = ';'.join([f"ctx._source.{key}='{value}'" for key, value in fileds.items()])
body = {
"script": {
"source": update_str
}
}
es.update(index=index, id=id, body=body)
对于数据的删除,有两种情况:
一种情况是该数据是一对多存储的,删除这条数据,只会删除索引中的其中一部分,属于文档数据修改。
另一种情况是该数据是一对一存储,应该直接删除该字段。
def delete_filed(self, ids, model_name):
# 删除某些字段
fileds = []
for key in index_filed_list.keys():
if key[0] == model_name:
fileds.append(index_filed_list.get(key))
es = self.get_es_instance()
delete_str = ';'.join([f"ctx._source.remove('{item}')" for item in fileds])
for id in ids:
body = {
"query": {
"term": {
"_id": f"{id}"
}
},
"script": {
"source": delete_str
}
}
es.update_by_query(index=index, body=body)
es.close()
前置的方法已经基本完成,现在开始实现同步PostgreSQL 字段和 ElasticSearch 索引的方法。
@api.model
def insert_or_update_filed(self, ids, fileds: dict, model_name):
# 检查索引是否存在
es = self.get_es_instance()
self.create_index(es)
body = {}
update_value = False
for key, value in fileds.items():
# 只有字符类型的才建索引
if (model_name, key) in index_filed_list.keys():
update_value = True
body[index_filed_list.get((model_name, key))] = value
if update_value:
self.get_fileds_mapping_or_create(es, body.keys())
# 检查该id的记录是否存在
for id in ids:
if not es.exists(index=index, id=id):
es.index(index=index, doc_type='_doc', id=id, refresh=True, body=body)
else:
self.update_filed_by_id(es, id, body)
es.close()
至此,ElasticSearch 工具类基本实现。
在其他的模型中,我们需要重写 Odoo 的增删改方法,进而实现在对模型进行增删改操作时,ElasticSearch 的索引能够同步进行修改。
# 重写模型创建方法,创建模型时同步创建es相关文档
@api.model
def create(self, vals):
result = super().write(vals)
self.env['prod.util.elasticsearch'].insert_or_update_filed(result.ids, vals, "company")
return result
# 重写模型修改方法,创建模型时同步修改es相关文档
def write(self, vals):
self.env['prod.util.elasticsearch'].insert_or_update_filed(self.ids, vals, "company")
return super().write(vals)
# 重写模型删除方法,创建模型时同步删除es相关文档
def unlink(self):
self.env['prod.util.elasticsearch'].delete_filed(self.ids, "company")
return super().unlink()
注意,这样操作之后,每次修改都必须使用 Odoo 的 self.create/write/unlink 方法。
如果直接执行 PostgreSQL 的话,依旧会跳过 ElasticSearch 的索引同步,导致 ElasticSearch 索引和 PostgreSQL 中索引不一致。
面对这种情况,我们需要 重写一个强制完全重建索引的方法,来保证索引的一致性。
这里强制重建索引的代码和业务相关性比较大,需要根据自己的业务需求自行调整。
@api.model
def rebuild_elasticsearch_index(self):
# 强制重建索引方法
self.env['prod.util.elasticsearch'].delete_index()
prod_ids = self.search([('id', '!=', None)]).ids
for prod_id in prod_ids:
prod = self.search_read([('id', '=', prod_id)])[0]
# 判断是否有选择公司
if prod.get('company'):
company = self.env['prod.company'].search_read([('id', '=', prod.get('company')[0])])
self.env['prod.util.elasticsearch'].insert_or_update_filed([prod.get('id')], company[0], "company")
# 判断是否选择了销售
if prod.get('sales_emp'):
sale_ids = prod.get('sales_emp')
sales = self.env['prod.sales'].search([('id', 'in', sale_ids)])
sale_names = []
for s in sales:
sale_names.append(s.name)
self.env['prod.util.elasticsearch'].insert_or_update_filed([prod.get('id')],{'name': ','.join(sale_names)}, "sales")
# 更新本体的索引字段
self.env['prod.util.elasticsearch'].insert_or_update_filed([prod.get('id')], prod, "product")
千万不要忘了,我们使用 ElasticSearch 做第三方的索引,目的就是进行全文搜索,现在来实现一个全文搜索的方法。
需要对 ElasticSearch 的语法有一定的了解。
使用 bool:{should:{}} 来进行多个条件的或运算
使用 match 的方法,可以对字段进行分词索引。
如果不需要分词的字段,可以使用 term:{value:filed.keyword} 的方式进行匹配。
匹配的结果记得要加上 highlight,确定具体匹配到了哪些词语。
最后再从 PostgreSQL 中取出不在索引内的字段,把整体返回结果封装一下就可以了。
@api.model
def search_info_by_keyword(self, keyword, size, page):
es = self.get_es_instance()
values = index_filed_list.values()
match_query_list = [{"match": {value: keyword}} for value in values]
highlight_query_dict = {value: {} for value in values}
body = {
"query": {
"bool": {
"should": match_query_list
}
},
"highlight": {
"fields": highlight_query_dict
},
"size": size,
"from": (page - 1) * size
}
result = {
"info": [],
'total': 0
}
hits = es.search(index=index, body=body)['hits']
search_result = hits['hits']
for info in search_result:
fileds = info['_source']
info_highlight = info['highlight']
if "company_name" in info_highlight.keys():
company_name = info_highlight['company_name'][0]
del info_highlight['company_name']
else:
company_name = fileds.get('company_name')
if "product_name" in info_highlight.keys():
product_name = info_highlight['product_name'][0]
del info_highlight['product_name']
else:
product_name = fileds.get('product_name')
if len(info_highlight) > 0:
highlight = '...'.join(value[0] for value in info_highlight.values())
else:
if "product_comment" in fileds:
highlight = fileds.get('product_comment')
elif "company_name" in fileds:
highlight = fileds.get('company_name')
elif "product_solution" in fileds:
highlight = fileds.get('product_solution')
else:
highlight = None
prod = self.env['prod.product'].search([('id', '=', info['_id'])])
logo = prod.img
logo_name = prod.file_name
logo = self.env['prod.picture'].picture_add_base64_head({'logo': logo, 'file_name': logo_name},
{'logo': 'file_name'})
result['info'].append({
"id": info['_id'],
"company_name": company_name,
"product_name": product_name,
"highlight": highlight,
"logo": logo.get('logo'),
})
result['total'] = hits['total']['value']
return result
看一下搜索的效果如何,
是否符合预期呢?
/ 搜索结果