Elasticsearch Data Analysis with R

Hello!

So here’s my problem - I have an Elasticsearch server running which has data updated every few seconds.

I downloaded the ‘elastic’ package for R in the hope of extracting this data and performing analytics with R. However, in my limited experience, this does not seem feasible.

Any suggestions on whether this is possible? Or should I simply give local data from CSV files as the input to R instead of data from the Elasticsearch server?

Thanks for the question. Why isn’t it feasible? Can you tell me what you’ve tried

How exactly does one import a full index into R for performing analysis? From whatever I’ve seen, only one document within an index can be imported at once. My index keeps getting updated with multiple documents every few seconds. Hence not feasible. Am I right or is there a work-around?

It sounds like you want to stream data in, is that right? That is, continuously poll Elasticsearch instead of just calling a function to get some data

If you have a lot of data to pull in I’d recommend using scroll(), see ?scroll. If you don’t have a lot of data, you can just use Search() or Search_uri without scrolling

Thanks!

Do you know if I can perform statistical analysis and charts on this Elasticsearch index in the same way as I do with a normal CSV uploaded file? Or are there limitations to this?

If you’re familiar with Elasticsearch (ES), it allows you to do summary aggregations, calculate various things - let me know if you’re not familiar with those options

Within R, we give you back an R list from Search and similar functions. Because data can be so varied in peoples indices, we dont attempt to make those into data.frame’s, but you can do that, then you’re all set to do any analysis/visualization

Have worked with Elasticsearch in the past but don’t know of these calculative options.

Can we convert the indices into data frames, preferably in real time? If so, how do we go about doing it?

For getting a data.frame, assuming you just want the _source element back, you can do something like this

library("elastic")
library("data.table")
connect()
res <- Search("shakespeare")
(df <- setDF(data.table::rbindlist(
  lapply(res$hits$hits, "[[", "_source"),
  fill = TRUE, use.names = TRUE
)))
#>    line_id play_name speech_number line_number       speaker                                       text_entry
#> 1        5  Henry IV             1       1.1.2 KING HENRY IV       Find we a time for frighted peace to pant,
#> 2       10  Henry IV             1       1.1.7 KING HENRY IV Nor more shall trenching war channel her fields,
#> 3       12  Henry IV             1       1.1.9 KING HENRY IV            Of hostile paces: those opposed eyes,
#> 4       17  Henry IV             1      1.1.14 KING HENRY IV       Shall now, in mutual well-beseeming ranks,
#> 5       24  Henry IV             1      1.1.21 KING HENRY IV           We are impressed and engaged to fight,
#> 6       29  Henry IV             1      1.1.26 KING HENRY IV      Which fourteen hundred years ago were naild
#> 7       31  Henry IV             1      1.1.28 KING HENRY IV    But this our purpose now is twelve month old,
#> 8       36  Henry IV             1      1.1.33 KING HENRY IV              In forwarding this dear expedience.
#> 9       43  Henry IV             2      1.1.40  WESTMORELAND        Against the irregular and wild Glendower,
#> 10      48  Henry IV             2      1.1.45  WESTMORELAND           By those Welshwomen done as may not be

data.table::rbindlist is really fast. dplyr::bind_rows does something similar, although you’d need to have each element as a data.frame already before combining them\

Alternatively, you can have jsonlite, the internal JSON parser attempt to parse to R structures for you, use asdf=TRUE which passes to jsonlite::fromJSON(), for example:

Search("shakespeare", asdf = TRUE)$hits$hits$`_source`
#>    line_id play_name speech_number line_number       speaker                                       text_entry
#> 1        5  Henry IV             1       1.1.2 KING HENRY IV       Find we a time for frighted peace to pant,
#> 2       10  Henry IV             1       1.1.7 KING HENRY IV Nor more shall trenching war channel her fields,
#> 3       12  Henry IV             1       1.1.9 KING HENRY IV            Of hostile paces: those opposed eyes,
#> 4       17  Henry IV             1      1.1.14 KING HENRY IV       Shall now, in mutual well-beseeming ranks,
#> 5       24  Henry IV             1      1.1.21 KING HENRY IV           We are impressed and engaged to fight,
#> 6       29  Henry IV             1      1.1.26 KING HENRY IV      Which fourteen hundred years ago were naild
#> 7       31  Henry IV             1      1.1.28 KING HENRY IV    But this our purpose now is twelve month old,
#> 8       36  Henry IV             1      1.1.33 KING HENRY IV              In forwarding this dear expedience.
#> 9       43  Henry IV             2      1.1.40  WESTMORELAND        Against the irregular and wild Glendower,
#> 10      48  Henry IV             2      1.1.45  WESTMORELAND           By those Welshwomen done as may not be

For the streaming part - there’s nothing in elastic for that. I’d suggest perhaps running an R instance that e.g., every second or few seconds, or more, queries your ES server, then proceed with data wrangling

Unfortunately, ES doesn’t have the concept of a changes feed like CouchDB has, but there is an open issue for it https://github.com/elastic/elasticsearch/issues/1242 and there is a apparently a plugin https://github.com/jurgc11/es-change-feed-plugin

