文档迁移
- 1.为什么要进行 reindex 操作
- 2.Reindex 操作的本质
- 3.实际案例
- 3.1 同集群索引之间的全量数据迁移
- 3.2 同集群索引之间基于特定条件的数据迁移
- 3.2.1 源索引设置检索条件
- 3.2.2 基于 script 脚本的索引迁移
- 3.2.3 基于预处理管道的数据迁移
- 3.3 不同集群之间的索引迁移
- 3.4 查看及取消 reindex 任务
- 3.4.1 查看 reindex 任务
- 3.4.2 取消 reindex 任务
- 4.注意事项
1.为什么要进行 reindex 操作
Reindex 是 Elasticsearch 中一种将数据从一个索引复制到另一个索引的操作,主要用途包括:
- 索引结构变更:当需要修改映射设置,但无法直接更新现有索引时。
- 数据迁移:将数据从一个索引/集群迁移到另一个索引/集群。
- 数据转换:在迁移过程中对数据进行修改或过滤。
- 分片优化:调整分片数量或分片策略。
- 版本升级:跨大版本升级时重建索引。
2.Reindex 操作的本质
Reindex 本质上是 Elasticsearch 内部的一个 数据复制 过程,它:
- 从源索引读取文档。
- 可选地对文档进行转换。
- 将文档写入目标索引。
- 不是简单的文件复制,而是重新索引文档的过程。
3.实际案例
3.1 同集群索引之间的全量数据迁移
场景:将 old_index
的所有数据迁移到新建的 new_index
,因为需要修改分片数量。
生成测试数据。
// 创建源索引 old_index
PUT old_index
{"mappings": {"properties": {"name": { "type": "text" },"age": { "type": "integer" },"email": { "type": "keyword" }}}
}// 批量插入测试数据
POST old_index/_bulk
{"index":{}}
{"name":"John Doe","age":28,"email":"john@example.com"}
{"index":{}}
{"name":"Jane Smith","age":32,"email":"jane@example.com"}
{"index":{}}
{"name":"Bob Johnson","age":45,"email":"bob@example.com"}
{"index":{}}
{"name":"Alice Brown","age":23,"email":"alice@example.com"}
{"index":{}}
{"name":"Tom Wilson","age":37,"email":"tom@example.com"}
执行迁移操作。
POST _reindex
{"source": {"index": "old_index"},"dest": {"index": "new_index"}
}
说明:这是最基本的 reindex 操作,将源索引所有文档复制到目标索引。
3.2 同集群索引之间基于特定条件的数据迁移
3.2.1 源索引设置检索条件
场景:只迁移 old_index
中 status
字段为 active
的文档。
生成测试数据。
// 创建带 status 字段的索引
PUT status_index
{"mappings": {"properties": {"name": { "type": "text" },"status": { "type": "keyword" },"value": { "type": "integer" }}}
}// 批量插入测试数据
POST status_index/_bulk
{"index":{}}
{"name":"Item 1","status":"active","value":100}
{"index":{}}
{"name":"Item 2","status":"inactive","value":200}
{"index":{}}
{"name":"Item 3","status":"active","value":150}
{"index":{}}
{"name":"Item 4","status":"pending","value":300}
{"index":{}}
{"name":"Item 5","status":"active","value":250}
执行迁移操作。
POST _reindex
{"source": {"index": "status_index","query": {"term": {"status": "active"}}},"dest": {"index": "status_index_new"}
}
3.2.2 基于 script 脚本的索引迁移
场景:迁移时修改字段,例如将 price
字段值增加 10 % 10\% 10%。
生成测试数据。
// 创建产品索引
PUT products
{"mappings": {"properties": {"name": { "type": "text" },"price": { "type": "double" },"category": { "type": "keyword" }}}
}// 批量插入测试数据
POST products/_bulk
{"index":{}}
{"name":"Laptop","price":1000.00,"category":"electronics"}
{"index":{}}
{"name":"Smartphone","price":700.00,"category":"electronics"}
{"index":{}}
{"name":"Desk Chair","price":150.00,"category":"furniture"}
{"index":{}}
{"name":"Coffee Mug","price":10.00,"category":"kitchen"}
{"index":{}}
{"name":"Notebook","price":5.00,"category":"stationery"}
执行迁移操作。
POST _reindex
{"source": {"index": "products"},"dest": {"index": "products_new"},"script": {"source": "ctx._source.price *= 1.10"}
}
3.2.3 基于预处理管道的数据迁移
场景:在迁移过程中使用预处理管道处理数据,例如添加时间戳。
生成测试数据。
// 创建原始日志索引
PUT raw_logs
{"mappings": {"properties": {"message": { "type": "text" },"level": { "type": "keyword" },"source": { "type": "keyword" }}}
}// 批量插入测试日志数据
POST raw_logs/_bulk
{"index":{}}
{"message":"User logged in","level":"INFO","source":"auth-service"}
{"index":{}}
{"message":"Failed authentication attempt","level":"WARN","source":"auth-service"}
{"index":{}}
{"message":"Database connection lost","level":"ERROR","source":"db-service"}
{"index":{}}
{"message":"Cache refreshed successfully","level":"INFO","source":"cache-service"}
{"index":{}}
{"message":"High memory usage detected","level":"WARN","source":"monitoring-service"}
执行迁移操作。
PUT _ingest/pipeline/add_timestamp
{"description": "Adds a timestamp to documents","processors": [{"set": {"field": "@timestamp","value": "{{_ingest.timestamp}}"}}]
}POST _reindex
{"source": {"index": "raw_logs"},"dest": {"index": "raw_logs_new","pipeline": "add_timestamp"}
}
3.3 不同集群之间的索引迁移
场景:将集群 A 的 cluster_a_index
迁移到集群 B 的 cluster_b_index
。
生成测试数据。
// 创建模拟跨集群迁移的源索引
PUT cluster_a_index
{"mappings": {"properties": {"title": { "type": "text" },"views": { "type": "integer" },"published": { "type": "date" }}}
}// 批量插入测试数据
POST cluster_a_index/_bulk
{"index":{}}
{"title":"Introduction to Elasticsearch","views":1500,"published":"2023-01-15"}
{"index":{}}
{"title":"Kibana Dashboard Tutorial","views":3200,"published":"2023-02-20"}
{"index":{}}
{"title":"Advanced Logstash Pipelines","views":875,"published":"2023-03-10"}
{"index":{}}
{"title":"Elasticsearch Performance Tuning","views":2100,"published":"2023-04-05"}
{"index":{}}
{"title":"Machine Learning with ELK","views":1800,"published":"2023-05-12"}
在集群 B 上执行迁移操作。
// 在集群 B 上执行
POST _reindex
{"source": {"remote": {"host": "http://clusterA:9200","username": "admin","password": "xxxxxx"},"index": "cluster_a_index"},"dest": {"index": "cluster_b_index"}
}
注意事项(非常重要):
- ⭐ 需要配置远程集群白名单。
elasticsearch.yml
中的reindex.remote.whitelist
配置项。
- ⭐ 网络连接必须畅通,比如:
- 是否在同一 VPC 下;
- 以何种方式访问,HTTP 还是 HTTPS(如果是 HTTPS,可能需要配置证书)。
- ⭐ 大数据量迁移建议使用快照/恢复方式更高效。
3.4 查看及取消 reindex 任务
生成测试数据。
// 创建一个大索引用于测试长时间运行的 reindex 任务
PUT large_source_index
{"mappings": {"properties": {"data": { "type": "text" },"counter": { "type": "integer" }}}
}// 批量插入大量测试数据(1000条)
POST _scripts/generate_large_data
{"script": {"lang": "painless","source": """def bulk = new StringBuilder();for (int i = 0; i < 1000; i++) {bulk.append('{"index":{}}\n');bulk.append('{"data":"Test data ' + i + '","counter":' + i + '}\n');}return bulk.toString();"""}
}POST large_source_index/_bulk
{ "script": { "id": "generate_large_data" } }
3.4.1 查看 reindex 任务
GET _tasks?detailed=true&actions=*reindex
3.4.2 取消 reindex 任务
POST _tasks/{task_id}/_cancel
示例:
// 先查看任务
GET _tasks?detailed=true&actions=*reindex// 返回结果中获取任务ID后取消
POST _tasks/oTUltX4IQMOUUVeiohTt8A:12345/_cancel
4.注意事项
- Reindex 会占用大量资源,建议在低峰期执行
- 大数据量 reindex 建议使用
slices
并行处理 - 可以使用
wait_for_completion=false
异步执行 - 监控任务进度,避免影响集群性能
- 考虑版本兼容性问题,特别是跨大版本迁移时
Reindex 是 Elasticsearch 数据管理的重要工具,合理使用可以解决许多数据迁移和结构调整问题。