OceanBase 数据链路 Concept

OceanBase 数据链路提供了从用户端到数据库端的最佳数据链路访问功能,屏蔽用户对分布式数据库的感知,保障分布式数据库的最高性能服务。数据链路包含两个组件:数据库代理和数据库驱动。

数据库代理

OceanBase 数据库代理 ODP(OceanBase Database Proxy,又称 OBProxy)是 OceanBase 专用的代理服务器,OceanBase 用户的数据会以多副本的形式存放在各个 OBServer 上,ODP 则负责接收用户发过来的 SQL 请求,转发用户 SQL 请求到最佳目标 OBServer 上,并将执行结果返回给客户。

作为 OceanBase 数据库的关键组件,ODP 具有以下特性:

  • 高性能转发:ODP 完整兼容 MySQL 协议,并支持 OceanBase 自研协议,采用多线程异步框架和透明流式转发的设计,保证了数据的高性能转发,同时确保了自身对机器资源的最小消耗。
  • 最佳路由:ODP 会充分考虑用户请求涉及的副本位置、用户配置的读写分离路由策略、OceanBase 多地部署的最优链路,以及 OceanBase 各机器的状态及负载情况,将用户的请求路由到最佳的 OBServer,最大程度的保证了 OceanBase 整体的高性能运转。
  • 连接管理:针对一个客户端的物理连接,ODP 维持自身到后端多个 OBServer 的连接,采用基于版本号的增量同步方案维持了每个 OBServer 连接的会话状态,保证了客户端高效访问各个 OBServer。
  • 专有协议:ODP 与 OBServer 默认采用了 OceanBase 专有协议,如增加报文的 CRC 校验保证与 OBServer 链路的正确性,增强传输协议以支持 Oracle 兼容性的数据类型和交互模型
  • 易运维:ODP 本身无状态支持无限水平扩展,支持同时访问多个 OceanBase 集群。可以通过丰富的内部命令实现对自身状态的实时监控,提供极大的运维便利性

数据库驱动

在 OceanBase MySQL 模式下,用户可以直接使用 MySQL 官方提供的 Connector 来使用 OceanBase 数据库(暂不支持 8.0 的驱动),在 OceanBase Oracle 模式下,需要使用 OceanBase 自研的数据库驱动。OceanBase 数据库驱动同时支持 OceanBase 的 MySQL/Oracle 两种协议,在使用时可以自动识别 OceanBase 的运行模式是 MySQL 还是 Oracle,无需额外设置。OceanBase 支持各类语言的数据库驱动,下面介绍几种常见的语言及标准。

OCI 驱动

OceanBase 提供了基于 OCI 接口的数据库驱动,兼容了 Oracle 数据库 OCI 接口的方法名和函数名,对于试用 OCI 接口编程的应用程序在进行移植时,无需做大量业务改造即可非常方便的适配到 OceanBase 数据库。

