跳转至

使用数据库

Conventions

  • 每一个代码块的顶部都有它所属的文件(相对)路径,如果代码块属于某个函数(方法),那么顶部会有函数(方法)的声明。
  • 代码块中的对象,如果很重要,会在行尾补充该对象的声明。
  • 我使用的MySQL 版本是 8.0.41。你看到这篇时,可能有了更新的版本,比如 8.0.42,区别不大的。

我们已经成功创建了 test1 数据库,现在该使用它了。

sql/sql_parse.cc
    rc = thd->get_protocol()->get_command(&com_data, &command);

我们快速回忆一下,服务端现在正在等待客户端的下一条命令,阻塞于上面这行代码。

sql/sql_parse.cc
  return_value = dispatch_command(thd, &com_data, command);

接着,我们在客户端中输入 use test1; 并回车后,服务端被唤醒,走到了 dispatch_command 方法。

sql/sql_parse.cc
  switch (command) {
    case COM_INIT_DB: {
      MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
      LEX_STRING tmp;
      thd->status_var.com_stat[SQLCOM_CHANGE_DB]++;
      thd->convert_string(&tmp, system_charset_info,
                          com_data->com_init_db.db_name,
                          com_data->com_init_db.length, thd->charset());

      LEX_CSTRING tmp_cstr = {tmp.str, tmp.length};
      if (!mysql_change_db(thd, tmp_cstr, false)) {
        query_logger.general_log_write(thd, command, thd->db().str,
                                       thd->db().length);
        my_ok(thd);
      }
      break;
    }
  // ignore
  }

进入 dispatch_command 方法后,在第 1814 行可以看到一个 switch 分支语句,根据客户端发送的命令,走进不同的分支。我们刚刚发送的 use test1; 命令是 COM_INIT_DB 类型,正好是 switch 分支语句的第一个分支。

Tip

代码中的 LEX_STRING 和 LEX_CSTRING 是结构,分别表示字符串和只读字符串,如下所示:

typedef struct MYSQL_LEX_STRING LEX_STRING;
typedef struct MYSQL_LEX_CSTRING LEX_CSTRING;
struct MYSQL_LEX_STRING {
  char *str;
  size_t length;
};

struct MYSQL_LEX_CSTRING {
  const char *str;
  size_t length;
};
用结构定义了有长度属性的字符串,毕竟 C 没有像 Java 在标准库中内置 String 类型,可以直接用,所以通常会自己定义。

第 1817 行,声明了一个字符串,对象名 tmp。

第 1819~1821行,对数据库的名字 com_data->com_init_db.db_name 进行字符集的转换(从 thd->charset()system_charset_info),并将转换后的结果保存到 tmp 对象中。

Tip

源字符集 thd->charset() 对应官方文档中介绍的系统变量 character_set_client

目标字符集 system_charset_info 对应官方文档中介绍的系统变量 character_set_system,值永远为 utf8mb3。

第 1823 行,声明了一个只读字符串,对象名 tmp_cstr,并将 tmp 对象中保存的数据库名字和长度保存进去。

在这个分支里,最重要的就是第 1824 行,调用 mysql_change_db 方法,第一个参数是线程上下文,第二个参数是数据库名和长度,第三个参数表示是否强制切换。

sql/sql_db.cc
  @details The function checks that the database name corresponds to a
  valid and existent database, checks access rights and changes the current
  database with database attributes (@@collation_database session variable,
  THD::db_access).

mysql_change_db 方法的注释有说明这个方法的细节,如上,它说这个方法校验数据库名字合不合法存不存在、用户访问权限,并切换当前数据库。

