欢迎关注公众号: 数据库技术研究,接下来我们就来聊聊关于时序数据库influxdb场景 时序数据库Influx-IOx源码学习五?以下内容大家不妨参考一二希望能帮到您!

时序数据库influxdb场景 时序数据库Influx-IOx源码学习五

时序数据库influxdb场景 时序数据库Influx-IOx源码学习五

欢迎关注公众号: 数据库技术研究

上篇介绍到:InfluxDB-IOx的Run命令启动过程,详情见:https://my.oschina.net/u/3374539/blog/5021654

这章记录一下Database create命令的执行过程。


在第三章命令行中介绍了,所有的子命令都有一个独立的参数或配置称为subcommand。

enum Command { Convert { // 省略 ...}, Meta {// 省略 ...}, Database(commands::database::Config), Run(Box<commands::run::Config>), Stats(commands::stats::Config), Server(commands::server::Config), Writer(commands::writer::Config), Operation(commands::operations::Config), }

这章我们打开看一眼commands::database下的config包含了什么。

pub struct Config { #[structopt(subcommand)] command: Command, } //见名知意,基本猜测一下就行了,慢慢使用到再回来看 enum Command { Create(Create), List(List), Get(Get), Write(Write), Query(Query), Chunk(chunk::Config), Partition(partition::Config), }

先来看一下create命令的执行。

Command::Create(command) => { //创建一个grpc的client let mut client = management::Client::new(connection); //设置基本的配置项 let rules = DatabaseRules { //数据库名字 name: command.name, //内存的各种配置,包含缓存大小,时间等等 lifecycle_rules: Some(LifecycleRules { //省略。。 }), //设置分区的策略 partition_template: Some(PartitionTemplate { //省略。。 }), //其它都填充default ..Default::default() }; //使用配置信息创建数据库,这里是生成了一个CreateDatabaserequest去调用了远程服务器的方法 client.create_database(rules).await?; println!("Ok"); }

在上一章中提到了grpc的启动,这里就涉及到了之前提到的grpc的框架tonic,在tonic中使用#[tonic::async_trait]了标记一个服务器端的实现开始。我在ide中搜索,可以在src/influxdb_ioxd/rpc/management.rs:50行中找到ManagementService相关的实现。

有关tonic更多的资料请阅读:https://github.com/hyperium/tonic

#[tonic::async_trait] impl<M> management_service_server::ManagementService for ManagementService<M> where M: ConnectionManager Send Sync Debug 'static, { //省略其它方法。。。 async fn create_database( &self, //这里就是接收CreateDatabaseRequest的请求 request: Request<CreateDatabaseRequest>, ) -> Result<Response<CreateDatabaseResponse>, Status> { //对数据进行一下校验,然后获得在上面配置的rules规则 let rules: DatabaseRules = request .into_inner() .rules .ok_or_else(|| FieldViolation::required("")) .and_then(TryInto::try_into) .map_err(|e| e.scope("rules"))?; //这里就是在第三章中提到的server_id,如果没配置就会报错了 let server_id = match self.server.require_id().ok() { Some(id) => id, None => return Err(NotFound::default().into()), }; //这里就是真正的去创建,在下面继续跟踪 match self.server.create_database(rules, server_id).await { Ok(_) => Ok(Response::new(CreateDatabaseResponse {})), Err(Error::DatabaseAlreadyExists { db_name }) => { return Err(AlreadyExists { resource_type: "database".to_string(), resource_name: db_name, ..Default::default() } .into()) } Err(e) => Err(default_server_error_handler(e)), } } }

接下来要继续查看数据库真正的被创建出来,我读到这里存在一个问题,文件格式是什么样子的?

pub async fn create_database(&self, rules: DatabaseRules, server_id: NonZeroU32) -> Result<()> { //检查server_id self.require_id()?; //把数据库名字存储到内存中,最终保存到一个btreemap中 let db_reservation = self.config.create_db(rules)?; //对数据进行持久化保存 self.persist_database_rules(db_reservation.rules().clone()) .await?; //启动数据库后台线程,在内存中写入数据库状态 db_reservation.commit(server_id, Arc::clone(&self.store), Arc::clone(&self.exec)); Ok(()) }

来解答上面的疑问,文件是怎样持久化、格式是什么样子的。

pub async fn persist_database_rules<'a>(&self, rules: DatabaseRules) -> Result<()> { //生成一个新的数据库路径 let location = object_store_path_for_database_config(&self.root_path()?, &rules.name); //序列化DatabaseRules这个pb到byte流 let mut data = BytesMut::new(); rules.encode(&mut data).context(ErrorSerializing)?; let len = data.len(); let stream_data = std::io::Result::Ok(data.freeze()); //将pb的内容进行存储 self.store .put( &location, futures::stream::once(async move { stream_data }), Some(len), ) .await .context(StoreError)?; Ok(()) }

这里调用了rules.encode()转换到pb的格式,这里是rust语言的一个方法,实现了From特性的,就得到了一个into的方法,如:impl From<DatabaseRules> for management::DatabaseRules.

到这里数据库的一个描述文件rules.pb就被写入到磁盘中了,路径是启动命令中指定的--data-dir参数路径 --writer-id 数据库名字。

例如,我的启动和创建命令为:

./influxdb_iox run --writer-id 1 --object-store file --data-dir ~/influxtest/ ./influxdb_iox database create test

那么得到的路径就为:~/influxtest/1/test/rules.pb. 之后可以运行一个pb的脚本来反查rules.pb中的数据内容,如下:

$ ./scripts/prototxt decode influxdata.iox.management.v1.DatabaseRules \ < ~/influxtest/1/test/rules.pb influxdata/iox/management/v1/service.proto:6:1: warning: Import google/protobuf/field_mask.proto is unused. name: "test" partition_template { parts { time: "%Y-%m-%d %H:00:00" } } lifecycle_rules { mutable_linger_seconds: 300 mutable_size_threshold: 10485760 buffer_size_soft: 52428800 buffer_size_hard: 104857600 sort_order { order: ORDER_ASC created_at_time { } } }

看到这里已经知道整个生成过程及文件内容。

祝玩儿的开心。

,