8d9c50105f
Change check_ref_format() to take a flags argument that indicates what is acceptable in the reference name (analogous to "git check-ref-format"'s "--allow-onelevel" and "--refspec-pattern"). This is more convenient for callers and also fixes a failure in the test suite (and likely elsewhere in the code) by enabling "onelevel" and "refspec-pattern" to be allowed independently of each other. Also rename check_ref_format() to check_refname_format() to make it obvious that it deals with refnames rather than references themselves. Signed-off-by: Michael Haggerty <mhagger@alum.mit.edu> Signed-off-by: Junio C Hamano <gitster@pobox.com>
679 lines
24 KiB
Python
679 lines
24 KiB
Python
#!/usr/bin/env python
|
|
|
|
"""Functionality for interacting with Git repositories.
|
|
|
|
This module provides classes for interfacing with a Git repository.
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
from binascii import hexlify
|
|
from cStringIO import StringIO
|
|
import unittest
|
|
|
|
from git_remote_helpers.util import debug, error, die, start_command, run_command
|
|
|
|
|
|
def get_git_dir ():
|
|
"""Return the path to the GIT_DIR for this repo."""
|
|
args = ("git", "rev-parse", "--git-dir")
|
|
exit_code, output, errors = run_command(args)
|
|
if exit_code:
|
|
die("Failed to retrieve git dir")
|
|
assert not errors
|
|
return output.strip()
|
|
|
|
|
|
def parse_git_config ():
|
|
"""Return a dict containing the parsed version of 'git config -l'."""
|
|
exit_code, output, errors = run_command(("git", "config", "-z", "-l"))
|
|
if exit_code:
|
|
die("Failed to retrieve git configuration")
|
|
assert not errors
|
|
return dict([e.split('\n', 1) for e in output.split("\0") if e])
|
|
|
|
|
|
def git_config_bool (value):
|
|
"""Convert the given git config string value to True or False.
|
|
|
|
Raise ValueError if the given string was not recognized as a
|
|
boolean value.
|
|
|
|
"""
|
|
norm_value = str(value).strip().lower()
|
|
if norm_value in ("true", "1", "yes", "on", ""):
|
|
return True
|
|
if norm_value in ("false", "0", "no", "off", "none"):
|
|
return False
|
|
raise ValueError("Failed to parse '%s' into a boolean value" % (value))
|
|
|
|
|
|
def valid_git_ref (ref_name):
|
|
"""Return True iff the given ref name is a valid git ref name."""
|
|
# The following is a reimplementation of the git check-ref-format
|
|
# command. The rules were derived from the git check-ref-format(1)
|
|
# manual page. This code should be replaced by a call to
|
|
# check_refname_format() in the git library, when such is available.
|
|
if ref_name.endswith('/') or \
|
|
ref_name.startswith('.') or \
|
|
ref_name.count('/.') or \
|
|
ref_name.count('..') or \
|
|
ref_name.endswith('.lock'):
|
|
return False
|
|
for c in ref_name:
|
|
if ord(c) < 0x20 or ord(c) == 0x7f or c in " ~^:?*[":
|
|
return False
|
|
return True
|
|
|
|
|
|
class GitObjectFetcher(object):
|
|
|
|
"""Provide parsed access to 'git cat-file --batch'.
|
|
|
|
This provides a read-only interface to the Git object database.
|
|
|
|
"""
|
|
|
|
def __init__ (self):
|
|
"""Initiate a 'git cat-file --batch' session."""
|
|
self.queue = [] # List of object names to be submitted
|
|
self.in_transit = None # Object name currently in transit
|
|
|
|
# 'git cat-file --batch' produces binary output which is likely
|
|
# to be corrupted by the default "rU"-mode pipe opened by
|
|
# start_command. (Mode == "rU" does universal new-line
|
|
# conversion, which mangles carriage returns.) Therefore, we
|
|
# open an explicitly binary-safe pipe for transferring the
|
|
# output from 'git cat-file --batch'.
|
|
pipe_r_fd, pipe_w_fd = os.pipe()
|
|
pipe_r = os.fdopen(pipe_r_fd, "rb")
|
|
pipe_w = os.fdopen(pipe_w_fd, "wb")
|
|
self.proc = start_command(("git", "cat-file", "--batch"),
|
|
stdout = pipe_w)
|
|
self.f = pipe_r
|
|
|
|
def __del__ (self):
|
|
"""Verify completed communication with 'git cat-file --batch'."""
|
|
assert not self.queue
|
|
assert self.in_transit is None
|
|
self.proc.stdin.close()
|
|
assert self.proc.wait() == 0 # Zero exit code
|
|
assert self.f.read() == "" # No remaining output
|
|
|
|
def _submit_next_object (self):
|
|
"""Submit queue items to the 'git cat-file --batch' process.
|
|
|
|
If there are items in the queue, and there is currently no item
|
|
currently in 'transit', then pop the first item off the queue,
|
|
and submit it.
|
|
|
|
"""
|
|
if self.queue and self.in_transit is None:
|
|
self.in_transit = self.queue.pop(0)
|
|
print >> self.proc.stdin, self.in_transit[0]
|
|
|
|
def push (self, obj, callback):
|
|
"""Push the given object name onto the queue.
|
|
|
|
The given callback function will at some point in the future
|
|
be called exactly once with the following arguments:
|
|
- self - this GitObjectFetcher instance
|
|
- obj - the object name provided to push()
|
|
- sha1 - the SHA1 of the object, if 'None' obj is missing
|
|
- t - the type of the object (tag/commit/tree/blob)
|
|
- size - the size of the object in bytes
|
|
- data - the object contents
|
|
|
|
"""
|
|
self.queue.append((obj, callback))
|
|
self._submit_next_object() # (Re)start queue processing
|
|
|
|
def process_next_entry (self):
|
|
"""Read the next entry off the queue and invoke callback."""
|
|
obj, cb = self.in_transit
|
|
self.in_transit = None
|
|
header = self.f.readline()
|
|
if header == "%s missing\n" % (obj):
|
|
cb(self, obj, None, None, None, None)
|
|
return
|
|
sha1, t, size = header.split(" ")
|
|
assert len(sha1) == 40
|
|
assert t in ("tag", "commit", "tree", "blob")
|
|
assert size.endswith("\n")
|
|
size = int(size.strip())
|
|
data = self.f.read(size)
|
|
assert self.f.read(1) == "\n"
|
|
cb(self, obj, sha1, t, size, data)
|
|
self._submit_next_object()
|
|
|
|
def process (self):
|
|
"""Process the current queue until empty."""
|
|
while self.in_transit is not None:
|
|
self.process_next_entry()
|
|
|
|
# High-level convenience methods:
|
|
|
|
def get_sha1 (self, objspec):
|
|
"""Return the SHA1 of the object specified by 'objspec'.
|
|
|
|
Return None if 'objspec' does not specify an existing object.
|
|
|
|
"""
|
|
class _ObjHandler(object):
|
|
"""Helper class for getting the returned SHA1."""
|
|
def __init__ (self, parser):
|
|
self.parser = parser
|
|
self.sha1 = None
|
|
|
|
def __call__ (self, parser, obj, sha1, t, size, data):
|
|
# FIXME: Many unused arguments. Could this be cheaper?
|
|
assert parser == self.parser
|
|
self.sha1 = sha1
|
|
|
|
handler = _ObjHandler(self)
|
|
self.push(objspec, handler)
|
|
self.process()
|
|
return handler.sha1
|
|
|
|
def open_obj (self, objspec):
|
|
"""Return a file object wrapping the contents of a named object.
|
|
|
|
The caller is responsible for calling .close() on the returned
|
|
file object.
|
|
|
|
Raise KeyError if 'objspec' does not exist in the repo.
|
|
|
|
"""
|
|
class _ObjHandler(object):
|
|
"""Helper class for parsing the returned git object."""
|
|
def __init__ (self, parser):
|
|
"""Set up helper."""
|
|
self.parser = parser
|
|
self.contents = StringIO()
|
|
self.err = None
|
|
|
|
def __call__ (self, parser, obj, sha1, t, size, data):
|
|
"""Git object callback (see GitObjectFetcher documentation)."""
|
|
assert parser == self.parser
|
|
if not sha1: # Missing object
|
|
self.err = "Missing object '%s'" % obj
|
|
else:
|
|
assert size == len(data)
|
|
self.contents.write(data)
|
|
|
|
handler = _ObjHandler(self)
|
|
self.push(objspec, handler)
|
|
self.process()
|
|
if handler.err:
|
|
raise KeyError(handler.err)
|
|
handler.contents.seek(0)
|
|
return handler.contents
|
|
|
|
def walk_tree (self, tree_objspec, callback, prefix = ""):
|
|
"""Recursively walk the given Git tree object.
|
|
|
|
Recursively walk all subtrees of the given tree object, and
|
|
invoke the given callback passing three arguments:
|
|
(path, mode, data) with the path, permission bits, and contents
|
|
of all the blobs found in the entire tree structure.
|
|
|
|
"""
|
|
class _ObjHandler(object):
|
|
"""Helper class for walking a git tree structure."""
|
|
def __init__ (self, parser, cb, path, mode = None):
|
|
"""Set up helper."""
|
|
self.parser = parser
|
|
self.cb = cb
|
|
self.path = path
|
|
self.mode = mode
|
|
self.err = None
|
|
|
|
def parse_tree (self, treedata):
|
|
"""Parse tree object data, yield tree entries.
|
|
|
|
Each tree entry is a 3-tuple (mode, sha1, path)
|
|
|
|
self.path is prepended to all paths yielded
|
|
from this method.
|
|
|
|
"""
|
|
while treedata:
|
|
mode = int(treedata[:6], 10)
|
|
# Turn 100xxx into xxx
|
|
if mode > 100000:
|
|
mode -= 100000
|
|
assert treedata[6] == " "
|
|
i = treedata.find("\0", 7)
|
|
assert i > 0
|
|
path = treedata[7:i]
|
|
sha1 = hexlify(treedata[i + 1: i + 21])
|
|
yield (mode, sha1, self.path + path)
|
|
treedata = treedata[i + 21:]
|
|
|
|
def __call__ (self, parser, obj, sha1, t, size, data):
|
|
"""Git object callback (see GitObjectFetcher documentation)."""
|
|
assert parser == self.parser
|
|
if not sha1: # Missing object
|
|
self.err = "Missing object '%s'" % (obj)
|
|
return
|
|
assert size == len(data)
|
|
if t == "tree":
|
|
if self.path:
|
|
self.path += "/"
|
|
# Recurse into all blobs and subtrees
|
|
for m, s, p in self.parse_tree(data):
|
|
parser.push(s,
|
|
self.__class__(self.parser, self.cb, p, m))
|
|
elif t == "blob":
|
|
self.cb(self.path, self.mode, data)
|
|
else:
|
|
raise ValueError("Unknown object type '%s'" % (t))
|
|
|
|
self.push(tree_objspec, _ObjHandler(self, callback, prefix))
|
|
self.process()
|
|
|
|
|
|
class GitRefMap(object):
|
|
|
|
"""Map Git ref names to the Git object names they currently point to.
|
|
|
|
Behaves like a dictionary of Git ref names -> Git object names.
|
|
|
|
"""
|
|
|
|
def __init__ (self, obj_fetcher):
|
|
"""Create a new Git ref -> object map."""
|
|
self.obj_fetcher = obj_fetcher
|
|
self._cache = {} # dict: refname -> objname
|
|
|
|
def _load (self, ref):
|
|
"""Retrieve the object currently bound to the given ref.
|
|
|
|
The name of the object pointed to by the given ref is stored
|
|
into this mapping, and also returned.
|
|
|
|
"""
|
|
if ref not in self._cache:
|
|
self._cache[ref] = self.obj_fetcher.get_sha1(ref)
|
|
return self._cache[ref]
|
|
|
|
def __contains__ (self, refname):
|
|
"""Return True if the given refname is present in this cache."""
|
|
return bool(self._load(refname))
|
|
|
|
def __getitem__ (self, refname):
|
|
"""Return the git object name pointed to by the given refname."""
|
|
commit = self._load(refname)
|
|
if commit is None:
|
|
raise KeyError("Unknown ref '%s'" % (refname))
|
|
return commit
|
|
|
|
def get (self, refname, default = None):
|
|
"""Return the git object name pointed to by the given refname."""
|
|
commit = self._load(refname)
|
|
if commit is None:
|
|
return default
|
|
return commit
|
|
|
|
|
|
class GitFICommit(object):
|
|
|
|
"""Encapsulate the data in a Git fast-import commit command."""
|
|
|
|
SHA1RE = re.compile(r'^[0-9a-f]{40}$')
|
|
|
|
@classmethod
|
|
def parse_mode (cls, mode):
|
|
"""Verify the given git file mode, and return it as a string."""
|
|
assert mode in (644, 755, 100644, 100755, 120000)
|
|
return "%i" % (mode)
|
|
|
|
@classmethod
|
|
def parse_objname (cls, objname):
|
|
"""Return the given object name (or mark number) as a string."""
|
|
if isinstance(objname, int): # Object name is a mark number
|
|
assert objname > 0
|
|
return ":%i" % (objname)
|
|
|
|
# No existence check is done, only checks for valid format
|
|
assert cls.SHA1RE.match(objname) # Object name is valid SHA1
|
|
return objname
|
|
|
|
@classmethod
|
|
def quote_path (cls, path):
|
|
"""Return a quoted version of the given path."""
|
|
path = path.replace("\\", "\\\\")
|
|
path = path.replace("\n", "\\n")
|
|
path = path.replace('"', '\\"')
|
|
return '"%s"' % (path)
|
|
|
|
@classmethod
|
|
def parse_path (cls, path):
|
|
"""Verify that the given path is valid, and quote it, if needed."""
|
|
assert not isinstance(path, int) # Cannot be a mark number
|
|
|
|
# These checks verify the rules on the fast-import man page
|
|
assert not path.count("//")
|
|
assert not path.endswith("/")
|
|
assert not path.startswith("/")
|
|
assert not path.count("/./")
|
|
assert not path.count("/../")
|
|
assert not path.endswith("/.")
|
|
assert not path.endswith("/..")
|
|
assert not path.startswith("./")
|
|
assert not path.startswith("../")
|
|
|
|
if path.count('"') + path.count('\n') + path.count('\\'):
|
|
return cls.quote_path(path)
|
|
return path
|
|
|
|
def __init__ (self, name, email, timestamp, timezone, message):
|
|
"""Create a new Git fast-import commit, with the given metadata."""
|
|
self.name = name
|
|
self.email = email
|
|
self.timestamp = timestamp
|
|
self.timezone = timezone
|
|
self.message = message
|
|
self.pathops = [] # List of path operations in this commit
|
|
|
|
def modify (self, mode, blobname, path):
|
|
"""Add a file modification to this Git fast-import commit."""
|
|
self.pathops.append(("M",
|
|
self.parse_mode(mode),
|
|
self.parse_objname(blobname),
|
|
self.parse_path(path)))
|
|
|
|
def delete (self, path):
|
|
"""Add a file deletion to this Git fast-import commit."""
|
|
self.pathops.append(("D", self.parse_path(path)))
|
|
|
|
def copy (self, path, newpath):
|
|
"""Add a file copy to this Git fast-import commit."""
|
|
self.pathops.append(("C",
|
|
self.parse_path(path),
|
|
self.parse_path(newpath)))
|
|
|
|
def rename (self, path, newpath):
|
|
"""Add a file rename to this Git fast-import commit."""
|
|
self.pathops.append(("R",
|
|
self.parse_path(path),
|
|
self.parse_path(newpath)))
|
|
|
|
def note (self, blobname, commit):
|
|
"""Add a note object to this Git fast-import commit."""
|
|
self.pathops.append(("N",
|
|
self.parse_objname(blobname),
|
|
self.parse_objname(commit)))
|
|
|
|
def deleteall (self):
|
|
"""Delete all files in this Git fast-import commit."""
|
|
self.pathops.append("deleteall")
|
|
|
|
|
|
class TestGitFICommit(unittest.TestCase):
|
|
|
|
"""GitFICommit selftests."""
|
|
|
|
def test_basic (self):
|
|
"""GitFICommit basic selftests."""
|
|
|
|
def expect_fail (method, data):
|
|
"""Verify that the method(data) raises an AssertionError."""
|
|
try:
|
|
method(data)
|
|
except AssertionError:
|
|
return
|
|
raise AssertionError("Failed test for invalid data '%s(%s)'" %
|
|
(method.__name__, repr(data)))
|
|
|
|
def test_parse_mode (self):
|
|
"""GitFICommit.parse_mode() selftests."""
|
|
self.assertEqual(GitFICommit.parse_mode(644), "644")
|
|
self.assertEqual(GitFICommit.parse_mode(755), "755")
|
|
self.assertEqual(GitFICommit.parse_mode(100644), "100644")
|
|
self.assertEqual(GitFICommit.parse_mode(100755), "100755")
|
|
self.assertEqual(GitFICommit.parse_mode(120000), "120000")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_mode, 0)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_mode, 123)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_mode, 600)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_mode, "644")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_mode, "abc")
|
|
|
|
def test_parse_objname (self):
|
|
"""GitFICommit.parse_objname() selftests."""
|
|
self.assertEqual(GitFICommit.parse_objname(1), ":1")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_objname, 0)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_objname, -1)
|
|
self.assertEqual(GitFICommit.parse_objname("0123456789" * 4),
|
|
"0123456789" * 4)
|
|
self.assertEqual(GitFICommit.parse_objname("2468abcdef" * 4),
|
|
"2468abcdef" * 4)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_objname,
|
|
"abcdefghij" * 4)
|
|
|
|
def test_parse_path (self):
|
|
"""GitFICommit.parse_path() selftests."""
|
|
self.assertEqual(GitFICommit.parse_path("foo/bar"), "foo/bar")
|
|
self.assertEqual(GitFICommit.parse_path("path/with\n and \" in it"),
|
|
'"path/with\\n and \\" in it"')
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, 1)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, 0)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, -1)
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "foo//bar")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/bar/")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "/foo/bar")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/./bar")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/../bar")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/bar/.")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "foo/bar/..")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "./foo/bar")
|
|
self.assertRaises(AssertionError, GitFICommit.parse_path, "../foo/bar")
|
|
|
|
|
|
class GitFastImport(object):
|
|
|
|
"""Encapsulate communication with git fast-import."""
|
|
|
|
def __init__ (self, f, obj_fetcher, last_mark = 0):
|
|
"""Set up self to communicate with a fast-import process through f."""
|
|
self.f = f # File object where fast-import stream is written
|
|
self.obj_fetcher = obj_fetcher # GitObjectFetcher instance
|
|
self.next_mark = last_mark + 1 # Next mark number
|
|
self.refs = set() # Keep track of the refnames we've seen
|
|
|
|
def comment (self, s):
|
|
"""Write the given comment in the fast-import stream."""
|
|
assert "\n" not in s, "Malformed comment: '%s'" % (s)
|
|
self.f.write("# %s\n" % (s))
|
|
|
|
def commit (self, ref, commitdata):
|
|
"""Make a commit on the given ref, with the given GitFICommit.
|
|
|
|
Return the mark number identifying this commit.
|
|
|
|
"""
|
|
self.f.write("""\
|
|
commit %(ref)s
|
|
mark :%(mark)i
|
|
committer %(name)s <%(email)s> %(timestamp)i %(timezone)s
|
|
data %(msgLength)i
|
|
%(msg)s
|
|
""" % {
|
|
'ref': ref,
|
|
'mark': self.next_mark,
|
|
'name': commitdata.name,
|
|
'email': commitdata.email,
|
|
'timestamp': commitdata.timestamp,
|
|
'timezone': commitdata.timezone,
|
|
'msgLength': len(commitdata.message),
|
|
'msg': commitdata.message,
|
|
})
|
|
|
|
if ref not in self.refs:
|
|
self.refs.add(ref)
|
|
parent = ref + "^0"
|
|
if self.obj_fetcher.get_sha1(parent):
|
|
self.f.write("from %s\n" % (parent))
|
|
|
|
for op in commitdata.pathops:
|
|
self.f.write(" ".join(op))
|
|
self.f.write("\n")
|
|
self.f.write("\n")
|
|
retval = self.next_mark
|
|
self.next_mark += 1
|
|
return retval
|
|
|
|
def blob (self, data):
|
|
"""Import the given blob.
|
|
|
|
Return the mark number identifying this blob.
|
|
|
|
"""
|
|
self.f.write("blob\nmark :%i\ndata %i\n%s\n" %
|
|
(self.next_mark, len(data), data))
|
|
retval = self.next_mark
|
|
self.next_mark += 1
|
|
return retval
|
|
|
|
def reset (self, ref, objname):
|
|
"""Reset the given ref to point at the given Git object."""
|
|
self.f.write("reset %s\nfrom %s\n\n" %
|
|
(ref, GitFICommit.parse_objname(objname)))
|
|
if ref not in self.refs:
|
|
self.refs.add(ref)
|
|
|
|
|
|
class GitNotes(object):
|
|
|
|
"""Encapsulate access to Git notes.
|
|
|
|
Simulates a dictionary of object name (SHA1) -> Git note mappings.
|
|
|
|
"""
|
|
|
|
def __init__ (self, notes_ref, obj_fetcher):
|
|
"""Create a new Git notes interface, bound to the given notes ref."""
|
|
self.notes_ref = notes_ref
|
|
self.obj_fetcher = obj_fetcher # Used to get objects from repo
|
|
self.imports = [] # list: (objname, note data blob name) tuples
|
|
|
|
def __del__ (self):
|
|
"""Verify that self.commit_notes() was called before destruction."""
|
|
if self.imports:
|
|
error("Missing call to self.commit_notes().")
|
|
error("%i notes are not committed!", len(self.imports))
|
|
|
|
def _load (self, objname):
|
|
"""Return the note data associated with the given git object.
|
|
|
|
The note data is returned in string form. If no note is found
|
|
for the given object, None is returned.
|
|
|
|
"""
|
|
try:
|
|
f = self.obj_fetcher.open_obj("%s:%s" % (self.notes_ref, objname))
|
|
ret = f.read()
|
|
f.close()
|
|
except KeyError:
|
|
ret = None
|
|
return ret
|
|
|
|
def __getitem__ (self, objname):
|
|
"""Return the note contents associated with the given object.
|
|
|
|
Raise KeyError if given object has no associated note.
|
|
|
|
"""
|
|
blobdata = self._load(objname)
|
|
if blobdata is None:
|
|
raise KeyError("Object '%s' has no note" % (objname))
|
|
return blobdata
|
|
|
|
def get (self, objname, default = None):
|
|
"""Return the note contents associated with the given object.
|
|
|
|
Return given default if given object has no associated note.
|
|
|
|
"""
|
|
blobdata = self._load(objname)
|
|
if blobdata is None:
|
|
return default
|
|
return blobdata
|
|
|
|
def import_note (self, objname, data, gfi):
|
|
"""Tell git fast-import to store data as a note for objname.
|
|
|
|
This method uses the given GitFastImport object to create a
|
|
blob containing the given note data. Also an entry mapping the
|
|
given object name to the created blob is stored until
|
|
commit_notes() is called.
|
|
|
|
Note that this method only works if it is later followed by a
|
|
call to self.commit_notes() (which produces the note commit
|
|
that refers to the blob produced here).
|
|
|
|
"""
|
|
if not data.endswith("\n"):
|
|
data += "\n"
|
|
gfi.comment("Importing note for object %s" % (objname))
|
|
mark = gfi.blob(data)
|
|
self.imports.append((objname, mark))
|
|
|
|
def commit_notes (self, gfi, author, message):
|
|
"""Produce a git fast-import note commit for the imported notes.
|
|
|
|
This method uses the given GitFastImport object to create a
|
|
commit on the notes ref, introducing the notes previously
|
|
submitted to import_note().
|
|
|
|
"""
|
|
if not self.imports:
|
|
return
|
|
commitdata = GitFICommit(author[0], author[1],
|
|
time.time(), "0000", message)
|
|
for objname, blobname in self.imports:
|
|
assert isinstance(objname, int) and objname > 0
|
|
assert isinstance(blobname, int) and blobname > 0
|
|
commitdata.note(blobname, objname)
|
|
gfi.commit(self.notes_ref, commitdata)
|
|
self.imports = []
|
|
|
|
|
|
class GitCachedNotes(GitNotes):
|
|
|
|
"""Encapsulate access to Git notes (cached version).
|
|
|
|
Only use this class if no caching is done at a higher level.
|
|
|
|
Simulates a dictionary of object name (SHA1) -> Git note mappings.
|
|
|
|
"""
|
|
|
|
def __init__ (self, notes_ref, obj_fetcher):
|
|
"""Set up a caching wrapper around GitNotes."""
|
|
GitNotes.__init__(self, notes_ref, obj_fetcher)
|
|
self._cache = {} # Cache: object name -> note data
|
|
|
|
def __del__ (self):
|
|
"""Verify that GitNotes' destructor is called."""
|
|
GitNotes.__del__(self)
|
|
|
|
def _load (self, objname):
|
|
"""Extend GitNotes._load() with a local objname -> note cache."""
|
|
if objname not in self._cache:
|
|
self._cache[objname] = GitNotes._load(self, objname)
|
|
return self._cache[objname]
|
|
|
|
def import_note (self, objname, data, gfi):
|
|
"""Extend GitNotes.import_note() with a local objname -> note cache."""
|
|
if not data.endswith("\n"):
|
|
data += "\n"
|
|
assert objname not in self._cache
|
|
self._cache[objname] = data
|
|
GitNotes.import_note(self, objname, data, gfi)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|