sql/sql_db.cc
bool mysql_change_db(THD *thd, const LEX_CSTRING &new_db_name,
                     bool force_switch) {
  LEX_STRING new_db_file_name;
  LEX_CSTRING new_db_file_name_cstr;

  Security_context *sctx = thd->security_context();
  ulong db_access = sctx->current_db_access();
  const CHARSET_INFO *db_default_cl = nullptr;

  // We must make sure the schema is released and unlocked in the right order.
  dd::Schema_MDL_locker mdl_handler(thd);
  dd::cache::Dictionary_client::Auto_releaser releaser(thd->dd_client());
  const dd::Schema *schema = nullptr;
  new_db_file_name.str = my_strndup(key_memory_THD_db, new_db_name.str, // 1406
                                    new_db_name.length, MYF(MY_WME));   // 1407
  new_db_file_name.length = new_db_name.length;                         // 1408
  if (check_and_convert_db_name(&new_db_file_name, false) !=            // 1421
      Ident_name_check::OK) {                                           // 1422
    // ignore
  }
  new_db_file_name_cstr.str = new_db_file_name.str;                     // 1429
  new_db_file_name_cstr.length = new_db_file_name.length;               // 1430
  if (sctx->get_active_roles()->size() == 0) {
    db_access =                                                         // 1434
        sctx->check_access(DB_OP_ACLS, new_db_file_name.str)            // 1435
            ? DB_OP_ACLS                                                // 1436
            : acl_get(thd, sctx->host().str, sctx->ip().str,
                      sctx->priv_user().str, new_db_file_name.str, false) |
                  sctx->master_access(new_db_file_name.str);
  } else {
    db_access = sctx->db_acl(new_db_file_name_cstr) |
                sctx->master_access(new_db_file_name.str);
  }
  if (mdl_handler.ensure_locked(new_db_file_name.str) ||
      thd->dd_client()->acquire(new_db_file_name.str, &schema)) {       // 1457
    my_free(new_db_file_name.str);                                      // 1458
    return true;                                                        // 1459
  }
  if (get_default_db_collation(*schema, &db_default_cl)) {              // 1490
    my_free(new_db_file_name.str);
    assert(thd->is_error() || thd->killed);
    return true;
  }
  mysql_change_db_impl(thd, new_db_file_name_cstr, db_access, db_default_cl); // 1501

done:
  /*
    Check if current database tracker is enabled. If so, set the 'changed' flag.
  */
  if (thd->session_tracker.get_tracker(CURRENT_SCHEMA_TRACKER)->is_enabled()) {
    thd->session_tracker.get_tracker(CURRENT_SCHEMA_TRACKER)
        ->mark_as_changed(thd, {});
  }
  if (thd->session_tracker.get_tracker(SESSION_STATE_CHANGE_TRACKER)
          ->is_enabled())
    thd->session_tracker.get_tracker(SESSION_STATE_CHANGE_TRACKER)
        ->mark_as_changed(thd, {});
  return false;
}

第 1421 行 check_and_convert_db_name 方法会结合系统配置 lower_case_table_names 决定是否转换成小写,然后再判断数据库名称是否合法。

第 1434 行 db_access = DB_OP_ACLS,DB_OP_ACLS 在数值上等于十进制233257023。

第 1457 行 thd->dd_client()->acquire(new_db_file_name.str, &schema) 从数据字典中获取数据库对象,如果不存在则 1458 行释放内存并在 1459 行返回 true。

第 1490 行从数据字典数据库对象中获取默认的字符集。

第 1501 行调用 mysql_change_db_impl 方法,下面是它的代码。

