Clickhouse源码分析-副本数据同步

1 总体流程

上图说明了一条insert语句最后如何被副本同步到的流程(图中ck集群为单shard,双副本)。

(1)从客户端发出,写入ck

(2)ck提交LogEntry到Keeper

(3)另外一个副本从Keeper拉取LogEntry到本地执行

2 参数说明

此部分介绍以下整个链路涉及的一些参数。

mergetree settings:

1.zookeeper_session_expiration_check_period

检查keeper session是否到期,每个以上参数的时间检查一次,默认为60S:

每个引擎为Replicated的MergeTree表在启动的时候,会运行以下任务来检查与keeper 之间的session是否过期。

创建复制表时,内核会启动这个复制表的引擎,

之后在ReplicatedMergeTreeRestartingThread::runImpl()中启动各种后台调度任务:

(1)background_operations_assignee:执行merge,fetch操作

(2)merge_selecting_task:主要功能为选择合并的part

(3)cleanup_thread:线程,清理过期part等

这些任务的调度有点任务内递归的感觉:

都是任务执行的最后在继续重复上个任务(只是任务的内容不一样)。

2.max_insert_delayed_streams_for_parallel_write

当part所在的存储系统支持并行写入时,这个参数默认为100,否则为0。

3.distributed_ddl_task_timeout

设置来自集群中所有主机的 DDL 查询响应的超时时间。如果某个 DDL 请求未能在所有主机上执行完成,响应中将包含一个 timeout 错误,并且该请求将以异步模式执行。负值表示无限超时时间。

3 示例表结构

db:

CREATE DATABASE replicated_db
ENGINE = Replicated('/clickhouse/databases/replicated_db', '{shard}', '{replica}')

table:

CREATE TABLE replicated_db.replicated_table
(
`id` UInt64,
`event_time` DateTime,
`event_type` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated_table', '{replica}')
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, id)
SETTINGS index_granularity = 8192

1 单节点生成LogEntry

这里我们忽略掉词语法解析,优化器,计划生成层以及执行层的部分算子,直接来到写数据到磁盘以及提交LogEntry的算子 - ReplicatedMergeTreeSinkImpl。

这里的输入参数chunk就是插入的数据在内存中的组织结构。

在ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk & chunk)中,主要有以下步骤:

1.将插入的数据通过分区键拆成part,此过程通过MergeTreeDataWriter::splitBlockIntoParts完成

2.遍历每一个拆分出来的part

(1)通过writeNewTempPart将这个拆分出来的part写到临时目录中。

(2)在这个分支,提交写入的part到keeper中:

如果开启了并行写入,part会攒够一定的数量后,整体提交到Keeper上,这个默认数量为100。

2 提交LogEntry到Keeper

2.1 提交重试的参数控制

1.insert_keeper_max_retries 

insert_keeper_max_retries 参数控制向复制表(Replicated MergeTree)插入数据时,对 ClickHouse Keeper(或 ZooKeeper)请求的最大重试次数。默认值为20。

只有以下三种错误会触发重试:

(1)network error

(2)Keeper session timeout

(3)request timeout

2.insert_keeper_retry_initial_backoff_ms 

insert_keeper_retry_initial_backoff_ms 参数定义了在INSERT执行期间,对失败的Keeper(ZooKeeper)请求进行重试时的初始退避等待时间(毫秒)。默认值为100ms。

3.insert_keeper_retry_max_backoff_ms 

insert_keeper_retry_max_backoff_ms 参数设定了在 INSERT 查询执行期间,对失败的 Keeper/ZooKeeper 请求进行重试时的最大退避等待时间上限(毫秒)。默认值为10s。

2.2 提交流程

注意这里提交的并不是写入的数据,而是写入part的元信息。

提交主要通过ReplicatedMergeTreeSinkImpl<async_insert>::commitPart完成。

block_id_path

/clickhouse/tables/s1/replicated_table/blocks/202507_12141418273484448060_16681511374997932159

retries_ctl.retryLoop为提交的驱动:

提交的状态转化通过stage_switcher这个匿名函数完成:

初始时retry_context.stage为LOCK_AND_COMMIT,所以进入commit_new_part_stage:

commit_new_part_stage主要做了以下几件事:

(1)设置要提交part的基本信息,例如block_number,part 名。对于New Part来说,block_number在一个复制表引擎中是全局递增的。

(2)构造要在Keeper上执行的请求,例如

构造在Keeper上创建的LogEntry的请求,通过get_logs_ops完成。对于一个New Part来说,这个LogEntry的type为GET_PART,还包括其他的一些信息,例如:

  • create_time:创建时间
  • source_replica:哪个副本创建的这个part

  • new_part_name:part名

等等。最后将这个LogEntry封装为一个CreateRequest。

