root/trunk/cheesecake/util.py

Revision 186, 6.6 kB (checked in by mk, 6 years ago)

Fix shutil.rmtree issue on Windows (closes #49).

  • Property svn:executable set to *
Line 
1 """Utility functions for Cheesecake project.
2 """
3
4 import os
5 import shutil
6 import signal
7 import stat
8 import sys
9 import tarfile
10 import time
11 import zipfile
12
13 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
14
15 PAD_TEXT = 40
16 PAD_VALUE = 4
17
18 def run_cmd(cmd, env=None, max_timeout=None):
19     """Run command and return its return code and its output.
20
21     >>> run_cmd('/bin/true')
22     (0, '')
23
24     >>> run_cmd('/bin/cat', max_timeout=0.2)
25     (1, 'Time exceeded')
26     """
27     arglist = cmd.split()
28
29     output = os.tmpfile()
30     try:
31         p = Popen(arglist, stdout=output, stderr=STDOUT, env=env)
32     except Exception, e:
33         return 1, e
34
35     # Wait only max_timeout seconds.
36     if max_timeout:
37         start = time.time()
38         while p.poll() is None:
39             time.sleep(0.1)
40             if time.time() - start > max_timeout:
41                 os.kill(p.pid, signal.SIGINT)
42                 p.wait()
43                 return 1, "Time exceeded"
44
45     p.wait()
46     output.seek(0)
47     return p.returncode, output.read()
48
49 def command_successful(cmd):
50     """Returns True if command exited normally, False otherwise.
51
52     >>> command_successful('/bin/true')
53     True
54     >>> command_successful('this-command-doesnt-exist')
55     False
56     """
57     rc, output = run_cmd(cmd)
58     return rc == 0
59
60 class StdoutRedirector(object):
61     """Redirect stdout to a temporary file.
62     """
63     def __init__(self, filename=None):
64         if filename:
65             self.fh = open(filename, 'w')
66         else:
67             self.fh = os.tmpfile()
68
69     def write(self, buf):
70         self.fh.write(buf)
71
72     def flush(self):
73         self.fh.flush()
74
75     def read_buffer(self):
76         """Return contents of the temporary file.
77         """
78         self.fh.seek(0)
79         return self.fh.read()
80
81 def pad_with_dots(msg, length=PAD_TEXT):
82     """Pad text with dots up to given length.
83
84     >>> pad_with_dots("Hello world", 20)
85     'Hello world ........'
86     >>> pad_with_dots("Exceeding length", 10)
87     'Exceeding length'
88     """
89     msg_length = len(msg)
90
91     if msg_length >= length:
92         return msg
93
94     msg = msg + " "
95     for i in range(msg_length+1, length):
96         msg += "."
97     return msg
98
99 def pad_left_spaces(value, length=PAD_VALUE):
100     """Pad value with spaces at left up to given length.
101
102     >>> pad_left_spaces(15, 4)
103     '  15'
104     >>> pad_left_spaces(123456, 2)
105     '123456'
106     >>> len(pad_left_spaces("")) == PAD_VALUE
107     True
108     """
109     if not isinstance(value, basestring):
110         value = str(value)
111     diff = length - len(value)
112     return " " * diff + value
113
114 def pad_right_spaces(value, length=PAD_VALUE):
115     """Pad value with spaces at left up to given length.
116
117     >>> pad_right_spaces(123, 5)
118     '123  '
119     >>> pad_right_spaces(12.1, 5)
120     '12.1 '
121     """
122     if not isinstance(value, basestring):
123         value = str(value)
124     diff = length - len(value)
125     return value + " " * diff
126
127 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
128     """Pad message with dots and pad value with spaces.
129
130     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
131     '123456 ...  77'
132     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
133     u'123 . 45'
134     """
135     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
136
137 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
138     """Return line consisting of 'char' characters.
139
140     >>> pad_line('*', 3)
141     '***'
142     >>> pad_line(length=10)
143     '=========='
144     """
145     return char * length
146
147 def unzip_package(package, destination):
148     """Unzip given `package` to the `destination` directory.
149
150     Return name of unpacked directory or None on error.
151     """
152     try:
153         z = zipfile.ZipFile(package)
154     except zipfile.error:
155         return None
156
157     # Get directory structure from zip and create it in destination directory.
158     for name in z.namelist():
159         (dir, file) = os.path.split(name)
160         unpack_dir = dir
161         target_dir = os.path.join(destination, dir)
162         if not os.path.exists(target_dir):
163             os.makedirs(target_dir)
164
165     # Extract files to directory structure
166     for i, name in enumerate(z.namelist()):
167         if not name.endswith('/'):
168             outfile = open(os.path.join(destination, name), 'wb')
169             outfile.write(z.read(name))
170             outfile.flush()
171             outfile.close()
172
173     return unpack_dir.split(os.sep)[0]
174
175 def untar_package(package, destination):
176     """Untar given `package` to the `destination` directory.
177
178     Return name of unpacked directory or None on error.
179     """
180     try:
181         t = tarfile.open(package)
182     except tarfile.ReadError, e:
183         return None
184
185     for member in t.getmembers():
186         t.extract(member, destination)
187
188     tarinfo = t.members[0]
189     ## GG: os.sep is \\ when running cheesecake on Windows
190     ## while inside the tar.gz the separator is /
191     ## We use / because most tar.gz archives are created on *nix
192     ##return tarinfo.name.split(os.sep)[0]
193     return tarinfo.name.split('/')[0]
194
195 def unegg_package(package, destination):
196     """Unpack given egg to the `destination` directory.
197
198     Return name of unpacked directory or None on error.
199     """
200     if os.path.isdir(package):
201         package_name = os.path.basename(package)
202         destination = os.path.join(destination, package_name)
203         shutil.copytree(package, destination, symlinks=True)
204         return package_name
205     else:
206         return unzip_package(package, destination)
207
208 def mkdirs(dir):
209     """Make directory with parent directories as needed.
210
211     Don't throw an exception if directory exists.
212     """
213     parts = dir.split(os.path.sep)
214     for length in xrange(1, len(parts)+1):
215         path = os.path.sep.join([''] + parts[:length])
216         if not os.path.exists(path):
217             os.mkdir(path)
218
219 def time_function(function):
220     """Measure function execution time.
221
222     Return (return value, time taken) tuple.
223
224     >>> def fun(x):
225     ...     return x*2
226     >>> ret, time_taken = time_function(lambda: fun(5))
227     >>> ret
228     10
229     """
230     start = time.time()
231     ret = function()
232     end = time.time()
233     return ret, end-start
234
235 def rmtree(topdir):
236     """Remove the whole directory tree (including subdirectories).
237
238     Works around some Windows-specific behaviour of shutil.rmtree.
239     """
240     all_privileges = stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC
241
242     # Make all files and diretories writable.
243     os.chmod(topdir, all_privileges)
244     for root, dirnames, filenames in os.walk(topdir):
245         for name in dirnames + filenames:
246             try:
247                 os.chmod(os.path.join(root, name), all_privileges)
248             except OSError:
249                 pass
250
251     shutil.rmtree(topdir, ignore_errors=True)
252
Note: See TracBrowser for help on using the browser.