样例代码:

  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <stdlib.h>
  4. #include <malloc.h>
  5. #include "oci.h"
  6. /*声明句柄*/
  7. OCIEnv *envhp; /*环境句柄*/
  8. OCISvcCtx *svchp; /*服务环境句柄*/
  9. OCIServer *srvhp; /*服务器句柄*/
  10. OCISession *authp; /*会话句柄*/
  11. OCIStmt *stmthp; /*语句句柄*/
  12. OCIDescribe *dschp; /*描述句柄*/
  13. OCIError *errhp; /*错误句柄*/
  14. OCIDefine *defhp[3]; /*定义句柄*/
  15. OCIBind *bidhp[4]; /*绑定句柄*/
  16. sb2 ind[3]; /*指示符变量*/
  17. /*绑定select结果集的参数*/
  18. text szpersonid[9]; /*存储personid列*/
  19. text szsex[2]; /*存储sex列*/
  20. text szname[51]; /*存储name列*/
  21. text szemail[51]; /*存储mail列*/
  22. text szphone[26]; /*存储phone列*/
  23. char sql[256]; /*存储执行的sql语句*/
  24. int main(int argc, char *argv[])
  25. {
  26. char strServerName[50];
  27. char strUserName[50];
  28. char strPassword[50];
  29. /*设置服务器,用户名和密码*/
  30. strcpy(strServerName, "host:port/db");
  31. strcpy(strUserName, "user");
  32. strcpy(strPassword, "pwd");
  33. /*初始化OCI应用环境*/
  34. OCIInitialize(OCI_DEFAULT, NULL, NULL, NULL, NULL);
  35. /*初始化环境句柄*/
  36. OCIEnvInit(&envhp, OCI_DEFAULT, 0, 0);
  37. /*分配句柄*/
  38. OCIHandleAlloc(envhp, (dvoid **)&svchp, OCI_HTYPE_SVCCTX, 0, 0);
  39. /*服务器环境句柄*/
  40. OCIHandleAlloc(envhp, (dvoid **)&srvhp, OCI_HTYPE_SERVER, 0, 0);
  41. /*服务器句柄*/
  42. OCIHandleAlloc(envhp, (dvoid **)&authp, OCI_HTYPE_SESSION, 0, 0);
  43. /*会话句柄*/
  44. OCIHandleAlloc(envhp, (dvoid **)&errhp, OCI_HTYPE_ERROR, 0, 0);
  45. /*错误句柄*/
  46. OCIHandleAlloc(envhp, (dvoid **)&dschp, OCI_HTYPE_DESCRIBE, 0, 0);
  47. /*描述符句柄*/
  48. /*连接服务器*/
  49. OCIServerAttach(srvhp, errhp, (text *)strServerName,
  50. (sb4)strlen(strServerName), OCI_DEFAULT);
  51. /*设置用户名和密码*/
  52. OCIAttrSet(authp, OCI_HTYPE_SESSION, (text *)strUserName,
  53. (ub4)strlen(strUserName), OCI_ATTR_USERNAME, errhp);
  54. OCIAttrSet(authp, OCI_HTYPE_SESSION, (text *)strPassword,
  55. (ub4)strlen(strPassword), OCI_ATTR_PASSWORD, errhp);
  56. /*设置服务器环境句柄属性*/
  57. OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
  58. (dvoid *)srvhp, (ub4)0, OCI_ATTR_SERVER, errhp);
  59. OCIAttrSet(svchp, OCI_HTYPE_SVCCTX, (dvoid *)authp,
  60. 0, OCI_ATTR_SESSION, errhp);
  61. /*创建并开始一个用户会话*/
  62. OCISessionBegin(svchp, errhp, authp, OCI_CRED_RDBMS, OCI_DEFAULT);
  63. OCIHandleAlloc(envhp, (dvoid **)&stmthp, OCI_HTYPE_STMT, 0, 0);
  64. /*语句句柄*/
  65. /************************************************************************/
  66. /*查询person表*/
  67. /************************************************************************/
  68. strcpy(sql, "select personid ,name,phone from person");
  69. /*准备SQL语句*/
  70. OCIStmtPrepare(stmthp, errhp, (text *)sql, strlen(sql), OCI_NTV_SYNTAX, OCI_DEFAULT);
  71. /*绑定输出列*/
  72. OCIDefineByPos(stmthp, &defhp[0], errhp, 1, (ub1 *)szpersonid,
  73. sizeof(szpersonid), SQLT_STR, &ind[0], 0, 0, OCI_DEFAULT);
  74. OCIDefineByPos(stmthp, &defhp[1], errhp, 2, (ub1 *)szname,
  75. sizeof(szname), SQLT_STR, &ind[1], 0, 0, OCI_DEFAULT);
  76. OCIDefineByPos(stmthp, &defhp[2], errhp, 3, (ub1 *)szphone,
  77. sizeof(szphone), SQLT_STR, &ind[2], 0, 0, OCI_DEFAULT);
  78. /*执行SQL语句*/
  79. OCIStmtExecute(svchp, stmthp, errhp, (ub4)0, 0, NULL, NULL,
  80. OCI_DEFAULT);
  81. printf("%-10s%-10s%-10s\n", "PERSONID", "NAME", "PHONE");
  82. while ((OCIStmtFetch(stmthp,
  83. errhp, 1, OCI_FETCH_NEXT, OCI_DEFAULT)) != OCI_NO_DATA)
  84. {
  85. printf("%-10s, %-10s,%-10s\n", szpersonid, szname, szphone);
  86. }
  87. //结束会话
  88. OCISessionEnd(svchp, errhp, authp, (ub4)0);
  89. //断开与数据库的连接
  90. OCIServerDetach(srvhp, errhp, OCI_DEFAULT);
  91. //释放OCI句柄
  92. OCIHandleFree((dvoid *)dschp, OCI_HTYPE_DESCRIBE);
  93. OCIHandleFree((dvoid *)stmthp, OCI_HTYPE_STMT);
  94. OCIHandleFree((dvoid *)errhp, OCI_HTYPE_ERROR);
  95. OCIHandleFree((dvoid *)authp, OCI_HTYPE_SESSION);
  96. OCIHandleFree((dvoid *)svchp, OCI_HTYPE_SVCCTX);
  97. OCIHandleFree((dvoid *)srvhp, OCI_HTYPE_SERVER);
  98. return 0;
  99. }

JDBC 驱动

