Source code for pressagio.tokenizer

"""
Several classes to tokenize text.

"""
import abc
import collections
import re
import typing

import pressagio.character


[docs]class Tokenizer(object): """ Base class for all tokenizers. """ __metaclass__ = abc.ABCMeta
[docs] def __init__( self, text, blankspaces=pressagio.character.blankspaces, separators=pressagio.character.separators, ): """ Constructor of the Tokenizer base class. Parameters ---------- text : str The text to tokenize. blankspaces : str The characters that represent empty spaces. separators : str The characters that separate token units (e.g. word boundaries). """ self.separators = separators self.blankspaces = blankspaces self.text = text self.lowercase = False self.offbeg = 0 self.offset = None self.offend = None
[docs] def is_blankspace(self, char): """ Test if a character is a blankspace. Parameters ---------- char : str The character to test. Returns ------- ret : bool True if character is a blankspace, False otherwise. """ if len(char) > 1: raise TypeError("Expected a char.") if char in self.blankspaces: return True return False
[docs] def is_separator(self, char): """ Test if a character is a separator. Parameters ---------- char : str The character to test. Returns ------- ret : bool True if character is a separator, False otherwise. """ if len(char) > 1: raise TypeError("Expected a char.") if char in self.separators: return True return False
@abc.abstractmethod def count_characters(self): raise NotImplementedError("Method must be implemented") @abc.abstractmethod def reset_stream(self): raise NotImplementedError("Method must be implemented") @abc.abstractmethod def count_tokens(self): raise NotImplementedError("Method must be implemented") @abc.abstractmethod def has_more_tokens(self): raise NotImplementedError("Method must be implemented") @abc.abstractmethod def next_token(self): raise NotImplementedError("Method must be implemented") @abc.abstractmethod def progress(self): raise NotImplementedError("Method must be implemented")
[docs]class ForwardTokenizer(Tokenizer):
[docs] def __init__( self, text, blankspaces=pressagio.character.blankspaces, separators=pressagio.character.separators, ): Tokenizer.__init__(self, text, blankspaces, separators) self.offend = self.count_characters() - 1 self.reset_stream()
def count_tokens(self): count = 0 while self.has_more_tokens(): count += 1 self.next_token() self.reset_stream() return count
[docs] def count_characters(self): """ Counts the number of unicode characters in the IO stream. """ return len(self.text)
def __next__(self): if self.has_more_tokens(): token = self.next_token() if token != "": return token raise StopIteration def __iter__(self): return self def has_more_tokens(self): if self.offset < self.offend: return True return False def next_token(self): current = self.text[self.offset] self.offset += 1 token = "" if self.offset <= self.offend: while ( self.is_blankspace(current) or self.is_separator(current) ) and self.offset < self.offend: current = self.text[self.offset] self.offset += 1 while ( not self.is_blankspace(current) and not self.is_separator(current) and self.offset <= self.offend ): if self.lowercase: current = current.lower() token += current current = self.text[self.offset] self.offset += 1 if self.offset > self.offend: current = self.text[-1] if not self.is_blankspace(current) and not self.is_separator( current ): token += current return token def progress(self): return float(self.offset) / self.offend def reset_stream(self): self.offset = 0
[docs]class ReverseTokenizer(Tokenizer):
[docs] def __init__( self, text, blankspaces=pressagio.character.blankspaces, separators=pressagio.character.separators, ): Tokenizer.__init__(self, text, blankspaces, separators) self.offend = self.count_characters() - 1 self.offset = self.offend
def count_tokens(self): curroff = self.offset self.offset = self.offend count = 0 while self.has_more_tokens(): self.next_token() count += 1 self.offset = curroff return count
[docs] def count_characters(self): """ Counts the number of unicode characters in the IO stream. """ return len(self.text)
def has_more_tokens(self): if self.offbeg <= self.offset: return True else: return False def next_token(self): token = "" while (self.offbeg <= self.offset) and len(token) == 0: current = self.text[self.offset] if (self.offset == self.offend) and ( self.is_separator(current) or self.is_blankspace(current) ): self.offset -= 1 return token while ( self.is_blankspace(current) or self.is_separator(current) ) and self.offbeg < self.offset: self.offset -= 1 if self.offbeg <= self.offset: current = self.text[self.offset] while ( not self.is_blankspace(current) and not self.is_separator(current) and self.offbeg <= self.offset ): if self.lowercase: current = current.lower() token = current + token self.offset -= 1 if self.offbeg <= self.offset: current = self.text[self.offset] return token def progress(self): return float(self.offend - self.offset) / (self.offend - self.offbeg) def reset_stream(self): self.offset = self.offend
def preprocess(text): re_wordbeg = re.compile(r"(?<=\s)[-']") re_wordbeg2 = re.compile(r"(?<=\s\")[-']") re_wordend = re.compile(r"[-'](?=\s)") re_wordend2 = re.compile(r"[-'](?=\"\s)") text = re_wordbeg.sub("", text) text = re_wordbeg2.sub("", text) text = re_wordend.sub("", text) text = re_wordend2.sub("", text) return text
[docs]class NgramMap: """ A memory efficient store for ngrams. This class is optimized for memory consumption, it might be slower than other ngram stores. It is also optimized for a three step process: 1) Add all ngrams. 2) Perform a cutoff opertation (optional). 3) Read list of ngrams. It might not perform well for other use cases. """
[docs] def __init__(self): """Initialize internal data stores.""" self._strings = dict() self.ngrams = collections.defaultdict(int) self.next_index = 0
[docs] def add_token(self, token): """ Add a token to the internal string store. This will only add the token to the internal strings store. It will return an index that you can use to create your ngram. The ngrams a are represented as strings of the indices, so we will return a string here so that the consumer does not have to do the conversion. Parameters ---------- token : str The token to add to the string store. Returns ------- str The index of the token as a string. """ if token in self._strings: return str(self._strings[token]) else: self._strings[token] = self.next_index old_index = self.next_index self.next_index += 1 return str(old_index)
[docs] def add(self, ngram_indices): """ Add an ngram to the store. This will add a list of strings as an ngram to the ngram store. In our standard use case the strings are the indices of the strings, you can get those from the `add_token()` method. Parameters ---------- list of str The indices of the ngram strings as string. """ self.ngrams["\t".join(ngram_indices)] += 1
[docs] def cutoff(self, cutoff): """ Perform a cutoff on the ngram store. This will remove all ngrams that have a frequency with the given cutoff or lower. Parameters ---------- cutoff : int The cutoff value, we will remove all items with a frequency of the cutoff or lower. """ delete_keys = [] for k, count in self.ngrams.items(): if count <= cutoff: delete_keys.append(k) for k in delete_keys: del self.ngrams[k]
def __len__(self): """Return the number of ngrams in the store.""" return len(self.ngrams)
[docs] def items(self): """ Get the ngrams from the store. Returns ------- iterable of tokens, count The tokens are a list of strings, the real tokens that you added to the store via `add_token()`. The count is the the count value for that ngram. """ strings = {v: k for k, v in self._strings.items()} for token_indices, count in self.ngrams.items(): tokens = [strings[int(idx)] for idx in token_indices.split("\t")] yield tokens, count
[docs]def forward_tokenize_files( infiles: typing.List[str], ngram_size: int, lowercase: bool = False, cutoff: int = 0 ): """ Tokenize a list of file and return an ngram store. Parameters ---------- infile : str The file to parse. ngram_size : int The size of the ngrams to generate. lowercase : bool Whether or not to lowercase all tokens. cutoff : int Perform a cutoff after parsing. We will only return ngrams that have a frequency higher than the cutoff. Returns ------- NgramMap The ngram map that allows you to iterate over the ngrams. """ ngram_map = NgramMap() for infile in infiles: ngram_map = forward_tokenize_file( infile, ngram_size, lowercase=lowercase, ngram_map=ngram_map ) if cutoff > 0: ngram_map.cutoff(cutoff) return ngram_map
[docs]def forward_tokenize_file( infile: str, ngram_size: int, lowercase: bool = False, cutoff: int = 0, ngram_map: NgramMap = None, ): """ Tokenize a file and return an ngram store. Parameters ---------- infile : str The file to parse. ngram_size : int The size of the ngrams to generate. lowercase : bool Whether or not to lowercase all tokens. cutoff : int Perform a cutoff after parsing. We will only return ngrams that have a frequency higher than the cutoff. ngram_map : NgramMap Pass an existing NgramMap if you want to add the ngrams of the given file to the store. Will create a new NgramMap if `None`. Returns ------- NgramMap The ngram map that allows you to iterate over the ngrams. """ if ngram_map is None: ngram_map = NgramMap() with open(infile, "r", encoding="utf-8") as f: for i, line in enumerate(f): line = preprocess(line) ngram_list = [] tokenizer = ForwardTokenizer(line) tokenizer.lowercase = lowercase while len(ngram_list) < ngram_size - 1 and tokenizer.has_more_tokens(): token = tokenizer.next_token() if token != "": token_idx = ngram_map.add_token(token) ngram_list.append(token_idx) if len(ngram_list) < ngram_size - 1: continue tokenizer.reset_stream() while tokenizer.has_more_tokens(): token = tokenizer.next_token() if token != "": token_idx = ngram_map.add_token(token) ngram_list.append(token_idx) ngram_map.add(ngram_list) ngram_list.pop(0) if cutoff > 0: ngram_map.cutoff(cutoff) return ngram_map