Hey,

I noticed that despite my index having 20000-odd entries, only 10 of them got copied onto this variable in this method. Can’t I just import all of them like it happens with the CSV files?

Do read the documentation. the default for the size parameter is 10

So I did change the size parameter to a really high value, since I have tens of thousands of logs in each index that I wish to import into a data frame. Two problems -

  1. There is a limit on the size parameter. It isn’t accepting values beyond a certain number.
  2. Even within the limits, it isn’t successfully importing bulk data. For instance, I set size to 10000 and close to 1000 of them failed to import.

This seems inefficient. Do I go ahead with the Elastic tool or just try my project with CSV as an input, which is more effort but clearly the better option?

What was the limit for you for size?

what do you mean by failed to import?

I think we can get this to work for you, just let me know answers to questions above. Also note that I imagine I can’t access your ES instance, so I can only help so much without being able to inspect for myself what’s going on

Hi, sorry for the delayed reply - did not notice activity on this thread.

  1. I changed the default limit from 10 to about 40,000 since that’s roughly the number of logs I have.

  2. By failed to import, I mean that the dataframe created following my command to import the data from ES contained only a small percentage of my logs.

Any suggestions?

The default max limit for the combination of size and from is 10,000, see https://www.elastic.co/guide/en/elasticsearch/reference/2.3/breaking_21_search_changes.html#_from_size_limits You can however set the maximum to a higher number as those docs say. However, they suggest not to do that, but instead use scrolling search as I suggested for very large requests. I’m updating the docs to make sure this is clear.

Another thing to be aware of that could have helped here is using more verbose errors. When you use connect() you can set the level of error verbosity, e.g.,

connect(errors = "complete")

Then when we set the size to > 10,000, we get

Search(index = "geonames", size = 40000L)
#> Error: 500 - all shards failed
#> ES stack trace:
#>
#>  type: query_phase_execution_exception
#>  reason: Result window is too large, from + size must be less than or equal 
#>    to: [10000] but was [40000]. See the scroll api for a more efficient way 
#>    to request large data sets. This limit can be set by changing the [index.max_result_window] 
#>    index level parameter.

p.s. Regarding notifications from this or other threads - you may want to check your settings in your account here to make sure you get email notifications

Hello Scott…
I am using scroll because I have a lot of documents in my index and I would like it to be stored in a dataframe. So I was hoping to use the asdf=TRUE option. But I am not getting the right results.

library("elastic")
connect()
res <- Search(index = 'client-events-new', scroll="5m",size=10000)
print(paste('number of hits ',res$hits$total))
out <- list()
hits <- 1
while(hits != 0){
  #res1 <- scroll(scroll_id = res$`_scroll_id`, asdf=TRUE)$hits$hits$`_source` # this does not work
  res1 <- scroll(scroll_id = res$`_scroll_id`, asdf=TRUE)
  hits <- length(res$hits$hits)
  if(hits > 0){
    out <- c(out, res$hits$hits)
    print(hits)
  }
}

In the above code, the scroll returns a response where res1$hits$hits is a list. I would have expected that to be a dataframe. As a workaround I am doing the following and this seems to work.

res <- Search(index = 'client-events-new', scroll="5m",size=10000)
res1 <- scroll(scroll_id = res$`_scroll_id`,raw = TRUE)
dfr=data.frame(jsonlite::fromJSON(res1, TRUE, flatten = TRUE))

What am I doing wrong here ?

Thanks for your help.

best

Rajesh

1 Like

Hi @rajpaz

As you know I can’t see your data, so I will only be able to guess at the solution.

The internal bit that handles the asdf parameter is jsonlite::fromJSON(res, asdf, flatten = TRUE) - so you can play around with that and see what might be going wrong.

One problem I see in your first eg block of code is that you need to assign the output of scroll(scroll_id = res$_scroll_id, asdf=TRUE) to res, not res1 - since you’re doing a while loop, and need to get a new scroll id if the while loop should continue.

Note also that we’ve added, at least in dev version, ability to stream output to disk in case you have too much data to fit in memory. See parameter stream_opts

Best, Scott

Thanks for the reply.
To try out your suggestion about playing with fromJSON
jsonlite::fromJSON(res, asdf, flatten = TRUE)

how do I get res in txt format. I know how to do it using raw=TRUE. This does seem to work (as I sent in earlier mil)

that’s what i meant, with raw param, then play with the raw json as character class

Hi Sckott. I am working on a database which is in Postgresql. I want to extract it using elasticsearch and then send it to R for analysis. I am successful in connecting Postgresql with Elasticsearch and extract data. Now I have connected Elasticsearch with R and I imported the data in R. As I had 370,000+ entries so I defined the exact size as follows:

Now the first issue is: The data contains 372864 entries of ‘Year’ and ‘Number of births’. Isn’t there any way to avoid importing every entry and import them in the chunks of ‘Year’? Like calculate a total of births year wise and Just import that data(the year range is 1969-84) as it will take less time to import data?
Second issue is: How to convert this into a data frame which can be used to make plots and graphs then?
Please help me out.