从Milvus迁移DashVector

DashVector · · 221 次点击 · · 开始浏览    

<font style="color:rgb(24, 24, 24);">本文档演示如何从Milvus将Collection数据全量导出,并适配迁移至DashVector。方案的主要流程包括:</font> 1. 首先,升级Milvus版本,目前Milvus只有在最新版本(v.2.3.x)中支持全量导出 2. 其次,将Milvus Collection的Schema信息和数据信息导出到具体的文件中 3. 最后,以导出的文件作为输入来构建DashVector Collection并数据导入 下面,将详细阐述迁移方案的具体操作细节。 <h1 id="Jgv8L">Milvus升级2.3.x版本</h1> 本文中,我们将借助Milvus的[query_iterator](https://milvus.io/docs/with_iterators.md)来全量导出数据([query](https://milvus.io/docs/query.md)接口无法导出完整数据),由于该接口目前只在v2.3.x版本中支持,所以在导出数据前,需要先将Milvus版本升级到该版本。Milvus版本升级的详细操作参考[Milvus用户文档](https://milvus.io/docs/upgrade_milvus_standalone-operator.md)。 :::info 注意:在进行Milvus Upgrade时需要注意数据的备份安全问题。 ::: <h1 id="oRmlt">Milvus全量数据导出</h1> 数据的导出包含Schema以及数据记录,Schema主要用于完备地定义Collection,数据记录对应于每个Partition下的全量数据,这两部分涵盖了需要导出的全部数据。下文展示如何将单个Milvus Collection全量导出。 <h2 id="bb950">Schema导出</h2> DashVector和Milvus在Schema的设计上有一些区别,DashVector向用户透出的接口非常简单,Milvus则更加详尽。从Milvus迁移DashVector时会涉及到部分Schema参数的删除(例如Collection的index_param参数),只会保留DashVector构建Collection的必要参数,以下为一个Schema转换的简单示例(其中,Collection已有的数据参考[Milvus示例代码](https://raw.githubusercontent.com/zilliztech/milvus-backup/main/example/prepare_data.py)写入)。 ```python from pymilvus import ( connections, utility, Collection, DataType ) import os import json from pathlib import Path fmt = "\n=== {:30} ===\n" print(fmt.format("start connecting to Milvus")) host = os.environ.get('MILVUS_HOST', "localhost") print(fmt.format(f"Milvus host: {host}")) connections.connect("default", host=host, port="19530") metrics_map = { 'COSINE': 'cosine', 'L2': 'euclidean', 'IP': 'dotproduct', } dtype_map = { DataType.BOOL: 'bool', DataType.INT8: 'int', DataType.INT16: 'int', DataType.INT32: 'int', DataType.INT64: 'int', DataType.FLOAT: 'float', DataType.DOUBLE: 'float', DataType.STRING: 'str', DataType.VARCHAR: 'str', } def load_collection(collection_name: str) -> Collection: has = utility.has_collection(collection_name) print(f"Does collection hello_milvus exist in Milvus: {has}") if not has: return None collection = Collection(collection_name) collection.load() return collection def export_collection_schema(collection, file: str): schema = collection.schema.to_dict() index = collection.indexes[0].to_dict() export_schema = dict() milvus_metric_type = index['index_param']['metric_type'] try: export_schema['metrics'] = metrics_map[milvus_metric_type] except: raise Exception(f"milvus metrics_type{milvus_metric_type} not supported") export_schema['fields_schema'] = {} for field in schema['fields']: if 'is_primary' in field and field['is_primary']: continue if field['name'] == index['field']: # vector if field['type'] == DataType.FLOAT_VECTOR: export_schema['dtype'] = 'float' export_schema['dimension'] = field['params']['dim'] else: raise Exception(f"milvus dtype{field['type']} not supported yet") else: try: # non-vector export_schema['fields_schema'][field['name']] = dtype_map[field['type']] except: raise Exception(f"milvus dtype{field['type']} not supported yet") with open(file, 'w') as file: json.dump(export_schema, file, indent=4) if __name__ == "__main__": collection_name = "YOUR_MILVUS_COLLECTION_NAME" collection = load_collection(collection_name) dump_path_str = collection_name+'.dump' dump_path = Path(dump_path_str) dump_path.mkdir(parents=True, exist_ok=True) schema_file = dump_path_str + "/schema.json" export_collection_schema(collection, schema_file) ``` 以下是一个可用于创建DashVector Collection的schema文件示例。 ```json { "metrics": "euclidean", "fields_schema": { "random": "float", "var": "str" }, "dtype": "float", "dimension": 8 } ``` <h2 id="XBXQa">Data导出</h2> DashVector和Milvus在设计上都有Partition的概念,所以向量以及其他数据进行导出时,需要注意按照Partition粒度进行导出。此外,DashVector的主键类型为str,而Milvus设计其为自定义类型,所以在导出时需要考虑主键类型的转换。以下为一个基于[query_iterator](https://milvus.io/docs/with_iterators.md)接口导出的简单代码示例: ```python from pymilvus import ( connections, utility, Collection, DataType ) import os import json import numpy as np from pathlib import Path fmt = "\n=== {:30} ===\n" print(fmt.format("start connecting to Milvus")) host = os.environ.get('MILVUS_HOST', "localhost") print(fmt.format(f"Milvus host: {host}")) connections.connect("default", host=host, port="19530") pk = "pk" vector_field_name = "vector" def load_collection(collection_name: str) -> Collection: has = utility.has_collection(collection_name) print(f"Does collection hello_milvus exist in Milvus: {has}") if not has: return None collection = Collection(collection_name) collection.load() return collection def export_partition_data(collection, partition_name, file: str): batch_size = 10 output_fields=["pk", "random", "var", "embeddings"] query_iter = collection.query_iterator( batch_size=batch_size, output_fields = output_fields, partition_names=[partition_name] ) export_file = open(file, 'w') while True: docs = query_iter.next() if len(docs) == 0: # close the iterator query_iter.close() break for doc in docs: new_doc = {} new_doc_fields = {} for k, v in doc.items(): if k == pk: # primary key new_doc['pk'] = str(v) elif k == vector_field_name: new_doc['vector'] = [float(k) for k in v] else: new_doc_fields[k] = v new_doc['fields'] = new_doc_fields json.dump(new_doc, export_file) export_file.write('\n') export_file.close() if __name__ == "__main__": collection_name = "YOUR_MILVUS_COLLECTION_NAME" collection = load_collection(collection_name) pk = collection.schema.primary_field.name vector_field_name = collection.indexes[0].field_name dump_path_str = collection_name+'.dump' dump_path = Path(dump_path_str) dump_path.mkdir(parents=True, exist_ok=True) for partition in collection.partitions: partition_name = partition.name if partition_name == '_default': export_path = dump_path_str + '/default.txt' else: export_path = dump_path_str + '/' + partition_name + ".txt" export_partition_data(collection, partition_name, export_path) ``` 上述示例代码会将Milvus Collection的各个Partition分别进行数据导出,导出后的文件结构如下图所示: ```latex # collection_name = hello_milvus hello_milvus.dump/ ├── default.txt └── schema.json ``` <h1 id="xtXQm">将数据导入DashVector</h1> <h2 id="Z0YPA">创建Cluster</h2> 参考DashVector官方[用户手册](https://help.aliyun.com/document_detail/2631966.html?spm=a2c4g.2631965.0.0.33485425aqhYvz)构建Cluster。 <h2 id="VEdmA">创建Collection</h2> 根据2.1章节中导出的Schema信息以及参考Dashvector官方[用户手册](https://help.aliyun.com/document_detail/2568085.html?spm=a2c4g.2631966.0.0.153c1afcNYc6rW)来创建Collection。下面的示例代码会根据2.1章节中导出的schema.json来创建一个DashVector的Collection。 ```python from dashvector import Client, DashVectorException from pydantic import BaseModel from typing import Dict, Type import json dtype_convert = { 'int': int, 'float': float, 'bool': bool, 'str': str } class Schema(BaseModel): metrics: str dtype: Type dimension: int fields_schema: Dict[str, Type] @classmethod def from_dict(cls, json_data): metrics = json_data['metrics'] dtype = dtype_convert[json_data['dtype']] dimension = json_data['dimension'] fields_schema = {k: dtype_convert[v] for k, v in json_data['fields_schema'].items()} return cls(metrics=metrics, dtype=dtype, dimension=dimension, fields_schema=fields_schema) def read_schema(schema_path) -> Schema: with open(schema_path) as file: json_data = json.loads(file.read()) return Schema.from_dict(json_data) if __name__ == "__main__": milvus_dump_path = f"{YOUR_MILVUS_COLLECTION_NAME}.dump" milvus_dump_scheme_path = milvus_dump_path + "/schema.json" schema = read_schema(milvus_dump_scheme_path) client = dashvector.Client( api_key='YOUR_API_KEY', endpoint='YOUR_CLUSTER_ENDPOINT' ) # create collection rsp = client.create(name="YOUR_DASHVECTOR_COLLECTION_NAME", dimension=schema.dimension, metric=schema.metrics, dtype=schema.dtype, fields_schema=schema.fields_schema) if not rsp: raise DashVectorException(rsp.code, reason=rsp.message) ``` <h2 id="PiA4M">导入Data</h2> 根据2.2章节中导出的数据以及参考DashVector官方[用户手册](https://help.aliyun.com/document_detail/2510249.html?spm=a2c4g.2510248.0.0.49ef7738NuI0kM#aa59e950508ld)来批量插入Doc。下面的示例代码会依次解析各个Partition导出的数据,然后依次创建DashVector下的Partition并导入数据。 ```python from dashvector import Client, DashVectorException, Doc from pydantic import BaseModel from typing import Dict, Type import json import glob from pathlib import Path def insert_data(collection, partition_name, partition_file): if partition_name != 'default': rsp = collection.create_partition(partition_name) if not rsp: raise DashVectorException(rsp.code, reason=rsp.message) with open(partition_file) as f: for line in f: if line.strip(): json_data = json.loads(line) rsp = collection.insert( [ Doc(id=json_data['pk'], vector=json_data['vector'], fields=json_data['fields']) ] ) if not rsp: raise DashVectorException(rsp.code, reason=rsp.message) if __name__ == "__main__": milvus_dump_path = f"{YOUR_MILVUS_COLLECTION_NAME}.dump" client = dashvector.Client( api_key='YOUR_API_KEY', endpoint='YOUR_CLUSTER_ENDPOINT' ) # create collection collection = client.get("YOUR_DASHVECTOR_COLLECTION_NAME") partition_files = glob.glob(milvus_dump_path+'/*.txt', recursive=False) for partition_file in partition_files: # create partition partition_name = Path(partition_file).stem insert_data(collection, partition_name, partition_file) ```

有疑问加站长微信联系(非本文作者))

入群交流(和以上内容无关):加入Go大咖交流群,或添加微信:liuxiaoyan-s 备注:入群;或加QQ群:692541889

221 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传