一次Part的提交会带着很多其他的请求:

RemoveRequest有:

其他的CreateRequest有:

get_quorum_ops只有在副本大于2时,会有携带请求。

getCommitPartOps中的CreateRequest:

(3)开启事务,在提交LogEntry到Keeper失败时,回滚,进行状态的恢复

(4)将LogEntry发送到Keeper上

由于是多个请求,所以会调用ZooKeeper::multiImpl

此处流程,可用下图表示(如果是写请求达到了follower,follower会转发给leader): 

非阻塞等待异步操作结果,最大等待时间为args.operation_timeout_ms毫秒

操作超时时间的参数,Coordination/CoordinationSettings.cpp

默认值为10S,Common/ZooKeeper/ZooKeeperConstants.h

3 副本拉取LogEntry

3.1 问题记录

问题1:创建表报Session was killed

这个问题可以跳过,暂时采用另一种方法解决,在此保留,以后有时间了继续追。

创建表时报错:Coordination::Exception: Session was killed

原因时,长时间未操作,ch-client与Keeper之间的session断开。

但是这有一个问题是:虽然创建表失败,但是建表的元信息可能会提交到Keeper上。

此时你会发现,虽然这个库并没有这个表,但是无法创建:

再次创建表报错如下:

此时可以使用以下语句剔除在keeper上的元信息:

SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/s1/replicated_table';

剔除在keeper上的元信息后,再次创建表,会发现此时会卡在创建这里:

之后翻看副本2的日志,发现副本2之前已经拉到了replicated_table这张表,并为它创建了数据目录。

解决:去副本2上删除对应得表目录

此时,会发现replicated_table表已经成功创建。

删除表同样有这个问题:


最终解决需要调整session超时时间。根因不是这个参数。以下继续分析:

其中code为:

下一步调试Keeper看为什么会有这个错误码。

这个错误码的设置位置:

(1)KeeperStateMachine<Storage>::processReconfiguration

(2)各个预处理不同请求的地方,preprocess

TODO:比较怀疑是不是我的两个ck使用的是不同版本的问题

这个问题最后没追下去,暂时只知道报错大概位置。

问题2:关于副本同步part失败的问题记录

此时在副本r1上的replicated_table有一个part为202507_0_9_3。

在副本2在同步这个part的过程中,虽然它从keeper上取到了这个LogEnetry:

但是一直报错,并且从num_tries可以得知,这个任务已经重试了22次了。

日志中的报错提示:

没有配置这个参数interserver_http_host

keeper上存副本1的replicated_table这个表的part的地方:/clickhouse/tables/s1/replicated_table/replicas/r1/parts

调整完之后,两个副本的parts目录下内容一致:

3.2 拉取LogEntry任务启动

一段核心注释:(Storages\StorageReplicatedMergeTree.h)

/** The replicated tables have a common log (/log/log-...).

  * Log - a sequence of entries (LogEntry) about what to do.

  * Each entry is one of:

  * - normal data insertion (GET),

  * - data insertion with a possible attach from local data (ATTACH),

  * - merge (MERGE),

  * - delete the partition (DROP).

  *

  * Each replica copies (queueUpdatingTask, pullLogsToQueue) entries from the log to its queue (/replicas/replica_name/queue/queue-...)

  *  and then executes them (queueTask).

  * Despite the name of the "queue", execution can be reordered, if necessary (shouldExecuteLogEntry, executeLogEntry).

  * In addition, the records in the queue can be generated independently (not from the log), in the following cases:

  * - when creating a new replica, actions are put on GET from other replicas (createReplica);

  * - if the part is corrupt (removePartAndEnqueueFetch) or absent during the check

  *   (at start - checkParts, while running - searchForMissingPart), actions are put on GET from other replicas;

  *

  * The replica to which INSERT was made in the queue will also have an entry of the GET of this data.

  * Such an entry is considered to be executed as soon as the queue handler sees it.

  *

  * The log entry has a creation time. This time is generated by the clock of server that created entry

  * - the one on which the corresponding INSERT or ALTER query came.

  *

  * For the entries in the queue that the replica made for itself,

  * as the time will take the time of creation the appropriate part on any of the replicas.

  */

解释如下:

所有副本共享一个日志目录 /log/log-...,每个日志项(LogEntry)描述一项操作。

  • 这个“日志”是指 ZooKeeper 中的节点 /log/log-0000001, /log/log-0000002 等。

  • 所有的副本会从这个共享日志中拉取操作(如插入、合并、删除等)。

日志项类型包括:(定义在Storages\MergeTree\ReplicatedMergeTreeLogEntry.h)

  • GET:常规插入数据;

  • ATTACH:插入数据时可能会使用本地已有的数据;

  • MERGE:后台合并多个 part;

  • DROP:删除某个分区的数据。