OceanBase 提供了基于 Java JDBC 标准的数据库驱动,JDBC(Java Database Connectivity) 是 Java 应用程序访问数据库的标准 API(应用程序编程接口),数据库驱动的实现会将 JDBC 标准编程接口转换成对应数据库厂商的 SQL 实现。OceanBase JDBC 驱动兼容 JDBC 4.0、4.1、4.2 标准。

JDBC 连接串的前置为 jdbc:oceanbase,其他使用和标准 JDBC 方式保持一致。样例代码:

  1. String url = "jdbc:oceanbase://host:port/SYS?useUnicode=true&characterEncoding=utf-8";
  2. String username = "SYS@oracle";
  3. String password = "";
  4. Connection conn = null;
  5. try {
  6. Class.forName("com.alipay.oceanbase.jdbc.Driver");
  7. conn = DriverManager.getConnection(url, username, password);
  8. PreparedStatement ps = conn.prepareStatement("select to_char(sysdate,'yyyy-MM-dd HH24:mi:ss') from dual;");
  9. ResultSet rs = ps.executeQuery();
  10. rs.next();
  11. System.out.println("sysdate is:" + rs.getString(1));
  12. rs.close();
  13. ps.close();
  14. } catch (Exception e) {
  15. e.printStackTrace();
  16. } finally {
  17. if (null != conn) {
  18. conn.close();
  19. }
  20. }

ODBC 驱动

OceanBase 提供了基于 ODBC 标准的数据库驱动,ODBC (Open Database Connectivity)和 JDBC 标准一样,提供了一种标准的API(应用程序编程接口)方法来访问数据库管理系统,目前已经被业界广泛接受。

目前 OceanBase 提供了 Linux 版本和 Windows 版本的 ODBC 驱动,样例代码:

  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include "sql.h"
  5. #include "sqlext.h"
  6. #define MAX_NAME_LEN 255
  7. typedef struct tagODBCHandler {
  8. SQLHENV henv;
  9. SQLHDBC hdbc;
  10. SQLHSTMT hstmt;
  11. }ODBCHandler;
  12. int IS_SUCC(SQLRETURN retcode) {
  13. if (retcode == SQL_SUCCESS || retcode == SQL_SUCCESS_WITH_INFO) return 1;
  14. return 0;
  15. }
  16. int main(int argc, char** argv) {
  17. ODBCHandler handler;
  18. SQLRETURN retcode;
  19. SQLCHAR connOut[MAX_NAME_LEN+1];
  20. SQLSMALLINT len;
  21. //Allocate environment handle
  22. retcode = SQLAllocHandle(SQL_HANDLE_ENV, SQL_NULL_HANDLE, &handler.henv);
  23. // Set the ODBC version environment attribute
  24. if (!IS_SUCC(retcode)) {
  25. return -1;
  26. }
  27. retcode = SQLSetEnvAttr(handler.henv, SQL_ATTR_ODBC_VERSION, (SQLPOINTER)SQL_OV_ODBC3_80, 0);
  28. // Allocate connection handle
  29. if (!IS_SUCC(retcode)) {
  30. return -1;
  31. }
  32. retcode = SQLAllocHandle(SQL_HANDLE_DBC, handler.henv, &handler.hdbc);
  33. if (!IS_SUCC(retcode)) {
  34. return -1;
  35. }
  36. // Set login timeout to 5 seconds
  37. SQLSetConnectAttr(handler.hdbc, SQL_LOGIN_TIMEOUT, (SQLPOINTER)5, 0);
  38. // Connect to data source
  39. retcode = SQLDriverConnect(handler.hdbc, NULL, (SQLCHAR*)"DSN=odbctest", SQL_NTS, connOut, MAX_NAME_LEN, &len,SQL_DRIVER_NOPROMPT);
  40. if (!IS_SUCC(retcode)) {
  41. return -1;
  42. }
  43. retcode = SQLAllocHandle(SQL_HANDLE_STMT, handler.hdbc, &handler.hstmt);
  44. if (!IS_SUCC(retcode)) {
  45. return -1;
  46. }
  47. {
  48. //insert
  49. retcode = SQLPrepare(handler.hstmt, (SQLCHAR*)"insert into PERSON values(?,?,'test')", SQL_NTS);
  50. if (!IS_SUCC(retcode)) {
  51. return -1;
  52. }
  53. SQLINTEGER id = 0;
  54. SQLINTEGER num = 0;
  55. retcode = SQLBindParameter(handler.hstmt, 1, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, &id, 0, NULL);
  56. if (!IS_SUCC(retcode)) {
  57. return -1;
  58. }
  59. retcode = SQLBindParameter(handler.hstmt, 2, SQL_PARAM_INPUT, SQL_C_LONG, SQL_INTEGER, 0, 0, &num, 0, NULL);
  60. if (!IS_SUCC(retcode)) {
  61. return -1;
  62. }
  63. retcode = SQLExecute(handler.hstmt);
  64. if (!IS_SUCC(retcode)) {
  65. return -1;
  66. }
  67. }
  68. // clean handle
  69. SQLFreeHandle(SQL_HANDLE_STMT, handler.hstmt);
  70. SQLDisconnect(handler.hdbc);
  71. SQLFreeHandle(SQL_HANDLE_DBC, handler.hdbc);
  72. SQLFreeHandle(SQL_HANDLE_ENV, handler.henv);
  73. return 0;
  74. }

