i am getting data in an ordered sequence of buffers one at a time. an example is reading a data pipe 4096 bytes at a time. i want to count the number of lines in this whole data based on the line ending sequence of the platform it is running on (os.sep). when the line ending sequence is longer than 1 character (len(os.sep)>1) it is possible for a line ending sequence to be split between buffers. does anyone know a good way to accurately count the line endings when getting or accessing these buffers one at a time without big memory usage (do not collect the whole data sequence all at once)?
an alternate goal is to count lines based on each line ending in any valid line ending (not necessarily the same as other lines).
I think you are mistaking os.sep for os.linesep. Anyway, here is my attempt (to be tested)
import os
__version__ = '0.0.1'
def count_linesep(ibuffer, linesep=os.linesep):
"""Count the number of line separators in a sequence of buffers
Arguments
ibuffer: a sequence of strings
linesep: an optional string representing the line separator.
Return
The number of line separators in the (virtually concatenated)
sequence of buffers
"""
rest = ''
w = len(linesep)
result = 0
for buffer in ibuffer:
buffer = rest + buffer
if (idx := buffer.rfind(linesep)) < 0:
rest = buffer[-w:]
else:
rest = buffer[max(idx + w, len(buffer) - w):]
result += 1 + buffer.count(linesep, 0, idx)
return result
ah, yes, os.sep is the file path separator. oops on me. and := is a new thing i am still unfamiliar with. so, this code does not make sense to me. it is unclear to me how this code compares in 2 buffers at the same time.
i am assuming that a linesep is never spread across more than 2 buffers. but if reading buffers might get a length of 1 when a linesep could be longer than 2 (not today) that could be (rare) issue.
(Oct-04-2022, 10:13 PM)Skaperen Wrote: [ -> ]i am assuming that a linesep is never spread across more than 2 buffers.
I think my code works even if the linesep is spread across more than 2 buffer, but it would be good to write tests for this.
such tests would have to simulate os.linesep being 3 characters or more, and very small buffers.
Here is a much more sophisticated version that allows overlapping line separators and uses re.finditer() to find the last match of the line separator in a string. The strategy is explained in the documentation, tell me if it is understandable
__version__ = '0.0.4'
from collections import deque
import os
import re
class Summary:
"""Class that summarizes the beginning of a text.
The summary consists of
self.count : the number of non overlapping sequences
of line separators that have been met so far in the text.
self.tail : the end of the unterminated last line of the text.
Only the last n-1 characters of that line are kept where
n is the length of the line separator because only these
characters could participate in an occurrence of the line
separator if the text is continued.
Constructor arguments:
ibuffer : a sequence of strings containing the beginning
of the text (defaults to an empty sequence)
linesep : the line separator string (defaults to os.linesep)
The Summary is extendible, meaning that more text can be added by
calling either self.append(buffer) or self.extend(ibuffer). The
members .count and .tail are updated accordingly.
The method .line_count() returns the number of lines in the text,
which is equal to .count or .count + 1 if the last unterminated
line is not empty.
The class allows line separators that could overlap such as
"abab", for example the text 'spam abababa ham' would have two
lines in that case and 'spam abababab ham' would have three lines.
"""
def __init__(self, ibuffer=(), linesep=os.linesep):
if not linesep:
raise ValueError("Empty line separator is not supported")
self._regex = re.compile(re.escape(linesep))
self._bound = max(1, len(linesep) - 1)
self.count = 0
self.tail = ''
self.extend(ibuffer)
def append(self, buffer):
s = self.tail + buffer
d = deque(
enumerate(self._regex.finditer(s), start=self.count + 1),
maxlen=1)
if d:
self.count, match = d[-1]
tail = s[match.end():]
else:
tail = s
self.tail = tail[-self._bound:]
def extend(self, ibuffer):
for buffer in ibuffer:
self.append(buffer)
def line_count(self):
return self.count + bool(self.tail)
def count_lines(ibuffer, linesep=os.linesep):
return Summary(ibuffer, linesep).line_count()
this doc and code is understandable to me.
i'm now thinking that i may be reading data that has line separators that may not match what the platform normally uses, such as Windows or Mac files transferred in binary to Linux (it happens). the way i have dealt with this back in my days of C programming is to allow any mix of CR LF VT FF in as many as 4 bytes to mean a new line (plus whatever else based on what is there). but where anything repeats, it means another line (so, "foo\r\n\r\vbar" would be separated by at least one blank line). it might be a little more complicated to process but should cover almost all real life cases.