Coverage for src/receptiviti/manage_request.py: 80%

399 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-12-01 10:33 -0500

1"""Make requests to the API.""" 

2 

3import hashlib 

4import json 

5import math 

6import os 

7import pickle 

8import re 

9import sys 

10import urllib.parse 

11import warnings 

12from glob import glob 

13from multiprocessing import Process, Queue, current_process 

14from tempfile import TemporaryDirectory, gettempdir 

15from time import perf_counter, sleep, time 

16from typing import List, Union 

17 

18import numpy 

19import pandas 

20import pyarrow 

21import pyarrow.compute 

22import pyarrow.dataset 

23import pyarrow.feather 

24import pyarrow.parquet 

25import requests 

26from chardet.universaldetector import UniversalDetector 

27from tqdm import tqdm 

28 

29from receptiviti.status import _resolve_request_def, status 

30 

31CACHE = gettempdir() + "/receptiviti_cache/" 

32REQUEST_CACHE = gettempdir() + "/receptiviti_request_cache/" 

33 

34 

35def _manage_request( 

36 text: Union[str, List[str], pandas.DataFrame, None] = None, 

37 ids: Union[str, List[str], List[int], None] = None, 

38 text_column: Union[str, None] = None, 

39 id_column: Union[str, None] = None, 

40 files: Union[List[str], None] = None, 

41 directory: Union[str, None] = None, 

42 file_type="txt", 

43 encoding: Union[str, None] = None, 

44 context="written", 

45 api_args: Union[dict, None] = None, 

46 bundle_size=1000, 

47 bundle_byte_limit=75e5, 

48 collapse_lines=False, 

49 retry_limit=50, 

50 request_cache=True, 

51 cores=1, 

52 collect_results=True, 

53 in_memory: Union[bool, None] = None, 

54 verbose=False, 

55 progress_bar: Union[str, bool] = os.getenv("RECEPTIVITI_PB", "True"), 

56 make_request=True, 

57 text_as_paths=False, 

58 dotenv: Union[bool, str] = True, 

59 cache=os.getenv("RECEPTIVITI_CACHE", ""), 

60 cache_overwrite=False, 

61 cache_format=os.getenv("RECEPTIVITI_CACHE_FORMAT", "parquet"), 

62 key=os.getenv("RECEPTIVITI_KEY", ""), 

63 secret=os.getenv("RECEPTIVITI_SECRET", ""), 

64 url=os.getenv("RECEPTIVITI_URL", ""), 

65 version=os.getenv("RECEPTIVITI_VERSION", ""), 

66 endpoint=os.getenv("RECEPTIVITI_ENDPOINT", ""), 

67 to_norming=False, 

68) -> tuple[pandas.DataFrame, Union[pandas.DataFrame, None], bool]: 

69 if cores > 1 and current_process().name != "MainProcess": 

70 return (pandas.DataFrame(), None, False) 

71 start_time = perf_counter() 

72 

73 if request_cache: 

74 if verbose: 

75 print(f"preparing request cache ({perf_counter() - start_time:.4f})") 

76 _manage_request_cache() 

77 

78 # resolve credentials and check status 

79 full_url, url, key, secret = _resolve_request_def(url, key, secret, dotenv) 

80 url_parts = re.search("/([Vv]\\d+)/?([^/]+)?", full_url) 

81 if url_parts: 

82 from_url = url_parts.groups() 

83 if not version and from_url[0] is not None: 

84 version = from_url[0] 

85 if not endpoint and from_url[1] is not None: 

86 endpoint = from_url[1] 

87 if to_norming: 

88 version = "v2" 

89 endpoint = "norming" 

90 request_cache = False 

91 else: 

92 if not version: 

93 version = os.getenv("RECEPTIVITI_VERSION", "v1") 

94 version = version.lower() 

95 if not version or not re.search("^v\\d+$", version): 

96 msg = f"invalid version: {version}" 

97 raise RuntimeError(msg) 

98 if not endpoint: 

99 endpoint_default = "framework" if version == "v1" else "analyze" 

100 endpoint = os.getenv("RECEPTIVITI_ENDPOINT", endpoint_default) 

101 endpoint = re.sub("^.*/", "", endpoint).lower() 

102 if not endpoint or re.search("[^a-z]", endpoint): 

