-
Notifications
You must be signed in to change notification settings - Fork 2
/
corpus.py
163 lines (130 loc) · 4.76 KB
/
corpus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import gc
from collections import defaultdict
import logging
from sentence import Sentence
class Corpus:
def __init__(self, backup, int_tokens=False):
self._corpus = []
self.create_index()
self._backup = backup
self._int_tokens = int_tokens
if self._int_tokens:
self._tokmap = {}
def __len__(self):
return len(self._corpus)
def __iter__(self):
return iter(self._corpus)
def __reversed__(self):
return reversed(self._corpus)
def __getitem__(self, key):
return self._corpus[key]
def __setitem__(self, key, value):
self._corpus[key] = value
def __delitem__(self, key):
del self._corpus[key]
def append(self, item):
self.add_sentence(item)
def add_sentence(self, sen):
# change tokens to ints
if self._int_tokens:
sen = self.tokens_to_ints(sen)
# create actual Sentence instance
new_sen = Sentence(sen)
self._corpus.append(new_sen)
# filter stopwords
if hasattr(self, "_stopwords"):
new_sen.remove_toks(self._stopwords, self._backup)
# register to index
sen_index = len(self._corpus) - 1
for tok in new_sen:
self._index[tok].add(sen_index)
def create_index(self):
self._index = defaultdict(set)
for i, sen in enumerate(self._corpus):
for tok in sen:
self._index[tok].add(i)
def ngram_index(self, ngram):
if self._int_tokens:
ngram = self.tokens_to_ints(ngram)
if ngram[0] not in self._index:
return set()
occ = set(self._index[ngram[0]])
if len(ngram) == 1:
return occ
for tok in ngram[1:]:
occ = occ & self._index[tok]
valid_occ = set()
for sen_i in occ:
sen = self._corpus[sen_i]
if sen.ngram_positions(ngram):
valid_occ.add(sen_i)
return valid_occ
def ngram_neighbours(self, ngram, indices=None, top_n=20):
if indices is None:
indices = self.ngram_index(ngram)
if self._int_tokens:
ngram = self.tokens_to_ints(ngram)
neighbours = defaultdict(int)
for sen_i in indices:
sen = self._corpus[sen_i]
for pos in sen.ngram_positions(ngram):
if pos > 0:
neighbours[(sen[pos - 1], -1)] += 1
if pos + len(ngram) < len(sen):
neighbours[(sen[pos + len(ngram)], 1)] += 1
return sorted(filter(lambda x: x[1] > 1, neighbours.items()), key=lambda x: x[1], reverse=True)[:top_n]
def remove_ngram(self, ngram, ind=None, backup=False):
ngram = self.tokens_to_ints(ngram)
if ind is None:
ind = self.ngram_index(ngram)
for sen_i in ind:
sen = self._corpus[sen_i]
sen.remove_ngram(ngram, backup)
# maintaining index
#for tok in ngram:
#if not tok in sen:
#self._index[tok].remove(sen_i)
#if len(self._index[tok]) == 0:
#del self._index[tok]
def tokens_to_ints(self, tokens):
# sometimes tokens are already changed
if type(tokens[0]) == int:
return tokens
ints = []
for tok in tokens:
if not tok in self._tokmap:
self._tokmap[tok] = len(self._tokmap)
ints.append(self._tokmap[tok])
return ints
def ints_to_tokens(self, ints):
# first check, if there is a reverse dict
if not hasattr(self, "_reverse_tokmap"):
logging.info("Creating reverse tokmap for corpus...")
gc.disable()
self._reverse_tokmap = dict((v,k) for k,v in self._tokmap.iteritems())
gc.enable()
logging.info("Creating reverse tokmap for corpus done")
tokens = []
for i in ints:
# normal tokens
if type(i) == int:
tokens.append(self._reverse_tokmap[i])
# removed tokens in backup mode
else:
tokens.append("[{0}]".format(self._reverse_tokmap[i[0]]))
return tokens
def set_stopwords(self, stopwords):
if len(stopwords) != 0:
if self._int_tokens:
self._stopwords = set(self.tokens_to_ints(list(stopwords)))
else:
self._stopwords = set(stopwords)
def clean_multiple_hapax_sentences(self):
# TODO implement if needed
pass
@staticmethod
def read_from_file(f):
c = Corpus()
for l in f:
le = l.strip().split()
c.append(Sentence(le))