Minding the zoo[keeper] with R

December 16, 2016
By

[This article was first published on R – rud.is, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

I’ve been drafting a new R package — sergeant — to work with Apache Drill and have found it much easier to manage having Drill operating in a single node cluster vs drill-embedded mode (esp when I need to add a couple more nodes for additional capacity). That means running Apache Zookeeper and I’ve had the occasional need to ping the Zookeeper process to see various stats (especially when I add some systems to the cluster).

Yes, it’s very easy+straightforward to nc hostname 2181 from the command line to issue commands (or use the Zookeeper CLIs) but when I’m in R I like to stay in R. To that end, I made a small reference class that makes it super-easy to query Zookeeper from within R. Said class will eventually be in a sister package to sergeant (and, a more detailed post on sergeant is forthcoming), but there may be others who want/need to poke at Zookeeper processes with 4-letter words and this will help you stay within R. The only command not available is stmk since I don’t have the pressing need to set the trace mask.

After you source the self-contained reference class you connect and then issue commands as so:

zk <- zookeeper$new(host="drill.local")

zk$ruok()

zk$conf()
zk$stat()

Drop a note in the comments (since this isn’t on github, yet) with any questions/issues.

zookeeper <- setRefClass(

  Class="zookeeper",

  fields=list(
    host="character",
    port="integer",
    timeout="integer"
  ),

  methods=list(

    initialize=function(..., host="localhost", port=2181L, timeout=30L) {
      host <<- host ; port <<- port ; timeout <<- timeout; callSuper(...)
    },

    available_commands=function() {
      return(sort(c("conf", "envi", "stat", "srvr", "whcs", "wchc", "wchp", "mntr",
                    "cons", "crst", "srst", "dump", "ruok", "dirs", "isro", "gtmk")))
    },

    connect=function(host="localhost", port=2181L, timeout=30L) {
      .self$host <- host ; .self$port <- port ; .self$timeout <- timeout
    },

    conf=function() {
      res <- .self$send_cmd("conf")
      res <- stringi::stri_split_fixed(res, "=", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    envi=function() {
      res <- .self$send_cmd("envi")
      res <- stringi::stri_split_fixed(res[-1], "=", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    stat=function() {
      res <- .self$send_cmd("stat")
      version <- stri_replace_first_regex(res[1], "^Zoo.*: ", "")
      res <- res[-(1:2)]
      sep <- which(res=="")
      clients <- stri_trim(res[1:(sep-1)])
      zstats <- stringi::stri_split_fixed(res[(sep+1):length(res)], ": ", 2, simplify=TRUE)
      zstats <- as.list(setNames(zstats[,2], zstats[,1]))
      list(version=version, clients=clients, stats=zstats)
    },

    srvr=function() {
      res <- .self$send_cmd("srvr")
      zstats <- stringi::stri_split_fixed(res, ": ", 2, simplify=TRUE)
      as.list(setNames(zstats[,2], zstats[,1]))
    },

    wchs=function() {
      res <- .self$send_cmd("wchs")
      conn_path <- stri_match_first_regex(res[1], "^([[:digit:]]+) connections watching ([[:digit:]]+) paths")
      tot_watch <- stri_match_first_regex(res[2], "Total watches:([[:digit:]]+)")
      list(connections_watching=conn_path[,2], paths=conn_path[,3], total_watches=tot_watch[,2])
    },

    wchc=function() {
      stri_trim(.self$send_cmd("wchc")) %>% discard(`==`, "") -> res
      setNames(list(res[2:length(res)]), res[1])
    },

    wchp=function() {
      .self$send_cmd("wchp") %>% stri_trim() %>% discard(`==`, "") -> res
      data.frame(
        path=qq[seq(1, length(qq), 2)],
        address=qq[seq(2, length(qq), 2)],
        stringsAsFactors=FALSE
      )
    },

    mntr=function() {
      res <- .self$send_cmd("mntr")
      res <- stringi::stri_split_fixed(res, "\t", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    cons=function() { list(clients=stri_trim(.self$send_cmd("cons") %>% discard(`==`, ""))) },
    crst=function() { message(.self$send_cmd("crst")) ; invisible() },
    srst=function() { message(.self$send_cmd("srst")) ; invisible() },
    dump=function() { paste0(.self$send_cmd("dump"), collapse="\n") },
    ruok=function() { .self$send_cmd("ruok") == "imok" },
    dirs=function() { .self$send_cmd("dirs") },
    isro=function() { .self$send_cmd("isro") },
    gtmk=function() { R.utils::intToBin(as.integer(.self$send_cmd("gtmk"))) },

    send_cmd=function(cmd) {
      require(purrr)
      require(stringi)
      require(R.utils)
      sock <- purrr::safely(socketConnection)
      con <- sock(host=.self$host, port=.self$port, blocking=TRUE, open="r+", timeout=.self$timeout)
      if (!is.null(con$result)) {
        con <- con$result
        cat(cmd, file=con)
        response <- readLines(con, warn=FALSE)
        a <- try(close(con))
        purrr::flatten_chr(stringi::stri_split_lines(response))
      } else {
        warning(sprintf("Error connecting to [%s:%s]", host, port))
      }
    }

  )

)

To leave a comment for the author, please follow the link and comment on their blog: R – rud.is.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.

Search R-bloggers

Sponsors

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)