root/trunk/cheesecake/util.py

Revision 173, 5.8 kB (checked in by mk, 6 years ago)

Stop pylint when it exceeds 60 seconds of running time (closes ticket #48).

  • Property svn:executable set to *
Line 
1 """Utility functions for Cheesecake project.
2 """
3
4 import os
5 import shutil
6 import signal
7 import sys
8 import tarfile
9 import time
10 import zipfile
11
12 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
13
14 PAD_TEXT = 40
15 PAD_VALUE = 4
16
17 def run_cmd(cmd, env=None, max_timeout=None):
18     """Run command and return its return code and its output.
19
20     >>> run_cmd('/bin/true')
21     (0, '')
22
23     >>> run_cmd('/bin/cat', max_timeout=0.2)
24     (1, 'Time exceeded')
25     """
26     arglist = cmd.split()
27     try:
28         p = Popen(arglist, stdout=PIPE, stderr=STDOUT, env=env)
29     except Exception, e:
30         return 1, e
31
32     # Wait only max_timeout seconds.
33     if max_timeout:
34         start = time.time()
35         while p.poll() is None:
36             time.sleep(0.1)
37             if time.time() - start > max_timeout:
38                 os.kill(p.pid, signal.SIGINT)
39                 p.wait()
40                 return 1, "Time exceeded"
41
42     output = p.communicate()[0]
43     return p.returncode, output
44
45 def command_successful(cmd):
46     """Returns True if command exited normally, False otherwise.
47
48     >>> command_successful('/bin/true')
49     True
50     >>> command_successful('this-command-doesnt-exist')
51     False
52     """
53     rc, output = run_cmd(cmd)
54     if rc:
55         return False
56     return True
57
58 class StdoutRedirector(object):
59     """Redirect stdout to a temporary file.
60     """
61     def __init__(self, filename=None):
62         if filename:
63             self.fh = open(filename, 'w')
64         else:
65             self.fh = os.tmpfile()
66
67     def write(self, buf):
68         self.fh.write(buf)
69
70     def flush(self):
71         self.fh.flush()
72
73     def read_buffer(self):
74         """Return contents of the temporary file.
75         """
76         self.fh.seek(0)
77         return self.fh.read()
78
79 def pad_with_dots(msg, length=PAD_TEXT):
80     """Pad text with dots up to given length.
81
82     >>> pad_with_dots("Hello world", 20)
83     'Hello world ........'
84     >>> pad_with_dots("Exceeding length", 10)
85     'Exceeding length'
86     """
87     msg_length = len(msg)
88
89     if msg_length >= length:
90         return msg
91
92     msg = msg + " "
93     for i in range(msg_length+1, length):
94         msg += "."
95     return msg
96
97 def pad_left_spaces(value, length=PAD_VALUE):
98     """Pad value with spaces at left up to given length.
99
100     >>> pad_left_spaces(15, 4)
101     '  15'
102     >>> pad_left_spaces(123456, 2)
103     '123456'
104     >>> len(pad_left_spaces("")) == PAD_VALUE
105     True
106     """
107     if not isinstance(value, basestring):
108         value = str(value)
109     diff = length - len(value)
110     return " " * diff + value
111
112 def pad_right_spaces(value, length=PAD_VALUE):
113     """Pad value with spaces at left up to given length.
114
115     >>> pad_right_spaces(123, 5)
116     '123  '
117     >>> pad_right_spaces(12.1, 5)
118     '12.1 '
119     """
120     if not isinstance(value, basestring):
121         value = str(value)
122     diff = length - len(value)
123     return value + " " * diff
124
125 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
126     """Pad message with dots and pad value with spaces.
127
128     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
129     '123456 ...  77'
130     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
131     u'123 . 45'
132     """
133     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
134
135 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
136     """Return line consisting of 'char' characters.
137
138     >>> pad_line('*', 3)
139     '***'
140     >>> pad_line(length=10)
141     '=========='
142     """
143     return char * length
144
145 def unzip_package(package, destination):
146     """Unzip given `package` to the `destination` directory.
147
148     Return name of unpacked directory or None on error.
149     """
150     try:
151         z = zipfile.ZipFile(package)
152     except zipfile.error:
153         return None
154
155     # Get directory structure from zip and create it in destination directory.
156     for name in z.namelist():
157         (dir, file) = os.path.split(name)
158         unpack_dir = dir
159         target_dir = os.path.join(destination, dir)
160         if not os.path.exists(target_dir):
161             os.makedirs(target_dir)
162
163     # Extract files to directory structure
164     for i, name in enumerate(z.namelist()):
165         if not name.endswith('/'):
166             outfile = open(os.path.join(destination, name), 'wb')
167             outfile.write(z.read(name))
168             outfile.flush()
169             outfile.close()
170
171     return unpack_dir.split(os.sep)[0]
172
173 def untar_package(package, destination):
174     """Untar given `package` to the `destination` directory.
175
176     Return name of unpacked directory or None on error.
177     """
178     try:
179         t = tarfile.open(package)
180     except tarfile.ReadError, e:
181         return None
182
183     for member in t.getmembers():
184         t.extract(member, destination)
185
186     tarinfo = t.members[0]
187     return tarinfo.name.split(os.sep)[0]
188
189 def unegg_package(package, destination):
190     """Unpack given egg to the `destination` directory.
191
192     Return name of unpacked directory or None on error.
193     """
194     if os.path.isdir(package):
195         package_name = os.path.basename(package)
196         destination = os.path.join(destination, package_name)
197         shutil.copytree(package, destination, symlinks=True)
198         return package_name
199     else:
200         return unzip_package(package, destination)
201
202 def mkdirs(dir):
203     """Make directory with parent directories as needed.
204
205     Don't throw an exception if directory exists.
206     """
207     parts = dir.split(os.path.sep)
208     for length in xrange(1, len(parts)+1):
209         path = os.path.sep.join([''] + parts[:length])
210         if not os.path.exists(path):
211             os.mkdir(path)
212
213 def time_function(function):
214     """Measure function execution time.
215
216     Return (return value, time taken) tuple.
217
218     >>> def fun(x):
219     ...     return x*2
220     >>> ret, time_taken = time_function(lambda: fun(5))
221     >>> ret
222     10
223     """
224     start = time.time()
225     ret = function()
226     end = time.time()
227     return ret, end-start
Note: See TracBrowser for help on using the browser.