摘要

这篇文章介绍四种实现MSSQL Server审计日志功能的方法探索,即解析数据库事务日志、SQL Profiler、SQL Audit以及Extended Event。详细介绍了这四种方法的具体实现,以及通过优缺点的对比和总结,最终得出结论,使用Extended Event实现审计日志是最好的选择,为产品化选型提供参考。

审计日志需求分析

对于关系型数据库来而言,在生产环境SQL Server数据库实例中,审计日志是一个非常重要且必须的强需求功能,主要体现在以下几个方面。

  • 安全审计

  • 问题排查

  • 性能调优

安全审计

在一些存取敏感信息的产品环境数据库SQL Server实例中(比如:财务系统、设计到国家安全层面的数据库系统),对数据操作要求十分谨慎,安全要求等级十分严密,需要对每一条数据操作语句进行审计,以便做到每次数据变动或查看均可追溯。在这个场景中,对敏感信息操作的审计是基于数据安全性的要求。

问题排查

在日常生产系统管理维护过程中,我们经常会遇到类似的场景和疑问:能否找到是谁在哪个时间点执行了什么语句把数据XXX给删除(更新)了呢?笔者在从事DBA行业的几年工作经历过程中,无数次被问及到类似的问题。要解决这个场景中的问题,审计日志功能是不二选择。

性能调优

利用审计日志对数据库系统进行性能调优是审计日志非常重要的功能和用途。比如,以下是几个审计日志典型的应用场景:

  • 找出某段时间内哪些语句导致了系统性能消耗严重(比如:CPU、IOPS等)

  • 找出某段时间内的TOP CPU SQL语句

  • 找出某段时间内的TOP IO SQL语句

  • 找出某段时间内的TOP Time Cost SQL语句

  • 找出某段时间内哪个用户使用的数据库系统资源最多

  • 找出某段时间内哪个应用使用的数据库系统资源最多

……

实现审计日志的方法

基于以上对审计日志的需求分析,我们了解到审计日志的功能是关系型数据至关重要的强需求,让我们来看看SQL Server数据库系统有哪些实现审计日志功能的方法和具体实现,以及这些方法的优缺点对比。

数据库日志分析

在SQL Server数据库事务日志中,记录了每个事务的数据变更操作的详细信息,包含谁在哪个时间点做了什么操作。所以,我们可以基于SQL Server数据库事务日志的分析,来获取数据变更的详细审计日志信息。使用这个方法来实现审计日志功能的,有一家叫着ApexSQL的公司产品做的很不错,产品ApexSQL Log就是通过数据库事务日志来实现审计日志功能的产品,详情参见:ApexSQL Log。附一张来自ApexSQL官网的截图: 01.png

但是,由于SQL Server本身是微软的闭源产品,对于事务日志格式外界很难知道,所以这个方法的实现门槛很高,实现难度极大。加之,有可能不同版本的SQL Server事务日志格式存在差异,必须要对每个版本的事务日志解析做相应的适配,导致维护成本极高,产品功能延续性存在极大风险和挑战。

SQL Profiler

SQL Profiler是微软从SQL Server 2000开始引入的数据库引擎跟踪工具,具有使用界面操作的接口、使用SQL语句创建接口以及使用SMO编程创建接口。使用SQL Profiler,可以实现非常多的功能,比如:

  • 图形化监控数据库引擎执行的SQL语句(也可以将执行语句保存到表中)

  • 查看执行语句实时的执行计划

  • 数据库引擎错误信息排查

  • 数据库性能分析

  • 阻塞,锁等待、锁升级及死锁跟踪

  • 后台收集查询语句信息

……

所以,从功能完整性角度来说,我们完全可以使用SQL Profiler来实现就数据库实例级别的审计日志的功能。那么接下来让我们看看如何使用SQL Profiler实现审计日志的功能。

图形化创建

开始 => 运行 => 键入“Profiler” => 回车,打开Profiler工具后,点击“New Trace” => Server Name => Authentication => Connect,如下图所示: 02.png

然后,选择General => Save to table => 选择要保留到的实例名、数据库名、架构名和表名 => OK 03.png

接下来选择要跟踪的事件,Events Selection => SQL:StmtCompleted => Column Filters => LoginName => Not Like %sa% => OK => Run 04.png

使用SQL语句创建

使用图形化界面创建SQL Profiler实现审计日志功能,简单易用,很容易上手。但是,过程繁琐、效率不高,难于自动化。这个时候,就需要使用SQL语句来创建SQL Profiler功能,实现一键创建的方法了。

  1. use master
  2. GO
  3. set nocount on
  4. declare
  5. @trace_folder nvarchar(256)
  6. ,@trace_file nvarchar(256)
  7. ,@max_files_size bigint
  8. ,@stop_time datetime
  9. ,@file_count int
  10. ,@int_filter_cpu int
  11. ,@int_filter_duration bigint
  12. ,@int_filter_spid int
  13. ,@set_trace_status int
  14. ;
  15. select
  16. @trace_folder=N'C:\Temp\perfmon'
  17. ,@max_files_size = 50 --max file size for each trace file
  18. ,@file_count = 10 --max file count
  19. ,@stop_time = '6/13/2017 10:50' --null: stop trace manully; specify time (stop at the specify time)
  20. ,@int_filter_cpu = NULL -- >= @int_filter_cpu ms will be traced. or else, skipped.
  21. --NULL: ignore this filter
  22. ,@int_filter_duration = NULL --execution duration filter: millisecond
  23. --NULL: ignore this filter
  24. --,@int_filter_spid = 151 --integer: specify a spid to trace
  25. --
  26. ,@set_trace_status = 1 --0: Stops the specified trace.;
  27. --1: Starts the specified trace.;
  28. --2: Closes the specified trace and deletes its definition from the server.;
  29. ;
  30. /*
  31. select * from sys.traces
  32. */
  33. --private variables
  34. declare
  35. @trace_id int
  36. ,@do int
  37. ,@loop int
  38. ,@trace_event_id int
  39. ,@trace_column_id int
  40. ,@return_code tinyint
  41. ,@return_decription varchar(200)
  42. ,@field_separator char(1)
  43. ;
  44. select
  45. @field_separator = ',' --trace columns list separator
  46. ;
  47. IF right(ltrim(rtrim(@trace_folder)), 1 ) <> '\'
  48. BEGIN
  49. SELECT
  50. @trace_folder = ltrim(rtrim(@trace_folder)) + N'\'
  51. ;
  52. exec sys.xp_create_subdir @trace_folder
  53. END
  54. ;
  55. select
  56. @trace_file = @trace_folder + REPLACE(@@SERVERNAME, N'\', N'')
  57. ;
  58. IF @int_filter_spid IS NOT NULL
  59. BEGIN
  60. select
  61. @trace_file = @trace_file + cast(@int_filter_spid as varchar)
  62. ;
  63. END
  64. --select @trace_file
  65. select top 1
  66. @trace_id = id
  67. from sys.traces
  68. where path like @trace_file + N'%'
  69. if @trace_id is not null
  70. begin
  71. -- Start Trace (status 1 = start)
  72. EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status
  73. return
  74. end
  75. if OBJECT_ID('tempdb..#trace_event','u') is not null
  76. drop table #trace_event
  77. create table #trace_event
  78. (
  79. id int identity(1,1) not null primary key
  80. ,trace_event_id int not null
  81. ,trace_column_id int not null
  82. ,event_name sysname null
  83. ,trace_column_name sysname null
  84. )
  85. ;with trace_event
  86. as
  87. ( --select * from sys.trace_events order by trace_event_id
  88. select
  89. is_trace = 1 , event_name = 'SQL:StmtCompleted'
  90. ,trace_column_list = 'NestLevel,ClientProcessID,EndTime,DatabaseID,GroupID,ServerName,SPID,DatabaseName,NTUserName,IntegerData2,RequestID,EventClass,SessionLoginName,NTDomainName,TextData,XactSequence,CPU,ApplicationName,Offset,LoginSid,TransactionID,IntegerData,Duration,SourceDatabaseID,LineNumber,ObjectID,Reads,RowCounts,Writes,IsSystem,ObjectName,LoginName,ObjectType,StartTime,HostName,EventSequence,'
  91. ),
  92. trace_column
  93. as(
  94. select
  95. *
  96. ,trace_column_list_xml =
  97. CAST(
  98. '<V><![CDATA['
  99. + REPLACE(
  100. REPLACE(
  101. REPLACE(
  102. trace_column_list,CHAR(10),']]></V><V><![CDATA['
  103. ),@field_separator,']]></V><V><![CDATA['
  104. ),CHAR(13),']]></V><V><![CDATA['
  105. )
  106. + ']]></V>'
  107. as xml
  108. )
  109. from trace_event
  110. where is_trace = 1
  111. )
  112. ,data
  113. as(
  114. select
  115. trace_column = T.C.value('(./text())[1]','sysname')
  116. ,event_name
  117. from trace_column AS a
  118. CROSS APPLY trace_column_list_xml.nodes('./V') AS T(C)
  119. )
  120. INSERT INTO #trace_event
  121. select
  122. trace_event_id = ev.trace_event_id
  123. ,trace_column_id = col.trace_column_id
  124. ,a.event_name
  125. ,trace_column_name = a.trace_column
  126. from data as a
  127. inner join sys.trace_columns as col
  128. on a.trace_column = col.name
  129. inner join sys.trace_events as ev
  130. on a.event_name = ev.name
  131. where col.trace_column_id is not null
  132. order by ev.trace_event_id
  133. ;
  134. --select * from #trace_event
  135. ---private variables
  136. select
  137. @trace_id = 0
  138. ,@do = 1
  139. ,@loop = @@ROWCOUNT
  140. ,@trace_event_id = 0
  141. ,@trace_column_id = 0
  142. ,@return_code = 0
  143. ,@return_decription = ''
  144. ;
  145. --create trace
  146. exec @return_code = sys.sp_trace_create @traceid = @trace_id OUTPUT
  147. , @options = 2
  148. , @tracefile = @trace_file
  149. , @maxfilesize = @max_files_size
  150. , @stoptime = @stop_time
  151. , @filecount = @file_count
  152. ;
  153. select
  154. trace_id = @trace_id
  155. ,[current_time] = getdate()
  156. ,[stop_time] = @stop_time
  157. ;
  158. set
  159. @return_decription = case @return_code
  160. when 0 then 'No error.'
  161. when 1 then 'Unknown error.'
  162. when 10 then 'Invalid options. Returned when options specified are incompatible.'
  163. when 12 then 'File not created.'
  164. when 13 then 'Out of memory. Returned when there is not enough memory to perform the specified action.'
  165. when 14 then 'Invalid stop time. Returned when the stop time specified has already happened.'
  166. when 15 then 'Invalid parameters. Returned when the user supplied incompatible parameters.'
  167. else ''
  168. end
  169. ;
  170. raiserror('Trace create with:
  171. %s',10,1,@return_decription) with nowait
  172. --loop set trace event & event column
  173. while @do <= @loop
  174. begin
  175. select top 1
  176. @trace_event_id = trace_event_id
  177. ,@trace_column_id = trace_column_id
  178. from #trace_event
  179. where id = @do
  180. ;
  181. --set trace event
  182. exec sys.sp_trace_setevent @trace_id, @trace_event_id, @trace_column_id, 1
  183. raiserror('exec sys.sp_trace_setevent @trace_id, %d, %d, 1',10,1,@trace_event_id,@trace_column_id) with nowait
  184. set @do = @do + 1;
  185. end
  186. --CPU >= 500/ cpu columnid = 18
  187. IF @int_filter_cpu IS NOT NULL
  188. EXEC sys.sp_trace_setfilter @trace_id, 18, 0, 4, @int_filter_cpu
  189. --duration filter/ duration columnid=13
  190. IF @int_filter_duration IS NOT NULL
  191. EXEC sys.sp_trace_setfilter @trace_id, 13, 0, 4, @int_filter_duration
  192. --spid filter/ spid columnid=12
  193. IF @int_filter_spid IS NOT NULL
  194. exec sys.sp_trace_setfilter @trace_id, 12, 0, 0, @int_filter_spid
  195. --applicationName not like 'SQL Server Profiler%'
  196. EXEC sys.sp_trace_setfilter @trace_id, 10, 0, 7, N'SQL Server Profiler%'
  197. -- Start Trace (status 1 = start)
  198. EXEC sys.sp_trace_setstatus @trace_id, @set_trace_status
  199. GO

其中输入参数表达的含义解释如下:

@trace_folder:Trace文件存放的位置

@max_files_size:每一个Trace文件大小

@file_count:Trace滚动最多的文件数量

@stop_time:Trace停止的时间

@int_filter_cpu:CPU过滤阈值,CPU使用率超过这个值会被记录下来,单位毫秒

@int_filter_duration:执行时间过滤阈值,执行时间超过这个值会被记录,单位毫秒

@set_trace_status:Trace的状态:0停止;1启动;2删除

SMO编程创建

SQL Profiler除了使用图形化界面创建,使用系统存储过程创建两种方法以外,还可以使用SMO编程方法来创建。

SQL Audit

使用SQL Audit实现SQL Server审计日志功能需要以下三个步骤来完成:

  • 创建实例级别的Audit并启动

  • 创建数据库级别的Audit Specification

  • 读取审计日志文件

创建实例级别Audit

使用Create Server Audit语句创建实例级别的Audit,方法如下:

  1. USE [master]
  2. GO
  3. CREATE SERVER AUDIT [Audit_Svr_User_Defined_for_Testing]
  4. TO FILE
  5. ( FILEPATH = N'C:\Temp\Audit'
  6. ,MAXSIZE = 10 MB
  7. ,MAX_ROLLOVER_FILES = 10
  8. ,RESERVE_DISK_SPACE = OFF
  9. )
  10. WITH
  11. ( QUEUE_DELAY = 1000
  12. ,ON_FAILURE = CONTINUE
  13. )
  14. GO

启动实例级别的Audit,代码如下

  1. USE [master]
  2. GO
  3. ALTER SERVER AUDIT [Audit_Svr_User_Defined_for_Testing] WITH(STATE=ON)
  4. ;
  5. GO

创建数据库级别Audit Specification

实例级别Audit创建完毕后,接下来是对需要审计的数据库建立对于的Audit Specification,方法如下:

  1. USE [testdb]
  2. GO
  3. CREATE DATABASE AUDIT SPECIFICATION [Audit_Spec_for_TestDB]
  4. FOR SERVER AUDIT [Audit_Svr_User_Defined_for_Testing]
  5. ADD (SELECT, INSERT, UPDATE, DELETE, EXECUTE ON DATABASE::[testdb] BY [public])
  6. WITH (STATE = ON);
  7. GO

由于SQL Audit Specification是基于数据库级别的,所以存在以下场景的维护性复杂度增加:

  • 用户需要审计实例中某些或者所有数据库,必须在每个需要审计的数据库下创建对象

  • 用户实例有新数据库创建,并需要审计日志功能时,必须在新的数据库下创建对象

读取审计日志文件

最后,我们需要将审计日志文件中存放的内容读取出来,使用SQL Server提供的系统函数sys.fn_get_audit_file,方法如下:

  1. DECLARE
  2. @AuditFilePath sysname
  3. ;
  4. SELECT
  5. @AuditFilePath = audit_file_path
  6. FROM sys.dm_server_audit_status
  7. WHERE name = 'Audit_Svr_User_Defined_for_Testing'
  8. SELECT statement,*
  9. FROM sys.fn_get_audit_file(@AuditFilePath,default,default)
  10. ;

Extended Event

微软SQL Server产品长期的规划是逐渐使用Extended Event来替换SQL Profiler工具,因为Extended Event更加轻量级,性能消耗比SQL Profiler大幅降低,因此对用户系统性能影响也大幅减轻。在审计日志的应用场景中,只需要在实例级别创建一个Extended Event Session对象,然后启用即可。既满足了功能性的需求,又能够做到很好后期维护,不需要为某一个数据库创建相应对象,对实例的性能消耗大幅降低到5%左右。

创建Extended Event Session

使用Create Event Session On Server语句创建基于实例级别的Extended Event。语句如下:

  1. USE master
  2. GO
  3. CREATE EVENT SESSION [svrXEvent_User_Define_Testing] ON SERVER
  4. ADD EVENT sqlserver.sql_statement_completed
  5. (
  6. ACTION
  7. (
  8. sqlserver.database_id,
  9. sqlserver.database_name,
  10. sqlserver.session_id,
  11. sqlserver.username,
  12. sqlserver.client_hostname,
  13. sqlserver.client_app_name,
  14. sqlserver.sql_text,
  15. sqlserver.query_hash,
  16. sqlserver.query_plan_hash,
  17. sqlserver.plan_handle,
  18. sqlserver.tsql_stack,
  19. sqlserver.is_system,
  20. package0.collect_system_time
  21. )
  22. WHERE sqlserver.username <> N'NT AUTHORITY\SYSTEM'
  23. AND sqlserver.username <> 'sa'
  24. AND (NOT sqlserver.like_i_sql_unicode_string(sqlserver.client_app_name, '%IntelliSense'))
  25. AND sqlserver.is_system = 0
  26. )
  27. ADD TARGET package0.asynchronous_file_target
  28. (
  29. SET
  30. FILENAME = N'C:\Temp\svrXEvent_User_Define_Testing.xel',
  31. MAX_FILE_SIZE = 10,
  32. MAX_ROLLOVER_FILES = 100
  33. )
  34. WITH (
  35. EVENT_RETENTION_MODE = NO_EVENT_LOSS,
  36. MAX_DISPATCH_LATENCY = 5 SECONDS
  37. );
  38. GO

启用Extended Event Session

Extended Event Session对象创建完毕后,需要启动这个session对象,方法如下:

  1. USE master
  2. GO
  3. -- We need to enable event session to capture event and event data
  4. ALTER EVENT SESSION [svrXEvent_User_Define_Testing]
  5. ON SERVER STATE = START;
  6. GO

读取审计日志文件

Extend Event生成审计日志文件以后,我们可以使用sys.fn_xe_file_target_read_file系统函数来读取,然后分析event_data列所记录的详细信息。

  1. USE master
  2. GO
  3. SELECT *
  4. FROM sys.fn_xe_file_target_read_file('C:\Temp\svrXEvent_User_Define_Testing*.xel', null, null, null)

方案对比

根据前面章节“实现审计日志的方法”部分的介绍,我们从可靠性、对象级别、可维护性、开销和对数据库系统的影响五个方面来总结这四种技术的优缺点。

  • 可靠性:这四种实现审计日志的方法可靠性都有保障,如果使用数字化衡量可维护性,得满分100分

  • 对象级别:SQL Profiler和Extended Event是基于实例级别的技术方案;解析事务日志解析和SQL Audit方法是基于数据库级别的技术,一旦有数据库创建或者删除操作,需要做相应的适配,所以维护成本也相对高。基于数据库级别的方案得分为0,基于实例级别得分为100

  • 维护性:基于实例级别的实现方法可维护性(得分100)显然优于基于数据库级别(得分为0)的实现方式

  • 开销:SQL Profiler对数据库系统开销很大,大概20%左右(得分100 - 20 = 80),其他三种开销较小5%左右(得分100 - 5 = 95)

  • 影响:开销大的技术方案自然影响就大,反之亦然。得分与开销部分类似。

四种技术方案优缺点汇总如下表所示: 05.png

以下是对四种实现审计日志方法五个维度打分,得分统计汇总如下表所示: 06.png

将汇总得分做成雷达图,如下图所示: 07.png

从雷达图我们可以很清楚的看到,综合考虑可靠性、可维护性、系统开销和影响来看,使用Extended Event实现审计日志的方法是最优的选择。

最后总结

本期分享了SQL Server实现审计日志功能的四种技术方案和详细实现,并从可靠性、可维护性、对象级别、系统开销和影响五个维度分析了四种方案各自的优缺点,最后的结论是使用Extended Event实现审计日志方法是最优选择,以此来为我们的产品化做出正确的选择。