Coverage for src/receptiviti/request.py: 78%
427 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-23 15:32 -0400
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-23 15:32 -0400
1"""Make requests to the API."""
3import hashlib
4import json
5import math
6import os
7import pickle
8import re
9import shutil
10import sys
11from glob import glob
12from multiprocessing import Process, Queue, cpu_count
13from tempfile import TemporaryDirectory, gettempdir
14from time import perf_counter, sleep, time
15from typing import List, Union
17import numpy
18import pandas
19import pyarrow
20import requests
21from pyarrow import compute, dataset
22from tqdm import tqdm
24from receptiviti.readin_env import readin_env
25from receptiviti.status import status
27CACHE = gettempdir() + "/receptiviti_cache/"
28REQUEST_CACHE = gettempdir() + "/receptiviti_request_cache/"
31def request(
32 text: Union[str, List[str], pandas.DataFrame, None] = None,
33 output: Union[str, None] = None,
34 ids: Union[str, List[Union[str, int]], None] = None,
35 text_column: Union[str, None] = None,
36 id_column: Union[str, None] = None,
37 files: Union[List[str], None] = None,
38 directory: Union[str, None] = None,
39 file_type: str = "txt",
40 return_text=False,
41 api_args: Union[dict, None] = None,
42 frameworks: Union[str, List[str], None] = None,
43 framework_prefix: Union[bool, None] = None,
44 bundle_size=1000,
45 bundle_byte_limit=75e5,
46 collapse_lines=False,
47 retry_limit=50,
48 clear_cache=False,
49 request_cache=True,
50 cores=cpu_count() - 2,
51 in_memory: Union[bool, None] = None,
52 verbose=False,
53 progress_bar=True,
54 overwrite=False,
55 make_request=True,
56 text_as_paths=False,
57 dotenv: Union[bool, str] = True,
58 cache: Union[str, bool] = os.getenv("RECEPTIVITI_CACHE", ""),
59 cache_overwrite=False,
60 cache_format=os.getenv("RECEPTIVITI_CACHE_FORMAT", ""),
61 key=os.getenv("RECEPTIVITI_KEY", ""),
62 secret=os.getenv("RECEPTIVITI_SECRET", ""),
63 url=os.getenv("RECEPTIVITI_URL", ""),
64 version=os.getenv("RECEPTIVITI_VERSION", ""),
65 endpoint=os.getenv("RECEPTIVITI_ENDPOINT", ""),
66) -> pandas.DataFrame:
67 """
68 Send texts to be scored by the API.
70 Args:
71 text (str | list[str] | pandas.DataFrame): Text to be processed, as a string or vector of
72 strings containing the text itself, or the path to a file from which to read in text.
73 If a DataFrame, `text_column` is used to extract such a vector. A string may also
74 represent a directory in which to search for files. To best ensure paths are not
75 treated as texts, either set `text_as_path` to `True`, or use `directory` to enter
76 a directory path, or `files` to enter a vector of file paths.
77 output (str): Path to a file to write results to.
78 ids (str | list[str | int]): Vector of IDs for each `text`, or a column name in `text`
79 containing IDs.
80 text_column (str): Column name in `text` containing text.
81 id_column (str): Column name in `text` containing IDs.
82 files (list[str]): Vector of file paths, as alternate entry to `text`.
83 directory (str): A directory path to search for files in, as alternate entry to `text`.
84 file_type (str): Extension of the file(s) to be read in from a directory (`txt` or `csv`).
85 return_text (bool): If `True`, will include a `text` column in the output with the
86 original text.
87 api_args (dict): Additional arguments to include in the request.
88 frameworks (str | list): One or more names of frameworks to return.
89 framework_prefix (bool): If `False`, will drop framework prefix from column names.
90 If one framework is selected, will default to `False`.
91 bundle_size (int): Maximum number of texts per bundle.
92 bundle_byte_limit (float): Maximum byte size of each bundle.
93 collapse_lines (bool): If `True`, will treat files as containing single texts, and
94 collapse multiple lines.
95 retry_limit (int): Number of times to retry a failed request.
96 clear_cache (bool): If `True`, will delete the `cache` before processing.
97 request_cache (bool): If `False`, will not temporarily save raw requests for reuse
98 within a day.
99 cores (int): Number of CPU cores to use when processing multiple bundles.
100 in_memory (bool | None): If `False`, will write bundles to disc, to be loaded when
101 processed. Defaults to `True` when processing in parallel.
102 verbose (bool): If `True`, will print status messages and preserve the progress bar.
103 progress_bar (bool): If `False`, will not display a progress bar.
104 overwrite (bool): If `True`, will overwrite an existing `output` file.
105 text_as_paths (bool): If `True`, will explicitly mark `text` as a list of file paths.
106 Otherwise, this will be detected.
107 dotenv (bool | str): Path to a .env file to read environment variables from. By default,
108 will for a file in the current directory or `~/Documents`.
109 Passed to `readin_env` as `path`.
110 cache (bool | str): Path to a cache directory, or `True` to use the default directory.
111 cache_overwrite (bool): If `True`, will not check the cache for previously cached texts,
112 but will store results in the cache (unlike `cache = False`).
113 cache_format (str): File format of the cache, of available Arrow formats.
114 key (str): Your API key.
115 secret (str): Your API secret.
116 url (str): The URL of the API; defaults to `https://api.receptiviti.com`.
117 version (str): Version of the API; defaults to `v1`.
118 endpoint (str): Endpoint of the API; defaults to fefe `framework`.
120 Returns:
121 Scores associated with each input text.
123 Cache:
124 If `cache` is specified, results for unique texts are saved in an Arrow database in the cache
125 location (`os.getenv("RECEPTIVITI_CACHE")`), and are retrieved with subsequent requests.
126 This ensures that the exact same texts are not re-sent to the API. This does, however,
127 add some processing time and disc space usage.
129 If `cache` if `True`, a default directory (`receptiviti_cache`) will be
130 looked for in the system's temporary directory (`tempfile.gettempdir()`).
132 The primary cache is checked when each bundle is processed, and existing results are loaded at
133 that time. When processing many bundles in parallel, and many results have been cached,
134 this can cause the system to freeze and potentially crash.
135 To avoid this, limit the number of cores, or disable parallel processing.
137 The `cache_format` arguments (or the `RECEPTIVITI_CACHE_FORMAT` environment variable) can be
138 used to adjust the format of the cache.
140 You can use the cache independently with
141 `pyarrow.dataset.dataset(os.getenv("RECEPTIVITI_CACHE"))`.
143 You can also set the `clear_cache` argument to `True` to clear the cache before it is used
144 again, which may be useful if the cache has gotten big, or you know new results will be
145 returned.
147 Even if a cached result exists, it will be reprocessed if it does not have all of the
148 variables of new results, but this depends on there being at least 1 uncached result. If,
149 for instance, you add a framework to your account and want to reprocess a previously
150 processed set of texts, you would need to first clear the cache.
152 Either way, duplicated texts within the same call will only be sent once.
154 The `request_cache` argument controls a more temporary cache of each bundle request. This
155 is cleared after a day. You might want to set this to `False` if a new framework becomes
156 available on your account and you want to process a set of text you re-processed recently.
158 Another temporary cache is made when `in_memory` is `False`, which is the default when
159 processing in parallel (when there is more than 1 bundle and `cores` is over 1). This is a
160 temporary directory that contains a file for each unique bundle, which is read in as needed
161 by the parallel workers.
163 Parallelization:
164 `text`s are split into bundles based on the `bundle_size` argument. Each bundle represents
165 a single request to the API, which is why they are limited to 1000 texts and a total size
166 of 10 MB. When there is more than one bundle and `cores` is greater than 1, bundles are
167 processed by multiple cores.
169 If you have texts spread across multiple files, they can be most efficiently processed in
170 parallel if each file contains a single text (potentially collapsed from multiple lines).
171 If files contain multiple texts (i.e., `collapse_lines=False`), then texts need to be
172 read in before bundling in order to ensure bundles are under the length limit.
173 """
174 if output is not None and os.path.isfile(output) and not overwrite:
175 msg = "`output` file already exists; use `overwrite=True` to overwrite it"
176 raise RuntimeError(msg)
177 start_time = perf_counter()
179 if request_cache:
180 if verbose:
181 print(f"preparing request cache ({perf_counter() - start_time:.4f})")
182 _manage_request_cache()
184 # resolve credentials and check status
185 if dotenv:
186 readin_env("." if isinstance(dotenv, bool) else dotenv)
187 if not url:
188 url = os.getenv("RECEPTIVITI_URL", "https://api.receptiviti.com")
189 url_parts = re.search("/([Vv]\\d+)/?([^/]+)?", url)
190 if url_parts:
191 from_url = url_parts.groups()
192 if not version and from_url[0] is not None:
193 version = from_url[0]
194 if not endpoint and from_url[1] is not None:
195 endpoint = from_url[1]
196 url = ("https://" if re.match("http", url, re.I) is None else "") + re.sub(
197 "/+[Vv]\\d+(?:/.*)?$|/+$", "", url
198 )
199 if not key:
200 key = os.getenv("RECEPTIVITI_KEY", "")
201 if not secret:
202 secret = os.getenv("RECEPTIVITI_SECRET", "")
203 if not version:
204 version = os.getenv("RECEPTIVITI_VERSION", "v1")
205 if not endpoint:
206 endpoint_default = "framework" if version.lower() == "v1" else "taxonomies"
207 endpoint = os.getenv("RECEPTIVITI_ENDPOINT", endpoint_default)
208 api_status = status(url, key, secret, dotenv, verbose=False)
209 if not api_status or api_status.status_code != 200:
210 msg = (
211 f"API status failed: {api_status.status_code}: {api_status.reason}"
212 if api_status
213 else "URL is not reachable"
214 )
215 raise RuntimeError(msg)
217 # resolve text and ids
218 def readin(
219 paths: List[str],
220 text_cols=text_column,
221 id_cols=id_column,
222 collapse=collapse_lines,
223 ) -> Union[List[str], pandas.DataFrame]:
224 text = []
225 ids = []
226 sel = []
227 if text_cols is not None:
228 sel.append(text_cols)
229 if id_cols is not None:
230 sel.append(id_cols)
231 if os.path.splitext(paths[0])[1] == ".txt" and not sel:
232 if collapse:
233 for file in paths:
234 with open(file, encoding="utf-8") as texts:
235 text.append(" ".join([line.rstrip() for line in texts]))
236 else:
237 for file in paths:
238 with open(file, encoding="utf-8") as texts:
239 lines = [line.rstrip() for line in texts]
240 text += lines
241 ids += (
242 [file]
243 if len(lines) == 1
244 else [file + str(i) for i in range(len(lines))]
245 )
246 return pandas.DataFrame({"text": text, "ids": ids})
247 else:
248 if collapse:
249 for file in paths:
250 temp = pandas.read_csv(file, usecols=sel)
251 text.append(" ".join(temp[text_cols]))
252 else:
253 for file in paths:
254 temp = pandas.read_csv(file, usecols=sel)
255 if not text_cols in temp:
256 msg = f"`text_column` ({text_cols}) was not found in all files"
257 raise IndexError(msg)
258 text += temp[text_cols].to_list()
259 ids += (
260 temp[id_cols].to_list()
261 if id_cols is not None
262 else [file]
263 if len(temp) == 1
264 else [file + str(i) for i in range(len(temp))]
265 )
266 return pandas.DataFrame({"text": text, "ids": ids})
267 return text
269 text_as_dir = False
270 if text is None:
271 if directory is not None:
272 text = directory
273 text_as_dir = True
274 elif files is not None:
275 text_as_paths = True
276 text = files
277 else:
278 msg = "enter text as the first argument, or use the files or directory arguments"
279 raise RuntimeError(msg)
280 if isinstance(text, str) and (text_as_dir or text_as_paths or len(text) < 260):
281 if not text_as_dir and os.path.isfile(text):
282 if verbose:
283 print(f"reading in texts from a file ({perf_counter() - start_time:.4f})")
284 text = readin([text])
285 if isinstance(text, pandas.DataFrame):
286 id_column = "ids"
287 text_column = "text"
288 text_as_paths = False
289 elif os.path.isdir(text):
290 text = glob(f"{text}/*{file_type}")
291 text_as_paths = True
292 if isinstance(text, pandas.DataFrame):
293 if id_column is not None:
294 if id_column in text:
295 ids = text[id_column].to_list()
296 else:
297 msg = f"`id_column` ({id_column}) is not in `text`"
298 raise IndexError(msg)
299 if text_column is not None:
300 if text_column in text:
301 text = text[text_column].to_list()
302 else:
303 msg = f"`text_column` ({text_column}) is not in `text`"
304 raise IndexError(msg)
305 else:
306 msg = "`text` is a DataFrame, but no `text_column` is specified"
307 raise RuntimeError(msg)
308 if isinstance(text, str):
309 text = [text]
310 text_is_path = all(
311 isinstance(t, str) and (text_as_paths or len(t) < 260) and os.path.isfile(t) for t in text
312 )
313 if text_as_paths and not text_is_path:
314 msg = "`text` treated as a list of files, but not all of the entries exist"
315 raise RuntimeError(msg)
316 if text_is_path and not collapse_lines:
317 ids = text
318 text = readin(text)
319 if isinstance(text, pandas.DataFrame):
320 if id_column is None:
321 ids = text["ids"]
322 elif id_column in text:
323 ids = text[id_column].to_list()
324 if text_column is None:
325 text_column = "text"
326 text = text[text_column].to_list()
327 text_is_path = False
328 if ids is None and text_is_path:
329 ids = text
331 id_specified = ids is not None
332 if ids is None:
333 ids = numpy.arange(1, len(text) + 1).tolist()
334 elif len(ids) != len(text):
335 msg = "`ids` is not the same length as `text`"
336 raise RuntimeError(msg)
337 original_ids = set(ids)
338 if len(ids) != len(original_ids):
339 msg = "`ids` contains duplicates"
340 raise RuntimeError(msg)
342 # prepare bundles
343 if verbose:
344 print(f"preparing text ({perf_counter() - start_time:.4f})")
345 data = pandas.DataFrame({"text": text, "id": ids})
346 n_original = len(data)
347 data_subset = data[
348 ~(data.duplicated(subset=["text"]) | (data["text"] == "") | data["text"].isna())
349 ]
350 n_texts = len(data_subset)
351 if not n_texts:
352 msg = "no valid texts to process"
353 raise RuntimeError(msg)
354 bundle_size = max(1, bundle_size)
355 n_bundles = math.ceil(n_texts / min(1000, bundle_size))
356 groups = data_subset.groupby(
357 numpy.sort(numpy.tile(numpy.arange(n_bundles) + 1, bundle_size))[:n_texts],
358 group_keys=False,
359 )
360 bundles = []
361 for _, group in groups:
362 if sys.getsizeof(group) > bundle_byte_limit:
363 start = current = end = 0
364 for txt in group["text"]:
365 size = os.stat(txt).st_size if text_is_path else sys.getsizeof(txt)
366 if size > bundle_byte_limit:
367 msg = (
368 "one of your texts is over the bundle size"
369 f" limit ({bundle_byte_limit / 1e6} MB)"
370 )
371 raise RuntimeError(msg)
372 if (current + size) > bundle_byte_limit:
373 bundles.append(group[start:end])
374 start = end = end + 1
375 current = size
376 else:
377 end += 1
378 current += size
379 bundles.append(group[start:])
380 else:
381 bundles.append(group)
382 n_bundles = len(bundles)
383 if verbose:
384 print(
385 f"prepared {n_texts} unique text{'s' if n_texts > 1 else ''} in "
386 f"{n_bundles} {'bundles' if n_bundles > 1 else 'bundle'}",
387 f"({perf_counter() - start_time:.4f})",
388 )
390 # process bundles
391 if isinstance(cache, str):
392 if cache:
393 if clear_cache and os.path.exists(cache):
394 shutil.rmtree(cache, True)
395 os.makedirs(cache, exist_ok=True)
396 if not cache_format:
397 cache_format = os.getenv("RECEPTIVITI_CACHE_FORMAT", "parquet")
398 else:
399 cache = False
400 opts = {
401 "url": f"{url}/{version}/{endpoint}/bulk".lower(),
402 "auth": requests.auth.HTTPBasicAuth(key, secret),
403 "retries": retry_limit,
404 "add": {} if api_args is None else api_args,
405 "are_paths": text_is_path,
406 "request_cache": request_cache,
407 "cache": "" if cache_overwrite or isinstance(cache, bool) and not cache else cache,
408 "cache_format": cache_format,
409 "make_request": make_request,
410 }
411 opts["add_hash"] = hashlib.md5(
412 json.dumps(
413 {**opts["add"], "url": opts["url"], "key": key, "secret": secret},
414 separators=(",", ":"),
415 ).encode()
416 ).hexdigest()
417 use_pb = (verbose and progress_bar) or progress_bar
418 parallel = n_bundles > 1 and cores > 1
419 if in_memory is None:
420 in_memory = not parallel
421 with TemporaryDirectory() as scratch_cache:
422 if not in_memory:
423 if verbose:
424 print(f"writing to scratch cache ({perf_counter() - start_time:.4f})")
426 def write_to_scratch(i: int, bundle: pandas.DataFrame):
427 temp = f"{scratch_cache}/{i}.json"
428 with open(temp, "wb") as scratch:
429 pickle.dump(bundle, scratch)
430 return temp
432 bundles = [write_to_scratch(i, b) for i, b in enumerate(bundles)]
433 if parallel:
434 if verbose:
435 print(f"requesting in parallel ({perf_counter() - start_time:.4f})")
436 waiter: "Queue[pandas.DataFrame]" = Queue()
437 queue: "Queue[tuple[int, pandas.DataFrame]]" = Queue()
438 manager = Process(
439 target=_queue_manager,
440 args=(queue, waiter, n_texts, n_bundles, use_pb, verbose),
441 )
442 manager.start()
443 nb = math.ceil(n_bundles / min(n_bundles, cores))
444 cores = math.ceil(n_bundles / nb)
445 procs = [
446 Process(
447 target=_process,
448 args=(bundles[(i * nb) : min(n_bundles, (i + 1) * nb)], opts, queue),
449 )
450 for i in range(cores)
451 ]
452 for cl in procs:
453 cl.start()
454 for cl in procs:
455 cl.join()
456 res = waiter.get()
457 else:
458 if verbose:
459 print(f"requesting serially ({perf_counter() - start_time:.4f})")
460 if use_pb:
461 pb = tqdm(total=n_texts, leave=verbose)
462 res = _process(bundles, opts, pb=pb)
463 if use_pb:
464 pb.close()
465 if verbose:
466 print(f"done requesting ({perf_counter() - start_time:.4f})")
468 # finalize
469 if not res.shape[0]:
470 msg = "no results"
471 raise RuntimeError(msg)
472 if isinstance(cache, str):
473 _update_cache(res, cache, cache_format, verbose, start_time, [e[0] for e in opts["add"]])
474 if verbose:
475 print(f"preparing output ({perf_counter() - start_time:.4f})")
476 data.set_index("id", inplace=True)
477 res.set_index("id", inplace=True)
478 if len(res) != n_original:
479 res = res.join(data["text"])
480 data_absent = data.loc[list(set(data.index).difference(res.index))]
481 data_absent = data_absent.loc[data_absent["text"].isin(res["text"])]
482 if data.size:
483 res = res.reset_index()
484 res.set_index("text", inplace=True)
485 data_dupes = res.loc[data_absent["text"]]
486 data_dupes["id"] = data_absent.index.to_list()
487 res = pandas.concat([res, data_dupes])
488 res.reset_index(inplace=True, drop=True)
489 res.set_index("id", inplace=True)
490 res = res.join(data["text"], how="outer")
491 if not return_text:
492 res.drop("text", axis=1, inplace=True)
493 res = res.reset_index()
495 if output is not None:
496 if verbose:
497 print(f"writing results to file: {output} ({perf_counter() - start_time:.4f})")
498 res.to_csv(output, index=False)
500 drops = ["custom", "bin"]
501 if not id_specified:
502 drops.append("id")
503 res.drop(
504 list({*drops}.intersection(res.columns)),
505 axis="columns",
506 inplace=True,
507 )
508 if frameworks is not None:
509 if verbose:
510 print(f"selecting frameworks ({perf_counter() - start_time:.4f})")
511 if isinstance(frameworks, str):
512 frameworks = [frameworks]
513 if len(frameworks) == 1 and framework_prefix is None:
514 framework_prefix = False
515 select = []
516 if id_specified:
517 select.append("id")
518 if return_text:
519 select.append("text")
520 select.append("text_hash")
521 res = res.filter(regex=f"^(?:{'|'.join(select + frameworks)})(?:$|\\.)")
522 if isinstance(framework_prefix, bool) and not framework_prefix:
523 prefix_pattern = re.compile("^[^.]+\\.")
524 res.columns = pandas.Index([prefix_pattern.sub("", col) for col in res.columns])
526 if verbose:
527 print(f"done ({perf_counter() - start_time:.4f})")
529 return res
532def _queue_manager(
533 queue: "Queue[tuple[int, Union[pandas.DataFrame, None]]]",
534 waiter: "Queue[pandas.DataFrame]",
535 n_texts: int,
536 n_bundles: int,
537 use_pb=True,
538 verbose=False,
539):
540 if use_pb:
541 pb = tqdm(total=n_texts, leave=verbose)
542 res: List[pandas.DataFrame] = []
543 for size, chunk in iter(queue.get, None):
544 if isinstance(chunk, pandas.DataFrame):
545 if use_pb:
546 pb.update(size)
547 res.append(chunk)
548 if len(res) >= n_bundles:
549 break
550 else:
551 break
552 waiter.put(pandas.concat(res, ignore_index=True, sort=False))
555def _process(
556 bundles: list,
557 opts: dict,
558 queue: Union["Queue[tuple[int, Union[pandas.DataFrame, None]]]", None] = None,
559 pb: Union[tqdm, None] = None,
560) -> pandas.DataFrame:
561 reses: List[pandas.DataFrame] = []
562 for bundle in bundles:
563 if isinstance(bundle, str):
564 with open(bundle, "rb") as scratch:
565 bundle = pickle.load(scratch)
566 body = []
567 bundle.insert(0, "text_hash", "")
568 for i, text in enumerate(bundle["text"]):
569 text_hash = hashlib.md5((opts["add_hash"] + text).encode()).hexdigest()
570 bundle.iat[i, 0] = text_hash
571 body.append({"content": text, "request_id": text_hash, **opts["add"]})
572 cached = None
573 if opts["cache"] and os.path.isdir(opts["cache"] + "/bin=h"):
574 db = dataset.dataset(
575 opts["cache"],
576 partitioning=dataset.partitioning(
577 pyarrow.schema([pyarrow.field("bin", pyarrow.string())]), flavor="hive"
578 ),
579 format=opts["cache_format"],
580 )
581 if "text_hash" in db.schema.names:
582 su = db.filter(compute.field("text_hash").isin(bundle["text_hash"]))
583 if su.count_rows() > 0:
584 cached = su.to_table().to_pandas(split_blocks=True, self_destruct=True)
585 res = "failed to retrieve results"
586 if cached is None or len(cached) < len(bundle):
587 if cached is None or not len(cached):
588 res = _prepare_results(body, opts)
589 else:
590 fresh = ~compute.is_in(
591 bundle["text_hash"].to_list(), pyarrow.array(cached["text_hash"])
592 ).to_pandas(split_blocks=True, self_destruct=True)
593 res = _prepare_results([body[i] for i, ck in enumerate(fresh) if ck], opts)
594 if not isinstance(res, str):
595 if cached is not None:
596 if len(res) != len(cached) or not all(cached.columns.isin(res.columns)):
597 cached = _prepare_results(
598 [body[i] for i, ck in enumerate(fresh) if not ck], opts
599 )
600 res = pandas.concat([res, cached])
601 else:
602 res = cached
603 if not isinstance(res, str):
604 res = res.merge(bundle.loc[:, ["text_hash", "id"]], on="text_hash")
605 reses.append(res)
606 if queue is not None:
607 queue.put((0, None) if isinstance(res, str) else (len(res), res))
608 elif pb is not None:
609 pb.update(len(bundle))
610 if isinstance(res, str):
611 raise RuntimeError(res)
612 return reses[0] if len(reses) == 1 else pandas.concat(reses, ignore_index=True, sort=False)
615def _prepare_results(body: list, opts: dict):
616 json_body = json.dumps(body, separators=(",", ":"))
617 bundle_hash = (
618 REQUEST_CACHE + hashlib.md5(json_body.encode()).hexdigest() + ".json"
619 if opts["request_cache"]
620 else ""
621 )
622 raw_res = _request(
623 json_body,
624 opts["url"],
625 opts["auth"],
626 opts["retries"],
627 bundle_hash,
628 opts["make_request"],
629 )
630 if isinstance(raw_res, str):
631 return raw_res
632 res = pandas.json_normalize(raw_res)
633 res.rename(columns={"request_id": "text_hash"}, inplace=True)
634 if "text_hash" not in res:
635 res.insert(0, "text_hash", [text["request_id"] for text in body])
636 res.drop(
637 list({"response_id", "language", "version", "error"}.intersection(res.columns)),
638 axis="columns",
639 inplace=True,
640 )
641 res.insert(res.shape[1], "bin", ["h" + h[0] for h in res["text_hash"]])
642 return res
645def _request(
646 body: str,
647 url: str,
648 auth: requests.auth.HTTPBasicAuth,
649 retries: int,
650 cache="",
651 execute=True,
652) -> Union[dict, str]:
653 if not os.path.isfile(cache):
654 if not execute:
655 return "`make_request` is `False`, but there are texts with no cached results"
656 res = requests.post(url, body, auth=auth, timeout=9999)
657 if cache and res.status_code == 200:
658 with open(cache, "w", encoding="utf-8") as response:
659 json.dump(res.json(), response)
660 else:
661 with open(cache, encoding="utf-8") as response:
662 data = json.load(response)
663 return data["results"] if "results" in data else data
664 if res.status_code == 200:
665 data = res.json()
666 data = dict(data[0] if isinstance(data, list) else data)
667 return data["results"] if "results" in data else data
668 if os.path.isfile(cache):
669 os.remove(cache)
670 if retries > 0:
671 cd = re.search(
672 "[0-9]+(?:\\.[0-9]+)?",
673 res.json()["message"]
674 if res.headers["Content-Type"] == "application/json"
675 else res.text,
676 )
677 sleep(1 if cd is None else float(cd[0]) / 1e3)
678 return _request(body, url, auth, retries - 1, cache)
679 return f"request failed, and have no retries: {res.status_code}: {res.reason}"
682def _update_cache(
683 res: pandas.DataFrame,
684 cache: str,
685 cache_format: str,
686 verbose: bool,
687 start_time: float,
688 add_names: list,
689):
690 part: pyarrow.Partitioning = dataset.partitioning(
691 pyarrow.schema([pyarrow.field("bin", pyarrow.string())]), flavor="hive"
692 )
693 exclude = {"id", *add_names}
695 def initialize_cache():
696 initial = res.iloc[[0]].drop(
697 exclude.intersection(res.columns),
698 axis="columns",
699 )
700 initial["text_hash"] = ""
701 initial["bin"] = "h"
702 initial.loc[
703 :,
704 ~initial.columns.isin(["summary.word_count", "summary.sentence_count"])
705 & (initial.dtypes != object).to_list(),
706 ] = 0.1
707 dataset.write_dataset(
708 pyarrow.Table.from_pandas(initial),
709 cache,
710 partitioning=part,
711 format=cache_format,
712 existing_data_behavior="overwrite_or_ignore",
713 )
715 if not os.path.isdir(cache + "/bin=h"):
716 if verbose:
717 print(f"initializing cache ({perf_counter() - start_time:.4f})")
718 initialize_cache()
719 db = dataset.dataset(cache, partitioning=part, format=cache_format)
720 if any(name not in exclude and name not in db.schema.names for name in res.columns.to_list()):
721 if verbose:
722 print(
723 "clearing cache since it contains columns not in new results",
724 f"({perf_counter() - start_time:.4f})",
725 )
726 shutil.rmtree(cache, True)
727 initialize_cache()
728 db = dataset.dataset(cache, partitioning=part, format=cache_format)
729 fresh = res[~res.duplicated(subset=["text_hash"])]
730 su = db.filter(compute.field("text_hash").isin(fresh["text_hash"]))
731 if su.count_rows() > 0:
732 cached = ~compute.is_in(
733 fresh["text_hash"].to_list(),
734 su.scanner(columns=["text_hash"]).to_table()["text_hash"],
735 ).to_pandas(split_blocks=True, self_destruct=True)
736 if any(cached):
737 fresh = fresh[cached.to_list()]
738 else:
739 return
740 n_new = len(fresh)
741 if n_new:
742 if verbose:
743 print(
744 f"adding {n_new} result{'' if n_new == 1 else 's'}",
745 f"to cache ({perf_counter() - start_time:.4f})",
746 )
747 dataset.write_dataset(
748 pyarrow.Table.from_pandas(
749 fresh.drop(
750 list(exclude.intersection(fresh.columns)),
751 axis="columns",
752 )
753 ),
754 cache,
755 partitioning=part,
756 format=cache_format,
757 existing_data_behavior="overwrite_or_ignore",
758 )
761def _manage_request_cache():
762 os.makedirs(REQUEST_CACHE, exist_ok=True)
763 try:
764 refreshed = time()
765 log_file = REQUEST_CACHE + "log.txt"
766 if os.path.exists(log_file):
767 with open(log_file, encoding="utf-8") as log:
768 logged = log.readline()
769 if isinstance(logged, list):
770 logged = logged[0]
771 refreshed = float(logged)
772 else:
773 with open(log_file, "w", encoding="utf-8") as log:
774 log.write(str(time()))
775 if time() - refreshed > 86400:
776 for cached_request in glob(REQUEST_CACHE + "*.json"):
777 os.remove(cached_request)
778 except Exception as exc:
779 msg = "failed to manage request cache"
780 raise RuntimeWarning(msg) from exc