103 msg = f"invalid endpoint: {endpoint}" 

104 raise RuntimeError(msg) 

105 api_status = status(url, key, secret, dotenv, verbose=False) 

106 if api_status is None or api_status.status_code != 200: 

107 msg = ( 

108 "URL is not reachable" 

109 if api_status is None 

110 else f"API status failed: {api_status.status_code}: {api_status.reason}" 

111 ) 

112 raise RuntimeError(msg) 

113 

114 # resolve text and ids 

115 text_as_dir = False 

116 if text is None: 

117 if directory is not None: 

118 text = directory 

119 text_as_dir = True 

120 elif files is not None: 

121 text_as_paths = True 

122 text = files 

123 else: 

124 msg = "enter text as the first argument, or use the `files` or `directory` arguments" 

125 raise RuntimeError(msg) 

126 if isinstance(text, str) and (text_as_dir or text_as_paths or len(text) < 260): 

127 if not text_as_dir and os.path.isfile(text): 

128 if verbose: 

129 print(f"reading in texts from a file ({perf_counter() - start_time:.4f})") 

130 text = _readin([text], text_column, id_column, collapse_lines, encoding) 

131 if isinstance(text, pandas.DataFrame): 

132 id_column = "ids" 

133 text_column = "text" 

134 text_as_paths = False 

135 elif os.path.isdir(text): 

136 text = glob(f"{text}/*{file_type}") 

137 text_as_paths = True 

138 elif os.path.isdir(os.path.dirname(text)): 

139 msg = f"`text` appears to point to a directory, but it does not exist: {text}" 

140 raise RuntimeError(msg) 

141 if isinstance(text, pandas.DataFrame): 

142 if id_column is not None: 

143 if id_column in text: 

144 ids = text[id_column].to_list() 

145 else: 

146 msg = f"`id_column` ({id_column}) is not in `text`" 

147 raise IndexError(msg) 

148 if text_column is not None: 

149 if text_column in text: 

150 text = text[text_column].to_list() 

151 else: 

152 msg = f"`text_column` ({text_column}) is not in `text`" 

153 raise IndexError(msg) 

154 else: 

155 msg = "`text` is a DataFrame, but no `text_column` is specified" 

156 raise RuntimeError(msg) 

157 if isinstance(text, str): 

158 text = [text] 

159 text_is_path = all(isinstance(t, str) and (text_as_paths or len(t) < 260) and os.path.isfile(t) for t in text) 

160 if text_as_paths and not text_is_path: 

161 msg = "`text` treated as a list of files, but not all of the entries exist" 

162 raise RuntimeError(msg) 

163 if text_is_path and not collapse_lines: 

164 ids = text 

165 text = _readin(text, text_column, id_column, collapse_lines, encoding) 

166 if isinstance(text, pandas.DataFrame): 

167 if id_column is None: 

168 ids = text["ids"].to_list() 

169 elif id_column in text: 

170 ids = text[id_column].to_list() 

171 if text_column is None: 

172 text_column = "text" 

173 text = text[text_column].to_list() 

174 text_is_path = False 

175 if ids is None and text_is_path: 

176 ids = text 

177 

178 id_specified = ids is not None 

179 if ids is None: 

180 ids = numpy.arange(1, len(text) + 1).tolist() 

181 elif len(ids) != len(text): 

182 msg = "`ids` is not the same length as `text`" 

183 raise RuntimeError(msg) 

184 original_ids = set(ids) 

185 if len(ids) != len(original_ids): 

186 msg = "`ids` contains duplicates" 

187 raise RuntimeError(msg) 

188 

189 # prepare bundles 

190 if verbose: 

191 print(f"preparing text ({perf_counter() - start_time:.4f})") 

192 data = pandas.DataFrame({"text": text, "id": ids}) 

193 data_subset = data[~(data.duplicated(subset=["text"]) | (data["text"] == "") | data["text"].isna())] 

194 n_texts = len(data_subset) 

195 if not n_texts: 

196 msg = "no valid texts to process" 

197 raise RuntimeError(msg) 

198 bundle_size = max(1, bundle_size) 

199 n_bundles = math.ceil(n_texts / min(1000, bundle_size)) 

200 groups = data_subset.groupby( 

201 numpy.sort(numpy.tile(numpy.arange(n_bundles) + 1, bundle_size))[:n_texts], 

202 group_keys=False, 

203 ) 