.NET 驱动

OceanBase 提供了基于 .NET 的数据库驱动,驱动提供了对 Entity Framework Core 和 Entity Framework 6.x 的兼容能力,用户可以使用 OceanBase .NET 驱动结合 Entity Framework 快速开发应用程序。.NET 驱动兼容 .NET Framework 4.5、.NET Framework 4.6、.NET Core 2.0 版本。

样例代码:

  1. using OceanBase.Data.OceanBaseClient;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Data;
  5. using System.Linq;
  6. using System.Text;
  7. using System.Threading.Tasks;
  8. namespace ObOracleDemo
  9. {
  10. class Program
  11. {
  12. static void Main(string[] args)
  13. {
  14. using (MySqlConnection c = new MySqlConnection(
  15. "server=100.88.105.219;port=63035;user id=SYS@tt_ww_oracle;database=SYS;Connection timeout=5;" +
  16. "pooling=false;includesecurityasserts=false;characterset=utf8"))
  17. {
  18. DateTime start = DateTime.Now;
  19. try
  20. {
  21. c.Open();
  22. Console.WriteLine("connect ob oracle successfully!");
  23. MySqlCommand cmd = new MySqlCommand("drop table test_oracle", c);
  24. try
  25. {
  26. cmd.ExecuteNonQuery();
  27. }
  28. catch (Exception ex)
  29. {
  30. Console.WriteLine("exception happened, error is");
  31. Console.WriteLine(ex.Message);
  32. Console.WriteLine(ex.StackTrace);
  33. }
  34. cmd.Dispose();
  35. cmd = new MySqlCommand("create table test_oracle (c1 int primary key, c2 varchar(256), c3 timestamp(9))", c);
  36. cmd.ExecuteNonQuery();
  37. cmd.Dispose();
  38. IDbCommand psCmd = c.CreateCommand();
  39. psCmd.CommandText = "insert into test_oracle (c1, c2, c3) values (?p1,?p2, ?p3)";
  40. IDbDataParameter p1 = psCmd.CreateParameter();
  41. p1.ParameterName = "?p1";
  42. p1.DbType = DbType.Int32;
  43. p1.Precision = (byte)10;
  44. p1.Scale = (byte)0;
  45. p1.Size = 4;
  46. psCmd.Parameters.Add(p1);
  47. p1.Value = 10;
  48. IDbDataParameter p2 = psCmd.CreateParameter();
  49. p2.ParameterName = "?p2";
  50. p2.DbType = DbType.String;
  51. psCmd.Parameters.Add(p2);
  52. p2.Value = "Hello C#";
  53. MySqlParameter p3 = new MySqlParameter();
  54. p3.ParameterName = "?p3";
  55. p3.MySqlDbType = MySqlDbType.DateTime;
  56. p3.Value = "2020-03-09 21:23:22.878879";
  57. psCmd.Parameters.Add(p3);
  58. psCmd.Prepare();
  59. psCmd.ExecuteNonQuery();
  60. psCmd.Dispose();
  61. psCmd = c.CreateCommand();
  62. psCmd.CommandText = "select c1,c2,c3 from test_oracle";
  63. psCmd.Prepare();
  64. MySqlDataReader reader = (MySqlDataReader)psCmd.ExecuteReader();
  65. reader.Read();
  66. Console.WriteLine("c1=" + reader.GetInt32(0));
  67. Console.WriteLine("c2=" + reader.GetString(1));
  68. Console.WriteLine("c3=" + reader.GetDateTime(2));
  69. psCmd.Dispose();
  70. }
  71. catch (Exception ex)
  72. {
  73. Console.WriteLine("exception happened, error is");
  74. Console.WriteLine(ex.Message);
  75. Console.WriteLine(ex.StackTrace);
  76. }
  77. finally
  78. {
  79. Console.WriteLine("Press Any Key To Continue...");
  80. Console.ReadKey();
  81. }
  82. }
  83. }
  84. }
  85. }