Commit 3cbd2818 authored by rico.liu's avatar rico.liu

init

parents
Pipeline #366 failed with stages
# Auto detect text files and perform LF normalization
* text=auto
# SimilarCharactor
基于音形码,EditDistance的字符串纠正相似度算法
音形码格式:【韵母,声母,结构,四角编码,笔画数】 共8位
音形码相似度算法 参考博客https://blog.csdn.net/chndata/article/details/41114771
TODO 字符串错误匹配算法 参考
结构、四角编码 抓取http://zidian.miaochaxun.com 数据
韵母、声母 使用pinyin包
笔画数抓取https://bihua.51240.com 数据
入口函数在string_similarity.py
繁简切换 Done
ongoing 相似度分值映射调整(sigmod函数映射)
TODO 字符串包含关系
ongoing 相似度算法添加与调整(bm25)
TODO 字符串错位
char_number_directionary = {
'0':'零',
'1':'一',
'2':'二',
'3':'三',
'4':'四',
'5':'五',
'6':'六',
'7':'七',
'8':'八',
'9':'九'
}
\ No newline at end of file
def minEditDist(sm, sn):
m, n = len(sm) + 1, len(sn) + 1
matrix = [[0] * n for i in range(m)]
matrix[0][0] = 0
for i in range(1, m):
matrix[i][0] = matrix[i - 1][0] + 1
for j in range(1, n):
matrix[0][j] = matrix[0][j - 1] + 1
cost = 0
for i in range(1, m):
for j in range(1, n):
if sm[i - 1] == sn[j - 1]:
cost = 0
else:
cost = 1
matrix[i][j] = min(matrix[i - 1][j] + 1, matrix[i][j - 1] + 1, matrix[i - 1][j - 1] + cost)
for i in range(m):
print
matrix[i]
return matrix[m - 1][n - 1]
final_code_dictionary={
'b':'1',
'p':'2',
'm':'3',
'f':'4',
'd':'5',
't':'6',
'n':'7',
'l':'7',
'g':'8',
'k':'9',
'h':'A',
'j':'B',
'q':'C',
'x':'D',
'zh':'E',
'ch':'F',
'sh':'G',
'r':'H',
'z':'E',
'c':'F',
's':'G',
'y':'I',
'w':'J',
'0':'0'
}
\ No newline at end of file
initial_code_dictionary = {
'a':'1',
'o':'2',
'e':'3',
'i':'4',
'u':'5',
'v':'6',
'ai':'7',
'ei':'7',
'ui':'8',
'ao':'9',
'ou':'A',
'iu':'B',
'ie':'C',
've':'D',
'er':'E',
'an':'F',
'en':'G',
'in':'H',
'un':'I',
'ven':'J',
'ang':'F',
'eng':'G',
'ing':'H',
'ong':'k',
'uo':'L',
'ian':'M',
'iao':'N',
'uai':'O',
'uan':'P',
'uang':'Q',
'ua':'R',
'iong':'S',
'ia':'T',
'iang':'O',
'ue':'P',
'':'0'
}
\ No newline at end of file
similar_pronunciation_dictionary = {
'n':'l',
'l':'n',
'an':'ang',
'ang':'an',
'en':'eng',
'eng':'en',
'in':'ing',
'ing':'in',
'z':'zh',
'c':'ch',
's':'sh',
'zh':'z',
'ch':'c',
'sh':'s'
}
\ No newline at end of file
from SimilarCharactor.string_util import string2code,traditional2simplified
from SimilarCharactor.edit_distance import minEditDist
import difflib
import Levenshtein
def similarity_cn(string1,string2):
code_string1 = string2code(traditional2simplified(string1))
code_string2 = string2code(traditional2simplified(string2))
distance = minEditDist(code_string1,code_string2)
return 1 - distance/max(len(code_string1),len(code_string2))
def similarity_en(string1,string2):
fraction_part1 = Levenshtein.ratio(string1,string2)
fraction_part2 = difflib.SequenceMatcher(None, string1, string2).quick_ratio()
fraction = fraction_part1*0.5+fraction_part2*0.5
return fraction
from pypinyin import pinyin,Style,lazy_pinyin
from SimilarCharactor.quadrilateral_code_dictionary import quadrilateral_code_dictionary as qcd
from SimilarCharactor.structure_code_dictionary import structure_code_dictionary as scd
from SimilarCharactor.initial_code_dictionary import initial_code_dictionary as icd
from SimilarCharactor.final_code_dictionary import final_code_dictionary as fcd
from SimilarCharactor.write_number_dictionary import write_number_dictionary as wnd
from SimilarCharactor.character import symbol_lst
from SimilarCharactor.code_directionary import code_directionary as cd
from SimilarCharactor.char_number_directionary import char_number_directionary as cnd
from zhconv import convert
def extract_initial_and_final(pinyin_string):
if pinyin_string[0:2] not in ['zh','ch','sh']:
if pinyin_string[0] not in ['b','p','m','f','d','t','n','l','g','k','h','j','q','x','r','z','c','s','y','w']:
final = '0'
initial = pinyin_string
else:
final = pinyin_string[0] # 此处四行为声母韵母抽取
initial = pinyin_string[1:]
else:
final = pinyin_string[0:2]
initial = pinyin_string[2:]
return initial,final
#编码格式【韵母,声母,结构,四角编码,笔画数】 共8位
def string2code(string):
code_string = ''
for char in string:
if char in ['1','2','3','4','5','6','7','8','9','0']:
char = cnd[char]
code_string = code_string + cd.get(char,'')
return code_string
#计算每个汉字的音形码
def get_code():
char_array = symbol_lst()
file = open('./SimilarCharactor/音型码.txt','w+')
for char in char_array:
pinyin_char = lazy_pinyin(char)[0]
initial, final = extract_initial_and_final(pinyin_char)
code_string = icd[initial] + fcd[final] + scd[char] + qcd[char] + wnd[char]
file.writelines("'"+ char+"':'"+code_string+"',\n")
file.close()
def traditional2simplified(string):
return convert(string, 'zh-cn')
get_code()
\ No newline at end of file
# -*-coding:utf-8-*-
# 此模块用于爬取汉字结构字典
import requests
from bs4 import BeautifulSoup
import re
from tqdm import tqdm
def get_url():
file1 = open('C:/Users/fooww/Desktop/cv/Word_Structure_Dict.txt', 'w', encoding='utf-8')
file1.truncate()
file1.close()
key_word_lst = ['danyi', 'zuoyou', 'shangxia', 'zuozhongyou', 'shangzhongxia', 'youshangbaowei', 'zuoshangbaowei', 'zuoxiabaowei','shangsanbaowei','xiasanbaowei','zuosanbaowei','quanbaowei','xiangqian','pinzi']
header = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) \
Chrome/63.0.3239.132 Safari/537.36'}
for index, key_word in enumerate(key_word_lst):
file1 = open('C:/Users/fooww/Desktop/cv/Word_Structure_Dict.txt', 'a', encoding='utf-8')
url = 'http://zidian.miaochaxun.com/'+key_word+'.html'
print(url)
res1 = requests.get(url, headers=header)
res1.encoding = 'utf-8'
soup1 = BeautifulSoup(res1.text, 'html.parser')
zi_list = soup1.find_all('p', class_='zi')
for s in zi_list:
[p.extract() for p in s.find_all('span')]
for s in zi_list:
for word in s.find_all('a'):
# print(word.get_text())
try:
if index<10:
tag = index
else:
tag = chr(55+index)
file1.write("'{0}':'{1}',\n".format(word.get_text(), tag))
except TypeError:
pass
continue
file1.close()
if __name__ == '__main__':
get_url()
def pinyin_2_hanzi(pinyinList):
from Pinyin2Hanzi import DefaultDagParams
from Pinyin2Hanzi import dag
dagParams = DefaultDagParams()
result = dag(dagParams, pinyinList, path_num=1, log=True)#10代表侯选值个数
for item in result:
socre = item.score
res = item.path # 转换结果
print(socre, res)
pinyin_2_hanzi(['hao kai xin'])
\ No newline at end of file
# coding=utf-8
import os
import pygame
import character
# 此程序用于将汉字转图片输出,以便利用opencv进行相似度识别
chinese_dir = 'D:/py/chinese/'
if not os.path.exists(chinese_dir):
os.mkdir( chinese_dir)
pygame.init()
for i,word in enumerate(character.symbol_lst()):
font = pygame.font.Font("C:\Windows\Fonts\msyh.ttf", 100) # 当前目录下要有微软雅黑的字体文件msyh.ttc,或者去c:\Windows\Fonts目录下找
rtext = font.render(word, True, (0, 0, 0), (255, 255, 255))
pygame.image.save(rtext, os.path.join(chinese_dir+ str(i) + ".png"))
import requests
from bs4 import BeautifulSoup
import re
import character
from tqdm import tqdm
def transutf8(symbol):
symbol = str(symbol.encode('utf-8'))
utf8_code = symbol[4:6]+symbol[8:10]+symbol[12:14]
return utf8_code
def writenum(symbol):
url_head = 'https://bihua.51240.com/'
url_tail = '__bihuachaxun/'
# 遍历输入汉字的utf8编码,爬取对应的笔画数
url_mid = transutf8(symbol)
url = url_head + url_mid + url_tail
res = requests.get(url)
soup = BeautifulSoup(res.text, 'lxml')
pattern = re.compile('笔画数')
pattern2 = re.compile('\d{1,2}')
write_num = soup.find('td',text=pattern).parent.find('td', text=pattern2).get_text()
return write_num
def get_dict():
symbol_lst = character.symbol_lst()
write_num_dict = {}
for char_one in tqdm(symbol_lst):
write_num_dict[char_one] = writenum(char_one)
return write_num_dict
def main():
with open("D:/py/write_num.txt", 'w') as f:
for i,j in get_dict().items():
f.write(i+' '+j)
f.close()
if __name__ == '__main__':
main()
\ No newline at end of file
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Mar 23 23:36:40 2021
@author: rico
"""
import requests
def zgc_api(func,data):
headers = {
'Connection': 'Keep-Alive'
}
key = 'eRo1#ZFHY5N&GEzV'
api = f"http://59.110.219.171:8000/{func}/"
print(api)
data.update({'key':key})
session = requests.session()
result = session.post(api,json=data,headers=headers,timeout=6000).json()
return result
#进度条
class Index(object):
def __init__(self, number=50, decimal=2):
"""
:param decimal: 你保留的保留小数位
:param number: # 号的 个数
"""
self.decimal = decimal
self.number = number
self.a = 100/number # 在百分比 为几时增加一个 # 号
def __call__(self, now, total):
# 1. 获取当前的百分比数
percentage = self.percentage_number(now, total)
# 2. 根据 现在百分比计算
well_num = int(percentage / self.a)
# print("well_num: ", well_num, percentage)
# 3. 打印字符进度条
progress_bar_num = self.progress_bar(well_num)
# 4. 完成的进度条
result = "\r%s %s" % (progress_bar_num, percentage)
return result
def percentage_number(self, now, total):
"""
计算百分比
:param now: 现在的数
:param total: 总数
:return: 百分
"""
return round(now / total * 100, self.decimal)
def progress_bar(self, num):
"""
显示进度条位置
:param num: 拼接的 “#” 号的
:return: 返回的结果当前的进度条
"""
# 1. "#" 号个数
well_num = "#" * num
# 2. 空格的个数
space_num = " " * (self.number - num)
return '[%s%s]' % (well_num, space_num)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Feb 24 17:21:29 2021
@author: rico
"""
import pandas as pd
import pymssql
from public import Index
#上传数据字典补充数据
def ComplicatedDataDict(path):
#创建新产品库链接
conn_zi_new = pymssql.connect(host='123.56.115.207', user='zgcprice3311',password='zgcprice20200628',database= 'ZI_NEW',autocommit=True)
cursor_zi_new = conn_zi_new.cursor()
df = pd.read_excel(path)
process_index = 0
index_ = Index()
for index,row in df.iterrows():
try:
print(index_(process_index,len(df)-1), end='%')
except:
print(index_(process_index,1), end='%')
process_index+=1
cursor_zi_new.execute(f"update ShuJuZiDian_Cfg set stdvalue = '{row['stdvalue']}',simplevalue = '{row['simplevalue']}' where id = {row['id']}")
conn_zi_new.close()
path = '/Users/rico/Downloads/台式机数据字典补充(1).xlsx'
ComplicatedDataDict(path)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Feb 23 00:36:24 2021
@author: rico
"""
import pandas as pd
import pymssql
from public import Index
def get_point_category_params_data(category):
category_sheet_name = category.replace("/","_")
#创建产品库链接
try:
conn_zi_new = pymssql.connect(host='172.17.9.129', user='zgcprice3311',password='zgcprice20200628',database= 'ZI_NEW',autocommit=True)
cursor = conn_zi_new.cursor()
conn_zdindex = pymssql.connect(host='172.17.9.129', user='zgcprice3311',password='zgcprice20200628',database= 'zdindex',autocommit=True)
cursor_zdindex = conn_zdindex.cursor()
except:
conn_zi_new = pymssql.connect(host='123.56.115.207', user='zgcprice3311',password='zgcprice20200628',database= 'ZI_NEW',autocommit=True)
cursor = conn_zi_new.cursor()
conn_zdindex = pymssql.connect(host='123.56.115.207', user='zgcprice3311',password='zgcprice20200628',database= 'zdindex',autocommit=True)
cursor_zdindex = conn_zdindex.cursor()
cursor.execute(f"select id,name from p_category where id not in (select distinct pid from p_category) and name in ('{category}')")
data = (cursor.fetchall())
export_category = pd.DataFrame(data,columns=[tuple[0] for tuple in cursor.description])
writer = pd.ExcelWriter(f"{category_sheet_name}参数确认.xlsx")
for category_code,category_name in zip(export_category['id'].tolist(),export_category['name'].tolist()):
print(f"开始提取{category_name}参数数据")
#获取产品信息
cursor.execute(f"select a.*,h.name as father_brand_name,d.name as brand_name,g.name as attr_name,f.value from p_sku a \
left join p_spu b \
on a.spuid = b.id \
left join p_category c \
on b.categoryid = c.id \
left join p_brand d \
on b.brandid = d.id \
left join p_skuvaluemap e \
on a.id = e.skuid \
left join p_skuvalue f \
on e.valueid = f.id \
left join p_skusubtitle g \
on f.subtitleid = g.id \
left join p_brand h \
on d.pid = h.id \
where b.categoryid = {category_code} and a.state in (1,2,4)")
data = (cursor.fetchall())
df_sku = pd.DataFrame(data,columns=[tuple[0] for tuple in cursor.description])
cursor.execute(f"select a.*,d.name as brand_name,g.name as attr_name,f.value from p_sku a \
left join p_spu b \
on a.spuid = b.id \
left join p_category c \
on b.categoryid = c.id \
left join p_brand d \
on b.brandid = d.id \
left join p_valuemap e \
on b.id = e.spuid \
left join p_value f \
on e.valueid = f.id \
left join p_subtitle g \
on f.subtitleid = g.id \
where b.categoryid = {category_code} and a.state in (1,2,4)")
data = (cursor.fetchall())
df_spu = pd.DataFrame(data,columns=[tuple[0] for tuple in cursor.description])
res = pd.DataFrame()
id_list = []
state_list = []
product_name_list = []
father_brand_list = []
brand_list = []
category_list = []
#param_list
cursor.execute(f"select * from vw_property where categoryid = {category_code}")
data = cursor.fetchall()
params_df = pd.DataFrame(data, columns=[tuple[0] for tuple in cursor.description])
params_df['needed_param'] = params_df['identy'].apply(lambda x: x[0])
params_df['standard_param'] = params_df['identy'].apply(lambda x: x[2])
params_df = params_df[(params_df['needed_param'] != '0') | (params_df['skuorspu'] == 'spu')]
params_df['subtitle'] = params_df['subtitle'].apply(lambda x: x.strip())
param_list = params_df['subtitle'].tolist()
for param in param_list:
param_var = '_' + ''.join(param.split()).replace('(','').replace(')','').replace('/','').replace('(','').replace(')','').replace('+','').replace('-','').replace('*','').replace('.','')
exec('%s_list=[]'%param_var)
process_index = 0
index = Index()
for prodcut_id in list(df_sku['sku'].unique()):
try:
print(index(process_index,len(list(df_sku['sku'].unique()))-1), end='%')
except:
print(index(process_index,1), end='%')
process_index+=1
id_list.append(prodcut_id)
state_list.append(df_sku[df_sku['sku'] == prodcut_id]['state'].tolist()[0])
product_name_list.append(df_sku[df_sku['sku'] == prodcut_id]['skuname'].tolist()[0])
father_brand_list.append(df_sku[df_sku['sku'] == prodcut_id]['father_brand_name'].tolist()[0])
brand_list.append(df_sku[df_sku['sku'] == prodcut_id]['brand_name'].tolist()[0])
category_list.append(category_name)
for param in param_list:
param_var = '_' + ''.join(param.split()).replace('(','').replace(')','').replace('/','').replace('(','').replace(')','').replace('+','').replace('-','').replace('*','').replace('.','')
try:
exec("%s_list.append(df_sku[(df_sku['sku'] == prodcut_id) & (df_sku['attr_name'] == '%s')]['value'].tolist()[0])"%(param_var,param))
except:
try:
exec("%s_list.append(df_spu[(df_spu['sku'] == prodcut_id) & (df_spu['attr_name'] == '%s')]['value'].tolist()[0])"%(param_var,param))
except:
exec("%s_list.append('无参数,需补充')"%param_var)
res['产品编码'] = id_list
res['产品状态'] = state_list
res['产品名称'] = product_name_list
res['产品父品牌'] = father_brand_list
res['产品品牌'] = brand_list
res['产品类别'] = category_list
for index,row in params_df.iterrows():
param = row['subtitle']
needed_flag = row['needed_param']
standard_flag = row['standard_param']
if param == '产品名称':
continue
param_var = '_' + ''.join(param.split()).replace('(','').replace(')','').replace('/','').replace('(','').replace(')','').replace('+','').replace('-','').replace('*','').replace('.','')
#标记标准项
if needed_flag == '1' and standard_flag == '1':
param = '*' + param
exec("res['%s']=%s_list"%(param,param_var))
res.to_excel(writer,f"{category_sheet_name}参数数据")
cursor_zdindex.execute(f"select goods_id,goods_name,platform_id,goods_url,index_price_wave from zd_week_price where periods in (select top 1 max(periods) from zd_entry_goods_price) and sub_category_code = {category_code} ")
data = (cursor_zdindex.fetchall())
price_df = pd.DataFrame(data,columns=[tuple[0] for tuple in cursor_zdindex.description])
price_df.to_excel(writer,f"{category_sheet_name}价格链接数据")
print(f"{category_name}数据导出完毕!")
writer.save()
conn_zi_new.close()
category_list = category_list = ['笔记本']
for category in category_list:
get_point_category_params_data(category)
\ No newline at end of file
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment