Auditing data transformation
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Auditing data transformation can be simply described as gathering metadata about the transformation process. The most basics metadata would be a timestamp, atomic transformation description, data volume on input, data volume on output, time elapsed.
If you work with R only interactively you may find it more like a fancy tool. On the other hand for automated scheduled R jobs it may be quite helpful to have traceability on the lower grain of processing than just binary success or fail after the script is executed, for example a logging each query against the data.
Similar features are already available in ETL tools for decades.
Implementation
I've addressed this in my dtq package.
It silently eavesdrop every call to [ for any data.table.
It can be perceived as preconfigured and faster version of base::trace tailored for data transformation auditing.
I believe it should be possible to port the idea to magrittr's %>% call if somebody need auditing there.
Examples
Populate example sales data.
library(dtq) set.seed(1) DT <- data.table( user = 1:10, group = letters[1:5], time = as.POSIXct(seq(from=1.4e9L+1L,to=1.5e9L,by=1e3), origin="1970-01-01"), sales = rnorm(10*5*1e5, 5) ) knitr::kable(head(DT, 2))
| user | group | time | sales |
|---|---|---|---|
| 1 | a | 2014-05-13 17:53:21 | 4.373546 |
| 2 | b | 2014-05-13 18:10:01 | 5.183643 |
Basic example
Order data, various aggregations.
## just to clarify below usage of unnamed args in `[` # DT[ i, j, by, keyby ] # first sale by user in each month DT[order(time), head(.SD, 1), .(user, year(time), month(time))] # total sales by user in each month DT[, .(sales = sum(sales)), .(user, year(time), month(time))] # total sales by group of users in each months, returning ordered and keyed data DT[, .(sales = sum(sales)),, .(group, year(time), month(time))]
Lets preview transformation metadata recorded by dtq.
knitr::kable(dtl(print = TRUE))
| seq | dtq_id | dtq_seq | src | query | timestamp | env | elapsed | in_rows | out_rows |
|---|---|---|---|---|---|---|---|---|---|
| 1 | 1 | 1 | DT | [i = order(time), j = head(.SD, 1), by = .(user, year(time), month(time))] | 2015-06-04 00:17:05 | R_GlobalEnv | 5.026816 | 5e+06 | 390 |
| 2 | 2 | 1 | DT | [j = .(sales = sum(sales)), by = .(user, year(time), month(time))] | 2015-06-04 00:17:09 | R_GlobalEnv | 3.543426 | 5e+06 | 390 |
| 3 | 3 | 1 | DT | [j = .(sales = sum(sales)), keyby = .(group, year(time), month(time))] | 2015-06-04 00:17:12 | R_GlobalEnv | 3.439573 | 5e+06 | 195 |
Complex example
Populate budget data to set together with sales data.
BUDGET <- CJ( group = letters[1:5], year = 2014:2016, month = 1:12 ) # cross of dimensions BUDGET[, budget := rnorm(3*12*5, 12*1e4, 12*1e3)] # budget value measure
For more complex processing I've found it very useful to organize single conceptual query on data into single chain of
[calls.
In below example I will calculate year-to-date sales and budget values and last year sales vs current sales ratio.
# column names
measures <- c("sales","budget")
ytd_measures <- paste("ytd", measures, sep="_")
ly_ytd_measures <- paste("ly", ytd_measures, sep="_")
# query
DT[, .(sales = sum(sales)),, .(group, year = year(time), month = month(time))
][BUDGET, `:=`(budget = i.budget, budget_ratio = sales / i.budget)
][, c(ytd_measures) := lapply(.SD, cumsum), .(group, year), .SDcol = measures
][, c(ly_ytd_measures) := shift(.SD), .(group, month), .SDcol = ytd_measures
][, ytd_sales_vs_ly_ytd_sales := ytd_sales / ly_ytd_sales
]
Check dtq logs again.
knitr::kable(dtl(print = TRUE))
| seq | dtq_id | dtq_seq | src | query | timestamp | env | elapsed | in_rows | out_rows |
|---|---|---|---|---|---|---|---|---|---|
| 1 | 1 | 1 | DT | [i = order(time), j = head(.SD, 1), by = .(user, year(time), month(time))] | 2015-06-04 00:17:05 | R_GlobalEnv | 5.0268161 | 5000000 | 390 |
| 2 | 2 | 1 | DT | [j = .(sales = sum(sales)), by = .(user, year(time), month(time))] | 2015-06-04 00:17:09 | R_GlobalEnv | 3.5434259 | 5000000 | 390 |
| 3 | 3 | 1 | DT | [j = .(sales = sum(sales)), keyby = .(group, year(time), month(time))] | 2015-06-04 00:17:12 | R_GlobalEnv | 3.4395734 | 5000000 | 195 |
| 4 | 4 | 1 | BUDGET | [j = :=(budget, rnorm(3 * 12 * 5, 12 * 10000, 12 * 1000))] |
2015-06-04 00:17:12 | R_GlobalEnv | 0.0015612 | 180 | 180 |
| 5 | 5 | 1 | DT | [j = .(sales = sum(sales)), keyby = .(group, year = year(time), month = month(time))] | 2015-06-04 00:17:16 | R_GlobalEnv | 3.3816404 | 5000000 | 195 |
| 6 | 5 | 2 | DT | [i = BUDGET, j = :=(budget = i.budget, budget_ratio = sales/i.budget)] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0026196 | 195 | 195 |
| 7 | 5 | 3 | DT | [j = :=(c(ytd_measures), lapply(.SD, cumsum)), by = .(group, year), .SDcols = measures] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0018303 | 195 | 195 |
| 8 | 5 | 4 | DT | [j = :=(c(ly_ytd_measures), shift(.SD)), by = .(group, month), .SDcols = ytd_measures] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0044990 | 195 | 195 |
| 9 | 5 | 5 | DT | [j = :=(ytd_sales_vs_ly_ytd_sales, ytd_sales/ly_ytd_sales)] |
2015-06-04 00:17:16 | R_GlobalEnv | 0.0012220 | 195 | 195 |
Minimal overhead
Time added to transformation due to logging using dtq is minimal.
For time consuming queries it is unnoticeable ~ 0% overhead.
For the most pessimistic, unrealistic query: for(i in seq_len(1e3)) data.table(a=1L)[,.(a)] it adds around 30% overhead.
Processes evaluated by dtq on each data.table [ call:
- gathering information
- current call
- top environment of the current call
nrowon input and outputproc.timeon start and end, orget_nanotimeif available
- append gathered info to log storage
cfunction is used to append current log to existing logs
All other information (sources, sequences, queries) are extracted from dtq logs while accessing them by dtl(). So they are not adding any overhead during data transformation.
The dtl() is a function to query against R6 class log storage.
Using in package
It is common to form your scripts into package so you may want to track data transformation from within your package.
It is handled by default, you don't need anything more than library(dtq) on script init.
Actually you can restrict logging to particular package or global env, which is useful if some of your dependencies are also using data.table and by default would be logged together with your logs.
Example use cases
Storing dtq logs in rds/csv/db
Traceability is good but if you close R session the dtq logs will be gone.
Simplest would be write.table with append=TRUE on the end of script.
Use dtl(print = TRUE) to get rid off list-type columns which stores the R6 or call objects.
If you want to keep R data types you should then use saveRDS on dtl().
Performance tuning
You can easily analyse your logs and detect the most time consuming sub-queries. Compare its timing to volume of in/our rows, etc.
Apply business rules on metadata
Your currency exchange source system is commited to provide daily exchanges rates in 5 minutes intervals.
- You expect exactly 288 rows on the input. If that is not true you catch it and send email directly from R to source system support.
- In your high level processing you expect to have 5 different KPIs on output. If it is not true you catch it and send email to yourself.
PS.
I would appreciate if someone could recommend a commenting service for github jekyll blogs, without tracking feature like in disqus service. Including self hosted of course.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.