root/branches/mk/cheesecake/util.py

Revision 98, 4.7 kB (checked in by mk, 7 years ago)

Added missing unit tests for command_successful, run_cmd and Index.repr.

  • Property svn:executable set to *
Line 
1 import os
2 import shutil
3 import sys
4 import tarfile
5 import zipfile
6
7 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
8
9 PAD_TEXT = 40
10 PAD_VALUE = 4
11
12 def run_cmd(cmd, env=None):
13     """Run command and return its return code and its output.
14
15     >>> run_cmd('/bin/true')
16     (0, '')
17     """
18     arglist = cmd.split()
19     try:
20         p = Popen(arglist, stdout=PIPE, stderr=STDOUT, env=env)
21     except ProcessError, e:
22         return 1, e
23     output = p.communicate()[0]
24     return p.returncode, output
25
26 def command_successful(cmd):
27     """Returns True if command exited normally, False otherwise.
28
29     >>> command_successful('/bin/true')
30     True
31     >>> command_successful('this-command-doesnt-exist')
32     False
33     """
34     rc, output = run_cmd(cmd)
35     if rc:
36         return False
37     return True
38
39 class StdoutRedirector(object):
40     """Redirect stdout to a temporary file.
41     """
42     def __init__(self, filename=None):
43         if filename:
44             self.fh = open(filename, 'w')
45         else:
46             self.fh = os.tmpfile()
47
48     def write(self, buf):
49         self.fh.write(buf)
50
51     def flush(self):
52         self.fh.flush()
53
54     def read_buffer(self):
55         """Return contents of the temporary file.
56         """
57         self.fh.seek(0)
58         return self.fh.read()
59
60 def pad_with_dots(msg, length=PAD_TEXT):
61     """Pad text with dots up to given length.
62     """
63     length = len(msg)
64     msg = msg + " "
65     for i in range(length, PAD_TEXT):
66         msg += "."
67     return msg
68
69 def pad_left_spaces(value, length=PAD_VALUE):
70     """Pad value with spaces at left up to given length.
71
72     >>> pad_left_spaces(15, 4)
73     '  15'
74     >>> pad_left_spaces(123456, 2)
75     '123456'
76     >>> len(pad_left_spaces("")) == PAD_VALUE
77     True
78     """
79     if not isinstance(value, basestring):
80         value = str(value)
81     diff = length - len(value)
82     return " " * diff + value
83
84 def pad_right_spaces(value, length=PAD_VALUE):
85     """Pad value with spaces at left up to given length.
86
87     >>> pad_right_spaces(123, 5)
88     '123  '
89     >>> pad_right_spaces(12.1, 5)
90     '12.1 '
91     """
92     if not isinstance(value, basestring):
93         value = str(value)
94     diff = length - len(value)
95     return value + " " * diff
96
97 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
98     """Pad message with dots and pad value with spaces.
99
100     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
101     '123456 ...  77'
102     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
103     u'123 . 45'
104     """
105     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
106
107 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
108     """Return line consisting of 'char' characters.
109     """
110     return char * length
111
112 def unzip_package(package, destination):
113     """Unzip given `package` to the `destination` directory.
114
115     Return name of unpacked directory or None on error.
116     """
117     try:
118         z = zipfile.ZipFile(package)
119     except zipfile.error:
120         return None
121
122     # Get directory structure from zip and create it in destination directory.
123     for name in z.namelist():
124         (dir, file) = os.path.split(name)
125         unpack_dir = dir
126         target_dir = os.path.join(destination, dir)
127         if not os.path.exists(target_dir):
128             os.makedirs(target_dir)
129
130     # Extract files to directory structure
131     for i, name in enumerate(z.namelist()):
132         if not name.endswith('/'):
133             outfile = open(os.path.join(destination, name), 'wb')
134             outfile.write(z.read(name))
135             outfile.flush()
136             outfile.close()
137
138     return unpack_dir.split(os.sep)[0]
139
140 def untar_package(package, destination):
141     """Untar given `package` to the `destination` directory.
142
143     Return name of unpacked directory or None on error.
144     """
145     try:
146         t = tarfile.open(package)
147     except tarfile.ReadError, e:
148         return None
149
150     for member in t.getmembers():
151         t.extract(member, destination)
152
153     tarinfo = t.members[0]
154     return tarinfo.name.split(os.sep)[0]
155
156 def unegg_package(package, destination):
157     """Unpack given egg to the `destination` directory.
158
159     Return name of unpacked directory or None on error.
160     """
161     if os.path.isdir(package):
162         package_name = os.path.basename(package)
163         destination = os.path.join(destination, package_name)
164         shutil.copytree(package, destination, symlinks=True)
165         return package_name
166     else:
167         return unzip_package(package, destination)
168
169 def mkdirs(dir):
170     """Make directory with parent directories as needed.
171
172     Don't throw an exception if directory exists.
173     """
174     parts = dir.split(os.path.sep)
175     for length in xrange(1, len(parts)+1):
176         path = os.path.sep.join([''] + parts[:length])
177         if not os.path.exists(path):
178             os.mkdir(path)
Note: See TracBrowser for help on using the browser.