204 bundles = [] 

205 for _, group in groups: 

206 if sys.getsizeof(group) > bundle_byte_limit: 

207 start = current = end = 0 

208 for txt in group["text"]: 

209 size = os.stat(txt).st_size if text_is_path else sys.getsizeof(txt) 

210 if size > bundle_byte_limit: 

211 msg = f"one of your texts is over the bundle size limit ({bundle_byte_limit / 1e6} MB)" 

212 raise RuntimeError(msg) 

213 if (current + size) > bundle_byte_limit: 

214 bundles.append(group[start:end]) 

215 start = end = end + 1 

216 current = size 

217 else: 

218 end += 1 

219 current += size 

220 bundles.append(group[start:]) 

221 else: 

222 bundles.append(group) 

223 n_bundles = len(bundles) 

224 if verbose: 

225 print( 

226 f"prepared {n_texts} unique text{'s' if n_texts > 1 else ''} in " 

227 f"{n_bundles} {'bundles' if n_bundles > 1 else 'bundle'}", 

228 f"({perf_counter() - start_time:.4f})", 

229 ) 

230 

231 # process bundles 

232 opts = { 

233 "url": ( 

234 full_url 

235 if to_norming 

236 else ( 

237 f"{url}/{version}/{endpoint}/bulk" if version == "v1" else f"{url}/{version}/{endpoint}/{context}" 

238 ).lower() 

239 ), 

240 "version": version, 

241 "auth": requests.auth.HTTPBasicAuth(key, secret), 

242 "retries": retry_limit, 

243 "add": {} if api_args is None else api_args, 

244 "request_cache": request_cache, 

245 "cache": cache, 

246 "cache_overwrite": cache_overwrite, 

247 "cache_format": cache_format, 

248 "to_norming": to_norming, 

249 "make_request": make_request, 

250 "text_is_path": text_is_path, 

251 "text_column": text_column, 

252 "id_column": id_column, 

253 "collapse_lines": collapse_lines, 

254 "encoding": encoding, 

255 "collect_results": collect_results, 

256 } 

257 if version != "v1" and api_args: 

258 opts["url"] += "?" + urllib.parse.urlencode(api_args) 

259 opts["add_hash"] = hashlib.md5( 

260 json.dumps( 

261 {**opts["add"], "url": opts["url"], "key": key, "secret": secret}, 

262 separators=(",", ":"), 

263 ).encode() 

264 ).hexdigest() 

265 if isinstance(progress_bar, str): 

266 progress_bar = progress_bar == "True" 

267 use_pb = (verbose and progress_bar) or progress_bar 

268 parallel = n_bundles > 1 and cores > 1 

269 if in_memory is None: 

270 in_memory = not parallel 

271 with TemporaryDirectory() as scratch_cache: 

272 if not in_memory: 

273 if verbose: 

274 print(f"writing to scratch cache ({perf_counter() - start_time:.4f})") 

275 

276 def write_to_scratch(i: int, bundle: pandas.DataFrame): 

277 temp = f"{scratch_cache}/{i}.json" 

278 with open(temp, "wb") as scratch: 

279 pickle.dump(bundle, scratch) 

280 return temp 

281 

282 bundles = [write_to_scratch(i, b) for i, b in enumerate(bundles)] 

283 if parallel: 

284 if verbose: 

285 print(f"requesting in parallel ({perf_counter() - start_time:.4f})") 

286 waiter: "Queue[List[Union[pandas.DataFrame, None]]]" = Queue() 

287 queue: "Queue[tuple[int, Union[pandas.DataFrame, None]]]" = Queue() 

288 manager = Process( 

289 target=_queue_manager, 

290 args=(queue, waiter, n_texts, n_bundles, use_pb, verbose), 

291 ) 

292 manager.start() 

293 nb = math.ceil(n_bundles / min(n_bundles, cores)) 

294 cores = math.ceil(n_bundles / nb) 

295 procs = [ 

296 Process( 

297 target=_process, 

298 args=(bundles[(i * nb) : min(n_bundles, (i + 1) * nb)], opts, queue), 

299 ) 

300 for i in range(cores) 

301 ] 

302 for cl in procs: 

303 cl.start() 

304 res = waiter.get() 

305 else: 

306 if verbose: 

307 print(f"requesting serially ({perf_counter() - start_time:.4f})") 

308 pb = tqdm(total=n_texts, leave=verbose) if use_pb else None 

309 res = _process(bundles, opts, pb=pb) 

310 if pb is not None: 

311 pb.close() 

312 if verbose: 

313 print(f"done requesting ({perf_counter() - start_time:.4f})") 

314 

315 return (data, pandas.concat(res, ignore_index=True, sort=False) if opts["collect_results"] else None, id_specified) 

316 

317 

318def _queue_manager( 

319 queue: "Queue[tuple[int, Union[pandas.DataFrame, None]]]", 

320 waiter: "Queue[List[Union[pandas.DataFrame, None]]]", 

321 n_texts: int, 

322 n_bundles: int, 

323 use_pb=True, 

324 verbose=False, 

325): 

326 if use_pb: 

327 pb = tqdm(total=n_texts, leave=verbose) 

328 res: List[Union[pandas.DataFrame, None]] = [] 

329 for size, chunk in iter(queue.get, None): 

330 if size: 

331 if use_pb: 

332 pb.update(size) 

333 res.append(chunk) 

334 if len(res) >= n_bundles: 

335 break 

336 else: 

337 break 

338 waiter.put(res) 

339 

340 

341def _process( 

342 bundles: List[pandas.DataFrame], 

343 opts: dict, 

344 queue: Union["Queue[tuple[int, Union[pandas.DataFrame, None]]]", None] = None, 

345 pb: Union[tqdm, None] = None, 

346) -> List[Union[pandas.DataFrame, None]]: 

347 reses: List[Union[pandas.DataFrame, None]] = [] 

348 for bundle in bundles: 

349 if isinstance(bundle, str): 

350 with open(bundle, "rb") as scratch: 

351 bundle = pickle.load(scratch) 

352 body = [] 

353 bundle.insert(0, "text_hash", "") 

354 if opts["text_is_path"]: 

355 bundle["text"] = _readin( 

356 bundle["text"].to_list(), 

357 opts["text_column"], 

358 opts["id_column"], 

359 opts["collapse_lines"], 

360 opts["encoding"], 

361 ) 

362 for i, text in enumerate(bundle["text"]): 

363 text_hash = hashlib.md5((opts["add_hash"] + text).encode()).hexdigest() 

364 bundle.iat[i, 0] = text_hash 

365 if opts["version"] == "v1": 

366 body.append({"content": text, "request_id": text_hash, **opts["add"]}) 

367 else: 

368 body.append({"text": text, "request_id": text_hash}) 

369 ncached = 0 

370 cached: Union[pandas.DataFrame, None] = None 

371 cached_cols: List[str] = [] 

372 if not opts["cache_overwrite"] and opts["cache"] and os.listdir(opts["cache"]): 

373 db = pyarrow.dataset.dataset( 

374 opts["cache"], 

375 partitioning=pyarrow.dataset.partitioning( 

376 pyarrow.schema([pyarrow.field("bin", pyarrow.string())]), flavor="hive" 

377 ), 

378 format=opts["cache_format"], 

379 ) 

380 cached_cols = db.schema.names 

381 if "text_hash" in cached_cols: 

382 su = db.filter(pyarrow.compute.field("text_hash").isin(bundle["text_hash"])) 

383 ncached = su.count_rows() 

384 if ncached > 0: 

385 cached = ( 

386 su.to_table().to_pandas(split_blocks=True, self_destruct=True) 

387 if opts["collect_results"] 

388 else su.scanner(["text_hash"]).to_table().to_pandas(split_blocks=True, self_destruct=True) 

389 ) 

390 res: Union[str, pandas.DataFrame] = "failed to retrieve results" 

391 json_body = json.dumps(body, separators=(",", ":")) 

392 bundle_hash = hashlib.md5(json_body.encode()).hexdigest() 

393 if cached is None or ncached < len(bundle): 

394 if cached is None: 

395 res = _prepare_results(json_body, bundle_hash, opts) 

396 else: 

397 fresh = ~bundle["text_hash"].isin(cached["text_hash"]) 

398 json_body = json.dumps([body[i] for i, ck in enumerate(fresh) if ck], separators=(",", ":")) 

399 res = _prepare_results(json_body, hashlib.md5(json_body.encode()).hexdigest(), opts) 

400 if not isinstance(res, str): 

401 if ncached: 

402 if res.ndim != len(cached_cols) or not pandas.Series(cached_cols).isin(res.columns).all(): 

403 json_body = json.dumps([body[i] for i, ck in enumerate(fresh) if ck], separators=(",", ":")) 

404 cached = _prepare_results(json_body, hashlib.md5(json_body.encode()).hexdigest(), opts) 

405 if cached is not None and opts["collect_results"]: 

406 res = pandas.concat([res, cached]) 

407 if opts["cache"]: 

408 writer = _get_writer(opts["cache_format"]) 

409 schema = pyarrow.schema( 

410 ( 

411 col, 

412 ( 

413 pyarrow.string() 

414 if res[col].dtype == "O" 

415 else ( 

416 pyarrow.int32() 

417 if col in ["summary.word_count", "summary.sentence_count"] 

418 else pyarrow.float32() 

419 ) 

420 ), 

421 ) 

422 for col in res.columns 

423 if col not in ["id", "bin", *(opts["add"].keys() if opts["add"] else [])] 

424 ) 

425 for id_bin, d in res.groupby("bin"): 

426 bin_dir = f"{opts['cache']}/bin={id_bin}" 

427 os.makedirs(bin_dir, exist_ok=True) 

428 writer( 

429 pyarrow.Table.from_pandas(d, schema, preserve_index=False), 

430 f"{bin_dir}/fragment-{bundle_hash}-0.{opts['cache_format']}", 

431 ) 

432 else: 

433 res = cached 

434 nres = len(res) 

435 if not opts["collect_results"]: 

436 reses.append(None) 

437 elif not isinstance(res, str): 

438 if "text_hash" in res: 

439 res = res.merge(bundle[["text_hash", "id"]], on="text_hash") 

440 reses.append(res) 

441 if queue is not None: 

442 queue.put((0, None) if isinstance(res, str) else (nres + ncached, res)) 

443 elif pb is not None: 

444 pb.update(len(bundle)) 

445 if isinstance(res, str): 

446 raise RuntimeError(res) 

447 return reses 

448 

449 

450def _prepare_results(body: str, bundle_hash: str, opts: dict): 

451 raw_res = _request( 

452 body, 

453 opts["url"], 

454 opts["auth"], 

455 opts["retries"], 

456 REQUEST_CACHE + bundle_hash + ".json" if opts["request_cache"] else "", 

457 opts["to_norming"], 

458 opts["make_request"], 

459 ) 

460 if isinstance(raw_res, str): 

461 return raw_res 

462 res = pandas.json_normalize(raw_res) 

463 if "request_id" in res: 

464 res.rename(columns={"request_id": "text_hash"}, inplace=True) 

465 res.drop( 

466 list({"response_id", "language", "version", "error"}.intersection(res.columns)), 

467 axis="columns", 

468 inplace=True, 

469 ) 

470 res.insert(res.ndim, "bin", ["h" + h[0] for h in res["text_hash"]]) 

471 return res 

472 

473 

474def _request( 

475 body: str, 

476 url: str, 

477 auth: requests.auth.HTTPBasicAuth, 

478 retries: int, 

479 cache="", 

480 to_norming=False, 

481 execute=True, 

482) -> Union[dict, str]: 

483 if not os.path.isfile(cache): 

484 if not execute: 

485 return "`make_request` is `False`, but there are texts with no cached results" 

486 if to_norming: 

487 res = requests.patch(url, body, auth=auth, timeout=9999) 

488 else: 

489 res = requests.post(url, body, auth=auth, timeout=9999) 

490 if cache and res.status_code == 200: 

491 with open(cache, "w", encoding="utf-8") as response: 

492 json.dump(res.json(), response) 

493 else: 

494 with open(cache, encoding="utf-8") as response: 

495 data = json.load(response) 

496 return data["results"] if "results" in data else data 

497 data = res.json() 

498 if res.status_code == 200: 

499 data = dict(data[0] if isinstance(data, list) else data) 

500 return data["results"] if "results" in data else data 

501 if os.path.isfile(cache): 

502 os.remove(cache) 

503 if retries > 0 and "code" in data and data["code"] == 1420: 