每个副本会把这些日志项复制到自己的执行队列中/replicas/<replica_name>/queue/queue-00000...),通过:

  • queueUpdatingTask(周期性任务)

  • pullLogsToQueue()(从 /log/ 拉取 log 到 /queue/

副本随后会启动后台线程执行队列里的任务(queueTask())。

虽然叫“队列”,但实际上执行顺序可以根据依赖进行重排(由 shouldExecuteLogEntry() 控制依赖,决定某 entry 是否可执行)。

举个例子,如果 MERGE 依赖的 part 还没拉取完成,就会延后执行。

某些队列任务并非从日志而来,而是副本本地生成的,比如:

  • 创建新副本时,会向队列中加入从其他副本 GET 所有已有 part 的任务;

如果发现某个 part 损坏(removePartAndEnqueueFetch)或缺失(启动时用 checkParts(),运行时用 searchForMissingPart()),
也会生成 GET 请求从其他副本拉取缺失的 part

即使某个副本自己是写入的目标,它也会有一个 GET 类型的 entry 表示这次插入。
这类 entry 在队列中会立即视为“已完成”,因为本地已经有数据,不需要再拉取。

日志项有创建时间戳,这个时间由“发起该写入的server”产生(即 INSERT / ALTER 语句在哪个副本执行)。

对于某个副本自己给自己生成的队列项(比如 GET 缺失 part),会使用已有副本上该 part 的创建时间作为时间戳。


正如前文提到的当创建一个引擎为Replicated族的表时,内核会启动这个复制表引擎,之后在ReplicatedMergeTreeRestartingThread::runImpl()中启动各种后台任务,拉取LogEntry的任务也在这个地方调度:

这个任务的主要内容如下所示:(核心为queue.pullLogsToQueue)

void StorageReplicatedMergeTree::queueUpdatingTask()

{

    if (!queue_update_in_progress)

    {

        last_queue_update_start_time.store(time(nullptr));

        queue_update_in_progress = true;

    }

    try

    {

        auto zookeeper = getZooKeeperAndAssertNotStaticStorage();

        if (is_readonly)

        {

            /// Note that we need to check shutdown_prepared_called, not shutdown_called, since the table will be marked as readonly

            /// after calling StorageReplicatedMergeTree::flushAndPrepareForShutdown().

            if (shutdown_prepared_called)

                return;

            throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode (replica path: {}), cannot update queue", replica_path);

        }

        queue.pullLogsToQueue(zookeeper, queue_updating_task->getWatchCallback(), ReplicatedMergeTreeQueue::UPDATE);

        last_queue_update_finish_time.store(time(nullptr));

        queue_update_in_progress = false;

    }

       ......

}

3.3 日志同步位点(log-pointer)

首先创建一个复制表之后,它在Keeper上的元数据都有哪些呢?

例如:

CREATE TABLE my_db.my_table ( ... ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/my_table', '{replica}') ORDER BY ...

其中:

{shard}   = s1
{replica} = r1

所以表的 ZooKeeper 路径会解析为:/clickhouse/tables/s1/my_table

副本路径为:/clickhouse/tables/s1/my_table/replicas/r1

ZooKeeper 路径结构图:

/clickhouse/
└── tables/
└── s1/                      ← shard = s1
└── my_table/           ← 表名
├── log/            ← 主日志目录(所有副本共享)
│   ├── log-0000000000
│   ├── log-0000000001
│   └── ...
├── replicas/
│   ├── r1/         ← 当前副本,replica = r1
│   │   ├── queue/              ← 待处理的日志操作队列
│   │   │   ├── queue-0000000000
│   │   │   └── ...
│   │   ├── log_pointer         ← 当前副本已同步日志的游标
│   │   ├── host                ← 当前副本的主机地址信息
│   │   ├── is_active           ← 当前副本是否存活
│   │   └── ...
│   ├── r2/         ← 其他副本(如果有)
│   └── ...
├── mutations/     ← 所有的 mutation 操作
├── block_numbers/ ← 每个分区的最大 block number
├── temp/          ← 临时节点
└── ...

在 ClickHouse Keeper中,log_pointer每个副本(replica)维护的一个游标(cursor),它的作用是在分布式表(如 ReplicatedMergeTree)中 记录该副本已经处理到哪个日志 entry。

3.4 拉取LogEntry流程

明白了log-pointer(日志同步位点)之后,再看看Keeper是如何具体拉取LogEntry的。

流程如下:

1.主表路径: /clickhouse/tables/{shard}/{table}/log/ 存放主日志(所有副本共享)。

2.每个副本路径: /clickhouse/tables/{shard}/{table}/replicas/{replica}/log_pointer 存储该副本处理进度。

3.副本执行拉取任务:

  • 获取当前副本的 log_pointer

  • 读取 /log 目录下的所有日志节点

  • 过滤日志列表,删除所有索引小于当前 log_pointer 指向的日志条目

  • 如果过滤后log_entries不为空,先sort

  • for循环逻辑:

    • 批次划分,以 current_multi_batch_size(初始较小)为批大小,从 log_entries 中取一段连续日志作为本批处理目标。last 指向本批最后一个日志条目。

    • 更新循环索引和批大小。entry_idx 指向下批次起点,批大小指数级增长(最多增长到 MAX_MULTI_OPS),加速同步过程。

    • 生成 ZooKeeper 路径列表,批量读取日志数据

      for (auto it = begin; it != end; ++it)get_paths.emplace_back(fs::path(zookeeper_path) / "log" / *it);
      auto get_results = zookeeper->get(get_paths);
      
    • 构造 ZooKeeper 操作列表,准备批量写入 queue 和更新指针

      for (size_t i = 0; i < get_num; ++i)
      {// 解析日志数据,构造 LogEntry 对象copied_entries.emplace_back(LogEntry::parse(res.data, res.stat, format_version));// 创建 queue 节点的请求(持久顺序节点)ops.emplace_back(zkutil::makeCreateRequest(fs::path(replica_path) / "queue/queue-", res.data, zkutil::CreateMode::PersistentSequential));// 处理 create_time,更新 min_unprocessed_insert_time(用于后续处理优先级等)
      }
      
    • 更新 log_pointer 和 min_unprocessed_insert_time 的请求。更新本副本的日志处理进度指针,指向最后处理的日志后一个索引。如果有最早插入时间更新,同步写入。

    • 使用 ZooKeeper multi() 提交以上所有操作

      auto responses = zookeeper->multi(ops, /* check_session_valid */ true);
    • 将LogEntry加到复制表queue中

      insertUnlocked(copied_entries[copied_entry_idx], unused, state_lock);
  • 唤醒表的后台任务执行线程去执行LogEntry任务

    storage.background_operations_assignee.trigger();

注意点:

//这只是读到所有的任务的名字,不读具体的内容

Strings log_entries = zookeeper->getChildrenWatch(fs::path(zookeeper_path) / "log", nullptr, watch_callback);

//读到log的具体内容

auto get_results = zookeeper->get(get_paths);

4 副本执行LogEntry

拉取到LogEntry到queue中后,最后会通过storage.background_operations_assignee.trigger()调度执行LogEntry的线程。

调度任务的内容为:

bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
{cleanup_thread.wakeupEarlierIfNeeded();/// If replication queue is stopped exit immediately as we successfully executed the taskif (queue.actions_blocker.isCancelled())return false;/// This object will mark the element of the queue as running.ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();if (!selected_entry)return false;auto job_type = selected_entry->log_entry->type;/// Depending on entry type execute in fetches (small) pool or big merge_mutate poolif (job_type == LogEntry::GET_PART || job_type == LogEntry::ATTACH_PART){assignee.scheduleFetchTask(std::make_shared<ExecutableLambdaAdapter>([this, selected_entry] () mutable{return processQueueEntry(selected_entry);}, common_assignee_trigger, getStorageID()));return true;}if (job_type == LogEntry::MERGE_PARTS){auto task = std::make_shared<MergeFromLogEntryTask>(selected_entry, *this, common_assignee_trigger);assignee.scheduleMergeMutateTask(task);return true;}if (job_type == LogEntry::MUTATE_PART){auto task = std::make_shared<MutateFromLogEntryTask>(selected_entry, *this, common_assignee_trigger);assignee.scheduleMergeMutateTask(task);return true;}assignee.scheduleCommonTask(std::make_shared<ExecutableLambdaAdapter>([this, selected_entry]() mutable { return processQueueEntry(selected_entry); }, common_assignee_trigger, getStorageID()),/* need_trigger */ true);return true;
}

这里主要说明任务的选择和执行:

1.从队列中选择一个待处理任务

ReplicatedMergeTreeQueue::SelectedEntryPtr selected_entry = selectQueueEntry();
if (!selected_entry)return false;

2.根据任务类型选择线程池调度

  •  类型:GET_PART / ATTACH_PART
if (job_type == LogEntry::GET_PART || job_type == LogEntry::ATTACH_PART)
{assignee.scheduleFetchTask(...);return true;
}
  • 类型:MERGE_PARTS

if (job_type == LogEntry::MERGE_PARTS)
{auto task = std::make_shared<MergeFromLogEntryTask>(...);assignee.scheduleMergeMutateTask(task);return true;
}

等等。

以下我们聚焦于GET_PART任务的执行逻辑:

processQueueEntry        ->

        executeLogEntry        ->

                executeFetch

        的核心流程为:

1.找到拥有 entry.new_part_name 或覆盖它的 part 的 其它副本(replica)​​​​​​

    /// Looking for covering part. After that entry.actual_new_part_name may be filled.String replica = findReplicaHavingCoveringPart(entry, true);
  • 获取所有副本名,并随机打乱(防止偏好某个副本)
    • Strings replicas = zookeeper->getChildren(...);
      std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
      
  • 遍历所有副本,跳过自身和不活跃副本
    • if (replica == replica_name) continue;
      if (active && !zookeeper->exists(.../replica/is_active)) continue;
      
  • 获取该副本上的所有 part,并检查是否包含所需 part 或其覆盖 part
    • 如果找到完全一致的 part,直接接受;
    • 如果是覆盖的 part,则选覆盖面最大的那个(如 all_0_0_10 优于 all_0_0_3);
    • MergeTreePartInfo::contains 判断某个 part 是否逻辑上包含另一个。
    • Strings parts = zookeeper->getChildren(.../replica/parts);for (const String & part_on_replica : parts)
      {if (part_on_replica == part_name || MergeTreePartInfo::contains(part_on_replica, part_name, format_version)){if (largest_part_found.empty() || MergeTreePartInfo::contains(part_on_replica, largest_part_found, format_version)){largest_part_found = part_on_replica;}}
      }
      
  • 如果找到了覆盖的 part,还要做一个额外检查-queue.addFuturePartIfNotCoveredByThem,这个函数暂未细看

2.确定 fetch 的 part 名称

String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;if (!entry.actual_new_part_name.empty())LOG_DEBUG(log, "Will fetch part {} instead of {}", entry.actual_new_part_name, entry.new_part_name);
  • 如果 findReplicaHavingCoveringPart 选中的 replica 拥有 更大的覆盖 part,比如:你需要的是 part_0_1_1, 它有的是 part_0_3_1,则 entry.actual_new_part_name 会被设置成那个覆盖的部分。

  • 然后将其作为 fetch 的目标

3.拼接 source_replica 的 ZooKeeper 路径

String source_replica_path = fs::path(zookeeper_path) / "replicas" / replica;

构造这个副本在 ZooKeeper 中的路径,例如:

/clickhouse/tables/s1/my_table/replicas/r2

4.执行 fetchPart

该函数会尝试将 part 拉取到本地,执行以下操作:

1. 前置检查与准备工作

  • 如果是静态只读表,禁止 fetch 操作。
​if (isStaticStorage())throw Exception(ErrorCodes::TABLE_IS_READ_ONLY, "Table is in readonly mode due to static storage");​
  • 如果不是 fetch 到 detached 目录,先检查是否已有旧的同名 part(可能是上次拉取失败的残留),如有,则触发后台清理线程。
if (!to_detached) {if (auto part = getPartIfExists(...)) {cleanup_thread.wakeup();return false;}
}
  • 检查是否有其它线程正在拉取这个 part。​​​​​
std::lock_guard lock(currently_fetching_parts_mutex);
if (!currently_fetching_parts.insert(part_name).second) {return false; // 已在拉取中,避免重复工作
}

2. 日志记录,可以看到副本拉取过来的part,对应的类型为DOWNLOAD_PART

    /// LoggingStopwatch stopwatch;MutableDataPartPtr part;DataPartsVector replaced_parts;ProfileEventsScope profile_events_scope;auto write_part_log = [&] (const ExecutionStatus & execution_status){writePartLog(PartLogElement::DOWNLOAD_PART, execution_status, stopwatch.elapsed(),part_name, part, replaced_parts, nullptr,profile_events_scope.getSnapshot());};

3.决定拉取方式:clone or fetch

如果目标 part 是一个 part mutation 的结果,尝试查找其 source part,并将其 checksums 与目标 part 的 checksums 进行比较。如果两者一致,则可以直接 clone 本地的 part。

    DataPartPtr part_to_clone;{/// If the desired part is a result of a part mutation, try to find the source part and compare/// its checksums to the checksums of the desired part. If they match, we can just clone the local part./// If we have the source part, its part_info will contain covered_part_info.auto covered_part_info = part_info;covered_part_info.mutation = 0;auto source_part = getActiveContainingPart(covered_part_info);/// Fetch for zero-copy replication is cheap and straightforward, so we don't use local clone hereif (source_part && !is_zero_copy_part(source_part)){auto source_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(source_part->getColumns(), source_part->checksums);String part_path = fs::path(source_replica_path) / "parts" / part_name;String part_znode = zookeeper->get(part_path);std::optional<ReplicatedMergeTreePartHeader> desired_part_header;if (!part_znode.empty()){desired_part_header = ReplicatedMergeTreePartHeader::fromString(part_znode);}else{String columns_str;String checksums_str;if (zookeeper->tryGet(fs::path(part_path) / "columns", columns_str) &&zookeeper->tryGet(fs::path(part_path) / "checksums", checksums_str)){desired_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksumsZNodes(columns_str, checksums_str);}else{LOG_INFO(log, "Not checking checksums of part {} with replica {}:{} because part was removed from ZooKeeper",part_name, source_zookeeper_name, source_replica_path);}}/// Checking both checksums and columns hash. For example we can have empty part/// with same checksums but different columns. And we attaching it exception will/// be thrown.if (desired_part_header&& source_part_header.getColumnsHash() == desired_part_header->getColumnsHash()&& source_part_header.getChecksums() == desired_part_header->getChecksums()){LOG_TRACE(log, "Found local part {} with the same checksums and columns hash as {}", source_part->name, part_name);part_to_clone = source_part;}}}

远程 fetch:获取源副本的 host 地址和端口信息,准备 HTTP 拉取所需的认证信息和参数,构造 get_part(),使用 fetcher.fetchSelectedPart()

接下来看一下远程拉取,在fetchSelectedPart中:

数据在构造HttpReadBuffer中已经获取到

主体流程如下:

1.准备临时下载目录(如 tmp-fetch_<part_name>),用于避免写入中直接影响数据目录,后续成功后才正式提交。

    static const String TMP_PREFIX = "tmp-fetch_";String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;

2.传统 Fetch 分支 - downloadPartToDisk

downloadPartToDisk中调用downloadBaseOrProjectionPartToDisk来取Part或者是Projection的Part:

    try{for (size_t i = 0; i < projections; ++i){String projection_name;readStringBinary(projection_name, in);MergeTreeData::DataPart::Checksums projection_checksum;auto projection_part_storage = part_storage_for_loading->getProjection(projection_name + ".proj");projection_part_storage->createDirectories();downloadBaseOrProjectionPartToDisk(replica_path, projection_part_storage, in, output_buffer_getter, projection_checksum, throttler, sync);data_checksums.addFile(projection_name + ".proj", projection_checksum.getTotalSizeOnDisk(), projection_checksum.getTotalChecksumUInt128());}downloadBaseOrProjectionPartToDisk(replica_path, part_storage_for_loading, in, output_buffer_getter, data_checksums, throttler, sync);}

downloadBaseOrProjectionPartToDisk中,遍历part中的每一个文件,例如.bin , .mrk等等

    for (size_t i = 0; i < files; ++i){String file_name;UInt64 file_size;readStringBinary(file_name, in);readBinary(file_size, in);/// File must be inside "absolute_part_path" directory./// Otherwise malicious ClickHouse replica may force us to write to arbitrary path.String absolute_file_path = fs::weakly_canonical(fs::path(data_part_storage->getRelativePath()) / file_name);if (!startsWith(absolute_file_path, fs::weakly_canonical(data_part_storage->getRelativePath()).string()))throw Exception(ErrorCodes::INSECURE_PATH,"File path ({}) doesn't appear to be inside part path ({}). ""This may happen if we are trying to download part from malicious replica or logical error.",absolute_file_path, data_part_storage->getRelativePath());written_files.emplace_back(output_buffer_getter(*data_part_storage, file_name, file_size));HashingWriteBuffer hashing_out(*written_files.back());copyDataWithThrottler(in, hashing_out, file_size, blocker.getCounter(), throttler);hashing_out.finalize();if (blocker.isCancelled()){/// NOTE The is_cancelled flag also makes sense to check every time you read over the network,/// performing a poll with a not very large timeout./// And now we check it only between read chunks (in the `copyData` function).throw Exception(ErrorCodes::ABORTED, "Fetching of part was cancelled");}MergeTreeDataPartChecksum::uint128 expected_hash;readPODBinary(expected_hash, in);if (expected_hash != hashing_out.getHash())throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH,"Checksum mismatch for file {} transferred from {} (0x{} vs 0x{})",(fs::path(data_part_storage->getFullPath()) / file_name).string(),replica_path,getHexUIntLowercase(expected_hash),getHexUIntLowercase(hashing_out.getHash()));if (file_name != "checksums.txt" &&file_name != "columns.txt" &&file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME &&file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME)checksums.addFile(file_name, file_size, expected_hash);}

之后将Part涉及的文件写到磁盘:

    /// Call fsync for all files at once in attempt to decrease the latencyfor (auto & file : written_files){file->finalize();if (sync)file->sync();}

5 扩展

如何判断一个Part是否包含另一个Part通过这个函数完成:

    bool contains(const MergeTreePartInfo & rhs) const{/// Containing part may have equal level iff block numbers are equal (unless level is MAX_LEVEL)/// (e.g. all_0_5_2 does not contain all_0_4_2, but all_0_5_3 or all_0_4_2_9 do)bool strictly_contains_block_range = (min_block == rhs.min_block && max_block == rhs.max_block) || level > rhs.level|| level == MAX_LEVEL || level == LEGACY_MAX_LEVEL;return partition_id == rhs.partition_id        /// Parts for different partitions are not merged&& min_block <= rhs.min_block&& max_block >= rhs.max_block&& level >= rhs.level&& mutation >= rhs.mutation&& strictly_contains_block_range;}

同步删除表:

DROP DATABASE IF EXISTS my_database SYNC;

删database目录的信息:

system drop database replica '分片名|副本名' from database db_name;

删replica下信息:

system drop replica '副本名' from database db_name;

剔除表的元信息:

SYSTEM DROP REPLICA 'r1' FROM ZKPATH '/clickhouse/tables/s1/replicated_table5';

在集群中创建表:

CREATE TABLE replicated_db.replicated_table ON CLUSTER my_cluster
(
`id` UInt64,
`event_time` DateTime,
`event_type` String
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/replicated_table', '{replica}')
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, id)
SETTINGS index_granularity = 8192

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916058.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916058.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring AI 系列之二十四 - ModerationModel

之前做个几个大模型的应用&#xff0c;都是使用Python语言&#xff0c;后来有一个项目使用了Java&#xff0c;并使用了Spring AI框架。随着Spring AI不断地完善&#xff0c;最近它发布了1.0正式版&#xff0c;意味着它已经能很好的作为企业级生产环境的使用。对于Java开发者来说…

在 macOS 上 安装最新 Python 和 pip

文章目录方法一&#xff1a;使用 Homebrew&#xff08;推荐&#xff09;方法二&#xff1a;使用 pyenv&#xff08;管理多个 Python 版本&#xff09;方法三&#xff1a;从官网下载安装包升级 pip验证安装方法一&#xff1a;使用 Homebrew&#xff08;推荐&#xff09; 1. 安装…

新能源电池厂自动化应用:Modbus TCP转DeviceNet实践

一、项目背景在新能源电池厂的生产过程中&#xff0c;提升自动化水平对提高生产效率和产品质量至关重要。我们的生产线上&#xff0c;施耐德PLC负责整体的生产流程控制&#xff0c;采用Modbus TCP协议进行数据传输&#xff0c;它基于以太网&#xff0c;传输速度快、稳定性高&am…

Java进阶3:Java集合框架、ArrayList、LinkedList、HashSet、HashMap和他们的迭代器

Java集合框架 集合框架被设计成的目标&#xff1a;高性能、高效 允许不同类型的结合&#xff0c;以类似的方式进行工作&#xff0c;有高度的互操作性 对一个集合的扩展和适应必须是简单的两种容器&#xff1a;集合Collection、图Map 集合接口被分为了三种子类型&#xff1a;Lis…

笔记/使用Excel进行财务预测

文章目录金融预测的决策与数据收集决定财务问题收集财务数据清理与合并财务数据解释与应用预测结果使用excel进行财务回归分析回归预测的步骤解释回归结果在 Excel 中执行预测财务分析指标财务分析常用指标一览表财务指标的相关性对竞争对手进行基准测试财务指标的趋势分析持续…

力扣1287:有序数组中出现次数超过25%的元素

力扣1287:有序数组中出现次数超过25%的元素题目思路代码题目 给你一个非递减的 有序 整数数组&#xff0c;已知这个数组中恰好有一个整数&#xff0c;它的出现次数超过数组元素总数的 25%。 请你找到并返回这个整数 思路 哈希表秒了 代码 class Solution { public:int fi…

如何用 Z.ai 生成PPT,一句话生成整套演示文档

大家好&#xff0c;这里是K姐。 一个帮你追踪最新AI应用的女子。 最近朋友给我分享了一个好玩的页面截图。 一眼看过去&#xff0c;就感觉这PPT的文字排版很有人工味。 我立马就去试了一下&#xff0c;才发现它根本不是传统的 PPT&#xff0c;而是一种网页式的 Slides 。 做…

C/C++ 编程:掌握静态库与动态库的编译

在 C/C 项目开发中&#xff0c;理解并掌握如何编译和使用库文件是至关重要的一环。库允许你将常用的函数和代码模块化&#xff0c;从而提高代码重用性、简化项目管理并缩短编译时间。最常见的两种库类型是静态库 (.a) 和动态库 (.so)。它们各有优缺点&#xff0c;适用于不同的开…

汽车安全 | 汽车安全入门

引言 汽车安全不仅仅是对汽车/车辆进行物理入侵。这只是很小且简单的一部分。当你以攻击者/对手的思维去看待一辆联网汽车时&#xff0c;你关注的是整个车辆生态系统。这不仅包括它如何与外部实体通信&#xff0c;也包括它在车内如何运作。 汽车是主要的交通工具&#xff0c;…

CLIP与SIGLIP对比浅析

CLIP 和 SIGLIP 的核心区别在于损失函数的设计&#xff1a;CLIP 使用基于 softmax 的对比损失&#xff08;InfoNCE&#xff09;&#xff0c;强制正样本在全局对比中压倒所有负样本&#xff0c;计算成本高且受限于负样本数量&#xff1b;SIGLIP 改用基于 sigmoid 的二元分类损失…

移动管家手机控车便捷性如何

移动管家4G手机控车-全面升级一键启动、无钥匙进入、手机启动、手机开关锁、手机开尾箱、手机寻车、车辆诊断、GPS北斗定位、电子围栏、车辆授权、车辆防盗抢、胎压检测、预约启动、车窗控制、车况提醒等功&#xff1b;移动管家手机控车系统&#xff08;以“移动管家控车APP”为…

MySQL 8.4.4详细下载安装配置

1、下载mysql8.4.4文件&#xff0c;取zip文件 mysql8.4.4下载路径 MySQL 5.7.31详细下载安装配置 2、配置环境变量 1.系统—>高级系统设置—>环境变量—>系统变量 在系统变量中点击新建&#xff0c;变量名为量名为&#xff1a;MYSQL_HOME&#xff0c;添加你的mys…

在 Linux 上安装 `pgvector`(这是一个 PostgreSQL 的向量类型扩展,常用于处理嵌入向量,便于进行向量相似度搜索)

全文 4000 字&#xff0c;配图配码&#xff0c;已在多家企业落地验证。阅读完如有收获&#xff0c;文末投票告诉我你最关注的方向&#xff0c;我会在下一篇文章里继续深入。 0. pgvector 简介 pgvector 是一款 PostgreSQL 原生向量数据类型扩展&#xff0c;核心能力&#xff1…

【项目实战】——深度学习.全连接神经网络

目录 1.使用全连接网络训练和验证MNIST数据集 2.使用全连接网络训练和验证CIFAR10数据集 1.使用全连接网络训练和验证MNIST数据集 import torch from torch import nn from torchvision import datasets, transforms from torch.utils.data import DataLoader from torch im…

嵌入式学习的第三十四天-进程间通信-TCP

一、TCPTCP : 传输控制协议 传输层1. TCP特点(1).面向连接,避免部分数据丢失 (2).安全、可靠 (3).面向字节流 (4).占用资源开销大2.TCP安全可靠机制三次握手:指建立tcp连接时&#xff0c;需要客户端和服务端总共发送三次报文确认连接。确保双方均已做好 收发…

【爬虫】06 - 自动化爬虫selenium

自动化爬虫selenium 文章目录自动化爬虫selenium一&#xff1a;Selenium简介1&#xff1a;什么是selenium2&#xff1a;安装准备二&#xff1a;元素定位1&#xff1a;id 定位2&#xff1a;name 定位3&#xff1a;class 定位4&#xff1a;tag 定位5&#xff1a;xpath 定位(最常用…

2025年中国移动鸿鹄大数据实训营(大数据方向)kafka讲解及实践-第2次作业指导

书接上回&#xff0c;第二次作业比较容易解决&#xff0c;我问了ai&#xff0c;让他对我进行指导&#xff0c;按照它提供的步骤&#xff0c;我完成了本次实验&#xff0c;接下来我会标注出需要注意的细节&#xff0c;指导大家完成此次任务。 &#x1f3af; 一、作业目标 ✔️…

三十七、【高级特性篇】定时任务:基于 APScheduler 实现测试计划的灵活调度

三十七、【高级特性篇】定时任务:基于 APScheduler 实现测试计划的灵活调度 前言 准备工作 第一部分:后端实现 - `APScheduler` 集成与任务调度 1. 安装 `django-apscheduler` 2. 配置 `django-apscheduler` 3. 数据库迁移 4. 创建调度触发函数 5. 启动 APScheduler 调度器 6…

RabbitMQ--消息顺序性

看本章之前强烈建议先去看博主的这篇博客 RabbitMQ--消费端单线程与多线程-CSDN博客 一、消息顺序性概念 消息顺序性是指消息在生产者发送的顺序和消费者接收处理的顺序保持一致。 二、RabbitMQ 顺序性保证机制 情况顺序保证情况备注单队列&#xff0c;单消费者消息严格按发送顺…

.net core接收对方传递的body体里的json并反序列化

1、首先我在通用程序里有一个可以接收对象型和数组型json串的反序列化方法public static async Task<Dictionary<string, string>> AllParameters(this HttpRequest request){Dictionary<string, string> parameters QueryParameters(request);request.Enab…