Skip to content

Commit

Permalink
Add function for detecting git chenages to text files
Browse files Browse the repository at this point in the history
  • Loading branch information
cottsay committed Jul 26, 2024
1 parent 85638ea commit e3872a5
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 0 deletions.
94 changes: 94 additions & 0 deletions rosdistro_reviewer/git_lines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright 2024 Open Source Robotics Foundation, Inc.
# Licensed under the Apache License, Version 2.0

from typing import Iterable
from typing import Mapping
from typing import Optional
from typing import Sequence

from git import Repo
import unidiff


def _rangeify(sequence: Iterable[int]) -> Iterable[range]:
chunk_last = None
chunk_start = None

for item in sequence:
if chunk_last != item - 1:
if chunk_start is not None:
yield range(chunk_start, chunk_last + 1)
chunk_start = item
chunk_last = item

if chunk_start is not None and chunk_last is not None:
yield range(chunk_start, chunk_last + 1)


def get_added_lines(
path,
*,
target_ref: Optional[str] = None,
head_ref: Optional[str] = None,
paths=None,
) -> Optional[Mapping[str, Sequence[range]]]:
"""
Determine what lines were added between two git repository states.
:param path: The path to the repository root
:param target_ref: The git ref to base the diff from
:param head_ref: The git ref where the changes have been made
:param paths: Relative paths under the repository to limit results to
:returns: Mapping of relative file paths to sequences of line number
ranges, or None if no changes were detected
"""
repo = Repo(path)

if head_ref is not None:
head = repo.commit(head_ref)
else:
head = None

if target_ref is not None:
target = repo.commit(target_ref)
elif head is not None:
target = head.parents[0]
else:
target = repo.head.commit

if head is not None:
for base in repo.merge_base(target, head):
if base is not None:
break
else:
raise RuntimeError(
f"No merge base found between '{target_ref}' and '{head_ref}'")
else:
base = target

diffs = base.diff(head, paths, True)

lines: dict[str, list[int]] = {}
for diff in diffs:
if not diff.b_path:
continue
patch = f"""--- {diff.a_path if diff.a_path else '/dev/null'}
+++ {diff.b_path}
{diff.diff.decode()}"""
patchset = unidiff.PatchSet(patch)
for file in patchset:
for hunk in file:
for line in hunk:
if line.line_type != unidiff.LINE_TYPE_ADDED:
continue
lines.setdefault(file.path, [])
lines[file.path].append(line.target_line_no)

if not lines:
return None

return {
path: list(_rangeify(sorted(lines.get(path, ()))))
for path in (paths if paths is not None else lines.keys())
}
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ long_description_content_type = text/markdown
[options]
python_requires = >=3.6
install_requires =
GitPython
unidiff
packages = find:
zip_safe = true

Expand Down
6 changes: 6 additions & 0 deletions test/spell_check.words
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
addfinalizer
apache
diffs
https
iterdir
linter
mktemp
mypy
patchset
pathlib
pycqa
pytest
rangeify
returncode
rosdistro
scspell
setuptools
thomas
unidiff
118 changes: 118 additions & 0 deletions test/test_git_lines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
# Copyright 2024 Open Source Robotics Foundation, Inc.
# Licensed under the Apache License, Version 2.0

from git import Head
from git import Repo
import pytest
from rosdistro_reviewer.git_lines import get_added_lines


@pytest.fixture(scope='session')
def git_repo(
request: pytest.FixtureRequest,
tmp_path_factory: pytest.TempPathFactory,
) -> Repo:
repo_dir = tmp_path_factory.mktemp('git_repo')
repo = Repo.init(repo_dir)
request.addfinalizer(repo.close)

repo.index.commit('Initial commit')

base = repo.create_head('base')
base.checkout()
lines_txt = repo_dir / 'lines.txt'
with open(lines_txt, 'w') as f:
f.write('\n'.join(['a', 'b', 'c', 'd', 'e', 'B', 'E', '']))
repo.index.add((lines_txt,))
repo.index.commit('Add lines.txt')