504 cd = re.search( 

505 "[0-9]+(?:\\.[0-9]+)?", 

506 (res.json()["message"] if res.headers["Content-Type"] == "application/json" else res.text), 

507 ) 

508 sleep(1 if cd is None else float(cd[0]) / 1e3) 

509 return _request(body, url, auth, retries - 1, cache, to_norming) 

510 return f"request failed, and have no retries: {res.status_code}: {data['error'] if 'error' in data else res.reason}" 

511 

512 

513def _manage_request_cache(): 

514 os.makedirs(REQUEST_CACHE, exist_ok=True) 

515 try: 

516 refreshed = time() 

517 log_file = REQUEST_CACHE + "log.txt" 

518 if os.path.exists(log_file): 

519 with open(log_file, encoding="utf-8") as log: 

520 logged = log.readline() 

521 if isinstance(logged, list): 

522 logged = logged[0] 

523 refreshed = float(logged) 

524 else: 

525 with open(log_file, "w", encoding="utf-8") as log: 

526 log.write(str(time())) 

527 if time() - refreshed > 86400: 

528 for cached_request in glob(REQUEST_CACHE + "*.json"): 

529 os.remove(cached_request) 

530 except Exception as exc: 

531 warnings.warn(UserWarning(f"failed to manage request cache: {exc}"), stacklevel=2) 

532 

533 

534def _readin( 

535 paths: List[str], 

536 text_column: Union[str, None], 

537 id_column: Union[str, None], 

538 collapse_lines: bool, 

539 encoding: Union[str, None], 

540) -> Union[List[str], pandas.DataFrame]: 

541 text = [] 

542 ids = [] 

543 sel = [] 

544 if text_column is not None: 

545 sel.append(text_column) 

546 if id_column is not None: 

547 sel.append(id_column) 

548 enc = encoding 

549 predict_encoding = enc is None 

550 if predict_encoding: 

551 detect = UniversalDetector() 

552 

553 def handle_encoding(file: str): 

554 detect.reset() 

555 with open(file, "rb") as text: 

556 while True: 

557 chunk = text.read(1024) 

558 if not chunk: 

559 break 

560 detect.feed(chunk) 

561 if detect.done: 

562 break 

563 detected = detect.close()["encoding"] 

564 if detected is None: 

565 msg = "failed to detect encoding; please specify with the `encoding` argument" 

566 raise RuntimeError(msg) 

567 return detected 

568 

569 if os.path.splitext(paths[0])[1] == ".txt" and not sel: 

570 if collapse_lines: 

571 for file in paths: 

572 if predict_encoding: 

573 enc = handle_encoding(file) 

574 with open(file, encoding=enc, errors="ignore") as texts: 

575 text.append(" ".join([line.rstrip() for line in texts])) 

576 else: 

577 for file in paths: 

578 if predict_encoding: 

579 enc = handle_encoding(file) 

580 with open(file, encoding=enc, errors="ignore") as texts: 

581 lines = [line.rstrip() for line in texts] 

582 text += lines 

583 ids += [file] if len(lines) == 1 else [file + str(i + 1) for i in range(len(lines))] 

584 return pandas.DataFrame({"text": text, "ids": ids}) 

585 elif collapse_lines: 

586 for file in paths: 

587 if predict_encoding: 

588 enc = handle_encoding(file) 

589 temp = pandas.read_csv(file, encoding=enc, usecols=sel) 

590 text.append(" ".join(temp[text_column])) 

591 else: 

592 for file in paths: 

593 if predict_encoding: 

594 enc = handle_encoding(file) 

595 temp = pandas.read_csv(file, encoding=enc, usecols=sel) 

596 if text_column not in temp: 

597 msg = f"`text_column` ({text_column}) was not found in all files" 

598 raise IndexError(msg) 

599 text += temp[text_column].to_list() 

600 ids += ( 

601 temp[id_column].to_list() 

602 if id_column is not None 

603 else [file] if len(temp) == 1 else [file + str(i + 1) for i in range(len(temp))] 

604 ) 

605 return pandas.DataFrame({"text": text, "ids": ids}) 

606 return text 

607 

608 

609def _get_writer(write_format: str): 

610 if write_format == "parquet": 

611 return pyarrow.parquet.write_table 

612 if write_format == "feather": 

613 return pyarrow.feather.write_feather