Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
* @gvanrossum @gvanrossum-ms @umeshma @robgruen
209 changes: 188 additions & 21 deletions src/typeagent/emails/email_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ def import_forwarded_email_string(


# Imports an email.message.Message object and returns an EmailMessage object
# If the message is a reply, returns only the latest response.
def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
# Extract metadata from
# Extract metadata
email_meta = EmailMessageMeta(
sender=msg.get("From", ""),
recipients=_import_address_headers(msg.get_all("To", [])),
Expand All @@ -69,20 +68,32 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
if timestamp_date is not None:
timestamp = parsedate_to_datetime(timestamp_date).isoformat()

# Get email body.
# If the email was a reply, then ensure we only pick up the latest response
# Get email body and parse into chunks with source attribution
body = _extract_email_body(msg)
if body is None:
body = ""
elif is_reply(msg):
body = get_last_response_in_thread(body)

# Prepend subject to body if available
if email_meta.subject is not None:
body = email_meta.subject + "\n\n" + body

body_chunks = _text_to_chunks(body, max_chunk_length)
# Parse into chunks with source attribution (handles inline replies)
parsed_chunks = parse_email_chunks(body)

# Apply max_chunk_length splitting while preserving source attribution
text_chunks: list[str] = []
chunk_sources: list[str | None] = []
for text, source in parsed_chunks:
sub_chunks = _text_to_chunks(text, max_chunk_length)
for sub_chunk in sub_chunks:
text_chunks.append(sub_chunk)
chunk_sources.append(source)

email: EmailMessage = EmailMessage(
metadata=email_meta, text_chunks=body_chunks, timestamp=timestamp
metadata=email_meta,
text_chunks=text_chunks,
chunk_sources=chunk_sources,
timestamp=timestamp,
)
return email

Expand Down Expand Up @@ -124,21 +135,177 @@ def get_forwarded_email_parts(email_text: str) -> list[str]:
# Precompiled regex for trailing line delimiters (underscores, dashes, equals, spaces)
_TRAILING_LINE_DELIMITERS = re.compile(r"[\r\n][_\-= ]+\s*$")

# Pattern to detect "On <date> <user> wrote:" header for inline replies
# Uses alternation to handle different date formats:
# 1. "On Mon, Dec 10, 2020 at 10:30 AM John Doe wrote:" (AM/PM format)
# 2. "On Mon, Dec 10, 2020 Someone wrote:" (year followed by name)
# 3. Fallback: last words before "wrote:"
# Groups 1, 2, or 3 will capture the person's name depending on format
_INLINE_REPLY_HEADER = re.compile(
r"^on\s+(?:.+[AP]M\s+(.+?)|.+,\s*\d{4}\s+(.+?)|.+\s+(.+?))\s+wrote:\s*$",
re.IGNORECASE | re.MULTILINE,
)

# Pattern to match quoted lines (starting with > possibly with leading whitespace)
_QUOTED_LINE = re.compile(r"^\s*>")

# Pattern to detect email signature markers
_SIGNATURE_MARKER = re.compile(r"^--\s*$", re.MULTILINE)


# Simple way to get the last response on an email thread in MIME format
def get_last_response_in_thread(email_text: str) -> str:
# Type alias for chunk with source info
ChunkWithSource = tuple[str, str | None] # (text, source: None=original, str=quoted)


def is_inline_reply(email_text: str) -> bool:
"""
Detect if an email contains inline replies (responses interspersed with quotes).

An inline reply has:
1. An "On ... wrote:" header
2. Quoted lines (starting with >) interspersed with non-quoted response lines
"""
if not email_text:
return ""
return False

# Must have the "On ... wrote:" header
header_match = _INLINE_REPLY_HEADER.search(email_text)
if not header_match:
return False

# Check content after the header for mixed quoted/non-quoted lines
content_after_header = email_text[header_match.end() :]
lines = content_after_header.split("\n")

has_quoted = False
has_non_quoted_after_quoted = False

for line in lines:
# Check for signature marker
if _SIGNATURE_MARKER.match(line):
break

stripped = line.strip()
if not stripped:
continue

if _QUOTED_LINE.match(line):
has_quoted = True
elif has_quoted:
# Non-quoted line after we've seen quoted lines = inline reply
has_non_quoted_after_quoted = True
break

return has_quoted and has_non_quoted_after_quoted


def parse_email_chunks(email_text: str) -> list[ChunkWithSource]:
"""
Parse email text into chunks with source attribution.

Returns a list of (text, source) tuples where:
- source is None for original (unquoted) content
- source is the quoted person's name for quoted content, or " " if unknown

This handles inline replies where the sender responds inline to quoted text,
preserving both the quoted and unquoted portions as separate chunks.
"""
if not email_text:
return []

# Find the "On ... wrote:" header
header_match = _INLINE_REPLY_HEADER.search(email_text)
if not header_match:
# No inline reply pattern, return as a single original chunk
text = _strip_trailing_delimiters(email_text)
if text:
return [(text, None)]
return []

# Extract quoted person from header (first non-None group from groups 1, 2, or 3)
quoted_person = (
header_match.group(1) or header_match.group(2) or header_match.group(3) or " "
)
quoted_person = quoted_person.strip() if quoted_person else " "
if not quoted_person:
quoted_person = " "

# Get preamble (content before the "On ... wrote:" header)
preamble = email_text[: header_match.start()].strip()

# Process content after header
content_after_header = email_text[header_match.end() :]
lines = content_after_header.split("\n")

result: list[ChunkWithSource] = []
if preamble:
result.append((preamble, None))

current_reply_lines: list[str] = []
current_quoted_lines: list[str] = []
in_signature = False

def flush_reply() -> None:
nonlocal current_reply_lines
if current_reply_lines:
text = "\n".join(current_reply_lines).strip()
if text:
result.append((text, None))
current_reply_lines = []

def flush_quoted() -> None:
nonlocal current_quoted_lines
if current_quoted_lines:
text = "\n".join(current_quoted_lines).strip()
if text:
result.append((text, quoted_person))
current_quoted_lines = []

for line in lines:
# Check for signature marker
if _SIGNATURE_MARKER.match(line):
in_signature = True
# Flush any pending content
flush_quoted()
flush_reply()
continue

if in_signature:
# Skip signature content
continue

if _QUOTED_LINE.match(line):
# This is a quoted line - flush any pending reply first
if current_reply_lines:
flush_reply()
# Strip the leading > and any space after it
unquoted = re.sub(r"^\s*>\s?", "", line)
current_quoted_lines.append(unquoted)
else:
# Non-quoted line - flush any pending quoted first
if current_quoted_lines:
flush_quoted()
# Only accumulate non-empty lines or preserve blank lines within a block
stripped = line.strip()
if stripped or current_reply_lines:
current_reply_lines.append(line.rstrip())

# Flush any remaining content
flush_quoted()
flush_reply()

# Strip trailing delimiters from the last chunk
if result:
last_text, last_source = result[-1]
result[-1] = (_strip_trailing_delimiters(last_text), last_source)

return result

match = _THREAD_DELIMITERS.search(email_text)
if match:
email_text = email_text[: match.start()]

email_text = email_text.strip()
# Remove trailing line delimiters (e.g. underscores, dashes, equals)
_TRAILING_LINE_DELIMITER_REGEX = _TRAILING_LINE_DELIMITERS
email_text = _TRAILING_LINE_DELIMITER_REGEX.sub("", email_text)
return email_text
def _strip_trailing_delimiters(text: str) -> str:
"""Remove trailing line delimiters (underscores, dashes, equals, spaces)."""
text = text.strip()
return _TRAILING_LINE_DELIMITERS.sub("", text)


# Extracts the plain text body from an email.message.Message object.
Expand Down Expand Up @@ -198,11 +365,11 @@ def _text_to_chunks(text: str, max_chunk_length: int) -> list[str]:
if len(text) < max_chunk_length:
return [text]

paragraphs = _splitIntoParagraphs(text)
paragraphs = _split_into_paragraphs(text)
return list(_merge_chunks(paragraphs, "\n\n", max_chunk_length))


def _splitIntoParagraphs(text: str) -> list[str]:
def _split_into_paragraphs(text: str) -> list[str]:
return _remove_empty_strings(re.split(r"\n{2,}", text))


Expand Down
6 changes: 6 additions & 0 deletions src/typeagent/emails/email_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ def __init__(self, **data: Any) -> None:
super().__init__(**data)

text_chunks: list[str] = CamelCaseField("The text chunks of the email message")
# For each chunk: None means original content, str means quoted.
# If quoted, the string is the name of the person being quoted, or " " if unknown.
chunk_sources: list[str | None] = CamelCaseField(
"Source attribution for each chunk: None=original, str=quoted person or ' '",
default_factory=list,
)
metadata: EmailMessageMeta = CamelCaseField(
"Metadata associated with the email message"
)
Expand Down
Loading