📜 ⬆️ ⬇️

Getting a list of files in the remote repository

Somehow I needed to view a list of files in a remote repository. At the same time, I didn’t really want to clone it. An Internet search was expected to give many answers like "this is impossible, make a clone." And I just need to make sure that through some link there is a repository corresponding to some archive with source codes. Since “some link” is on the page with a description of the contents of this archive (more precisely, additions to this archive), it seemed to me sufficient to compare only the list of files. How to be?
Of course, Mercurial doesn’t offer virtually any options for working with a remote repository. More precisely, you can do push and pull (well, clone as a special case of the latter). But is it possible to make a pull without affecting the file system? Answer: it is possible, hg incoming will help us here. Actually, the algorithm works as follows:
  1. Create a new empty repository somewhere. In an empty repository, you can pull from any repository.
  2. Using hg incoming to get a list of changes. Since hg incoming uses the same functions as hg log , we are not limited in the possibilities of changing its output. In particular, you can get a list of all files modified in each of the revisions, or even the changes themselves in the format of unified diff (with git extensions for binary files). Diff we do not need, but a list of all the modified files is useful.
  3. Since we get all the revisions, along the way we can add a list of children to each change in addition to the list of birthmarks. The absence of children who are not the ancestors of the revision, the list of files in which we are interested, we do not care.
  4. Mercurial has one revision, which is necessarily present in any repository and is the only one that really has no parents: -1:0000000000000000000000000000000000000000 . This is a good starting point.
    Starting with this revision, we find the list of files in all other revisions (the list of files in the initial revision is known: it is empty). For this
    1. For each revision, except the initial one, we take a list of files from the first parent. Audits cost from parents to children.
    2. Add to this list a list of added files (you will receive it if you use hg incoming --style xml --verbose : in the paths tag).
    3. Remove from this list a list of deleted files (it turns out there).

  5. Now we find a revision that does not have a single descendant. This will be the revision requested using hg incoming --rev revspec . Having found this revision, we will display a list of files in it.


I note that hg incoming output with the default format cannot be used for such an operation. You must either write your template with {file_adds} , {file_mods} and {file_dels} , or take ready: --style xml . The key - --template will not help you here. Writing your own format will greatly reduce the code compared to using the sax parser for XML, but I preferred to --style xml .

