root/trunk/cheesecake/util.py

Revision 187, 6.9 kB (checked in by mk, 6 years ago)

Don't use os.tmpfile().

  • Property svn:executable set to *
Line 
1 """Utility functions for Cheesecake project.
2 """
3
4 import os
5 import shutil
6 import signal
7 import stat
8 import sys
9 import tarfile
10 import tempfile
11 import time
12 import zipfile
13
14 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
15
16 PAD_TEXT = 40
17 PAD_VALUE = 4
18
19 def make_temp_file():
20     tmpfd, tmpname = tempfile.mkstemp()
21     return os.fdopen(tmpfd), tmpname
22
23 def run_cmd(cmd, env=None, max_timeout=None):
24     """Run command and return its return code and its output.
25
26     >>> run_cmd('/bin/true')
27     (0, '')
28
29     >>> run_cmd('/bin/cat', max_timeout=0.2)
30     (1, 'Time exceeded')
31     """
32     arglist = cmd.split()
33
34     output, output_name = make_temp_file()
35     try:
36         p = Popen(arglist, stdout=output, stderr=STDOUT, env=env)
37     except Exception, e:
38         return 1, e
39
40     # Wait only max_timeout seconds.
41     if max_timeout:
42         start = time.time()
43         while p.poll() is None:
44             time.sleep(0.1)
45             if time.time() - start > max_timeout:
46                 os.kill(p.pid, signal.SIGINT)
47                 p.wait()
48                 return 1, "Time exceeded"
49
50     p.wait()
51     output.seek(0)
52
53     output_content = output.read()
54     output.close()
55     os.unlink(output_name)
56
57     return p.returncode, output_content
58
59 def command_successful(cmd):
60     """Returns True if command exited normally, False otherwise.
61
62     >>> command_successful('/bin/true')
63     True
64     >>> command_successful('this-command-doesnt-exist')
65     False
66     """
67     rc, output = run_cmd(cmd)
68     return rc == 0
69
70 class StdoutRedirector(object):
71     """Redirect stdout to a temporary file.
72     """
73     def __init__(self, filename=None):
74         if filename:
75             self.fh = open(filename, 'w')
76             self.fname = None
77         else:
78             self.fh, self.fname = make_temp_file()
79
80     def write(self, buf):
81         self.fh.write(buf)
82
83     def flush(self):
84         self.fh.flush()
85
86     def read_buffer(self):
87         """Return contents of the temporary file.
88         """
89         self.fh.seek(0)
90
91         output = self.fh.read()
92         self.fh.close()
93         if self.fname:
94             os.unlink(self.fname)
95
96         return output
97
98 def pad_with_dots(msg, length=PAD_TEXT):
99     """Pad text with dots up to given length.
100
101     >>> pad_with_dots("Hello world", 20)
102     'Hello world ........'
103     >>> pad_with_dots("Exceeding length", 10)
104     'Exceeding length'
105     """
106     msg_length = len(msg)
107
108     if msg_length >= length:
109         return msg
110
111     msg = msg + " "
112     for i in range(msg_length+1, length):
113         msg += "."
114     return msg
115
116 def pad_left_spaces(value, length=PAD_VALUE):
117     """Pad value with spaces at left up to given length.
118
119     >>> pad_left_spaces(15, 4)
120     '  15'
121     >>> pad_left_spaces(123456, 2)
122     '123456'
123     >>> len(pad_left_spaces("")) == PAD_VALUE
124     True
125     """
126     if not isinstance(value, basestring):
127         value = str(value)
128     diff = length - len(value)
129     return " " * diff + value
130
131 def pad_right_spaces(value, length=PAD_VALUE):
132     """Pad value with spaces at left up to given length.
133
134     >>> pad_right_spaces(123, 5)
135     '123  '
136     >>> pad_right_spaces(12.1, 5)
137     '12.1 '
138     """
139     if not isinstance(value, basestring):
140         value = str(value)
141     diff = length - len(value)
142     return value + " " * diff
143
144 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
145     """Pad message with dots and pad value with spaces.
146
147     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
148     '123456 ...  77'
149     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
150     u'123 . 45'
151     """
152     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
153
154 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
155     """Return line consisting of 'char' characters.
156
157     >>> pad_line('*', 3)
158     '***'
159     >>> pad_line(length=10)
160     '=========='
161     """
162     return char * length
163
164 def unzip_package(package, destination):
165     """Unzip given `package` to the `destination` directory.
166
167     Return name of unpacked directory or None on error.
168     """
169     try:
170         z = zipfile.ZipFile(package)
171     except zipfile.error:
172         return None
173
174     # Get directory structure from zip and create it in destination directory.
175     for name in z.namelist():
176         (dir, file) = os.path.split(name)
177         unpack_dir = dir
178         target_dir = os.path.join(destination, dir)
179         if not os.path.exists(target_dir):
180             os.makedirs(target_dir)
181
182     # Extract files to directory structure
183     for i, name in enumerate(z.namelist()):
184         if not name.endswith('/'):
185             outfile = open(os.path.join(destination, name), 'wb')
186             outfile.write(z.read(name))
187             outfile.flush()
188             outfile.close()
189
190     return unpack_dir.split(os.sep)[0]
191
192 def untar_package(package, destination):
193     """Untar given `package` to the `destination` directory.
194
195     Return name of unpacked directory or None on error.
196     """
197     try:
198         t = tarfile.open(package)
199     except tarfile.ReadError, e:
200         return None
201
202     for member in t.getmembers():
203         t.extract(member, destination)
204
205     tarinfo = t.members[0]
206     ## GG: os.sep is \\ when running cheesecake on Windows
207     ## while inside the tar.gz the separator is /
208     ## We use / because most tar.gz archives are created on *nix
209     ##return tarinfo.name.split(os.sep)[0]
210     return tarinfo.name.split('/')[0]
211
212 def unegg_package(package, destination):
213     """Unpack given egg to the `destination` directory.
214
215     Return name of unpacked directory or None on error.
216     """
217     if os.path.isdir(package):
218         package_name = os.path.basename(package)
219         destination = os.path.join(destination, package_name)
220         shutil.copytree(package, destination, symlinks=True)
221         return package_name
222     else:
223         return unzip_package(package, destination)
224
225 def mkdirs(dir):
226     """Make directory with parent directories as needed.
227
228     Don't throw an exception if directory exists.
229     """
230     parts = dir.split(os.path.sep)
231     for length in xrange(1, len(parts)+1):
232         path = os.path.sep.join([''] + parts[:length])
233         if not os.path.exists(path):
234             os.mkdir(path)
235
236 def time_function(function):
237     """Measure function execution time.
238
239     Return (return value, time taken) tuple.
240
241     >>> def fun(x):
242     ...     return x*2
243     >>> ret, time_taken = time_function(lambda: fun(5))
244     >>> ret
245     10
246     """
247     start = time.time()
248     ret = function()
249     end = time.time()
250     return ret, end-start
251
252 def rmtree(topdir):
253     """Remove the whole directory tree (including subdirectories).
254
255     Works around some Windows-specific behaviour of shutil.rmtree.
256     """
257     all_privileges = stat.S_IREAD | stat.S_IWRITE | stat.S_IEXEC
258
259     # Make all files and diretories writable.
260     os.chmod(topdir, all_privileges)
261     for root, dirnames, filenames in os.walk(topdir):
262         for name in dirnames + filenames:
263             try:
264                 os.chmod(os.path.join(root, name), all_privileges)
265             except OSError:
266                 pass
267
268     shutil.rmtree(topdir, ignore_errors=True)
269
Note: See TracBrowser for help on using the browser.