root/branches/mk/cheesecake/util.py

Revision 114, 5.4 kB (checked in by mk, 7 years ago)

Added missing indices docstrings.

  • Property svn:executable set to *
Line 
1 """Utility functions for Cheesecake project.
2 """
3
4 import os
5 import shutil
6 import sys
7 import tarfile
8 import time
9 import zipfile
10
11 from subprocess import call, ProcessError, Popen, PIPE, STDOUT
12
13 PAD_TEXT = 40
14 PAD_VALUE = 4
15
16 def run_cmd(cmd, env=None):
17     """Run command and return its return code and its output.
18
19     >>> run_cmd('/bin/true')
20     (0, '')
21     """
22     arglist = cmd.split()
23     try:
24         p = Popen(arglist, stdout=PIPE, stderr=STDOUT, env=env)
25     except ProcessError, e:
26         return 1, e
27     output = p.communicate()[0]
28     return p.returncode, output
29
30 def command_successful(cmd):
31     """Returns True if command exited normally, False otherwise.
32
33     >>> command_successful('/bin/true')
34     True
35     >>> command_successful('this-command-doesnt-exist')
36     False
37     """
38     rc, output = run_cmd(cmd)
39     if rc:
40         return False
41     return True
42
43 class StdoutRedirector(object):
44     """Redirect stdout to a temporary file.
45     """
46     def __init__(self, filename=None):
47         if filename:
48             self.fh = open(filename, 'w')
49         else:
50             self.fh = os.tmpfile()
51
52     def write(self, buf):
53         self.fh.write(buf)
54
55     def flush(self):
56         self.fh.flush()
57
58     def read_buffer(self):
59         """Return contents of the temporary file.
60         """
61         self.fh.seek(0)
62         return self.fh.read()
63
64 def pad_with_dots(msg, length=PAD_TEXT):
65     """Pad text with dots up to given length.
66
67     >>> pad_with_dots("Hello world", 20)
68     'Hello world ........'
69     >>> pad_with_dots("Exceeding length", 10)
70     'Exceeding length'
71     """
72     msg_length = len(msg)
73
74     if msg_length >= length:
75         return msg
76
77     msg = msg + " "
78     for i in range(msg_length+1, length):
79         msg += "."
80     return msg
81
82 def pad_left_spaces(value, length=PAD_VALUE):
83     """Pad value with spaces at left up to given length.
84
85     >>> pad_left_spaces(15, 4)
86     '  15'
87     >>> pad_left_spaces(123456, 2)
88     '123456'
89     >>> len(pad_left_spaces("")) == PAD_VALUE
90     True
91     """
92     if not isinstance(value, basestring):
93         value = str(value)
94     diff = length - len(value)
95     return " " * diff + value
96
97 def pad_right_spaces(value, length=PAD_VALUE):
98     """Pad value with spaces at left up to given length.
99
100     >>> pad_right_spaces(123, 5)
101     '123  '
102     >>> pad_right_spaces(12.1, 5)
103     '12.1 '
104     """
105     if not isinstance(value, basestring):
106         value = str(value)
107     diff = length - len(value)
108     return value + " " * diff
109
110 def pad_msg(msg, value, msg_length=PAD_TEXT, value_length=PAD_VALUE):
111     """Pad message with dots and pad value with spaces.
112
113     >>> pad_msg("123456", 77, msg_length=10, value_length=4)
114     '123456 ...  77'
115     >>> pad_msg("123", u"45", msg_length=5, value_length=3)
116     u'123 . 45'
117     """
118     return msg + " " +"." * (msg_length-len(msg)-1) + pad_left_spaces(value, value_length)
119
120 def pad_line(char="=", length=(PAD_TEXT+PAD_VALUE+1)):
121     """Return line consisting of 'char' characters.
122
123     >>> pad_line('*', 3)
124     '***'
125     >>> pad_line(length=10)
126     '=========='
127     """
128     return char * length
129
130 def unzip_package(package, destination):
131     """Unzip given `package` to the `destination` directory.
132
133     Return name of unpacked directory or None on error.
134     """
135     try:
136         z = zipfile.ZipFile(package)
137     except zipfile.error:
138         return None
139
140     # Get directory structure from zip and create it in destination directory.
141     for name in z.namelist():
142         (dir, file) = os.path.split(name)
143         unpack_dir = dir
144         target_dir = os.path.join(destination, dir)
145         if not os.path.exists(target_dir):
146             os.makedirs(target_dir)
147
148     # Extract files to directory structure
149     for i, name in enumerate(z.namelist()):
150         if not name.endswith('/'):
151             outfile = open(os.path.join(destination, name), 'wb')
152             outfile.write(z.read(name))
153             outfile.flush()
154             outfile.close()
155
156     return unpack_dir.split(os.sep)[0]
157
158 def untar_package(package, destination):
159     """Untar given `package` to the `destination` directory.
160
161     Return name of unpacked directory or None on error.
162     """
163     try:
164         t = tarfile.open(package)
165     except tarfile.ReadError, e:
166         return None
167
168     for member in t.getmembers():
169         t.extract(member, destination)
170
171     tarinfo = t.members[0]
172     return tarinfo.name.split(os.sep)[0]
173
174 def unegg_package(package, destination):
175     """Unpack given egg to the `destination` directory.
176
177     Return name of unpacked directory or None on error.
178     """
179     if os.path.isdir(package):
180         package_name = os.path.basename(package)
181         destination = os.path.join(destination, package_name)
182         shutil.copytree(package, destination, symlinks=True)
183         return package_name
184     else:
185         return unzip_package(package, destination)
186
187 def mkdirs(dir):
188     """Make directory with parent directories as needed.
189
190     Don't throw an exception if directory exists.
191     """
192     parts = dir.split(os.path.sep)
193     for length in xrange(1, len(parts)+1):
194         path = os.path.sep.join([''] + parts[:length])
195         if not os.path.exists(path):
196             os.mkdir(path)
197
198 def time_function(function):
199     """Measure function execution time.
200
201     Return (return value, time taken) tuple.
202
203     >>> def fun(x):
204     ...     return x*2
205     >>> ret, time_taken = time_function(lambda: fun(5))
206     >>> ret
207     10
208     """
209     start = time.time()
210     ret = function()
211     end = time.time()
212     return ret, end-start
Note: See TracBrowser for help on using the browser.