import os
from corpkit.lazyprop import lazy_property
from corpkit.interrogation import Results
from corpkit.process import classname
import collections
[docs]class Corpus(collections.MutableSequence):
"""
Model a parsed or unparsed corpus with arbitrary depth of subfolders
"""
def __new__(self, path_or_data, **kwargs):
if isinstance(path_or_data, str) and path_or_data.endswith('.h'):
import pandas as pd
store = pd.HDFStore(path_or_data)
data = store[os.path.basename(path_or_data)[:-2]]
store.close()
return LoadedCorpus(data=data, path=path_or_data)
else:
return super().__new__(self)
def __init__(self, path_or_data, **kwargs):
# in here we'll put the corpus files
fs = []
# defaults
self.singlefile = False
self.path = False
self.datatype = 'conll'
self._trees = {}
self._subcorpus = kwargs.get('in_subcorpus', False)
self._sliced = kwargs.get('sliced', False)
self._id = 0
self._hdf5 = False
# these make it possible to cut the corpus short quickly
self.just = kwargs.pop('just', None)
self.skip = kwargs.pop('skip', None)
if not len(path_or_data):
raise ValueError("Corpus cannot be empty.")
# if path wasn't passed in as a kwarg, this must be it
if not kwargs.get('path'):
self.path = path_or_data
else:
# i want this to raise an error if not there
self.path = kwargs.pop('path', '')
# for when files are passed in (i.e. slicing)
fs = path_or_data
self.name = os.path.basename(self.path)
# get singlefile, datatype
if isinstance(self.path, str):
self.singlefile = os.path.isfile(self.path)
if self.singlefile:
if self.path.endswith('conll'):
self.datatype = 'conll'
else:
self.datatype = 'plaintext'
else:
if self.path.endswith('-parsed'):
self.datatype = 'conll'
else:
self.datatype = 'plaintext'
# prepend 'data' if corpus can't be found at path
if isinstance(self.path, str) and \
not os.path.isdir(self.path) and \
not self.singlefile:
self.path = os.path.join('data', self.path)
# if the user gave a path to a corpus, make a list of File objs
if not fs and not self.singlefile:
for root, dirnames, filenames in os.walk(self.path):
for filename in filenames:
if not filename.endswith(('conll', 'conllu', 'txt')):
continue
subc = os.path.basename(root)
subc = False if subc == self.name else subc
fs.append(File(os.path.join(root, filename), corpus_name=self.name, in_subcorpus=subc))
self.list = list()
self.extend(fs)
if not os.path.exists(self.path):
raise FileNotFoundError("Corpus not found at %s" % self.path)
def __len__(self):
return len(self.list)
def __getitem__(self, i):
import re
# get from subcorpora, unless there are none, in which case, get from files
#to_iter = self.subcorpora if os.path.isdir(self.path) and len(self.subcorpora) else self.list
to_iter = self.list
if isinstance(i, str):
# dict style lookup of files when there are no subcorpora
return next((s for s in to_iter if s.name.split('.', 1)[0] == i), None)
# allow user to pass in a regular expression and get all matching names
elif isinstance(i, re._pattern_type):
return Corpus([s for s in to_iter if re.search(i, s.name.split('.', 1)[0])], path=self.path, sliced=True)
# normal indexing and slicing
elif isinstance(i, slice):
return Corpus(to_iter[i], path=self.path, sliced=True)
else:
return to_iter[i]
def __delitem__(self, i):
del self.list[i]
def __setitem__(self, i, v):
self.list[i] = v
[docs] def insert(self, i, v):
self.list.insert(i, v)
def __str__(self):
return str(self.list)
def __getattr__(self, name):
name = name.split('.', 1)[0]
return self.__getitem__(name)
@lazy_property
def files(self):
"""
A list of File objects within the corpus
"""
return [i for i in self.list]
@lazy_property
def filepaths(self):
"""
A list of filepaths within the corpus
"""
return [i.path for i in self.list]
[docs] def store_as_hdf(self, **kwargs):
"""
Store a corpus in an HDF5 file for faster loading
"""
from corpkit.process import store_as_hdf
return store_as_hdf(self, **kwargs)
@lazy_property
def subcorpora(self):
"""
A list of immediate subcorpora, also modelled as Corpus objects.
"""
subs = [Corpus(os.path.join(self.path, x))
for x in os.listdir(self.path)
if os.path.isdir(os.path.join(self.path, x))]
return subs
[docs] def load(self, multiprocess=False, load_trees=True, **kwargs):
"""
Load corpus into memory (i.e. create one large `pd.DataFrame`)
Keyword args:
multiprocess (int): how many threads to use
load_trees (bool): Parse constituency trees if present
add_gov (bool): pre-load each token's governor
cols (list): list of columns to be loaded (can improve performance)
just (dict): restrict load to lines with feature key matching
regex value (case insensitive)
skip (dict): the inverse of `just`
Returns:
:class:`corpkit.corpus.LoadedCorpus`
"""
from corpkit.other import _load
return _load(self, multiprocess=multiprocess, load_trees=load_trees,
path=self.path, name=self.name, **kwargs)
[docs] def search(self, target, query, **kwargs):
"""
Search a corpus for some linguistic or metadata feature
Args:
target (str): The name of the column or feature to search
- `'w'`: words
- `'l'`: lemmas
- `'x'`: XPOS
- `'p'`: POS
- `'f'`: dependency function
- `'year'`, `speaker`, etc: arbitrary metadata categories
- `'t'`: Constitutency trees via TGrep2 syntax
- `'d'`: Dependency graphs via depgrep
query (str/list): regular expression, Tgrep2/depgrep string to match, or
list of strings to match against
Keyword Args:
inverse (bool): get non-matches
multiprocess (int): number of parallel threads to start
no_store (bool): do not store reference corpus in Results object
just_index (bool): return only pointers to matches, not actual data
cols (list): list of columns to be loaded (can improve performance)
just (dict): restrict load to lines with feature key matching
regex value (case insensitive)
skip (dict): the inverse of `just`
Returns:
:class:`corpkit.interrogation.Results`: search result
"""
from corpkit.interrogator import interrogator
return interrogator(self, target, query, **kwargs)
[docs] def trees(self, query, **kwargs):
"""Equivalent to `.search('t', query)`"""
return self.search('t', query, **kwargs)
[docs] def deps(self, query, **kwargs):
"""Equivalent to `.search('d', query)`"""
return self.search('d', query, **kwargs)
[docs] def cql(self, query, **kwargs):
"""Equivalent to `.search('c', query)`"""
return self.search('c', query, **kwargs)
[docs] def words(self, query, **kwargs):
"""Equivalent to `.search('w', query)`"""
return self.search('w', query, **kwargs)
[docs] def lemmas(self, query, **kwargs):
"""Equivalent to `.search('l', query)`"""
return self.search('l', query, **kwargs)
def pos(self, query, **kwargs):
"""Equivalent to `.search('p', query)`"""
return self.search('p', query, **kwargs)
[docs] def pos(self, query, **kwargs):
"""Equivalent to `.search('x', query)`"""
return self.search('p', query, **kwargs)
[docs] def functions(self, query, **kwargs):
"""Equivalent to `.search('f', query)`"""
return self.search('f', query, **kwargs)
[docs] def parse(self, parser='corenlp', lang='english',
multiprocess=False, **kwargs):
"""
Parse a plaintext corpus
Keyword Args:
parser (str): name of the parser (only 'corenlp' accepted so far)
lang (str): language for parser (`english`, `arabic`, `chinese`,
`german`, `french` or `spanish`)
multiprocess (int): number of parallel threads to start
memory_mb (int): megabytes of memory to use per thread (default 2024)
Returns:
:class:`corpkit.corpus.Corpus`: parsed corpus
"""
if self.name.endswith('-parsed'):
raise ValueError("Corpus is already parsed.")
from corpkit.parse import Parser
parser = Parser(parser=parser, lang=lang)
return parser.run(self, multiprocess=multiprocess, **kwargs)
# for backward compatibility only
[docs] def interrogate(self, search, **kwargs):
target, query = search.popitem()
return self.search(target=target, query=query, **kwargs)
def __str__(self):
from corpkit.process import make_tree
return str(self.files)
def __repr__(self):
# not always nice, should use abspath sometimes ...
relat = os.path.relpath(self.path).split('/')[1:]
usr = next((i for i, x in enumerate(relat) if x.lower() == 'users'), 0)
path = ' -> '.join(relat[usr:])
return '<%s: %s --- %ss/%sf>' % (self.__class__.__name__,
path, format(len(self.subcorpora), ','), format(len(self), ','))
[docs] def fsi(self, ix):
"""
Get a slice of a corpus as a DataFrame
Args:
ix (iterable): if len(ix) == 1, filename to get
if len(ix) == 2, get sent from filename
if len(ix) == 3, get token from sent from filename
Return:
pd.DataFrame
"""
if not isinstance(ix, (list, tuple)):
ix = [ix]
# allow the user to pass in somthing nested
if isinstance(ix[0], (list, tuple)):
import pandas as pd
dfs = [self.fsi(i) for i in ix]
return pd.concat(dfs)
ix = list(ix)
if isinstance(ix[0], int):
ix[0] = self.files[ix[0]].name.split('.', 1)[0]
if len(ix) == 1:
f, s, i = ix[0], None, None
elif len(ix) == 2:
f, s, i = ix[0], ix[1], None
elif len(ix) == 3:
f, s, i = ix[0], ix[1], ix[2]
else:
raise NotImplementedError
# if the user deliberately gave no filename, we have to load all
if f is None:
df = self.load()
else:
fobj = next((s for s in self.files if s.name.split('.', 1)[0] == f), None)
if not fobj:
return ValueError("File not found.")
df = fobj.document
if s is None and i is None:
return df
elif s is None and i is not None:
df = df.xs(i, level=['i'], drop_level=False)
elif i is not None:
df = df.xs((s, i), level=['s', 'i'], drop_level=False)
else:
df = df.xs([s], level=['s'], drop_level=False)
return df
[docs] def features(self, subcorpora=False):
"""
Generate and show basic stats from the corpus, including number of
sentences, clauses, process types, etc.
Example:
>>> corpus.features
SB Characters Tokens Words Closed class words Open class words
01 26873 8513 7308 4809 3704
02 25844 7933 6920 4313 3620
03 18376 5683 4877 3067 2616
04 20066 6354 5366 3587 2767
"""
if 'features' in self.metadata:
import pandas as pd
feat = pd.read_json(self.metadata['features'])
#feat = feat.set_index(['file', 's'])
#todo: make into interrogation object?
if subcorpora:
return feat.pivot_table(index=subcorpora, aggfunc=sum)
else:
return feat
from corpkit.interrogator import interrogator
from corpkit.process import add_df_to_dotfile
feat = interrogator(self, 'features')
if not self._sliced:
add_df_to_dotfile(self.path, feat, typ='features', subcorpora=False)
else:
print("Corpus is a slice, and thus will not be added to metadata")
if subcorpora:
return feat.table(subcorpora=subcorpora)
else:
return feat
@lazy_property
def wordclasses(self):
"""
Generate and show basic stats from the corpus, including number of
sentences, clauses, process types, etc.
Example:
>>> corpus.wordclasses
SB Verb Noun Preposition Determiner ...
01 26873 8513 7308 5508 ...
02 25844 7933 6920 3323 ...
03 18376 5683 4877 3137 ...
04 20066 6354 5366 4336 ...
"""
from corpkit.features import _get_postags_and_wordclasses
postags, wordclasses = _get_postags_and_wordclasses(self)
return wordclasses
@lazy_property
def postags(self):
"""
Generate and show basic stats from the corpus, including number of
sentences, clauses, process types, etc.
Example:
>>> corpus.postags
SB NN VB JJ IN DT
01 26873 8513 7308 4809 3704 ...
02 25844 7933 6920 4313 3620 ...
03 18376 5683 4877 3067 2616 ...
04 20066 6354 5366 3587 2767 ...
"""
from corpkit.features import _get_postags_and_wordclasses
postags, wordclasses = _get_postags_and_wordclasses(self)
return postags
@lazy_property
def lexicon(self, **kwargs):
"""
Get a lexicon/frequency distribution from a corpus. Store it in metadata.
Returns:
a `DataFrame` of tokens and counts
"""
from corpkit.features import lexicon
return lexicon(self)
[docs] def sample(self, n, level='f'):
"""
Get a sample of the corpus
:param n: amount of data in the the sample. If an ``int``, get n files.
if a ``float``, get float * 100 as a percentage of the corpus
:type n: ``int``/``float``
:param level: sample subcorpora (``s``) or files (``f``)
:type level: ``str``
Returns:
a Corpus object
"""
import random
if level.startswith('f'):
return Corpus(random.sample(list(self), n), self.path)
else:
return Corpus(random.sample(list(self.subcorpora), n), self.path)
@lazy_property
def metadata(self):
"""
Get metadata for a corpus
"""
if isinstance(self, File):
self = self.mainpath
#if not self.name.endswith('-parsed'):
# return {}
from corpkit.process import get_corpus_metadata
absp = os.path.abspath(self.path)
return get_corpus_metadata(absp, generate=True)
[docs] def tokenise(self, postag=True, lemmatise=True, *args, **kwargs):
"""
Tokenise a plaintext corpus, saving to disk
Returns:
The newly created :class:`corpkit.corpus.Corpus`
"""
from corpkit.make import make_corpus
#from corpkit.process import determine_datatype
#dtype, singlefile = determine_datatype(self.path)
if self.datatype != 'plaintext':
raise ValueError(
'parse method can only be used on plaintext corpora.')
kwargs.pop('parse', None)
kwargs.pop('tokenise', None)
c = make_corpus(self.path,
parse=False,
tokenise=True,
postag=postag,
lemmatise=lemmatise,
*args,
**kwargs)
return Corpus(c)
[docs] def annotate(self, interro, annotation, dry_run=True):
"""
Annotate a corpus
Args:
interro (:class:`corpkit.Interrogation`): Search matches
annotation (str/dict): a tag or field: value dict. If a dict, the key
is the name of the annotation field, and the value is, well, the
value. If the value string matches one of the column names seen
when concordancing, the content of that string will be used. If
the value is a list, the middle column will be formatted, as per
the `show` arguments for Interrogation.table() and
Interrogation.conc().
dry_run (bool): Show the annotations to be made, but don't do them
"""
conc = self.concordance
from corpkit.annotate import annotator
annotator(conc, annotation, dry_run=dry_run)
# regenerate metadata afterward---could be a bit slow?
if not dry_run:
self.delete_metadata()
from corpkit.process import make_dotfile
make_dotfile(self)
[docs] def unannotate(annotation, dry_run=True):
"""
Delete annotation from a corpus
Args:
annotation (str/dict): just as in `corpus.annotate()`.
dry_run (bool): Show the changes to be made, but don't do them
"""
from corpkit.annotate import annotator
annotator(self, annotation, dry_run=dry_run, deletemode=True)
[docs]class File(Corpus):
"""
Models a corpus file for reading, interrogating, concordancing.
Methods for interrogating, concordancing and configurations are the same as
:class:`corpkit.corpus.Corpus`, plus methods for accessing the file contents
directly as a `str`, or as a Pandas DataFrame.
"""
def __init__(self, path, **kwargs):
super(File, self).__init__(path)
self.corpus_name = kwargs.get('corpus_name')
self._subcorpus = kwargs.get('in_subcorpus', False)
def __repr__(self):
return "<%s instance: %s>" % (classname(self), self.name)
def __str__(self):
return self.path
def __bool__(self):
return True
[docs] def read(self, **kwargs):
"""
Get contents of file as string
"""
from corpkit.constants import OPENER
with OPENER(self.path, 'r', **kwargs) as fo:
return fo.read()
@lazy_property
def document(self):
"""
`pd.DataFrame` containing tokens and their attributes and metadata
"""
if self.datatype == 'conll':
from corpkit.conll import path_to_df
return path_to_df(self.path, corpus_name=self.corpus_name)
else:
from corpkit.process import saferead
return saferead(self.path)[0]
@lazy_property
def trees(self):
"""
Get list of parse trees in a File
"""
from nltk.tree import ParentedTree
dat = self.read()
dat = [i.split('=')[1].strip() for i in dat if i.startswith(('# parse =', '# parse='))]
return [ParentedTree.fromstring(t) for t in dat]
@lazy_property
def plain(self):
"""
Show the sentences in a File as plaintext
"""
dat = self.read()
dat = [i.split('=')[1].strip() for i in dat if i.startswith(('# text =', '# text='))]
if not dat:
return
else:
return dat
[docs]class LoadedCorpus(Results):
"""
Store a corpus in memory as a DataFrame.
This class has all the same methods as a Results object. The only real
difference is that slicing it will do some reindexing to speed up searches.
"""
@property
def _constructor(self):
import pandas as pd
pd.options.mode.chained_assignment = None
#self.loc[:,'_n'] = list(range(len(self)))
return LoadedCorpus
def __init__(self, data, path=False):
super(LoadedCorpus, self).__init__(data, path=path)