Coverage for src/receptiviti/request.py: 86%
138 statements
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-08 11:14 -0500
« prev ^ index » next coverage.py v7.6.1, created at 2025-01-08 11:14 -0500
1"""Make requests to the API."""
3import os
4import re
5import shutil
6from glob import glob
7from importlib.util import find_spec
8from math import ceil
9from multiprocessing import current_process
10from tempfile import gettempdir
11from time import perf_counter, time
12from typing import List, Union
14import pandas
15import pyarrow.dataset
17from receptiviti.frameworks import frameworks as get_frameworks
18from receptiviti.manage_request import _get_writer, _manage_request
19from receptiviti.norming import norming
20from receptiviti.readin_env import readin_env
22CACHE = gettempdir() + "/receptiviti_cache/"
23REQUEST_CACHE = gettempdir() + "/receptiviti_request_cache/"
26def request(
27 text: Union[str, List[str], pandas.DataFrame, None] = None,
28 output: Union[str, None] = None,
29 ids: Union[str, List[str], List[int], None] = None,
30 text_column: Union[str, None] = None,
31 id_column: Union[str, None] = None,
32 files: Union[List[str], None] = None,
33 directory: Union[str, None] = None,
34 file_type: str = "txt",
35 encoding: Union[str, None] = None,
36 return_text=False,
37 context="written",
38 custom_context: Union[str, bool] = False,
39 api_args: Union[dict, None] = None,
40 frameworks: Union[str, List[str], None] = None,
41 framework_prefix: Union[bool, None] = None,
42 bundle_size=1000,
43 bundle_byte_limit=75e5,
44 collapse_lines=False,
45 retry_limit=50,
46 clear_cache=False,
47 request_cache=True,
48 cores=1,
49 collect_results=True,
50 in_memory: Union[bool, None] = None,
51 verbose=False,
52 progress_bar: Union[str, bool] = os.getenv("RECEPTIVITI_PB", "True"),
53 overwrite=False,
54 make_request=True,
55 text_as_paths=False,
56 dotenv: Union[bool, str] = True,
57 cache: Union[str, bool] = os.getenv("RECEPTIVITI_CACHE", ""),
58 cache_degragment=True,
59 cache_overwrite=False,
60 cache_format=os.getenv("RECEPTIVITI_CACHE_FORMAT", ""),
61 key=os.getenv("RECEPTIVITI_KEY", ""),
62 secret=os.getenv("RECEPTIVITI_SECRET", ""),
63 url=os.getenv("RECEPTIVITI_URL", ""),
64 version=os.getenv("RECEPTIVITI_VERSION", ""),
65 endpoint=os.getenv("RECEPTIVITI_ENDPOINT", ""),
66) -> pandas.DataFrame | None:
67 """
68 Send texts to be scored by the API.
70 Args:
71 text (str | list[str] | pandas.DataFrame): Text to be processed, as a string or vector of
72 strings containing the text itself, or the path to a file from which to read in text.
73 If a DataFrame, `text_column` is used to extract such a vector. A string may also
74 represent a directory in which to search for files. To best ensure paths are not
75 treated as texts, either set `text_as_path` to `True`, or use `directory` to enter
76 a directory path, or `files` to enter a vector of file paths.
77 output (str): Path to a file to write results to.
78 ids (str | list[str | int]): Vector of IDs for each `text`, or a column name in `text`
79 containing IDs.
80 text_column (str): Column name in `text` containing text.
81 id_column (str): Column name in `text` containing IDs.
82 files (list[str]): Vector of file paths, as alternate entry to `text`.
83 directory (str): A directory path to search for files in, as alternate entry to `text`.
84 file_type (str): Extension of the file(s) to be read in from a directory (`txt` or `csv`).
85 encoding (str | None): Encoding of file(s) to be read in; one of the
86 [standard encodings](https://docs.python.org/3/library/codecs.html#standard-encodings).
87 If this is `None` (default), encoding will be predicted for each file, but this can
88 potentially fail, resulting in mis-encoded characters. For best (and fastest) results,
89 specify encoding.
90 return_text (bool): If `True`, will include a `text` column in the output with the
91 original text.
92 context (str): Name of the analysis context.
93 custom_context (str | bool): Name of a custom context (as listed by `receptiviti.norming`),
94 or `True` if `context` is the name of a custom context.
95 api_args (dict): Additional arguments to include in the request.
96 frameworks (str | list): One or more names of frameworks to request. Note that this
97 changes the results from the API, so it will invalidate any cached results
98 without the same set of frameworks.
99 framework_prefix (bool): If `False`, will drop framework prefix from column names.
100 If one framework is selected, will default to `False`.
101 bundle_size (int): Maximum number of texts per bundle.
102 bundle_byte_limit (float): Maximum byte size of each bundle.
103 collapse_lines (bool): If `True`, will treat files as containing single texts, and
104 collapse multiple lines.
105 retry_limit (int): Number of times to retry a failed request.
106 clear_cache (bool): If `True`, will delete the `cache` before processing.
107 request_cache (bool): If `False`, will not temporarily save raw requests for reuse
108 within a day.
109 cores (int): Number of CPU cores to use when processing multiple bundles.
110 collect_results (bool): If `False`, will not retain bundle results in memory for return.
111 in_memory (bool | None): If `False`, will write bundles to disc, to be loaded when
112 processed. Defaults to `True` when processing in parallel.
113 verbose (bool): If `True`, will print status messages and preserve the progress bar.
114 progress_bar (str | bool): If `False`, will not display a progress bar.
115 overwrite (bool): If `True`, will overwrite an existing `output` file.
116 text_as_paths (bool): If `True`, will explicitly mark `text` as a list of file paths.
117 Otherwise, this will be detected.
118 dotenv (bool | str): Path to a .env file to read environment variables from. By default,
119 will for a file in the current directory or `~/Documents`.
120 Passed to `readin_env` as `path`.
121 cache (bool | str): Path to a cache directory, or `True` to use the default directory.
122 The cache is an Arrow dataset, and so requires the `pyarrow` package.
123 cache_degragment (bool): If `False`, will not defragment the cache after writing new
124 results to it.
125 cache_overwrite (bool): If `True`, will not check the cache for previously cached texts,
126 but will store results in the cache (unlike `cache = False`).
127 cache_format (str): File format of the cache, of available Arrow formats.
128 key (str): Your API key.
129 secret (str): Your API secret.
130 url (str): The URL of the API; defaults to `https://api.receptiviti.com`.
131 version (str): Version of the API; defaults to `v1`.
132 endpoint (str): Endpoint of the API; defaults to `framework`.
134 Returns:
135 Scores associated with each input text.
137 Examples:
138 ```python
139 # score a single text
140 single = receptiviti.request("a text to score")
142 # score multiple texts, and write results to a file
143 multi = receptiviti.request(["first text to score", "second text"], "filename.csv")
145 # score texts in separate files
146 ## defaults to look for .txt files
147 file_results = receptiviti.request(directory = "./path/to/txt_folder")
149 ## could be .csv
150 file_results = receptiviti.request(
151 directory = "./path/to/csv_folder",
152 text_column = "text", file_type = "csv"
153 )
155 # score texts in a single file
156 results = receptiviti.request("./path/to/file.csv", text_column = "text")
157 ```
159 Request Process:
160 This function (along with the internal `_manage_request` function) handles texts and results in several steps:
162 1. Prepare bundles (split `text` into <= `bundle_size` and <= `bundle_byte_limit` bundles).
163 1. If `text` points to a directory or list of files, these will be read in later.
164 2. If `in_memory` is `False`, bundles are written to a temporary location,
165 and read back in when the request is made.
166 2. Get scores for texts within each bundle.
167 1. If texts are paths, or `in_memory` is `False`, will load texts.
168 2. If `cache` is set, will skip any texts with cached scores.
169 3. If `request_cache` is `True`, will check for a cached request.
170 4. If any texts need scoring and `make_request` is `True`, will send unscored texts to the API.
171 3. If a request was made and `request_cache` is set, will cache the response.
172 4. If `cache` is set, will write bundle scores to the cache.
173 5. After requests are made, if `cache` is set, will defragment the cache
174 (combine bundle results within partitions).
175 6. If `collect_results` is `True`, will prepare results:
176 1. Will realign results with `text` (and `ids` if provided).
177 2. If `output` is specified, will write realigned results to it.
178 3. Will drop additional columns (such as `custom` and `id` if not provided).
179 4. If `framework` is specified, will use it to select columns of the results.
180 5. Returns results.
182 Cache:
183 If `cache` is specified, results for unique texts are saved in an Arrow database
184 in the cache location (`os.getenv("RECEPTIVITI_CACHE")`), and are retrieved with
185 subsequent requests. This ensures that the exact same texts are not re-sent to the API.
186 This does, however, add some processing time and disc space usage.
188 If `cache` if `True`, a default directory (`receptiviti_cache`) will be
189 looked for in the system's temporary directory (`tempfile.gettempdir()`).
191 The primary cache is checked when each bundle is processed, and existing results are
192 loaded at that time. When processing many bundles in parallel, and many results have
193 been cached, this can cause the system to freeze and potentially crash.
194 To avoid this, limit the number of cores, or disable parallel processing.
196 The `cache_format` arguments (or the `RECEPTIVITI_CACHE_FORMAT` environment variable) can be
197 used to adjust the format of the cache.
199 You can use the cache independently with
200 `pyarrow.dataset.dataset(os.getenv("RECEPTIVITI_CACHE"))`.
202 You can also set the `clear_cache` argument to `True` to clear the cache before it is used
203 again, which may be useful if the cache has gotten big, or you know new results will be
204 returned.
206 Even if a cached result exists, it will be reprocessed if it does not have all of the
207 variables of new results, but this depends on there being at least 1 uncached result. If,
208 for instance, you add a framework to your account and want to reprocess a previously
209 processed set of texts, you would need to first clear the cache.
211 Either way, duplicated texts within the same call will only be sent once.
213 The `request_cache` argument controls a more temporary cache of each bundle request. This
214 is cleared after a day. You might want to set this to `False` if a new framework becomes
215 available on your account and you want to process a set of text you re-processed recently.
217 Another temporary cache is made when `in_memory` is `False`, which is the default when
218 processing in parallel (when there is more than 1 bundle and `cores` is over 1). This is a
219 temporary directory that contains a file for each unique bundle, which is read in as needed
220 by the parallel workers.
222 Parallelization:
223 `text`s are split into bundles based on the `bundle_size` argument. Each bundle represents
224 a single request to the API, which is why they are limited to 1000 texts and a total size
225 of 10 MB. When there is more than one bundle and `cores` is greater than 1, bundles are
226 processed by multiple cores.
228 If you have texts spread across multiple files, they can be most efficiently processed in
229 parallel if each file contains a single text (potentially collapsed from multiple lines).
230 If files contain multiple texts (i.e., `collapse_lines=False`), then texts need to be
231 read in before bundling in order to ensure bundles are under the length limit.
233 If you are calling this function from a script, parallelization will involve rerunning
234 that script in each process, so anything you don't want rerun should be protected by
235 a check that `__name__` equals `"__main__"`
236 (placed within an `if __name__ == "__main__":` clause).
237 """
238 if cores > 1 and current_process().name != "MainProcess":
239 return None
240 if output is not None and os.path.isfile(output) and not overwrite:
241 msg = "`output` file already exists; use `overwrite=True` to overwrite it"
242 raise RuntimeError(msg)
243 start_time = perf_counter()
245 if dotenv:
246 readin_env(dotenv if isinstance(dotenv, str) else ".")
247 dotenv = False
249 # check norming context
250 if isinstance(custom_context, str):
251 context = custom_context
252 custom_context = True
253 if context != "written":
254 if verbose:
255 print(f"retrieving norming contexts ({perf_counter() - start_time:.4f})")
256 available_contexts: List[str] = norming(name_only=True, url=url, key=key, secret=secret, verbose=False)
257 if ("custom/" + context if custom_context else context) not in available_contexts:
258 msg = f"norming context {context} is not on record or is not completed"
259 raise RuntimeError(msg)
261 # check frameworks
262 if frameworks and version and "2" in version:
263 if not api_args:
264 api_args = {}
265 if isinstance(frameworks, str):
266 frameworks = [frameworks]
267 api_args["frameworks"] = [f for f in frameworks if f != "summary"]
268 if api_args and "frameworks" in api_args:
269 arg_frameworks: List[str] = (
270 api_args["frameworks"].split(",") if isinstance(api_args["frameworks"], str) else api_args["frameworks"]
271 )
272 available_frameworks = get_frameworks(url=url, key=key, secret=secret)
273 for f in arg_frameworks:
274 if f not in available_frameworks:
275 msg = f"requested framework is not available to your account: {f}"
276 raise RuntimeError(msg)
277 if isinstance(api_args["frameworks"], list):
278 api_args["frameworks"] = ",".join(api_args["frameworks"])
280 if isinstance(cache, str) and cache:
281 if find_spec("pyarrow") is None:
282 msg = "install the `pyarrow` package to use the cache"
283 raise RuntimeError(msg)
284 if clear_cache and os.path.exists(cache):
285 shutil.rmtree(cache, True)
286 os.makedirs(cache, exist_ok=True)
287 if not cache_format:
288 cache_format = os.getenv("RECEPTIVITI_CACHE_FORMAT", "parquet")
289 if cache_format not in ["parquet", "feather"]:
290 msg = "`cache_format` must be `parquet` or `feather`"
291 raise RuntimeError(msg)
292 else:
293 cache = ""
295 data, res, id_specified = _manage_request(
296 text=text,
297 ids=ids,
298 text_column=text_column,
299 id_column=id_column,
300 files=files,
301 directory=directory,
302 file_type=file_type,
303 encoding=encoding,
304 context=f"custom/{context}" if custom_context else context,
305 api_args=api_args,
306 bundle_size=bundle_size,
307 bundle_byte_limit=bundle_byte_limit,
308 collapse_lines=collapse_lines,
309 retry_limit=retry_limit,
310 request_cache=request_cache,
311 cores=cores,
312 collect_results=collect_results,
313 in_memory=in_memory,
314 verbose=verbose,
315 progress_bar=progress_bar,
316 make_request=make_request,
317 text_as_paths=text_as_paths,
318 dotenv=dotenv,
319 cache=cache,
320 cache_overwrite=cache_overwrite,
321 cache_format=cache_format,
322 key=key,
323 secret=secret,
324 url=url,
325 version=version,
326 endpoint=endpoint,
327 )
329 # finalize
330 if collect_results and (res is None or not res.shape[0]):
331 msg = "no results"
332 raise RuntimeError(msg)
333 if cache and cache_degragment:
334 writer = _get_writer(cache_format)
335 for bin_dir in glob(cache + "/bin=*/"):
336 _defragment_bin(bin_dir, cache_format, writer)
337 if not collect_results:
338 if verbose:
339 print(f"done ({perf_counter() - start_time:.4f})")
340 return None
341 if verbose:
342 print(f"preparing output ({perf_counter() - start_time:.4f})")
343 data.set_index("id", inplace=True)
344 res.set_index("id", inplace=True)
345 if len(res) != len(data):
346 res = res.join(data["text"])
347 data_absent = data.loc[list(set(data.index).difference(res.index))]
348 data_absent = data_absent.loc[data_absent["text"].isin(res["text"])]
349 if data.size:
350 res = res.reset_index()
351 res.set_index("text", inplace=True)
352 data_dupes = res.loc[data_absent["text"]]
353 data_dupes["id"] = data_absent.index.to_list()
354 res = pandas.concat([res, data_dupes])
355 res.reset_index(inplace=True, drop=True)
356 res.set_index("id", inplace=True)
357 res = res.join(data["text"], how="right")
358 if not return_text:
359 res.drop("text", axis=1, inplace=True)
360 res = res.reset_index()
362 if output is not None:
363 if verbose:
364 print(f"writing results to file: {output} ({perf_counter() - start_time:.4f})")
365 res.to_csv(output, index=False)
367 drops = ["custom", "bin"]
368 if not id_specified:
369 drops.append("id")
370 res.drop(
371 list({*drops}.intersection(res.columns)),
372 axis="columns",
373 inplace=True,
374 )
375 if frameworks is not None:
376 if verbose:
377 print(f"selecting frameworks ({perf_counter() - start_time:.4f})")
378 if isinstance(frameworks, str):
379 frameworks = [frameworks]
380 if len(frameworks) == 1 and framework_prefix is None:
381 framework_prefix = False
382 select = []
383 if id_specified:
384 select.append("id")
385 if return_text:
386 select.append("text")
387 select.append("text_hash")
388 res = res.filter(regex=f"^(?:{'|'.join(select + frameworks)})(?:$|\\.)")
389 if isinstance(framework_prefix, bool) and not framework_prefix:
390 prefix_pattern = re.compile("^[^.]+\\.")
391 res.columns = pandas.Index([prefix_pattern.sub("", col) for col in res.columns])
393 if verbose:
394 print(f"done ({perf_counter() - start_time:.4f})")
396 return res
399def _defragment_bin(bin_dir: str, write_format: str, writer):
400 fragments = glob(f"{bin_dir}/*.{write_format}")
401 if len(fragments) > 1:
402 data = pyarrow.dataset.dataset(fragments, format=write_format, exclude_invalid_files=True).to_table()
403 nrows = data.num_rows
404 n_chunks = max(1, ceil(nrows / 1e9))
405 rows_per_chunk = max(1, ceil(nrows / n_chunks))
406 time_id = str(ceil(time()))
407 for chunk in range(0, n_chunks, rows_per_chunk):
408 writer(data[chunk : (chunk + rows_per_chunk)], f"{bin_dir}/part-{time_id}-{chunk}.{write_format}")
409 for fragment in fragments:
410 os.unlink(fragment)