1 |
|
.onLoad <- function(lib, pkg) {
|
2 |
! |
if (Sys.getenv("RECEPTIVITI_URL") == "") Sys.setenv(RECEPTIVITI_URL = "https://api.receptiviti.com/")
|
3 |
|
}
|
4 |
|
|
5 |
|
#' Receptiviti API
|
6 |
|
#'
|
7 |
|
#' The main function to access the \href{https://www.receptiviti.com}{Receptiviti} API.
|
8 |
|
#'
|
9 |
|
#' @param text A character vector with text to be processed, path to a directory containing files, or a vector of file paths.
|
10 |
|
#' If a single path to a directory, each file is collapsed to a single text. If a path to a file or files,
|
11 |
|
#' each line or row is treated as a separate text, unless \code{collapse_lines} is \code{TRUE} (in which case,
|
12 |
|
#' files will be read in as part of bundles at processing time, as is always the case when a directory).
|
13 |
|
#' Use \code{files} to more reliably enter files, or \code{dir} to more reliably specify a directory.
|
14 |
|
#' @param output Path to a \code{.csv} file to write results to. If this already exists, set \code{overwrite} to \code{TRUE}
|
15 |
|
#' to overwrite it.
|
16 |
|
#' @param id Vector of unique IDs the same length as \code{text}, to be included in the results.
|
17 |
|
#' @param text_column,id_column Column name of text/id, if \code{text} is a matrix-like object, or a path to a csv file.
|
18 |
|
#' @param files A list of file paths, as alternate entry to \code{text}.
|
19 |
|
#' @param dir A directory to search for files in, as alternate entry to \code{text}.
|
20 |
|
#' @param file_type File extension to search for, if \code{text} is the path to a directory containing files to be read in.
|
21 |
|
#' @param return_text Logical; if \code{TRUE}, \code{text} is included as the first column of the result.
|
22 |
|
#' @param api_args A list of additional arguments to pass to the API (e.g., \code{list(sallee_mode = "sparse")}). Defaults to the
|
23 |
|
#' \code{receptiviti.api_args} option.
|
24 |
|
#' @param frameworks A vector of frameworks to include results from. Texts are always scored with all available framework --
|
25 |
|
#' this just specifies what to return. Defaults to \code{all}, to return all scored frameworks. Can be set by the
|
26 |
|
#' \code{receptiviti.frameworks} option (e.g., \code{options(receptiviti.frameworks = c("liwc", "sallee"))}).
|
27 |
|
#' @param framework_prefix Logical; if \code{FALSE}, will remove the framework prefix from column names, which may result in duplicates.
|
28 |
|
#' If this is not specified, and 1 framework is selected, or \code{as_list} is \code{TRUE}, will default to remove prefixes.
|
29 |
|
#' @param as_list Logical; if \code{TRUE}, returns a list with frameworks in separate entries.
|
30 |
|
#' @param bundle_size Number of texts to include in each request; between 1 and 1,000.
|
31 |
|
#' @param bundle_byte_limit Memory limit (in bytes) of each bundle, under \code{1e7} (10 MB, which is the API's limit).
|
32 |
|
#' May need to be lower than the API's limit, depending on the system's requesting library.
|
33 |
|
#' @param collapse_lines Logical; if \code{TRUE}, and \code{text} contains paths to files, each file is treated as a single text.
|
34 |
|
#' @param retry_limit Maximum number of times each request can be retried after hitting a rate limit.
|
35 |
|
#' @param overwrite Logical; if \code{TRUE}, will overwrite an existing \code{output} file.
|
36 |
|
#' @param compress Logical; if \code{TRUE}, will save as an \code{xz}-compressed file.
|
37 |
|
#' @param make_request Logical; if \code{FALSE}, a request is not made. This could be useful if you want to be sure and
|
38 |
|
#' load from one of the caches, but aren't sure that all results exist there; it will error out if it encounters
|
39 |
|
#' texts it has no other source for.
|
40 |
|
#' @param text_as_paths Logical; if \code{TRUE}, ensures \code{text} is treated as a vector of file paths. Otherwise, this will be
|
41 |
|
#' determined if there are no \code{NA}s in \code{text} and every entry is under 500 characters long.
|
42 |
|
#' @param cache Path to a directory in which to save unique results for reuse; defaults to
|
43 |
|
#' \code{Sys.getenv(}\code{"RECEPTIVITI_CACHE")}. See the Cache section for details.
|
44 |
|
#' @param cache_overwrite Logical; if \code{TRUE}, will write results to the cache without reading from it. This could be used
|
45 |
|
#' if you want fresh results to be cached without clearing the cache.
|
46 |
|
#' @param cache_format Format of the cache database; see \code{\link[arrow]{FileFormat}}.
|
47 |
|
#' Defaults to \code{Sys.getenv(}\code{"RECEPTIVITI_CACHE_FORMAT")}.
|
48 |
|
#' @param clear_cache Logical; if \code{TRUE}, will clear any existing files in the cache. Use \code{cache_overwrite} if
|
49 |
|
#' you want fresh results without clearing or disabling the cache. Use \code{cache = FALSE} to disable the cache.
|
50 |
|
#' @param request_cache Logical; if \code{FALSE}, will always make a fresh request, rather than using the response
|
51 |
|
#' from a previous identical request.
|
52 |
|
#' @param cores Number of CPU cores to split bundles across, if there are multiple bundles. See the Parallelization section.
|
53 |
|
#' @param use_future Logical; if \code{TRUE}, uses a \code{future} back-end to process bundles, in which case,
|
54 |
|
#' parallelization can be controlled with the \code{\link[future]{plan}} function (e.g., \code{plan("multisession")}
|
55 |
|
#' to use multiple cores); this is required to see progress bars when using multiple cores. See the Parallelization section.
|
56 |
|
#' @param in_memory Logical; if \code{FALSE}, will write bundles to temporary files, and only load them as they are being requested.
|
57 |
|
#' @param clear_scratch_cache Logical; if \code{FALSE}, will preserve the bundles written when \code{in_memory} is \code{TRUE}, after
|
58 |
|
#' the request has been made.
|
59 |
|
#' @param verbose Logical; if \code{TRUE}, will show status messages.
|
60 |
|
#' @param key API Key; defaults to \code{Sys.getenv("RECEPTIVITI_KEY")}.
|
61 |
|
#' @param secret API Secret; defaults to \code{Sys.getenv("RECEPTIVITI_SECRET")}.
|
62 |
|
#' @param url API URL; defaults to \code{Sys.getenv("RECEPTIVITI_URL")}, which defaults to
|
63 |
|
#' \code{"https://api.receptiviti.com/"}.
|
64 |
|
#' @param version API version; defaults to \code{Sys.getenv("RECEPTIVITI_VERSION")}, which defaults to
|
65 |
|
#' \code{"v1"}.
|
66 |
|
#' @param endpoint API endpoint (path name after the version); defaults to \code{Sys.getenv("RECEPTIVITI_ENDPOINT")},
|
67 |
|
#' which defaults to \code{"framework"}.
|
68 |
|
#' @param include_headers Logical; if \code{TRUE}, \code{receptiviti_status}'s verbose message will include
|
69 |
|
#' the HTTP headers.
|
70 |
|
#'
|
71 |
|
#' @returns A \code{data.frame} with columns for \code{text} (if \code{return_text} is \code{TRUE}; the originally entered text),
|
72 |
|
#' \code{id} (if one was provided), \code{text_hash} (the MD5 hash of the text), a column each for relevant entries in \code{api_args},
|
73 |
|
#' and scores from each included framework (e.g., \code{summary.word_count} and \code{liwc.i}). If \code{as_list} is \code{TRUE},
|
74 |
|
#' returns a list with a named entry containing such a \code{data.frame} for each framework.
|
75 |
|
#'
|
76 |
|
#' @section Cache:
|
77 |
|
#' If the \code{cache} argument is specified, results for unique texts are saved in an
|
78 |
|
#' \href{https://arrow.apache.org}{Arrow} database in the cache location
|
79 |
|
#' (\code{Sys.getenv(}\code{"RECEPTIVITI_CACHE")}), and are retrieved with subsequent requests.
|
80 |
|
#' This ensures that the exact same texts are not re-sent to the API.
|
81 |
|
#' This does, however, add some processing time and disc space usage.
|
82 |
|
#'
|
83 |
|
#' If \code{cache} is \code{TRUE}, a default directory (\code{receptiviti_cache}) will be looked for
|
84 |
|
#' in the system's temporary directory (which is usually the parent of \code{tempdir()}).
|
85 |
|
#' If this does not exist, you will be asked if it should be created.
|
86 |
|
#'
|
87 |
|
#' The primary cache is checked when each bundle is processed, and existing results are loaded at
|
88 |
|
#' that time. When processing many bundles in parallel, and many results have been cached,
|
89 |
|
#' this can cause the system to freeze and potentially crash.
|
90 |
|
#' To avoid this, limit the number of cores, or disable parallel processing.
|
91 |
|
#'
|
92 |
|
#' The \code{cache_format} arguments (or the \code{RECEPTIVITI_CACHE_FORMAT} environment variable) can be used to adjust the format of the cache.
|
93 |
|
#'
|
94 |
|
#' You can use the cache independently with \code{open_database(Sys.getenv("RECEPTIVITI_CACHE"))}.
|
95 |
|
#'
|
96 |
|
#' You can also set the \code{clear_cache} argument to \code{TRUE} to clear the cache before it is used again, which may be useful
|
97 |
|
#' if the cache has gotten big, or you know new results will be returned. Even if a cached result exists, it will be
|
98 |
|
#' reprocessed if it does not have all of the variables of new results, but this depends on there being at least 1 uncached
|
99 |
|
#' result. If, for instance, you add a framework to your account and want to reprocess a previously processed set of texts,
|
100 |
|
#' you would need to first clear the cache.
|
101 |
|
#'
|
102 |
|
#' Either way, duplicated texts within the same call will only be sent once.
|
103 |
|
#'
|
104 |
|
#' The \code{request_cache} argument controls a more temporary cache of each bundle request. This is cleared when the
|
105 |
|
#' R session ends. You might want to set this to \code{FALSE} if a new framework becomes available on your account
|
106 |
|
#' and you want to process a set of text you already processed in the current R session without restarting.
|
107 |
|
#'
|
108 |
|
#' Another temporary cache is made when \code{in_memory} is \code{FALSE}, which is the default when processing
|
109 |
|
#' in parallel (when \code{cores} is over \code{1} or \code{use_future} is \code{TRUE}). This contains
|
110 |
|
#' a file for each unique bundle, which is read in as needed by the parallel workers.
|
111 |
|
#'
|
112 |
|
#' @section Parallelization:
|
113 |
|
#' \code{text}s are split into bundles based on the \code{bundle_size} argument. Each bundle represents
|
114 |
|
#' a single request to the API, which is why they are limited to 1000 texts and a total size of 10 MB.
|
115 |
|
#' When there is more than one bundle and either \code{cores} is greater than 1 or \code{use_future} is \code{TRUE} (and you've
|
116 |
|
#' externally specified a \code{\link[future]{plan}}), bundles are processed by multiple cores.
|
117 |
|
#'
|
118 |
|
#' If you have texts spread across multiple files, they can be most efficiently processed in parallel
|
119 |
|
#' if each file contains a single text (potentially collapsed from multiple lines). If files contain
|
120 |
|
#' multiple texts (i.e., \code{collapse_lines = FALSE}), then texts need to be read in before bundling
|
121 |
|
#' in order to ensure bundles are under the length limit.
|
122 |
|
#'
|
123 |
|
#' Whether processing in serial or parallel, progress bars can be specified externally with
|
124 |
|
#' \code{\link[progressr]{handlers}}; see examples.
|
125 |
|
#' @examples
|
126 |
|
#' \dontrun{
|
127 |
|
#'
|
128 |
|
#' # check that the API is available, and your credentials work
|
129 |
|
#' receptiviti_status()
|
130 |
|
#'
|
131 |
|
#' # score a single text
|
132 |
|
#' single <- receptiviti("a text to score")
|
133 |
|
#'
|
134 |
|
#' # score multiple texts, and write results to a file
|
135 |
|
#' multi <- receptiviti(c("first text to score", "second text"), "filename.csv")
|
136 |
|
#'
|
137 |
|
#' # score many texts in separate files
|
138 |
|
#' ## defaults to look for .txt files
|
139 |
|
#' file_results <- receptiviti(dir = "./path/to/txt_folder")
|
140 |
|
#'
|
141 |
|
#' ## could be .csv
|
142 |
|
#' file_results <- receptiviti(
|
143 |
|
#' dir = "./path/to/csv_folder",
|
144 |
|
#' text_column = "text", file_type = "csv"
|
145 |
|
#' )
|
146 |
|
#'
|
147 |
|
#' # score many texts from a file, with a progress bar
|
148 |
|
#' ## set up cores and progress bar (only necessary if you want the progress bar)
|
149 |
|
#' future::plan("multisession")
|
150 |
|
#' progressr::handlers(global = TRUE)
|
151 |
|
#' progressr::handlers("progress")
|
152 |
|
#'
|
153 |
|
#' ## make request
|
154 |
|
#' results <- receptiviti(
|
155 |
|
#' "./path/to/largefile.csv",
|
156 |
|
#' text_column = "text", use_future = TRUE
|
157 |
|
#' )
|
158 |
|
#' }
|
159 |
|
#' @importFrom curl new_handle curl_fetch_memory curl_fetch_disk
|
160 |
|
#' @importFrom jsonlite toJSON fromJSON read_json
|
161 |
|
#' @importFrom utils object.size
|
162 |
|
#' @importFrom digest digest
|
163 |
|
#' @importFrom parallel detectCores makeCluster clusterExport parLapplyLB parLapply stopCluster
|
164 |
|
#' @importFrom future.apply future_lapply
|
165 |
|
#' @importFrom progressr progressor
|
166 |
|
#' @importFrom arrow read_csv_arrow write_csv_arrow schema string write_dataset open_dataset
|
167 |
|
#' @importFrom dplyr filter compute collect select all_of
|
168 |
|
#' @export
|
169 |
|
|
170 |
|
receptiviti <- function(text, output = NULL, id = NULL, text_column = NULL, id_column = NULL, files = NULL, dir = NULL,
|
171 |
|
file_type = "txt", return_text = FALSE, api_args = getOption("receptiviti.api_args", list()),
|
172 |
|
frameworks = getOption("receptiviti.frameworks", "all"), framework_prefix = TRUE, as_list = FALSE,
|
173 |
|
bundle_size = 1000, bundle_byte_limit = 75e5, collapse_lines = FALSE, retry_limit = 50, clear_cache = FALSE,
|
174 |
|
clear_scratch_cache = TRUE, request_cache = TRUE, cores = detectCores() - 1, use_future = FALSE,
|
175 |
|
in_memory = TRUE, verbose = FALSE, overwrite = FALSE, compress = FALSE, make_request = TRUE,
|
176 |
|
text_as_paths = FALSE, cache = Sys.getenv("RECEPTIVITI_CACHE"), cache_overwrite = FALSE,
|
177 |
|
cache_format = Sys.getenv("RECEPTIVITI_CACHE_FORMAT", "parquet"), key = Sys.getenv("RECEPTIVITI_KEY"),
|
178 |
|
secret = Sys.getenv("RECEPTIVITI_SECRET"), url = Sys.getenv("RECEPTIVITI_URL"),
|
179 |
|
version = Sys.getenv("RECEPTIVITI_VERSION"), endpoint = Sys.getenv("RECEPTIVITI_ENDPOINT")) {
|
180 |
|
# check input
|
181 |
60x |
final_res <- text_hash <- bin <- NULL
|
182 |
60x |
if (!is.null(output)) {
|
183 |
! |
if (!file.exists(output) && file.exists(paste0(output, ".xz"))) output <- paste0(output, ".xz")
|
184 |
1x |
if (!overwrite && file.exists(output)) stop("output file already exists; use overwrite = TRUE to overwrite it", call. = FALSE)
|
185 |
|
}
|
186 |
59x |
if (isTRUE(cache)) {
|
187 |
2x |
temp <- dirname(tempdir())
|
188 |
! |
if (basename(temp) == "working_dir") temp <- dirname(dirname(temp))
|
189 |
2x |
cache <- paste0(temp, "/receptiviti_cache")
|
190 |
2x |
if (!dir.exists(cache)) {
|
191 |
! |
if (interactive() && !isFALSE(getOption("receptiviti.cache_prompt")) &&
|
192 |
! |
grepl("^(?:[Yy1]|$)", readline("Do you want to establish a default cache? [Y/n] "))) {
|
193 |
! |
dir.create(cache, FALSE)
|
194 |
|
} else {
|
195 |
! |
options(receptiviti.cache_prompt = FALSE)
|
196 |
! |
cache <- FALSE
|
197 |
|
}
|
198 |
|
}
|
199 |
|
}
|
200 |
59x |
st <- proc.time()[[3]]
|
201 |
59x |
text_as_dir <- FALSE
|
202 |
59x |
if (missing(text)) {
|
203 |
3x |
if (!is.null(dir)) {
|
204 |
! |
if (!dir.exists(dir)) stop("entered dir does not exist", call. = FALSE)
|
205 |
1x |
text <- dir
|
206 |
1x |
text_as_dir <- TRUE
|
207 |
2x |
} else if (!is.null(files)) {
|
208 |
1x |
text <- files
|
209 |
1x |
text_as_paths <- TRUE
|
210 |
|
} else {
|
211 |
1x |
stop("enter text as the first argument, or use files or dir", call. = FALSE)
|
212 |
|
}
|
213 |
|
}
|
214 |
58x |
if (text_as_paths) {
|
215 |
1x |
if (anyNA(text)) stop("NAs are not allowed in text when being treated as file paths", call. = FALSE)
|
216 |
1x |
if (!all(file.exists(text))) stop("not all of the files in text exist", call. = FALSE)
|
217 |
|
}
|
218 |
56x |
read_in <- FALSE
|
219 |
56x |
if (text_as_dir || text_as_paths || (is.character(text) && !anyNA(text) && all(nchar(text) < 500))) {
|
220 |
38x |
if (text_as_dir || length(text) == 1 && dir.exists(text)) {
|
221 |
! |
if (verbose) message("reading in texts from directory: ", text, " (", round(proc.time()[[3]] - st, 4), ")")
|
222 |
5x |
text_as_paths <- TRUE
|
223 |
5x |
text <- normalizePath(list.files(text, file_type, full.names = TRUE), "/", FALSE)
|
224 |
33x |
} else if (text_as_paths || all(file.exists(text))) {
|
225 |
6x |
text_as_paths <- collapse_lines
|
226 |
! |
if (verbose) message("reading in texts from file list (", round(proc.time()[[3]] - st, 4), ")")
|
227 |
6x |
if (!collapse_lines) {
|
228 |
4x |
if (missing(id_column)) names(text) <- if (length(id) != length(text)) text else id
|
229 |
5x |
if (all(grepl("\\.csv", text, TRUE))) {
|
230 |
1x |
if (is.null(text_column)) stop("text appears to point to csv files, but text_column was not specified", call. = FALSE)
|
231 |
2x |
read_in <- TRUE
|
232 |
2x |
text <- unlist(lapply(text, function(f) {
|
233 |
2x |
d <- tryCatch(
|
234 |
2x |
read_csv_arrow(f, col_select = c(text_column, id_column)),
|
235 |
2x |
error = function(e) NULL
|
236 |
|
)
|
237 |
! |
if (is.null(d)) stop("failed to read in file ", f, call. = FALSE)
|
238 |
2x |
if (!is.null(id_column) && id_column %in% colnames(d)) {
|
239 |
1x |
structure(d[, text_column, drop = TRUE], names = d[, id_column, drop = TRUE])
|
240 |
|
} else {
|
241 |
1x |
d[, text_column, drop = TRUE]
|
242 |
|
}
|
243 |
|
}))
|
244 |
|
} else {
|
245 |
2x |
text <- unlist(lapply(text, function(f) {
|
246 |
2x |
d <- readLines(f, warn = FALSE, skipNul = TRUE)
|
247 |
! |
if (collapse_lines) d <- paste(d, collapse = " ")
|
248 |
2x |
d
|
249 |
|
}))
|
250 |
|
}
|
251 |
4x |
id <- names(text)
|
252 |
|
}
|
253 |
27x |
} else if (length(text) == 1 && dirname(text) != "." && dir.exists(dirname(text))) {
|
254 |
! |
stop("text appears to be a directory, but it does not exist")
|
255 |
|
}
|
256 |
37x |
if (text_as_paths && missing(id)) {
|
257 |
6x |
id <- text
|
258 |
! |
if (anyDuplicated(id)) id <- names(unlist(lapply(split(id, factor(id, unique(id))), seq_along)))
|
259 |
|
}
|
260 |
|
}
|
261 |
55x |
if (is.null(dim(text))) {
|
262 |
49x |
if (!read_in) {
|
263 |
1x |
if (!text_as_paths && !is.null(text_column)) stop("text_column is specified, but text has no columns", call. = FALSE)
|
264 |
1x |
if (!is.null(id_column)) stop("id_column is specified, but text has no columns", call. = FALSE)
|
265 |
|
}
|
266 |
|
} else {
|
267 |
1x |
if (length(id) == 1 && id %in% colnames(text)) id_column <- id
|
268 |
6x |
if (!is.null(id_column)) {
|
269 |
2x |
if (id_column %in% colnames(text)) {
|
270 |
1x |
id <- text[, id_column, drop = TRUE]
|
271 |
|
} else {
|
272 |
1x |
stop("id_column not found in text", call. = FALSE)
|
273 |
|
}
|
274 |
|
}
|
275 |
5x |
if (!is.null(text_column)) {
|
276 |
3x |
if (text_column %in% colnames(text)) {
|
277 |
2x |
text <- text[, text_column, drop = TRUE]
|
278 |
|
} else {
|
279 |
1x |
if (!text_as_paths) stop("text_column not found in text", call. = FALSE)
|
280 |
|
}
|
281 |
|
}
|
282 |
4x |
if (!is.null(dim(text))) {
|
283 |
2x |
if (ncol(text) == 1) {
|
284 |
1x |
text <- text[, 1, drop = TRUE]
|
285 |
|
} else {
|
286 |
1x |
stop("text has dimensions, but no text_column column", call. = FALSE)
|
287 |
|
}
|
288 |
|
}
|
289 |
|
}
|
290 |
2x |
if (!is.character(text)) text <- as.character(text)
|
291 |
! |
if (!length(text)) stop("no texts were found after resolving the text argument")
|
292 |
4x |
if (length(id) && !is.character(id)) id <- as.character(id)
|
293 |
50x |
provided_id <- FALSE
|
294 |
50x |
if (length(id)) {
|
295 |
1x |
if (length(id) != length(text)) stop("id is not the same length as text", call. = FALSE)
|
296 |
1x |
if (anyDuplicated(id)) stop("id contains duplicate values", call. = FALSE)
|
297 |
13x |
provided_id <- TRUE
|
298 |
|
} else {
|
299 |
35x |
id <- paste0("t", seq_along(text))
|
300 |
|
}
|
301 |
1x |
if (!is.numeric(retry_limit)) retry_limit <- 0
|
302 |
48x |
url_parts <- unlist(strsplit(regmatches(
|
303 |
48x |
url, gregexpr("/[Vv]\\d(?:/[^/]+)?", url)
|
304 |
48x |
)[[1]], "/", fixed = TRUE))
|
305 |
47x |
if (version == "") version <- if (length(url_parts) > 1) url_parts[[2]] else "v1"
|
306 |
48x |
if (endpoint == "") {
|
307 |
47x |
endpoint <- if (length(url_parts) > 2) {
|
308 |
! |
url_parts[[3]]
|
309 |
|
} else {
|
310 |
! |
if (tolower(version) == "v1") "framework" else "taxonomies"
|
311 |
|
}
|
312 |
|
}
|
313 |
48x |
url <- paste0(sub("(?:/v\\d+)?/+$", "", url), "/", version, "/")
|
314 |
48x |
full_url <- paste0(url, endpoint, "/bulk")
|
315 |
! |
if (!is.list(api_args)) api_args <- as.list(api_args)
|
316 |
48x |
args_hash <- digest::digest(jsonlite::toJSON(c(
|
317 |
48x |
api_args,
|
318 |
48x |
url = full_url, key = key, secret = secret
|
319 |
48x |
), auto_unbox = TRUE), serialize = FALSE)
|
320 |
|
|
321 |
|
# ping API
|
322 |
48x |
if (make_request) {
|
323 |
2x |
if (verbose) message("pinging API (", round(proc.time()[[3]] - st, 4), ")")
|
324 |
46x |
ping <- receptiviti_status(url, key, secret, verbose = FALSE)
|
325 |
1x |
if (ping$status_code != 200) stop(ping$status_message, call. = FALSE)
|
326 |
|
}
|
327 |
|
|
328 |
|
# prepare text
|
329 |
2x |
if (verbose) message("preparing text (", round(proc.time()[[3]] - st, 4), ")")
|
330 |
45x |
data <- data.frame(text = text, id = id, stringsAsFactors = FALSE)
|
331 |
45x |
text <- data[!is.na(data$text) & data$text != "" & !duplicated(data$text), ]
|
332 |
1x |
if (!nrow(text)) stop("no valid texts to process", call. = FALSE)
|
333 |
1x |
if (!is.numeric(bundle_size)) bundle_size <- 1000
|
334 |
44x |
n_texts <- nrow(text)
|
335 |
44x |
n <- ceiling(n_texts / min(1000, max(1, bundle_size)))
|
336 |
44x |
bundles <- split(text, sort(rep_len(seq_len(n), nrow(text))))
|
337 |
44x |
size_fun <- if (text_as_paths) function(b) sum(file.size(b$text)) else object.size
|
338 |
44x |
for (i in rev(seq_along(bundles))) {
|
339 |
154x |
size <- size_fun(bundles[[i]])
|
340 |
154x |
if (size > bundle_byte_limit) {
|
341 |
2x |
sizes <- vapply(seq_len(nrow(bundles[[i]])), function(r) as.numeric(size_fun(bundles[[i]][r, ])), 0)
|
342 |
2x |
if (any(sizes > bundle_byte_limit)) {
|
343 |
1x |
stop(
|
344 |
1x |
"one of your texts is over the individual size limit (", bundle_byte_limit / 1e6, " MB)",
|
345 |
1x |
call. = FALSE
|
346 |
|
)
|
347 |
|
}
|
348 |
1x |
bins <- rep(1, length(sizes))
|
349 |
1x |
bin_size <- 0
|
350 |
1x |
bi <- 1
|
351 |
1x |
for (ti in seq_along(bins)) {
|
352 |
50x |
bin_size <- bin_size + sizes[ti]
|
353 |
50x |
if (bin_size > bundle_byte_limit) {
|
354 |
2x |
bin_size <- sizes[ti]
|
355 |
2x |
bi <- bi + 1
|
356 |
|
}
|
357 |
50x |
bins[ti] <- bi
|
358 |
|
}
|
359 |
1x |
bundles <- c(bundles[-i], unname(split(bundles[[i]], paste0(i, ".", bins))))
|
360 |
|
}
|
361 |
|
}
|
362 |
43x |
n_bundles <- length(bundles)
|
363 |
43x |
bundle_ref <- if (n_bundles == 1) "bundle" else "bundles"
|
364 |
2x |
if (verbose) message("prepared text in ", n_bundles, " ", bundle_ref, " (", round(proc.time()[[3]] - st, 4), ")")
|
365 |
|
|
366 |
|
# prepare cache
|
367 |
43x |
if (is.character(cache) && cache != "") {
|
368 |
30x |
temp <- normalizePath(cache, "/", FALSE)
|
369 |
30x |
cache <- TRUE
|
370 |
2x |
if (clear_cache) unlink(temp, recursive = TRUE)
|
371 |
30x |
dir.create(temp, FALSE)
|
372 |
|
} else {
|
373 |
13x |
temp <- NULL
|
374 |
13x |
cache <- FALSE
|
375 |
|
}
|
376 |
|
|
377 |
43x |
check_cache <- cache && !cache_overwrite
|
378 |
43x |
auth <- paste0(key, ":", secret)
|
379 |
! |
if (missing(in_memory) && (use_future || cores > 1) && n_bundles > cores) in_memory <- FALSE
|
380 |
43x |
request_scratch <- NULL
|
381 |
43x |
if (!in_memory) {
|
382 |
! |
if (verbose) message("writing ", bundle_ref, " to disc (", round(proc.time()[[3]] - st, 4), ")")
|
383 |
2x |
request_scratch <- paste0(tempdir(), "/receptiviti_request_scratch/")
|
384 |
2x |
dir.create(request_scratch, FALSE)
|
385 |
2x |
if (clear_scratch_cache) on.exit(unlink(request_scratch, recursive = TRUE))
|
386 |
2x |
bundles <- vapply(bundles, function(b) {
|
387 |
6x |
scratch_bundle <- paste0(request_scratch, digest::digest(b), ".rds")
|
388 |
6x |
if (!file.exists(scratch_bundle)) saveRDS(b, scratch_bundle, compress = FALSE)
|
389 |
6x |
scratch_bundle
|
390 |
2x |
}, "", USE.NAMES = FALSE)
|
391 |
|
}
|
392 |
|
|
393 |
43x |
doprocess <- function(bundles, cores, future) {
|
394 |
5x |
env <- parent.frame()
|
395 |
5x |
if (future) {
|
396 |
1x |
eval(expression(future.apply::future_lapply(bundles, process)), envir = env)
|
397 |
|
} else {
|
398 |
4x |
cl <- parallel::makeCluster(cores)
|
399 |
4x |
parallel::clusterExport(cl, ls(envir = env), env)
|
400 |
4x |
on.exit(parallel::stopCluster(cl))
|
401 |
4x |
(if (length(bundles) > cores * 2) parallel::parLapplyLB else parallel::parLapply)(cl, bundles, process)
|
402 |
|
}
|
403 |
|
}
|
404 |
|
|
405 |
43x |
request <- function(body, bin, ids, attempt = retry_limit) {
|
406 |
130x |
unpack <- function(d) {
|
407 |
16977x |
if (is.list(d)) as.data.frame(lapply(d, unpack), optional = TRUE) else d
|
408 |
|
}
|
409 |
130x |
json <- jsonlite::toJSON(unname(body), auto_unbox = TRUE)
|
410 |
130x |
temp_file <- paste0(tempdir(), "/", digest::digest(json, serialize = FALSE), ".json")
|
411 |
116x |
if (!request_cache) unlink(temp_file)
|
412 |
130x |
res <- NULL
|
413 |
130x |
if (!file.exists(temp_file)) {
|
414 |
126x |
if (make_request) {
|
415 |
125x |
handler <- tryCatch(
|
416 |
125x |
curl::new_handle(httpauth = 1, userpwd = auth, copypostfields = json),
|
417 |
125x |
error = function(e) e$message
|
418 |
|
)
|
419 |
125x |
if (is.character(handler)) {
|
420 |
! |
stop(if (grepl("libcurl", handler, fixed = TRUE)) {
|
421 |
! |
"libcurl encountered an error; try setting the bundle_byte_limit argument to a smaller value"
|
422 |
|
} else {
|
423 |
! |
paste("failed to create handler:", handler)
|
424 |
! |
}, call. = FALSE)
|
425 |
|
}
|
426 |
125x |
res <- curl::curl_fetch_disk(full_url, temp_file, handler)
|
427 |
|
} else {
|
428 |
1x |
stop("make_request is FALSE, but there are texts with no cached results", call. = FALSE)
|
429 |
|
}
|
430 |
|
}
|
431 |
129x |
result <- if (file.exists(temp_file)) {
|
432 |
129x |
if (is.null(res$type) || grepl("application/json", res$type, fixed = TRUE)) {
|
433 |
129x |
tryCatch(
|
434 |
129x |
jsonlite::read_json(temp_file, simplifyVector = TRUE),
|
435 |
129x |
error = function(e) list(message = "invalid response format")
|
436 |
|
)
|
437 |
|
} else {
|
438 |
! |
list(message = "invalid response format")
|
439 |
|
}
|
440 |
|
} else {
|
441 |
! |
list(message = rawToChar(res$content))
|
442 |
|
}
|
443 |
129x |
if (!is.null(result$results) || is.null(result$message)) {
|
444 |
81x |
if (!is.null(result$results)) result <- result$results
|
445 |
82x |
if ("error" %in% names(result)) {
|
446 |
1x |
su <- !is.na(result$error$code)
|
447 |
1x |
errors <- result[su & !duplicated(result$error$code), "error"]
|
448 |
1x |
warning(
|
449 |
1x |
if (sum(su) > 1) "some texts were invalid: " else "a text was invalid: ",
|
450 |
1x |
paste(
|
451 |
1x |
do.call(paste0, data.frame("(", errors$code, ") ", errors$message, stringsAsFactors = FALSE)),
|
452 |
1x |
collapse = "; "
|
453 |
|
),
|
454 |
1x |
call. = FALSE
|
455 |
|
)
|
456 |
|
}
|
457 |
82x |
result <- unpack(result[!names(result) %in% c("response_id", "language", "version", "error")])
|
458 |
82x |
if (!is.null(result) && nrow(result)) {
|
459 |
82x |
if (colnames(result)[1] == "request_id") {
|
460 |
81x |
colnames(result)[1] <- "text_hash"
|
461 |
|
} else {
|
462 |
1x |
result <- cbind(text_hash = vapply(body, "[[", "", "request_id"), result)
|
463 |
|
}
|
464 |
82x |
cbind(id = ids, bin = bin, result)
|
465 |
|
}
|
466 |
|
} else {
|
467 |
47x |
unlink(temp_file)
|
468 |
47x |
if (length(result$message) == 1 && substr(result$message, 1, 1) == "{") {
|
469 |
! |
result <- jsonlite::fromJSON(result$message)
|
470 |
|
}
|
471 |
47x |
if (attempt > 0 && (length(result$code) == 1 && result$code == 1420) || (
|
472 |
47x |
length(result$message) == 1 && result$message == "invalid response format"
|
473 |
|
)) {
|
474 |
46x |
wait_time <- as.numeric(regmatches(result$message, regexec("[0-9]+(?:\\.[0-9]+)?", result$message)))
|
475 |
46x |
Sys.sleep(if (is.na(wait_time)) 1 else wait_time / 1e3)
|
476 |
46x |
request(body, bin, ids, attempt - 1)
|
477 |
|
} else {
|
478 |
1x |
stop(paste0(if (length(result$code)) {
|
479 |
1x |
paste0(
|
480 |
1x |
if (is.null(res$status_code)) 200 else res$status_code, " (", result$code, "): "
|
481 |
|
)
|
482 |
1x |
}, result$message), call. = FALSE)
|
483 |
|
}
|
484 |
|
}
|
485 |
|
}
|
486 |
|
|
487 |
43x |
process <- function(bundle) {
|
488 |
108x |
opts <- getOption("stringsAsFactors")
|
489 |
108x |
options("stringsAsFactors" = FALSE)
|
490 |
108x |
on.exit(options("stringsAsFactors" = opts))
|
491 |
1x |
if (is.character(bundle)) bundle <- readRDS(bundle)
|
492 |
108x |
text <- bundle$text
|
493 |
108x |
bin <- NULL
|
494 |
108x |
if (text_as_paths) {
|
495 |
4x |
if (all(grepl("\\.csv", text, TRUE))) {
|
496 |
1x |
if (is.null(text_column)) stop("files appear to be csv, but no text_column was specified", call. = FALSE)
|
497 |
1x |
text <- vapply(text, function(f) paste(arrow::read_csv_arrow(f, col_select = all_of(text_column))[[1]], collapse = " "), "")
|
498 |
|
} else {
|
499 |
2x |
text <- vapply(text, function(f) paste(readLines(f, warn = FALSE, skipNul = TRUE), collapse = " "), "")
|
500 |
|
}
|
501 |
|
}
|
502 |
107x |
bundle$hashes <- paste0(vapply(paste0(args_hash, text), digest::digest, "", serialize = FALSE))
|
503 |
107x |
initial <- paste0("h", substr(bundle$hashes, 1, 1))
|
504 |
107x |
set <- !is.na(text) & text != "" & text != "logical(0)" & !duplicated(bundle$hashes)
|
505 |
107x |
res_cached <- res_fresh <- NULL
|
506 |
107x |
if (check_cache && dir.exists(paste0(temp, "/bin=h"))) {
|
507 |
26x |
db <- arrow::open_dataset(temp, partitioning = arrow::schema(bin = arrow::string()), format = cache_format)
|
508 |
26x |
cached <- if (!is.null(db$schema$GetFieldByName("text_hash"))) {
|
509 |
26x |
tryCatch(
|
510 |
26x |
dplyr::compute(dplyr::filter(db, bin %in% unique(initial), text_hash %in% bundle$hashes)),
|
511 |
26x |
error = function(e) matrix(integer(), 0)
|
512 |
|
)
|
513 |
|
} else {
|
514 |
! |
matrix(integer(), 0)
|
515 |
|
}
|
516 |
26x |
if (nrow(cached)) {
|
517 |
24x |
cached <- as.data.frame(cached$to_data_frame())
|
518 |
! |
if (anyDuplicated(cached$text_hash)) cached <- cached[!duplicated(cached$text_hash), ]
|
519 |
24x |
rownames(cached) <- cached$text_hash
|
520 |
24x |
cached_set <- which(bundle$hashes %in% cached$text_hash)
|
521 |
24x |
set[cached_set] <- FALSE
|
522 |
24x |
res_cached <- cbind(id = bundle[cached_set, "id"], cached[bundle[cached_set, "hashes"], ])
|
523 |
|
}
|
524 |
|
}
|
525 |
107x |
valid_options <- names(api_args)
|
526 |
107x |
if (any(set)) {
|
527 |
84x |
set <- which(set)
|
528 |
84x |
res_fresh <- request(lapply(
|
529 |
84x |
set, function(i) c(api_args, list(content = text[[i]], request_id = bundle[i, "hashes"]))
|
530 |
84x |
), initial[set], bundle[set, "id"])
|
531 |
82x |
valid_options <- valid_options[valid_options %in% colnames(res_fresh)]
|
532 |
82x |
if (length(valid_options)) {
|
533 |
2x |
res_fresh <- res_fresh[, !colnames(res_fresh) %in% valid_options, drop = FALSE]
|
534 |
|
}
|
535 |
82x |
if (check_cache && !is.null(res_cached) && !all(colnames(res_cached) %in% colnames(res_fresh))) {
|
536 |
! |
res_cached <- NULL
|
537 |
! |
res_fresh <- rbind(
|
538 |
! |
res_fresh,
|
539 |
! |
request(lapply(
|
540 |
! |
cached_set, function(i) c(api_args, list(content = text[[i]], request_id = bundle[i, "hashes"]))
|
541 |
! |
), initial[cached_set], bundle[cached_set, "id"])
|
542 |
|
)
|
543 |
|
}
|
544 |
|
}
|
545 |
105x |
res <- rbind(res_cached, res_fresh)
|
546 |
2x |
if (length(valid_options)) for (n in valid_options) res[[n]] <- api_args[[n]]
|
547 |
105x |
missing_ids <- !bundle$id %in% res$id
|
548 |
105x |
if (any(missing_ids)) {
|
549 |
1x |
varnames <- colnames(res)[colnames(res) != "id"]
|
550 |
1x |
res <- rbind(res, cbind(
|
551 |
1x |
id = bundle[missing_ids, "id"],
|
552 |
1x |
as.data.frame(matrix(NA, sum(missing_ids), length(varnames), dimnames = list(NULL, varnames)))
|
553 |
|
))
|
554 |
1x |
res$text_hash <- structure(bundle$hashes, names = bundle$id)[res$id]
|
555 |
|
}
|
556 |
105x |
prog(amount = nrow(res))
|
557 |
105x |
res
|
558 |
|
}
|
559 |
|
|
560 |
|
# make request(s)
|
561 |
43x |
cores <- if (is.numeric(cores)) max(1, min(n_bundles, cores)) else 1
|
562 |
43x |
prog <- progressor(n_texts)
|
563 |
43x |
results <- if (use_future || cores > 1) {
|
564 |
5x |
call_env <- new.env(parent = globalenv())
|
565 |
5x |
environment(doprocess) <- call_env
|
566 |
5x |
environment(request) <- call_env
|
567 |
5x |
environment(process) <- call_env
|
568 |
5x |
for (name in c(
|
569 |
5x |
"doprocess", "request", "process", "text_column", "prog", "make_request", "check_cache", "full_url",
|
570 |
5x |
"temp", "use_future", "cores", "bundles", "cache_format", "request_cache", "auth",
|
571 |
5x |
"text_as_paths", "retry_limit", "api_args", "args_hash"
|
572 |
|
)) {
|
573 |
95x |
call_env[[name]] <- get(name)
|
574 |
|
}
|
575 |
5x |
if (verbose) {
|
576 |
! |
message(
|
577 |
! |
"processing ", bundle_ref, " using ", if (use_future) "future backend" else paste(cores, "cores"),
|
578 |
! |
" (", round(proc.time()[[3]] - st, 4), ")"
|
579 |
|
)
|
580 |
|
}
|
581 |
5x |
eval(expression(doprocess(bundles, cores, use_future)), envir = call_env)
|
582 |
|
} else {
|
583 |
2x |
if (verbose) message("processing ", bundle_ref, " sequentially (", round(proc.time()[[3]] - st, 4), ")")
|
584 |
38x |
lapply(bundles, process)
|
585 |
|
}
|
586 |
2x |
if (verbose) message("done retrieving; preparing final results (", round(proc.time()[[3]] - st, 4), ")")
|
587 |
40x |
final_res <- do.call(rbind, results)
|
588 |
40x |
if (length(api_args)) {
|
589 |
3x |
su <- !names(api_args) %in% colnames(final_res)
|
590 |
1x |
if (any(su)) warning("unrecognized api_args: ", paste(names(api_args)[su], collapse = ", "), call. = FALSE)
|
591 |
|
}
|
592 |
|
|
593 |
|
# update cache
|
594 |
40x |
if (!is.null(temp)) {
|
595 |
2x |
if (verbose) message("checking cache (", round(proc.time()[[3]] - st, 4), ")")
|
596 |
29x |
initialized <- dir.exists(paste0(temp, "/bin=h"))
|
597 |
29x |
if (initialized) {
|
598 |
27x |
db <- arrow::open_dataset(temp, partitioning = arrow::schema(bin = arrow::string()), format = cache_format)
|
599 |
27x |
if (db$num_cols != (ncol(final_res) - 1)) {
|
600 |
! |
if (verbose) message("clearing existing cache since columns did not align (", round(proc.time()[[3]] - st, 4), ")")
|
601 |
! |
dir.create(temp, FALSE)
|
602 |
! |
initialized <- FALSE
|
603 |
|
}
|
604 |
|
}
|
605 |
29x |
exclude <- c("id", names(api_args))
|
606 |
29x |
if (!initialized) {
|
607 |
2x |
su <- !colnames(final_res) %in% exclude
|
608 |
2x |
if (sum(su) > 2) {
|
609 |
2x |
initial <- final_res[1, su]
|
610 |
2x |
initial$text_hash <- ""
|
611 |
2x |
initial$bin <- "h"
|
612 |
2x |
initial[, !colnames(initial) %in% c(
|
613 |
2x |
"summary.word_count", "summary.sentence_count"
|
614 |
2x |
) & !vapply(initial, is.character, TRUE)] <- .1
|
615 |
2x |
initial <- rbind(initial, final_res[, colnames(initial)])
|
616 |
2x |
if (verbose) {
|
617 |
1x |
message(
|
618 |
1x |
"initializing cache with ", nrow(final_res), " result",
|
619 |
1x |
if (nrow(final_res) > 1) "s", " (", round(proc.time()[[3]] - st, 4), ")"
|
620 |
|
)
|
621 |
|
}
|
622 |
2x |
arrow::write_dataset(initial, temp, partitioning = "bin", format = cache_format)
|
623 |
|
}
|
624 |
|
} else {
|
625 |
27x |
fresh <- final_res[
|
626 |
27x |
!duplicated(final_res$text_hash), !colnames(final_res) %in% names(api_args),
|
627 |
27x |
drop = FALSE
|
628 |
|
]
|
629 |
27x |
cached <- dplyr::filter(db, bin %in% unique(fresh$bin), text_hash %in% fresh$text_hash)
|
630 |
27x |
if (!any(dim(cached) == 0) || nrow(cached) != nrow(fresh)) {
|
631 |
27x |
uncached_hashes <- if (nrow(cached)) {
|
632 |
25x |
!fresh$text_hash %in% dplyr::collect(dplyr::select(cached, text_hash))[[1]]
|
633 |
|
} else {
|
634 |
2x |
rep(TRUE, nrow(fresh))
|
635 |
|
}
|
636 |
27x |
if (any(uncached_hashes)) {
|
637 |
4x |
if (verbose) {
|
638 |
1x |
message(
|
639 |
1x |
"updating cache with ", sum(uncached_hashes), " result",
|
640 |
1x |
if (sum(uncached_hashes) > 1) "s", " (", round(proc.time()[[3]] - st, 4), ")"
|
641 |
|
)
|
642 |
|
}
|
643 |
4x |
arrow::write_dataset(
|
644 |
4x |
fresh[uncached_hashes, !colnames(fresh) %in% exclude], temp,
|
645 |
4x |
partitioning = "bin", format = cache_format
|
646 |
|
)
|
647 |
|
}
|
648 |
|
}
|
649 |
|
}
|
650 |
|
}
|
651 |
|
|
652 |
|
# prepare final results
|
653 |
2x |
if (verbose) message("preparing output (", round(proc.time()[[3]] - st, 4), ")")
|
654 |
40x |
rownames(final_res) <- final_res$id
|
655 |
40x |
rownames(data) <- data$id
|
656 |
40x |
data$text_hash <- structure(final_res$text_hash, names = data[final_res$id, "text"])[data$text]
|
657 |
40x |
final_res <- cbind(
|
658 |
40x |
data[, c(if (return_text) "text", if (provided_id) "id", "text_hash"), drop = FALSE],
|
659 |
40x |
final_res[
|
660 |
40x |
structure(final_res$id, names = final_res$text_hash)[data$text_hash],
|
661 |
40x |
!colnames(final_res) %in% c("id", "bin", "text_hash", "custom"),
|
662 |
40x |
drop = FALSE
|
663 |
|
]
|
664 |
|
)
|
665 |
40x |
row.names(final_res) <- NULL
|
666 |
40x |
if (!is.null(output)) {
|
667 |
! |
if (!grepl("\\.csv", output, TRUE)) output <- paste0(output, ".csv")
|
668 |
1x |
if (compress && !grepl(".xz", output, fixed = TRUE)) output <- paste0(output, ".xz")
|
669 |
1x |
if (grepl(".xz", output, fixed = TRUE)) compress <- TRUE
|
670 |
1x |
if (verbose) message("writing results to file: ", output, " (", round(proc.time()[[3]] - st, 4), ")")
|
671 |
3x |
dir.create(dirname(output), FALSE, TRUE)
|
672 |
2x |
if (overwrite) unlink(output)
|
673 |
1x |
if (compress) output <- xzfile(output)
|
674 |
3x |
write_csv_arrow(final_res, file = output)
|
675 |
|
}
|
676 |
|
|
677 |
40x |
if (is.character(frameworks) && frameworks[1] != "all") {
|
678 |
! |
if (verbose) message("selecting frameworks (", round(proc.time()[[3]] - st, 4), ")")
|
679 |
7x |
vars <- colnames(final_res)
|
680 |
7x |
sel <- grepl(paste0("^(?:", paste(tolower(frameworks), collapse = "|"), ")"), vars)
|
681 |
7x |
if (any(sel)) {
|
682 |
4x |
if (missing(framework_prefix) && (length(frameworks) == 1 && frameworks != "all")) framework_prefix <- FALSE
|
683 |
6x |
sel <- unique(c("text", "id", "text_hash", names(api_args), vars[sel]))
|
684 |
6x |
sel <- sel[sel %in% vars]
|
685 |
6x |
final_res <- final_res[, sel]
|
686 |
|
} else {
|
687 |
1x |
warning("frameworks did not match any columns -- returning all", call. = FALSE)
|
688 |
|
}
|
689 |
|
}
|
690 |
40x |
if (as_list) {
|
691 |
1x |
if (missing(framework_prefix)) framework_prefix <- FALSE
|
692 |
1x |
inall <- c("text", "id", "text_hash", names(api_args))
|
693 |
1x |
cols <- colnames(final_res)
|
694 |
1x |
inall <- inall[inall %in% cols]
|
695 |
1x |
pre <- sub("\\..*$", "", cols)
|
696 |
1x |
pre <- unique(pre[!pre %in% inall])
|
697 |
1x |
final_res <- lapply(structure(pre, names = pre), function(f) {
|
698 |
9x |
res <- final_res[, c(inall, grep(paste0("^", f), cols, value = TRUE))]
|
699 |
9x |
if (!framework_prefix) colnames(res) <- sub("^.+\\.", "", colnames(res))
|
700 |
9x |
res
|
701 |
|
})
|
702 |
5x |
} else if (!framework_prefix) colnames(final_res) <- sub("^.+\\.", "", colnames(final_res))
|
703 |
2x |
if (verbose) message("done (", round(proc.time()[[3]] - st, 4), ")")
|
704 |
40x |
invisible(final_res)
|
705 |
|
}
|