承影映射器

# 承影映射器 v0.5 ## (一)、诞生缘由 创星编译器的最终目的,是为了实现创星集团产品前后端开发的无代码化。让公司从“微服务”走向“中台化”的过程更加顺畅,真正的实现“中台之后无代码”。 但在编译的过程中,存在服务与服务之间出入参对接与调度的难点: - 1. 技术难点: 有模板时的服务出入参接口匹配与无模板时服务出入参的接口匹配; - 2. 业务难点:业务使用过程中会因为某些字段的差异性导致无法精准匹配,需要一套智能解析的算法实现。 因此,我们打造了创星映射器这一内核技术平台,用来智能匹配服务与服务出入参之间的接口对接与调度。 ## (二)、实现效果 全面提升了开发效率,最坏情况提高倍,最好情况提高倍。 全面改善产品质量和运行效率,最坏情况提速10%,最好情况提速1000%以上。 帮助推进创新中台在公司内部的推广和使用,为中台提供核心竞争力。 创星映射器当前可以实现全方位的数据类型间映射,我们在应对数据类型对接的时候,自定义了一款基于JSON格式名为TGDL(创星通用描述性语言)的中间体语言 ## (三)、实现困难 在做智能匹配时,数据集样本的不足,导致相似度无法完整的按照深度学习模型去计算,但当前可采取一些简单的阈值回归得到近似值。 计划任务: 实现完全自动mapping 优化代码结构,增强代码可读性 全自动/半自动生成服务的单元测试 ## (四)、未来方向 - 1.承影0.5版本(流程建模阶段):有简单的前后端开发功能,直接生成代码 - 2.承影1.0版本(流程建模阶段):加入项目的概念,将前后端打通 - 3.承影1.5版本时:进行扩张尝试,与纯均, 鱼肠整合,在承影流程设计器中引入规则建模 - 4.承影2.0版本时:进行全面扩张,在承影流程设计器中再引入数据建模、权限建模 - 5.承影2.5版本时:化繁为简,全面进行整合,将承影设计器全面融合到全开发环节可视化建模中 层级关系:承影设计器-流程建模-映射器 ## (五)、架构简图 ## (六)、核心代码实现案例 - 1. 出入参的模板 ```python #入参模板 argument_template = { "type": "Argument", "value": { "type": "Normal", "serviceNodeId": "", "serviceKeyName": "" } } #出参模板 result_template = { "type": "Result", "value": { "type": "Normal", "serviceNodeId": "", "serviceKeyName": "" } } #map类型模板 map_template = {"type": "Map", "value": {}} #列表类型模板 list_template = {"type": "List", "value": []} ``` - 2. 结点数据的哈希化 ```python async def take_node_hash(node: Union[mapping_query.QueryNode, mapping_entity.Node]) -> int: #当结点类型为入口(Entry)时,对结点的值进行哈希化 if node.type == mapping_entity.NodeType.Entry: return hash(node.type) #当结点类型为服务(Service)时,对结点属性的服务(service) ID和接口(api) ID两个值进行哈希化 elif node.type == mapping_entity.NodeType.Service: return hash(node.properties.service.id + "-" + node.properties.api.id) #当结点类型为SQL时,对结点的SQL语句进行哈希化 elif node.type == mapping_entity.NodeType.SQL: return hash(node.properties.command) ``` - 3. 是否需要智能解析的判定 ```Python #服务结点间的映射函数实现 async def get_service_mapping( from_nodes: List[mapping_query.QueryNode], to_node: mapping_query.QueryNode ) -> Dict[str, Any]: # 判断是否需要智能解析 ###结点为SQL类型时获取服务(service_name)和接口(interface_name)对应的SQL语句 if to_node.type is NodeType.SQL: repo = await repository_lib.get_registered_sql_service() doc = repo["index"].get(to_node.properties.command, None) ###结点为服务类型时获取服务的服务ID和接口ID elif to_node.type is NodeType.Service: repo = await repository_lib.get_registered_service() doc = repo.get(to_node.properties.service.id, {}).get(to_node.properties.api.id, None) # 如果没有找到模版,执行智能解析 if doc is None: return {"Guess": True, "Arguments": await guess_service_mapping(from_nodes, to_node)} # 遍历模板里的mapping映射内容 for mapping in doc.Mappings: depends = mapping.depends requires: Dict[int, str] = {await take_node_hash(v): k for k, v in depends.items()} provides: Dict[int, mapping_query.QueryNode] = {await take_node_hash(x): x for x in from_nodes} #取哈希化后的结点组成一个集合 difference = set(requires.keys()).difference(set(provides.keys())) print(requires, provides) #如果结果为空,则将JSON模板转为python对象并返回 if not difference: template = Template(json.dumps(mapping.template)) template_mapping = {} for h in requires: template_mapping[requires[h]] = provides[h].id return {"Guess": "False", "Arguments": json.loads(template.substitute(template_mapping))} #如果有结果,则返回找不到匹配的映射 raise RuntimeException("找不到匹配的映射") ``` - 4. 获取JSON格式文档数据的路径 ```Python async def construct_result(path: List[str], argument: Dict, res: Any): # 当 path 为空时,停止递归 if not path: return # 若 path 不为空,则进行递归 elif len(path) >= 1: #删除路径里的第一个元素 curr_path = path.pop(0) # res[0]["Value"]["Code"] res["0"]["Value"]["Code"] # res "0:Int/Value/Code" # 0, #如果当前路径里包含了:且数据类型为int时 if ":" in curr_path and curr_path.split(":")[1].lower().startswith("int"): curr_path = int(curr_path.split(":")[0]) # TGDL: {"type":"Nothing"} -> None/null # TGDL: {"type":"Map/List/Integer", "value":"{}/[]/1" # 如果结果集里没有value值或者值的数据类型不为列表则按照列表模板进行更新 if res["type"] == "Nothing" or res["type"] != "List": res.update(list_template.copy()) else: # 如果结果集里没有value值或者值的数据类型不为字典则按照map模板进行更新 if "value" not in res or type(res["value"]) is not dict: res.update(map_template.copy()) #当pop后的路径的长度为0时,如果res的值为列表,则加到value里面,否则直接赋值给curr_path if len(path) == 0: if type(res["value"]) is list: res["value"].append(argument) else: res["value"][curr_path] = argument #再对路径更深层级里面的值进行递归 else: #如果值的类型为列表时,则用append加到该列表里去,再递归调用该方法 if type(res["value"]) is list: if len(res["value"]) < curr_path + 1: res["value"].append({}) await construct_result(path, argument, res["value"][-1]) #如果当前路径里没有值时,则将其初始化,再递归调用该方法 else: if curr_path not in res["value"]: res["value"][curr_path] = {} await construct_result(path, argument, res["value"][curr_path]) ``` - 5. 路径递归算法逻辑 ```Python #路径递归算法 async def get_available_keys(obj: Union[Dict[str, Any], List]) -> List[str]: # 如果对象为空或者对象里的type为Nothing,则返回空 if not obj or obj.get("type", None) == "Nothing": return [] async def check_primitive(curr_obj: Dict[str, Any]) -> bool: # 如果value不是一个字典,那么就代表不用递归下去了 obj_type = type(curr_obj.get("value", None)) # 返回对象类型为不是字典也不是列表 return obj_type is not dict and obj_type is not list async def get_available_keys_helper(curr_obj: Dict[str, Any], result: set = set(), path: str = "") -> MutableSet: # Base case: 当遇到Primitive就跳过 is_primitive = await check_primitive(curr_obj) if is_primitive: return set([path]) # 如果不是 primitive 那就继续检查 # 如果是一个map对象,则遍历它的键,递归返回path/key格式的路径,如:"data/Var1" if curr_obj["type"] == "Map": for key in curr_obj["value"]: next_obj = curr_obj["value"][key] if path: new_path = "{}/{}".format(path, key) else: new_path = key result = result.union(await get_available_keys_helper(next_obj, result.copy(), new_path)) # 如果是一个列表,则用enumerate组合为一个索引序列,遍历它的键,递归返回path/i 格式的路径,如:["Code", "Message", "Data/Var1", "Data/Var2"]中的"Data/Var2" elif curr_obj["type"] == "List": for i, next_obj in enumerate(curr_obj["value"]): if path: new_path = "{}/{}:int".format(path, i) else: new_path = path result = result.union(await get_available_keys_helper(next_obj, result.copy(), new_path)) return result #调用这个方法,用列表类型返回 keys = await get_available_keys_helper(obj) return list(keys) ``` - 6.模板智能匹配的相似度阈值选取 ```python async def guess_service_mapping(from_nodes: List[mapping_query.QueryNode], to_node: mapping_query.QueryNode) -> Dict[str, Any]: # 获取所有可以用的键 available_sources = {} # 遍历结点,返回入参和出参的路径 for node in from_nodes: if node.properties.arguments is not None: keys = await get_available_keys(node.properties.arguments) for key in keys: available_sources[key] = {"type": "Argument", "id": node.id} if node.properties.results is not None: keys = await get_available_keys(node.properties.results) for key in keys: available_sources[key] = {"type": "Result", "id": node.id} # 需要的键 required_keys = await get_available_keys(to_node.properties.arguments) # 拼接参数 argument = {} # resquired_keys 就是一连串的 路径, 比如 ["Code", "Message","Data/Var1", "Data/Var1"] # required_key 就是一个路径,比如 "Data/Var1" for required_key in required_keys: #调用difflib包,其作用是比对文本之间的差异,返回一个分数列表 scores = [ (difflib.SequenceMatcher(None, required_key.split("/")[-1].lower(), available_source.split("/")[-1].lower()).quick_ratio(), available_source) for available_source in available_sources] #类型默认的模板 template = {"type": "Nothing"} if len(scores) > 0: # 阈值 取中位数的1.3倍 threshold = max(0.5, statistics.median(map(lambda x: x[0], scores)) * 1.3) # 筛选出大于阈值的分数,返回一个列表 filtered_scores = list(filter(lambda x: x[0] > threshold, scores)) if len(filtered_scores) > 0: best = max(scores, key=lambda x: x[0])[1] source = available_sources[best] if source["type"] == "Result": template = result_template else: template = argument_template template["value"]["serviceNodeId"] = source["id"] template["value"]["serviceKeyName"] = best path = re.split(r"[/.]", required_key) argument[required_key] = copy.deepcopy(template) # 根据这个路径,去更新to_node.properties.arguments里面的内容 await construct_result(path, argument[required_key], to_node.properties.arguments) return to_node.properties.arguments ``` ## (七)、部分文件结构 ![image.png](https://cos.easydoc.net/92634618/files/kne7s3bm.png)