-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextraction.py
114 lines (107 loc) · 5.25 KB
/
extraction.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import datetime
import logging
from typing import Optional
import bz2
import lzma
import json
import zstandard
from config import DATA_DIR
from helpers import (
infer_extension,
is_relevant_ln,
count_and_log,
create_ln_str_with_json_boilerplate,
)
def extract_from_dump(prefix: str, year: int, month: int, subreddit: str, force: bool = False) -> None:
"""Extract json objects for a specific subreddit for a given year and month into a single year/month file,
assuming the necessary dump files were downloaded beforehand"""
in_dn = DATA_DIR / "compressed"
out_dn = DATA_DIR / f"extracted/monthly/{subreddit}"
out_dn.mkdir(parents=True, exist_ok=True)
ext = infer_extension(prefix, year, month)
n = 0
if prefix == "RC":
kind = "comments"
elif prefix == "RS":
kind = "submissions"
subreddit = subreddit.lower()
date_str = f"{year}-{str(month).zfill(2)}"
out_fp = out_dn / f"{prefix}_{subreddit}_{date_str}.json"
if force is True or not out_fp.is_file():
ext_start = datetime.datetime.utcnow()
fn = f"{prefix}_{date_str}.{ext}"
files = [in_dn / fn]
for fp in files:
if fp.is_file():
logging.info(f"Extracting {kind} for subreddit '{subreddit}' from {fp} to {out_fp}")
with open(out_fp, mode="w", encoding="utf-8") as h_out:
n = 0
if ext == "bz2":
with bz2.BZ2File(fp) as h_in:
for ln in h_in:
ln = ln.decode("utf-8")
if is_relevant_ln(ln, subreddit) is True:
ln_str = create_ln_str_with_json_boilerplate(ln, n)
h_out.write(ln_str)
n = count_and_log(n)
if n > 0: # write final ]
h_out.write("\n]")
elif ext == "xz":
with lzma.LZMAFile(fp) as h_in:
for ln in h_in:
ln = ln.decode("utf-8")
if is_relevant_ln(ln, subreddit) is True:
ln_str = create_ln_str_with_json_boilerplate(ln, n)
h_out.write(ln_str)
n = count_and_log(n)
if n > 0: # write final ]
h_out.write("\n]")
elif ext == "zst":
chunksize = (
2 ** 23
) # 8MB per chunk to reduce the immpact of "unexpected end of data" errors until fixed
with open(fp, "rb") as h_in:
decomp = zstandard.ZstdDecompressor(max_window_size=2147483648)
with decomp.stream_reader(h_in) as reader:
prev_ln = ""
while True:
chunk = reader.read(chunksize)
if not chunk:
break
try:
lines = chunk.decode("utf-8").split("\n")
except UnicodeDecodeError as e:
logging.warning(e)
# Attempt to ignore that segment
chunk = chunk[: e.start] + chunk[e.end :]
try:
lines = chunk.decode("utf-8").split("\n")
except UnicodeDecodeError as e:
logging.error(e)
lines = []
for i, ln in enumerate(lines[:-1]):
if i == 0:
ln = f"{prev_ln}{ln}"
ln = ln.strip()
if is_relevant_ln(ln, subreddit) is True:
ln_str = create_ln_str_with_json_boilerplate(ln, n)
h_out.write(ln_str)
n = count_and_log(n)
prev_ln = lines[-1]
if n > 0: # write final ]
h_out.write("\n]")
if n > 0:
logging.info(f"Saved {n:,} lines to {out_fp.name}")
else:
try:
out_fp.unlink()
except FileNotFoundError:
pass
else:
logging.warning(f"File {fp.name} not found for extraction")
duration = str(datetime.datetime.utcnow() - ext_start).split(".")[0].zfill(8)
logging.info(f"Extraction process of {n} lines completed after {duration}")
else:
logging.info(
f"Skipping extraction to {out_fp.name} because the file already exists (--force=True to override this)"
)