Actually, the code itself
 #!/usr/bin/env python # vim: fileencoding=utf-8 from __future__ import unicode_literals, division from xml import sax from subprocess import check_call, Popen, PIPE from shutil import rmtree from tempfile import mkdtemp class MercurialRevision(object): __slots__ = ('rev', 'hex', 'tags', 'bookmarks', 'branch', 'parents', 'children', 'added', 'removed', 'modified', 'copies', 'files',) def __init__(self, rev, hex): self.rev = rev self.hex = hex self.parents = [] self.children = [] self.added = set() self.removed = set() self.modified = set() self.copies = {} self.tags = set() self.bookmarks = set() self.branch = None self.files = set() def __str__(self): return '<revision>'.format(hex=self.hex, rev=self.rev) def __repr__(self): return '{0}({rev!r}, {hex!r})'.format(self.__class__.__name__, hex=self.hex, rev=self.rev) def __hash__(self): return int(self.hex, 16) class MercurialHandler(sax.handler.ContentHandler): def startDocument(self): self.curpath = [] self.currev = None nullrev = MercurialRevision(-1, '0' * 40) self.revisions_rev = {nullrev.rev : nullrev} self.revisions_hex = {nullrev.hex : nullrev} self.tags = {} self.bookmarks = {} self.characters_fun = None self.last_data = None def add_tag(self, tag): self.currev.tags.add(tag) self.tags[tag] = self.currev def add_bookmark(self, bookmark): self.currev.bookmarks.add(bookmark) self.bookmarks[bookmark] = self.currev def characters(self, data): if self.characters_fun: if not self.last_data: self.last_data = data else: self.last_data += data def startElement(self, name, attributes): if name == 'log': assert not self.curpath assert not self.currev elif name == 'logentry': assert self.curpath == ['log'] assert not self.currev self.currev = MercurialRevision(int(attributes['revision']), attributes['node']) else: assert self.currev if name == 'tag': assert self.curpath[-1] == 'logentry' self.characters_fun = self.add_tag elif name == 'bookmark': assert self.curpath[-1] == 'logentry' self.characters_fun = self.add_bookmark elif name == 'parent': assert self.curpath[-1] == 'logentry' self.currev.parents.append(self.revisions_hex[attributes['node']]) elif name == 'branch': assert self.curpath[-1] == 'logentry' self.characters_fun = lambda branch: self.currev.__setattr__('branch', branch) elif name == 'path': assert self.curpath[-1] == 'paths' if attributes['action'] == 'M': self.characters_fun = self.currev.modified.add elif attributes['action'] == 'A': self.characters_fun = self.currev.added.add elif attributes['action'] == 'R': self.characters_fun = self.currev.removed.add elif name == 'copy': assert self.curpath[-1] == 'copies' self.characters_fun = (lambda destination, source=attributes['source']: self.currev.copies.__setitem__(source, destination)) self.curpath.append(name) def endElement(self, name): assert self.curpath or self.curpath[-1] == ['log'] assert self.curpath[-1] == name if name == 'logentry': if not self.currev.parents: self.currev.parents.append(self.revisions_rev[self.currev.rev - 1]) for parent in self.currev.parents: parent.children.append(self.currev) self.revisions_hex[self.currev.hex] = self.currev self.revisions_rev[self.currev.rev] = self.currev self.currev = None if self.last_data is None: if self.characters_fun: self.characters_fun('') else: assert self.characters_fun self.characters_fun(self.last_data) self.characters_fun = None self.last_data = None self.curpath.pop() def export_result(self): heads = {revision for revision in self.revisions_hex.values() if not revision.children or all(child.branch != revision.branch for child in revision.children)} # heads contains the same revisions as `hg heads --closed` tips = {head for head in heads if not head.children} return { 'heads': heads, 'tips': tips, 'tags': self.tags, 'bookmarks': self.bookmarks, 'revisions_hex': self.revisions_hex, 'revisions_rev': self.revisions_rev, 'root': self.revisions_rev[-1], } class MercurialRemoteParser(object): __slots__ = ('parser', 'handler', 'tmpdir') def __init__(self, tmpdir=None): self.parser = sax.make_parser() self.handler = MercurialHandler() self.parser.setContentHandler(self.handler) self.tmpdir = tmpdir or mkdtemp(suffix='.hg') self.init_tmpdir() def init_tmpdir(self): check_call(['hg', 'init', self.tmpdir]) def delete_tmpdir(self): if self.tmpdir and rmtree: rmtree(self.tmpdir) __del__ = delete_tmpdir def __enter__(self): return self def __exit__(self, *args, **kwargs): self.delete_tmpdir() @staticmethod def generate_files(parsing_result): toprocess = [parsing_result['root']] processed = set() while toprocess: revision = toprocess.pop(0) if revision.parents: # Inherit files from the first parent assert not revision.files if revision.parents[0] not in processed: assert toprocess toprocess.append(revision) continue revision.files.update(revision.parents[0].files) # Then apply delta found in log assert not (revision.files & revision.added) revision.files.update(revision.added) assert revision.files >= revision.removed revision.files -= revision.removed assert revision.files >= revision.modified, ( 'Expected to find the following files: ' + ','.join( file for file in revision.modified if not file in revision.files)) processed.add(revision) toprocess.extend(child for child in revision.children if not child in processed and not child in toprocess) assert set(parsing_result['revisions_rev'].values()) == processed return parsing_result def parse_url(self, url, rev_name=None): p = Popen(['hg', '--repository', self.tmpdir, 'incoming', '--style', 'xml', '--verbose', url, ] + (['--rev', rev_name] if rev_name else []), stdout=PIPE) p.stdout.readline() # Skip “comparing with {url}” header self.parser.parse(p.stdout) parsing_result = self.handler.export_result() self.generate_files(parsing_result) return parsing_result if __name__ == '__main__': import sys def print_files(revision): for file in revision.files: print file remote_url = sys.argv[1] rev_name = sys.argv[2] with MercurialRemoteParser() as remote_parser: parsing_result = remote_parser.parse_url(remote_url, rev_name=rev_name) assert len(parsing_result['tips']) == 1, 'Found more then one head' print_files(next(iter(parsing_result['tips']))) # vim: tw=100 ft=python ts=4 sts=4 sw=4 </revision> 

')
Using: python -O list_hg_files.py bitbucket.org/ZyX_I/aurum tip python -O list_hg_files.py bitbucket.org/ZyX_I/aurum tip . Both arguments (remote repository URL and revision notation) are required.

Source: https://habr.com/ru/post/197312/


All Articles