sql/sql_db.cc
static void mysql_change_db_impl(THD *thd, const LEX_CSTRING &new_db_name,
                                 ulong new_db_access,
                                 const CHARSET_INFO *new_db_charset) {
  /* 1. Change current database in THD. */

  if (new_db_name.str == nullptr) {
    /*
      THD::set_db() does all the job -- it frees previous database name and
      sets the new one.
    */

    thd->set_db(NULL_CSTR);
  } else if (!strcmp(new_db_name.str, INFORMATION_SCHEMA_NAME.str)) {
    /*
      Here we must use THD::set_db(), because we want to copy
      INFORMATION_SCHEMA_NAME constant.
    */

    thd->set_db(INFORMATION_SCHEMA_NAME);
  } else {
    /*
      Here we already have a copy of database name to be used in THD. So,
      we just call THD::reset_db(). Since THD::reset_db() does not releases
      the previous database name, we should do it explicitly.
    */
    mysql_mutex_lock(&thd->LOCK_thd_data);
    if (thd->db().str) my_free(const_cast<char *>(thd->db().str));
    DEBUG_SYNC(thd, "after_freeing_thd_db");
    thd->reset_db(new_db_name);
    mysql_mutex_unlock(&thd->LOCK_thd_data);
  }

  /* 2. Update security context. */

  /* Cache the effective schema level privilege with roles applied */
  thd->security_context()->cache_current_db_access(new_db_access);

  /* 3. Update db-charset environment variables. */

  thd->db_charset = new_db_charset;
  thd->variables.collation_database = new_db_charset;
}

第 1227 行 thd->reset_db(new_db_name); 设置线程上下文的成员变量,将当前数据库的名字,即 test1,保存起来。后面会贴出这个方法的代码。

第 1234 行缓存用户对数据库 test1 的访问权限。

第 1238、1239 行设置线程上下文的成员变量,将当前数据库 test1 的字符集保存起来。

sql/sql_class.h
  /**
    Set the current database; use shallow copy of C-string.

    @param new_db     the new database name.

    @note This operation just sets {db, db_length}. Switching the current
    database usually involves other actions, like switching other database
    attributes including security context. In the future, this operation
    will be made private and more convenient interface will be provided.
  */
  void reset_db(const LEX_CSTRING &new_db) {
    m_db.str = new_db.str;
    m_db.length = new_db.length;
#ifdef HAVE_PSI_THREAD_INTERFACE
    PSI_THREAD_CALL(set_thread_db)(new_db.str, static_cast<int>(new_db.length));
#endif
  }

reset_db 方法将数据库名字和长度保存到线程上下文的成员变量 m_db 中。reset_db 方法结束后回到 mysql_change_db_impl,mysql_change_db_impl 方法结束后回到 mysql_change_db 方法,mysql_change_db 方法结束后回到了我们开始讲的 switch 分支中。

sql/sql_parse.cc
  switch (command) {
    case COM_INIT_DB: {
      MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
      LEX_STRING tmp;
      thd->status_var.com_stat[SQLCOM_CHANGE_DB]++;
      thd->convert_string(&tmp, system_charset_info,
                          com_data->com_init_db.db_name,
                          com_data->com_init_db.length, thd->charset());

      LEX_CSTRING tmp_cstr = {tmp.str, tmp.length};
      if (!mysql_change_db(thd, tmp_cstr, false)) {
        query_logger.general_log_write(thd, command, thd->db().str,
                                       thd->db().length);
        my_ok(thd);
      }
      break;
    }
  // ignore
  }

第 1825 行,写 general_log,由系统变量 general_log 决定是否写,默认不写。

第 1826 行设置当前执行语句的执行结果为 ok。

第 1829 行 break 结束 switch 语句,走到下面的代码。

sql/sql_parse.cc
done:
  assert(thd->open_tables == nullptr ||
         (thd->locked_tables_mode == LTM_LOCK_TABLES));

  /* Finalize server status flags after executing a command. */
  thd->update_slow_query_status();
  if (thd->killed) thd->send_kill_message();
  thd->send_statement_status();

2453 行更新慢查询状态,如果语句执行时间超过系统变量 long_query_time,则更新线程上下文中的 server_status 变量,将位 11 置 1,代码如下所示。

sql/sql_class.cc
void THD::update_slow_query_status() {
  if (my_micro_time() > start_utime + variables.long_query_time)
    server_status |= SERVER_QUERY_WAS_SLOW;
}

因为 SERVER_QUERY_WAS_SLOW = 2048,即 2 的 11 次方,所以按位或操作将变量 server_status 的位 11 置 1 了。

2455 行 thd->send_statement_status(); 发送语句的执行结果给客户端,发完后做些清理工作就结束了,等待客户端的下一条命令。