Minding the zoo[keeper] with R

December 16, 2016
By

(This article was first published on R – rud.is, and kindly contributed to R-bloggers)

I’ve been drafting a new R package — sergeant — to work with Apache Drill and have found it much easier to manage having Drill operating in a single node cluster vs drill-embedded mode (esp when I need to add a couple more nodes for additional capacity). That means running Apache Zookeeper and I’ve had the occasional need to ping the Zookeeper process to see various stats (especially when I add some systems to the cluster).

Yes, it’s very easy+straightforward to nc hostname 2181 from the command line to issue commands (or use the Zookeeper CLIs) but when I’m in R I like to stay in R. To that end, I made a small reference class that makes it super-easy to query Zookeeper from within R. Said class will eventually be in a sister package to sergeant (and, a more detailed post on sergeant is forthcoming), but there may be others who want/need to poke at Zookeeper processes with 4-letter words and this will help you stay within R. The only command not available is stmk since I don’t have the pressing need to set the trace mask.

After you source the self-contained reference class you connect and then issue commands as so:

zk <- zookeeper$new(host="drill.local")

zk$ruok()

zk$conf()
zk$stat()

Drop a note in the comments (since this isn’t on github, yet) with any questions/issues.

zookeeper <- setRefClass(

  Class="zookeeper",

  fields=list(
    host="character",
    port="integer",
    timeout="integer"
  ),

  methods=list(

    initialize=function(..., host="localhost", port=2181L, timeout=30L) {
      host <<- host ; port <<- port ; timeout <<- timeout; callSuper(...)
    },

    available_commands=function() {
      return(sort(c("conf", "envi", "stat", "srvr", "whcs", "wchc", "wchp", "mntr",
                    "cons", "crst", "srst", "dump", "ruok", "dirs", "isro", "gtmk")))
    },

    connect=function(host="localhost", port=2181L, timeout=30L) {
      .self$host <- host ; .self$port <- port ; .self$timeout <- timeout
    },

    conf=function() {
      res <- .self$send_cmd("conf")
      res <- stringi::stri_split_fixed(res, "=", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    envi=function() {
      res <- .self$send_cmd("envi")
      res <- stringi::stri_split_fixed(res[-1], "=", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    stat=function() {
      res <- .self$send_cmd("stat")
      version <- stri_replace_first_regex(res[1], "^Zoo.*: ", "")
      res <- res[-(1:2)]
      sep <- which(res=="")
      clients <- stri_trim(res[1:(sep-1)])
      zstats <- stringi::stri_split_fixed(res[(sep+1):length(res)], ": ", 2, simplify=TRUE)
      zstats <- as.list(setNames(zstats[,2], zstats[,1]))
      list(version=version, clients=clients, stats=zstats)
    },

    srvr=function() {
      res <- .self$send_cmd("srvr")
      zstats <- stringi::stri_split_fixed(res, ": ", 2, simplify=TRUE)
      as.list(setNames(zstats[,2], zstats[,1]))
    },

    wchs=function() {
      res <- .self$send_cmd("wchs")
      conn_path <- stri_match_first_regex(res[1], "^([[:digit:]]+) connections watching ([[:digit:]]+) paths")
      tot_watch <- stri_match_first_regex(res[2], "Total watches:([[:digit:]]+)")
      list(connections_watching=conn_path[,2], paths=conn_path[,3], total_watches=tot_watch[,2])
    },

    wchc=function() {
      stri_trim(.self$send_cmd("wchc")) %>% discard(`==`, "") -> res
      setNames(list(res[2:length(res)]), res[1])
    },

    wchp=function() {
      .self$send_cmd("wchp") %>% stri_trim() %>% discard(`==`, "") -> res
      data.frame(
        path=qq[seq(1, length(qq), 2)],
        address=qq[seq(2, length(qq), 2)],
        stringsAsFactors=FALSE
      )
    },

    mntr=function() {
      res <- .self$send_cmd("mntr")
      res <- stringi::stri_split_fixed(res, "\t", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    cons=function() { list(clients=stri_trim(.self$send_cmd("cons") %>% discard(`==`, ""))) },
    crst=function() { message(.self$send_cmd("crst")) ; invisible() },
    srst=function() { message(.self$send_cmd("srst")) ; invisible() },
    dump=function() { paste0(.self$send_cmd("dump"), collapse="\n") },
    ruok=function() { .self$send_cmd("ruok") == "imok" },
    dirs=function() { .self$send_cmd("dirs") },
    isro=function() { .self$send_cmd("isro") },
    gtmk=function() { R.utils::intToBin(as.integer(.self$send_cmd("gtmk"))) },

    send_cmd=function(cmd) {
      require(purrr)
      require(stringi)
      require(R.utils)
      sock <- purrr::safely(socketConnection)
      con <- sock(host=.self$host, port=.self$port, blocking=TRUE, open="r+", timeout=.self$timeout)
      if (!is.null(con$result)) {
        con <- con$result
        cat(cmd, file=con)
        response <- readLines(con, warn=FALSE)
        a <- try(close(con))
        purrr::flatten_chr(stringi::stri_split_lines(response))
      } else {
        warning(sprintf("Error connecting to [%s:%s]", host, port))
      }
    }

  )

)

To leave a comment for the author, please follow the link and comment on their blog: R – rud.is.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.

Sponsors

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)