repo.head.reference = Head(repo, 'refs/heads/orphan') # type: ignore
repo.index.commit('Orphaned commit')

repo.create_head('lines2', 'base').checkout()
lines2_txt = repo_dir / 'lines2.txt'
with open(lines2_txt, 'w') as f:
f.write('\n'.join(['1', '2']))
repo.index.add((lines2_txt,))
repo.index.remove(str(lines_txt), working_tree=True)
repo.index.commit('Add lines2.txt, remove lines.txt')

repo.create_head('less_c', 'base').checkout()
with open(lines_txt, 'w') as f:
f.write('\n'.join(['a', 'b', 'd', 'e', 'B', 'C', 'E', '']))
repo.index.add((lines_txt,))
repo.index.commit("Remove 'c' from lines.txt")

repo.create_head('less_c_d', 'less_c').checkout()
with open(lines_txt, 'w') as f:
f.write('\n'.join(['a', 'b', 'e', 'B', 'C', 'D', 'E', '']))
repo.index.add((lines_txt,))
repo.index.commit("Remove 'd' from lines.txt")

repo.create_head('less_a', 'base').checkout()
with open(lines_txt, 'w') as f:
f.write('\n'.join(['b', 'c', 'd', 'e', 'A', 'B', 'E', '']))
repo.index.add((lines_txt,))
repo.index.commit("Remove 'a' from lines.txt")

target = repo.create_head('merge_c_d_to_a', 'less_a').checkout()
other = repo.heads['less_c_d']
repo.index.merge_tree(other.commit)
with open(lines_txt, 'w') as f:
f.write('\n'.join(['b', 'e', 'A', 'B', 'C', 'D', 'E', '']))
repo.index.add((lines_txt,))
repo.index.commit(
"Merge branch 'less_c_d' into merge_c_d_to_a",
parent_commits=[target.commit, other.commit]) # type: ignore

with open(lines_txt, 'a') as f:
f.write('X\n')

return repo


def test_added_lines(git_repo: Repo) -> None:
# Check uncommitted
lines = get_added_lines(git_repo.working_dir)
assert lines == {'lines.txt': [range(8, 9)]}

# Check path targeting
lines = get_added_lines(git_repo.working_dir, paths=['lines.txt'])
assert lines == {'lines.txt': [range(8, 9)]}

# Check path targeting with no match
lines = get_added_lines(git_repo.working_dir, paths=['foo.txt'])
assert lines is None

# Check explicit head
lines = get_added_lines(git_repo.working_dir, head_ref='less_a')
assert lines == {'lines.txt': [range(5, 6)]}

# Check explicit target with no head (including uncommitted)
lines = get_added_lines(git_repo.working_dir, target_ref='less_c')
assert lines == {'lines.txt': [range(3, 4), range(6, 7), range(8, 9)]}

# Check explicit head and target
lines = get_added_lines(git_repo.working_dir, target_ref='less_c',
head_ref='less_c_d')
assert lines == {'lines.txt': [range(6, 7)]}

# Check explicit head and target with multiple commits
lines = get_added_lines(git_repo.working_dir, target_ref='base',
head_ref='less_c_d')
assert lines == {'lines.txt': [range(5, 7)]}

# Check merge base behavior
lines = get_added_lines(git_repo.working_dir, target_ref='less_a',
head_ref='less_c_d')
assert lines == {'lines.txt': [range(5, 7)]}

# Check file being added
lines = get_added_lines(git_repo.working_dir, target_ref='base',
head_ref='lines2')
assert lines == {'lines2.txt': [range(1, 3)]}

# Check failure to find merge base
with pytest.raises(RuntimeError):
get_added_lines(git_repo.working_dir, target_ref='orphan',
head_ref='less_a')

0 comments on commit e3872a5

Please sign in to comment.