root/trunk/cheesecake/util.py

Revision 193, 7.1 kB (checked in by mk, 6 years ago)

Remember to always close the temporary file in run_cmd.

  • Property svn:executable set to *
Line 
1 """Utility functions for Cheesecake project.
2 """
3
4 import os
5 import shutil
6 import signal
7 import stat
8 import sys
9 import tarfile
10 import tempfile
11 import time
12 import zipfile
13
14 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
15
16 PAD_TEXT = 40
17 PAD_VALUE = 4
18
19 def make_temp_file():
20     tmpfd, tmpname = tempfile.mkstemp()
21     return os.fdopen(tmpfd, 'w+'), tmpname
22
23 def run_cmd(cmd, env=None, max_timeout=None):
24     """Run command and return its return code and its output.
25
26     >>> run_cmd('/bin/true')
27     (0, '')
28
29     >>> run_cmd('/bin/cat', max_timeout=0.2)
30     (1, 'Time exceeded')
31     """
32     arglist = cmd.split()
33     output, output_name = make_temp_file()
34
35     try:
36         try:
37             p = Popen(arglist, stdout=output, stderr=STDOUT, env=env)
38
39             # Wait only max_timeout seconds.
40             if max_timeout:
41                 start = time.time()
42                 while p.poll() is None:
43                     time.sleep(0.1)
44                     if time.time() - start > max_timeout:
45                         os.kill(p.pid, signal.SIGINT)
46                         p.wait()
47                         return 1, "Time exceeded"
48
49             p.wait()
50             output.seek(0)
51
52             output_content = output.read()
53
54             return p.returncode, output_content
55         except Exception, e:
56             return 1, e
57     finally:
58         output.close()
59         os.unlink(output_name)
60
61 def command_successful(cmd):
62     """Returns True if command exited normally, False otherwise.
63
64     >>> command_successful('/bin/true')
65     True
66     >>> command_successful('this-command-doesnt-exist')
67     False
68     """
69     rc, output = run_cmd(cmd)
70     return rc == 0
71
72 class StdoutRedirector(object):
73     """Redirect stdout to a temporary file.
74     """
75     def __init__(self, filename=None):
76         if filename:
77             self.fh = open(filename, 'w')
78             self.fname = None
79         else:
80             self.fh, self.fname = make_temp_file()
81
82     def write(self, buf):
83         self.fh.write(buf)
84
85     def flush(self):
86         self.fh.flush()
87
88     def read_buffer(self):
89         """Return contents of the temporary file.
90         """
91         self.fh.seek(0)
92
93         output = self.fh.read()
94         self.fh.close()
95         if self.fname:
96             os.unlink(self.fname)
97
98         return output
99
100 def pad_with_dots(msg, length=PAD_TEXT):
101     """Pad text with dots up to given length.
102
103     >>> pad_with_dots("Hello world", 20)
104     'Hello world ........'
105     >>> pad_with_dots("Exceeding length", 10)
106     'Exceeding length'
107     """
108     msg_length = len(msg)
109
110     if msg_length >= length:
111         return msg
112
113     msg = msg + " "
114     for i in range(msg_length+1, length):
115         msg += "."
116     return msg
117
118 def pad_left_spaces(value, length=PAD_VALUE):
119     """Pad value with spaces at left up to given length.
120
121     >>> pad_left_spaces(15, 4)
122     '  15'
123     >>> pad_left_spaces(123456, 2)
124     '123456'
125     >>> len(pad_left_spaces("")) == PAD_VALUE
126     True
127     """
128     if not isinstance(value, basestring):
129         value = str(value)
130     diff = length - len(value)
131     return " " * diff + value
132
133 def pad_right_spaces(value, length=PAD_VALUE):
134     """Pad value with spaces at left up to given length.
135
136     >>> pad_right_spaces(123, 5)
137     '123  '
138     >>> pad_right_spaces(12.1, 5)
139     '12.1 '
140     """
141     if not isinstance(value, basestring):
142         value = str(value)
143     diff = length - len(value)
144     return value + " " * diff
145
146 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
147     """Pad message with dots and pad value with spaces.
148
149     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
150     '123456 ...  77'
151     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
152     u'123 . 45'
153     """
154     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
155
156 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
157     """Return line consisting of 'char' characters.
158
159     >>> pad_line('*', 3)
160     '***'
161     >>> pad_line(length=10)
162     '=========='
163     """
164     return char * length
165
166 def unzip_package(package, destination):
167     """Unzip given `package` to the `destination` directory.
168
169     Return name of unpacked directory or None on error.
170     """
171     try:
172         z = zipfile.ZipFile(package)
173     except zipfile.error:
174         return None
175
176     # Get directory structure from zip and create it in destination directory.
177     for name in z.namelist():
178         (dir, file) = os.path.split(name)
179         unpack_dir = dir
180         target_dir = os.path.join(destination, dir)
181         if not os.path.exists(target_dir):
182             os.makedirs(target_dir)
183
184     # Extract files to directory structure
185     for i, name in enumerate(z.namelist()):
186         if not name.endswith('/'):
187             outfile = open(os.path.join(destination, name), 'wb')
188             outfile.write(z.read(name))
189             outfile.flush()
190             outfile.close()
191
192     return unpack_dir.split(os.sep)[0]
193
194 def untar_package(package, destination):
195     """Untar given `package` to the `destination` directory.
196
197     Return name of unpacked directory or None on error.
198     """
199     try:
200         t = tarfile.open(package)
201     except tarfile.ReadError, e:
202         return None
203
204     for member in t.getmembers():
205         t.extract(member, destination)
206
207     tarinfo = t.members[0]
208     t.close()
209
210     ## GG: os.sep is \\ when running cheesecake on Windows
211     ## while inside the tar.gz the separator is /
212     ## We use / because most tar.gz archives are created on *nix
213     ##return tarinfo.name.split(os.sep)[0]
214     return tarinfo.name.split('/')[0]
215
216 def unegg_package(package, destination):
217     """Unpack given egg to the `destination` directory.
218
219     Return name of unpacked directory or None on error.
220     """
221     if os.path.isdir(package):
222         package_name = os.path.basename(package)
223         destination = os.path.join(destination, package_name)
224         shutil.copytree(package, destination, symlinks=True)
225         return package_name
226     else:
227         return unzip_package(package, destination)
228
229 def mkdirs(dir):
230     """Make directory with parent directories as needed.
231
232     Don't throw an exception if directory exists.
233     """
234     parts = dir.split(os.path.sep)
235     for length in xrange(1, len(parts)+1):
236         path = os.path.sep.join([''] + parts[:length])
237         if not os.path.exists(path):
238             os.mkdir(path)
239
240 def time_function(function):
241     """Measure function execution time.
242
243     Return (return value, time taken) tuple.
244
245     >>> def fun(x):
246     ...     return x*2
247     >>> ret, time_taken = time_function(lambda: fun(5))
248     >>> ret
249     10
250     """
251     start = time.time()
252     ret = function()
253     end = time.time()
254     return ret, end-start
255
256 def rmtree(topdir):
257     """Remove the whole directory tree (including subdirectories).
258
259     Works around some Windows-specific behaviour of shutil.rmtree.
260     """
261     all_privileges = stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC
262
263     # Make all files and diretories writable.
264     os.chmod(topdir, all_privileges)
265     for root, dirnames, filenames in os.walk(topdir):
266         for name in dirnames + filenames:
267             try:
268                 os.chmod(os.path.join(root, name), all_privileges)
269             except OSError:
270                 pass
271
272     shutil.rmtree(topdir, ignore_errors=True)
273
Note: See TracBrowser for help on using the browser.