root/trunk/cheesecake/util.py

Revision 175, 5.8 kB (checked in by mk, 6 years ago)

Fixed hanging pylint problem (stdout buffer was overflowed and blocked the whole process).

  • Property svn:executable set to *
Line 
1 """Utility functions for Cheesecake project.
2 """
3
4 import os
5 import shutil
6 import signal
7 import sys
8 import tarfile
9 import time
10 import zipfile
11
12 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
13
14 PAD_TEXT = 40
15 PAD_VALUE = 4
16
17 def run_cmd(cmd, env=None, max_timeout=None):
18     """Run command and return its return code and its output.
19
20     >>> run_cmd('/bin/true')
21     (0, '')
22
23     >>> run_cmd('/bin/cat', max_timeout=0.2)
24     (1, 'Time exceeded')
25     """
26     arglist = cmd.split()
27
28     output = os.tmpfile()
29     try:
30         p = Popen(arglist, stdout=output, stderr=STDOUT, env=env)
31     except Exception, e:
32         return 1, e
33
34     # Wait only max_timeout seconds.
35     if max_timeout:
36         start = time.time()
37         while p.poll() is None:
38             time.sleep(0.1)
39             if time.time() - start > max_timeout:
40                 os.kill(p.pid, signal.SIGINT)
41                 p.wait()
42                 return 1, "Time exceeded"
43
44     p.wait()
45     output.seek(0)
46     return p.returncode, output.read()
47
48 def command_successful(cmd):
49     """Returns True if command exited normally, False otherwise.
50
51     >>> command_successful('/bin/true')
52     True
53     >>> command_successful('this-command-doesnt-exist')
54     False
55     """
56     rc, output = run_cmd(cmd)
57     return rc == 0
58
59 class StdoutRedirector(object):
60     """Redirect stdout to a temporary file.
61     """
62     def __init__(self, filename=None):
63         if filename:
64             self.fh = open(filename, 'w')
65         else:
66             self.fh = os.tmpfile()
67
68     def write(self, buf):
69         self.fh.write(buf)
70
71     def flush(self):
72         self.fh.flush()
73
74     def read_buffer(self):
75         """Return contents of the temporary file.
76         """
77         self.fh.seek(0)
78         return self.fh.read()
79
80 def pad_with_dots(msg, length=PAD_TEXT):
81     """Pad text with dots up to given length.
82
83     >>> pad_with_dots("Hello world", 20)
84     'Hello world ........'
85     >>> pad_with_dots("Exceeding length", 10)
86     'Exceeding length'
87     """
88     msg_length = len(msg)
89
90     if msg_length >= length:
91         return msg
92
93     msg = msg + " "
94     for i in range(msg_length+1, length):
95         msg += "."
96     return msg
97
98 def pad_left_spaces(value, length=PAD_VALUE):
99     """Pad value with spaces at left up to given length.
100
101     >>> pad_left_spaces(15, 4)
102     '  15'
103     >>> pad_left_spaces(123456, 2)
104     '123456'
105     >>> len(pad_left_spaces("")) == PAD_VALUE
106     True
107     """
108     if not isinstance(value, basestring):
109         value = str(value)
110     diff = length - len(value)
111     return " " * diff + value
112
113 def pad_right_spaces(value, length=PAD_VALUE):
114     """Pad value with spaces at left up to given length.
115
116     >>> pad_right_spaces(123, 5)
117     '123  '
118     >>> pad_right_spaces(12.1, 5)
119     '12.1 '
120     """
121     if not isinstance(value, basestring):
122         value = str(value)
123     diff = length - len(value)
124     return value + " " * diff
125
126 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
127     """Pad message with dots and pad value with spaces.
128
129     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
130     '123456 ...  77'
131     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
132     u'123 . 45'
133     """
134     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
135
136 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
137     """Return line consisting of 'char' characters.
138
139     >>> pad_line('*', 3)
140     '***'
141     >>> pad_line(length=10)
142     '=========='
143     """
144     return char * length
145
146 def unzip_package(package, destination):
147     """Unzip given `package` to the `destination` directory.
148
149     Return name of unpacked directory or None on error.
150     """
151     try:
152         z = zipfile.ZipFile(package)
153     except zipfile.error:
154         return None
155
156     # Get directory structure from zip and create it in destination directory.
157     for name in z.namelist():
158         (dir, file) = os.path.split(name)
159         unpack_dir = dir
160         target_dir = os.path.join(destination, dir)
161         if not os.path.exists(target_dir):
162             os.makedirs(target_dir)
163
164     # Extract files to directory structure
165     for i, name in enumerate(z.namelist()):
166         if not name.endswith('/'):
167             outfile = open(os.path.join(destination, name), 'wb')
168             outfile.write(z.read(name))
169             outfile.flush()
170             outfile.close()
171
172     return unpack_dir.split(os.sep)[0]
173
174 def untar_package(package, destination):
175     """Untar given `package` to the `destination` directory.
176
177     Return name of unpacked directory or None on error.
178     """
179     try:
180         t = tarfile.open(package)
181     except tarfile.ReadError, e:
182         return None
183
184     for member in t.getmembers():
185         t.extract(member, destination)
186
187     tarinfo = t.members[0]
188     return tarinfo.name.split(os.sep)[0]
189
190 def unegg_package(package, destination):
191     """Unpack given egg to the `destination` directory.
192
193     Return name of unpacked directory or None on error.
194     """
195     if os.path.isdir(package):
196         package_name = os.path.basename(package)
197         destination = os.path.join(destination, package_name)
198         shutil.copytree(package, destination, symlinks=True)
199         return package_name
200     else:
201         return unzip_package(package, destination)
202
203 def mkdirs(dir):
204     """Make directory with parent directories as needed.
205
206     Don't throw an exception if directory exists.
207     """
208     parts = dir.split(os.path.sep)
209     for length in xrange(1, len(parts)+1):
210         path = os.path.sep.join([''] + parts[:length])
211         if not os.path.exists(path):
212             os.mkdir(path)
213
214 def time_function(function):
215     """Measure function execution time.
216
217     Return (return value, time taken) tuple.
218
219     >>> def fun(x):
220     ...     return x*2
221     >>> ret, time_taken = time_function(lambda: fun(5))
222     >>> ret
223     10
224     """
225     start = time.time()
226     ret = function()
227     end = time.time()
228     return ret, end-start
Note: See TracBrowser for help on using the browser.