- The
Pyhton tutorial
- PEP
- Estil / Style
- Fitxer / File (shebang)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
- ...
- Python
(Raspberry
Pi)
- PyPI: the Python package
index
- Python
Documentation contents
- The
Python standard library
- Built-in Functions
- print
from __future__ import print_function
- format
- Format
Specification Mini-Language
- 4 decimals:
a = 123.45
print("{:.4f}".format(a))
- left-aligned text and right-aligned value:
title = "My title:"
value = 123.45
print("{:<20}{:>20.4f}".format(title,
value))
- ...
- type
if type(myvar) is int
...
- isinstance
import six
if isinstance(myvar, (basestring))
...
- Dades / Data
- ...
type |
create |
create element |
set element |
check for key |
retrieve element |
get index |
remove element |
join |
all elements |
Object |
class MyModel:
myfield1 = 'myvalue1'
myobject = MyModel() |
|
myobject.myfield1 = 'mynewvalue' |
hasattr(myobject, 'myfield1') |
- myobject.myfield1
- getattr(myobject, 'myfield1',
'mydefaultvalue')
|
|
|
|
|
dict |
- mydict = {}
- mydict = {'mykey1':'myvalue1',
'mykey2':'myvalue2'}
|
mydict['mykey3'] = 'myvalue3' |
mydict['mykey1'] = 'mynewvalue1' |
'mykey1' in mydict |
- mydict['mykey1']
- mydict.get('mykey1','mydefaultvalue')
|
|
|
mydict.update(myotherdict) |
for k, v in list(mydict.items()): |
list |
- mylist = []
- mylist = ['myvalue1','myvalue2']
|
mylist.append('myvalue3') |
|
'myvalue1' in mylist |
|
mylist.index('mykey1') |
- mylist.pop(0)
- try:
mylist.remove("myvalue1")
|
mylist + myotherlist |
for i in mylist: |
tuple |
- mytuple = ()
- mytuple = ('myvalue1','myvalue2',)
|
mytuple += ('myvalue3',) |
|
'myvalue1' in mytuple |
|
mylist.index('mykey1') |
|
mytuple + myothertuple |
|
set |
|
myset.add("myvalue3") |
|
|
|
|
myset.remove("myvalue1") |
|
|
- Loops
- compact
["{}-{}".format(a,b) for a,b in
...]
- Create a dict from a list
- Python
: How to convert a list to dictionary ?
>>> mylist =
[{'first_key':'value_f1',
'second_key':'value_s1'},
{'first_key':'value_f2',
'second_key':'value_s2'}]
>>> a =
{b['first_key']:b['second_key'] for b in
mylist}
>>> a
{'value_f1': 'value_s1', 'value_f2':
'value_s2'}
- Search/filter from a list of dicts:
- Python
select from a list + Examples
a =
[{'name':'myname1','address':'myaddress11'},
{'name':'myname1','address':'myaddress12'},
{'name':'myname2','address':'myaddress21'},]
elements_from_myname1 = [c for c in a if
c['name']=='myname1']
addresses_from_myname1 = [c['address'] for
c in a if c['name']=='myname1']
- Flatten
- Compare lists, dicts ...
- ...
- Built-in Constants
- Built-in Types
- 5.6.
Sequence
Types — str, unicode, list, tuple, bytearray,
buffer, xrange
- Cadena / String
- Distància entre dues cadenes / Distance
between two strings
- Format
- columnes amb amplades fixes
(my_integer_value tindrà una coma com a
separador de milers)
print(
first_string.ljust(50)
+ second_string.center(20)
+ third_string.rjust(30)
+
str("{:,}".format(my_integer_value)).rjust(25)
)
- ...
- Llistes / Lists
- Dades /
Data
- Sort
- Unique (remove repeated elements)
my_list_without_repeated_elements
=
list(set(my_list_with_repeated_elements))
- when elements are dicts:
- Convert string to list
- Search
- Current, next
- Intersection of two lists
intersection_list =
list(set(first_list) &
set(second_list))
- Subtraction of two lists
subtraction_list =
list(set(first_list) -
set(second_list))
non_common_list =
list(set(first_list) -
set(second_list)) + list(set(second_list)
- set(first_list))
- 5.8
Mapping Types - dict
- items() vs iteritems()
- Pretty print (not recursive)
import pprint
pp = pprint.PrettyPrinter(width=1)
pp.pprint(my_dict)
- json
(recursive)
-
|
python
object (list
or dict)
|
json
string
|
json
file
|
python
object ->
|
-
|
obj_json
= json.dumps(obj, indent=4) |
with
open("toto.json", "w") as f:
json.dump(obj,
f, indent=4) |
json
string ->
|
obj
= json.loads(obj_json) |
-
|
-
|
json
file ->
|
with
open("toto.json", "r") as f:
obj =
json.load(f) |
-
|
-
|
import json
print json.dumps(my_dict, indent=2)
- Problemes
- TypeError: Object of type datetime
is not JSON serializable
- Solució
- How
to overcome
"datetime.datetime not
JSON serializable"?
import
datetime
class
DateTimeJSONEncoder(json.JSONEncoder):
#
https://stackoverflow.com/questions/11875770/how-to-overcome-datetime-datetime-not-json-serializable
def
default(self, o):
if isinstance(o,
datetime.datetime):
return o.isoformat()
return
json.JSONEncoder.default(self,
o)
json.dumps(my_object_with_datetimes,
cls=DateTimeJSONEncoder)
- yaml
import yaml
print yaml.dump(my_dict, indent=2)
- Built-in Exceptions
- Text Processing Services
- Expressions
regulars
/ Regular expressions
- 7.2.
re —
Regular expression operations
- URL
parsing
- Exemple / Example
import re
import dateutil.parser
regex = re.compile(r'.*_(.*)')
# get datetime from the following
string:
name = 'toto_2016-10-25T133700Z'
m = regex.search(name)
if m:
extracted_datetime =
dateutil.parser.parse( m.groups()[0] )
- Remove elements from a list according to a
regular expression (files: *.min.js, *.min.css)
import re
list_with_min_js = ['primer.js',
'segon.min.js', 'tercer.js']
regex =
re.compile(r'.*.min.js|.*.min.css')
list_without_min_js = [i for i in
list_with_min_js if not regex.search(i)]
# returns: ['primer.js', 'tercer.js']
- Binary Data Services
- Data Types
- Data
/ Date
- 8.1 datetime
- 8.1.7
strftime()
and strptime() Behavior
- now in UTC and ISO-8601 format
import datetime
now = datetime.datetime.utcnow()
now.isoformat() #
2017-06-14T09:57:56.145575
now.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
# 2017-06-14T09:57:56.145575Z
- build a datetime from string
import datetime
mydate =
datetime.datetime.strptime('20201201T100102',
'%Y%m%dT%H%M%S')
- now in naive and aware:
>>>
datetime.datetime.utcnow()
datetime.datetime(2023, 5, 29, 17,
55, 5, 890540)
>>>
datetime.datetime.now(datetime.timezone.utc)
datetime.datetime(2023, 5, 29,
17, 55, 6, 615502,
tzinfo=datetime.timezone.utc)
>>>
datetime.datetime.now().astimezone()
datetime.datetime(2023, 5, 30, 20,
45, 25, 701462,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200),
'CEST'))
>>>
datetime.datetime.now().astimezone(datetime.timezone(datetime.timedelta(seconds=7200),
'CEST'))
datetime.datetime(2023, 5, 30, 20,
50, 5, 125038,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200),
'CEST'))
>>>
datetime.datetime.now()
datetime.datetime(2023, 5, 29, 19,
55, 7, 561511)
- (using pytz: DEPRECATED; migration
guide, pytz-deprecation-shim)
convert from naive to aware:
import
datetime
import pytz
mydate_winter_naive =
datetime.datetime.strptime('2020-02-01T00:00:00',
'%Y-%m-%dT%H:%M:%S')
mydate_winter_naive.isoformat() #
'2020-02-01T00:00:00'
mydate_summer_naive =
datetime.datetime.strptime('2020-08-01T00:00:00',
'%Y-%m-%dT%H:%M:%S')
mydate_summer_naive.isoformat()
#
# convert datetime from naive to
aware (utc):
utc = pytz.utc
mydate_winter_aware_utc =
utc.localize(mydate_winter_naive)
mydate_winter_aware_utc.isoformat()
# '2020-02-01T00:00:00+00:00'
mydate_summer_aware_utc
=
utc.localize(mydate_summer_naive)
mydate_summer_aware_utc.isoformat()
# '2020-08-01T00:00:00+00:00'
# convert datetime from naive to
aware (Europe/Andorra)
mydate_winter_aware_andorra =
mydate_winter_naive.astimezone(pytz.timezone('Europe/Andorra'))
mydate_winter_aware_andorra.isoformat()
# '2020-02-01T00:00:00+01:00'
mydate_summer_aware_andorra =
mydate_summer_naive.astimezone(pytz.timezone('Europe/Andorra'))
mydate_summer_aware_andorra.isoformat()
# '2020-08-01T00:00:00+02:00'
- convert from ISO-8601 string to
datetime
import dateutil.parser
yourdate =
dateutil.parser.parse(datestring)
my_time = strptime(
my_string, '...')
- convert seconds to HH:MM:SS
- without second fractions
import
time
time_in_hh_mm_ss =
time.strftime('%H:%M:%S',
time.gmtime(time_in_seconds_int))
- with second fractions
#
similar to timedelta.__str__
def _to_hh_mm_ss_ms(seconds):
mm, ss =
divmod(seconds, 60)
hh, mm =
divmod(mm, 60)
fraction_seconds = seconds -
int(seconds)
s =
"{:d}:{:02d}:{:02d}.{:03d}".format(hh,
mm, ss, fraction_seconds)
return s
import
datetime
# no fractional part if it
is 0
time_in_hh_mm_ss
= str(datetime.timedelta(seconds=duration_seconds_float))
import
datetime
import pytz
time_in_hh_mm_ss_ff =
datetime.datetime.fromtimestamp(time_in_seconds_float,
pytz.UTC).strftime('%H:%M:%S.%f')
- convert HH:MM:SS to seconds
- ...
- Fusos horaris / Timezones
- Exemple
import pytz
timezone = 'Europe/Andorra'
my_date.astimezone(pytz.timezone(timezone)).strftime('%H:%M:%S
%Z')
- i18n
- 15.3
time - Time access and conversions
- minimum and maximum datetime (infinite)
unaware_maximum =
datetime.datetime.max
import pytz
aware_maximum =
datetime.datetime.max.replace(tzinfo=pytz.UTC)
- other libraries
- add datetime and time
- event.start +
datetime.timedelta(hours=event.duration.hour,minutes=event.duration.minute,seconds=event.duration.second)
- difference between two times
- Date
in Django
- Periods
- Numeric and Mathematical Modules
- Random
- 9.6
random
- Contrasenya aleatòria / Random password
- Generate
password in python
import random
import string
chars = string.letters + string.digits
length = 20
generated_password = ''.join(map(lambda
x: random.choice(chars),
range(length)))
- Functional Programming Modules
- File and Directory Access
- Data Persistence
- Data Compression and Archiving
- File Formats
- CSV
- Fulls
de càlcul
- 13.1. csv — CSV File Reading and Writing (2.7)
(3.7)
- csv from file
with
open(csv_path, newline='') as f:
reader = csv.reader(f,
delimiter=';')
for row in reader:
row_length = len(row)
print('{} -- {}'.format(row_length,
row[10:20]))
with
open(csv_path, newline='',
encoding='Windows-1252') as f:
reader = csv.reader(f,
delimiter=';')
for row in reader:
row_length = len(row)
print('{} -- {}'.format(row_length,
row[10:20]))
- csv from string
- csv from InMemoryUploadedFile:
- csv from http response (e.g. from APITestCase)
import six
if six.PY2:
csv_string =
res.content
else:
csv_string =
str(res.content, 'utf-8')
csv_lines = csv_string.splitlines()
reader = csv.reader(csv_lines)
parsed_csv = list(reader)
number_csv_lines = len(parsed_csv)
- Cryptographic
Services
- Generic Operating System Services
- Fitxers / Files
- to open files using url, see urlopen
- io
- better option than old bultin open (?)
- read a file:
import io
with io.open(src, 'r',
encoding='utf-8') as f_src:
for line in f_src:
...
- temporal / temporary
- mkdtemp
- ...
- directory temporal / temporary dir
import
tempfile
tmpdir = tempfile.mkdtemp()
- fitxer temporal / temporary file
- Temporary...
- Concurrent Execution
- Processos / Processes
- 17.1 subprocess (2)
(3)
subprocess.run
(>=3.5)
- Older high-level API
-
old
|
new
|
subprocess.call(...) |
subprocess.run(...) |
subprocess.check_call(...) |
subprocess. run(...,
check=True)
|
subprocess.check_output(...) |
subprocess. run(...,
check=True,
stdout=subprocess.PIPE).stdout
|
- Exemples / Examples
- compatible Python 2 / 3:
import shlex
if six.PY2:
# python 2
#
https://stackoverflow.com/questions/14218992/shlex-split-still-not-supporting-unicode#answer-14219159
args = map(lambda
s: s.decode('utf-8'),
shlex.split(complete_command.encode('utf-8')))
result_stdout =
subprocess.check_output(args,
stderr=subprocess.STDOUT)
result_stderr = ''
else:
# python 3
args =
shlex.split(complete_command)
completed_process =
subprocess.run(args, check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result_stdout =
completed_process.stdout
result_stderr =
completed_process.stderr
- standard
import shlex,
subprocess
command = '...'
parameters = '...'
command_line = "{0}
{1}".format(command, parameters) args
= shlex.split(command_line)
try:
completed_process =
subprocess.run(args, check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print(completed_process.stdout.decode('utf-8'))
print(completed_process.stderr.decode('utf-8'))
except Exception as e:
print("ERROR:
{0}".format(e))
if e.stdout:
print("ERROR stdout:
{0}".format(e.stdout.decode('utf-8')))
if e.stderr:
print("ERROR stderr:
{0}".format(e.stderr.decode('utf-8')))
- stdout, stderr to a file:
- import
shlex, subprocess
command = '/usr/bin/ffmpeg'
parameters = '-y -i sintel.mp4 -vf
"scale=1280:-1" -c:a copy -c:v h264 -f
flv /tmp/toto.flv'
command_line = "{0}
{1}".format(command, parameters)
args =
shlex.split(command_line)
with open("toto.log", "w") as f:
try:
completed_process =
subprocess.run(args, check=True,
stdout=f, stderr=f)
except Exception as
e:
print("ERROR: {0}".format(e))
- shell pipes
- Problemes / Problems
- Unicode was not supported by shlex split
[Errno 12] Cannot allocate memory
- Scheduling
- Info
- sched
(Python)
- schedule
- APScheduler
- Migració
- Exemples
- basic
import
time
from
apscheduler.schedulers.background
import BackgroundScheduler
def do_something(**kwargs):
print(kwargs)
# create scheduler
scheduler = BackgroundScheduler()
# add job
scheduler.add_job(
# action with
kwargs
do_something,
kwargs=do_something_kwargs,
# date trigger
trigger="date",
run_date=now_datetime_utc,
# run every
job, even if it is too late
misfire_grace_time=None,
)
# start scheduler
scheduler.start()
# do not exit (should we use
BlockingScheduler?)
while True:
...
- comunicació
- RPC
- examples / rpc
/ (<=3.9.0)
- problemes
(APScheduler==3.10.1, rpyc==5.3.1)
TypeError: tzinfo
argument must be None or
of a tzinfo subclass, not
type
'backports.zoneinfo.ZoneInfo'
- Meet
an error when
reschedule the job via
RPyC #287
- Solució / Solution
- received kwargs
must be deep copied
before passing it to
add_job:
- server.py
import
copy
import rpyc
from
rpyc.utils.server
import
ThreadedServer
from
apscheduler.schedulers.background
import
BackgroundScheduler
def
print_text(*args,
**kwargs):
print("[print_text]
args: {},
kwargs:
{}".format(args,
kwargs))
class
SchedulerService(rpyc.Service):
def
exposed_add_job(self,
func, *args,
**kwargs):
# a copy of
the datetime
inside dict
must be done,
to avoid
errors when
astimezone is
called from it
(apscheduler/schedulers/base.py)
kwargs_copy
=
copy.deepcopy(kwargs)
return
scheduler.add_job(func,
*args, **kwargs_copy)
if __name__ ==
'__main__':
scheduler =
BackgroundScheduler()
scheduler.start()
#
allow_pickle
is needed by
deepcopy
protocol_config
=
{"allow_public_attrs":
True, "allow_pickle":
True}
server =
ThreadedServer(SchedulerService,
port=12345,
protocol_config=protocol_config)
try:
server.start()
except
(KeyboardInterrupt,
SystemExit):
pass
finally:
scheduler.shutdown()
- client.py
import
rpyc
import
datetime
run_date_aware
=
datetime.datetime.now(datetime.timezone.utc)
config = {
"allow_public_attrs":
True,
"allow_pickle":
True,
}
conn =
rpyc.connect("localhost",
12345,
config=config)
#job =
conn.root.add_job('server:print_text',
'interval',
args=['Hello,
World'],
seconds=2)
job =
conn.root.add_job(
"server:print_text",
trigger="date",
run_date=run_date_aware,
# args, kwargs
for func:
args=["Hello,
World"],
kwargs={"test":
True},
)
sleep(10)
conn.root.remove_job(job.id)
- ...
- threading
- Networking
and Interprocess Communication
- Xarxa / Network
- asyncio
- cooperative
multitasking (as opposed to preemptive
multitasking in threading)
- process or task voluntarily yields
control to the processor:
await
- Info
- PEP
3156
-
CPython >3.5 |
CPython >3.8 |
loop = asyncio.get_event_loop()
loop.run_until_complete(my_task()) |
asyncio.run(my_task()) |
loop = asyncio.get_event_loop()
loop.create_task(my_task()) |
asyncio.create_task(my_task()) |
- micropython-async
tutorial
- 2 ...
- 2.2 Coroutines and tasks
- una corutina
es defineix amb async def i
conté algun await
- 2.2.1 Queuing a task for
scheduling
asyncio.create_task
Arg: the coro to run.
The scheduler converts the
coro to a Task and queues
the task to run ASAP. Return
value: the Task instance. It
returns immediately. The
coro arg is specified with
function call syntax with
any required arguments
passed.
asyncio.run
Arg: the coro to run.
Return value: any value
returned by the passed coro.
The scheduler queues the
passed coro to run ASAP. The
coro arg is specified with
function call syntax with
any required arguments
passed. In the current
version the run call returns
when the task terminates.
However, under CPython, the
run call does not terminate.
await
Arg: the task or coro
to run. If a coro is passed
it must be specified with
function call syntax. Starts
the task ASAP. The awaiting
task blocks until the
awaited one has run to
completion. As described in
section 2.2, it is possible
to await a task which has
already been started. In
this instance, the await is
on the task object (function
call syntax is not used).
- 2.2.4
A typical firmware app
import
uasyncio as asyncio
from my_app import MyClass
def set_global_exception():
def
handle_exception(loop,
context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop =
asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() #
Debug aid
my_class =
MyClass() # Constructor
might create tasks
asyncio.create_task(my_class.foo())
# Or you might do this
await
my_class.run_forever()
# Non-terminating method
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
# Clear retained state
- 3 Synchronization
- 4
Designing classes for asyncio
- 4.1 Awaitable
classes
- 4.2 Asynchronous interators
- 4.3 Asynchronous context managers
- ...
- 8
Notes fo beginners
- What
Is Async, How Does It Work, and When
Should I Use It?|A. Jesse Jiryu
Davis|PyCon APAC 2014 (yt)
- subs (els clients trien l'entrepà i el
treballador el fa al moment), pizza al
tall (els clients trien el tall i el
treballador l'escalfa), omakase (els
clients reben el plat del dia, sense
haver-lo de demanar)
- http://kegel.com/c10k.html
- threads, greenlets, coroutines,
callbacks
- what is async?:
- single-threaded
- I/O concurrency
- Non-blocking sockets
- internally: epoll (Linux) / kqueue
(Mac, BSD)
- event loop
- async frameworks
- Getting
Started with Asyncio in MicroPython
(Raspberry Pi Pico)
- quan són adequats i quan no:
- async no cal:
- quan una cosa necessita tota la cpu
(fent un entrepà)
- async cal:
- quan un procés està esperant que acabi
una crida (escalfant la pizza al forn,
cerca a la base de dades, esperant que
arribi una notificació, websockets);
mentrestant, el processador pot anar
fent altres coses
-
- la definició de les corutines va precedida d'
async ,
i hauria de contenir almenys una crida amb await
(encara que sigui await asyncio.sleep(0) )
perquè el processador pugui fer altres coses:
- si volem que la crida a una funció esperi que
s'acabi abans de passar a la línia següent, cal
posar
await (si no, s'executarà la
següent línia sense esperar el resultat); el
procés fa un yield i així mentrestant el
processador podrà executar les altres tasques
que hi ha a la cua de l'scheduler:
result = await
my_function()
- await
asyncio.sleep(2)
- per a iniciar l'scheduler (normalment es fa
servir per a cridar el main, però pot cridar
qualsevol altra corutina) i posar-hi main com a
tasca:
async def main():
...
if __name__ == "__main__"
asyncio.run(
main() )
- per a executar diverses crides a funcions
alhora:
await asyncio.gather(my_function(...),
my_function(...))
list_of_functions
= []
for i in range(10):
list_of_functions.append( my_function(i) )
await asyncio.gahter(*list_of_functions)
- afegeix una corutina a l'scheduler; encapsular
crides com a tasques, per a poder-les
cancel·lar; no s'executen fins que no hi ha
algun await:
my_task = asyncio.create_task(
my_function(...) )
result = await my_task
task.cancel()
- task.cancelled()
- task.done()
- task.result()
- ...
- Internet Data Handling
JSON
- mime types
- Codificació
de text / Text coding
- 18.7
mimetypes
- used by aws
s3 to determine content-type of aws s3
object when uploading it
- Dependencies
- CentOS 8
sudo dnf install mailcap
- will install /etc/mime.types
- Ús / Usage
import mimetypes guessed_mime_type
= mimetypes.guess_type('myfile') mimetypes.knownfiles
['/etc/mime.types',
'/etc/httpd/mime.types',
'/etc/httpd/conf/mime.types', ...]
- magicfile
- Dependències / dependencies
- Mageia
urpmi libpython-devel
libmagic-devel
- CentOS
sudo yum install file-devel
- Instal·lació / Installation
- Ús / Usage
import magicfile as magic
mime_type =
magic.from_file("testdata/test.pdf",
mime=True)
import magicfile as magic
f =
magic.Magic(magic_file='/usr/share/misc/magic',
mime=True) mime_type =
f.from_file("testdata/test.pdf")
- base64
- Structured Markup Processing Tools
- Internet
Protocols and Support
-
|
server |
client |
|
sync |
asyncio |
sync |
asyncio |
HTTP |
|
|
- requests
response =
requests.get('http://python.org')
print(response.text)
- with
requests.Session() as session:
response =
session.get('http://python.org')
print(response.text)
|
- aiohttp
async with
aiohttp.ClientSession() as
session:
async with
session.get('http://python.org')
as response:
print(await response.text())
- import
aiohttp
import asyncio
async def main():
async with
aiohttp.ClientSession() as
session:
async with
session.get('http://python.org')
as response:
html = await response.text()
print(html)
asyncio.run(main())
- import
aiohttp
import asyncio
async def fetch(session, url):
async with
session.get(url) as response:
return await response.text()
async def main():
async with
aiohttp.ClientSession() as
session:
html = await fetch(session,
'http://python.org')
print(html)
asyncio.run(main())
|
WebSocket |
|
- websockets
- import
asyncio
import websockets
async def hello(websocket):
name = await
websocket.recv()
print(f"<<< {name}")
greeting =
f"Hello {name}!"
await
websocket.send(greeting)
print(f">>> {greeting}")
async def main():
async with
websockets.serve(hello,
"localhost", 8765):
await asyncio.Future() # run
forever
if __name__ == "__main__":
asyncio.run(main())
|
- websockets
from
websockets.sync.client import
connect
def hello():
with
connect("ws://localhost:8765") as
websocket:
websocket.send("Hello world!")
message = websocket.recv()
print(f"Received: {message}")
hello()
|
- websockets
import
asyncio
import websockets
async def hello():
uri =
"ws://localhost:8765"
async with
websockets.connect(uri) as
websocket:
name = input("What's your name? ")
await websocket.send(name)
print(f">>> {name}")
greeting = await websocket.recv()
print(f"<<< {greeting}")
if __name__ == "__main__":
asyncio.run(hello())
- #
How
to run forever async websocket
client?
import asyncio
import websockets
async def hello():
uri =
"ws://localhost:5678"
async with
websockets.connect(uri) as
websocket:
while True:
greeting = await websocket.recv()
print(f"<<< {greeting}")
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(hello())
- ...
|
- HTTP
- URL
- Parsing
URLs with regular expressions
- 20.5
urllib
-
python2 |
python3 |
urllib
- urlopen()
- urlretrieve()
- ...
|
urllib2
- urlopen()
- install_opener()
- build_opener()
- ...
|
urllib.request
|
|
|
urllib.error
- exception URLError
- exception HTTPError
- exception ContentTooShortError
|
urlparse
- urlparse()
- parse_qs()
- parse_qsl()
- urlunparse()
- urlsplit()
- urlunsplit()
- urljoin()
- urldefrag()
urllib
- quote()
- quote_plus()
- unquote()
- unquote_plus()
- urlencode()
- pathname2url()
- url2pathname()
- getproxies()
|
|
urllib.parse
- urlparse()
- parse_qs()
- parse_qsl()
- urlunparse()
- urlsplit()
- urlunsplit()
- urljoin()
- urldefrag()
- unwrap()
- ...
- quote()
- quote_plus()
- quote_from_bytes()
- unquote()
- unquote_plus()
- unquote_to_bytes()
- urlencode()
|
|
|
urllib.robotparser |
- compatible python 2/3 import
import six
if six.PY2:
from urlparse
import urlparse, urlunparse
from urllib import
unquote, urlencode
else:
from urllib.parse
import urlparse, urlunparse,
unquote, urlencode
o = urlparse(...)
... unquote(...)
# urlunparse( (scheme, netloc, path,
params, query, fragment) )
... urlunparse( (o.
scheme,
o. netloc,
o. path,
o. params,
o. query,
o. fragment )
)
... urlencode(...)
- urlparse()
o =
urlparse('http://www.cwi.nl:80/%7Eguido/Python.html')
o
o =
urlparse('s3://bucket/path/to/file')
o
- urlopen()
import six
if six.PY2:
from urllib2 import
urlopen
# to avoid
error addinfourl instance has no
attribute '__exit__',
# on Python 2 you
cannot use urlopen inside a with
filehandle_src =
urlopen(url)
for line in
filehandle_src:
print(line.decode('utf-8'))
...
else:
from urllib.request
import urlopen
with urlopen(url)
as filehandle_src:
for line in filehandle_src:
...
- Changing
hostname in a url
import
urlparse
p =
urlparse.urlparse('https://www.google.dk:80/barbaz')
p._replace(netloc=p.netloc.replace(p.hostname,
'www.foo.dk')).geturl()
- URL Quoting (percent encoding)
- quote
import six
if six.PY2:
from urllib
import quote
else:
from
urllib.parse import quote
- Python 2
from
urllib import quote
# input is unicode
path_original =
u'/path/to/toto_à.html'
path_converted =
quote(path_original.encode('utf8'))
# will give:
'/path/to/toto_%C3%A0.html'
from
urllib import quote
# input is not unicode
path_original =
'/path/to/toto_à.html'
path_converted =
quote(path_original)
# will give:
'/path/to/toto_%C3%A0.html'
- Python 3
from
urllib.parse import quote
# input is unicode
path_original =
u'/path/to/toto_à.html'
path_converted =
quote(path_original.encode('utf8'))
# will give:
'/path/to/toto_%C3%A0.html'
from
urllib.parse import quote
# input is not unicode
path_original =
'/path/to/toto_à.html'
path_converted =
quote(path_original)
# will give:
'/path/to/toto_%C3%A0.html
- unquote
- convert: /path/to/toto_%C3%A0.mp4
-> /path/to/toto_à.mp4
import
six
path_original =
u'/path/to/toto_%C3%A0.mp4'
if six.PY2:
path_converted =
unquote(path_original.encode()).decode('utf-8')
else:
path_converted
= unquote( path_original )
- add query params to a path (which may already
have query params) (Python 3):
from urllib.parse
import urlencode, urlparse, parse_qsl
original_path = '/path/to?a=b&c=d'
o = urlparse(original_path)
existing_query_params = parse_qsl(o.query)
added_query_params = [('e','f'),('g','h')]
total_queryparams = existing_query_params
+ added_query_params
new_path = '{}?{}'.format(o.path,
urlencode(total_queryparams))
# '/path/to?a=b&c=d&e=f&g=h'
- urljoin
- download from url to local file:
import six
if six.PY2:
from urlparse import
urlparse
from urllib2 import
urlopen
else:
from urllib.parse
import urlparse
from urllib.request
import urlopen
with urlopen(src) as filehandle_src,
open(dst, 'wb') as filehandle_dst:
shutil.copyfileobj(filehandle_src,
filehandle_dst)
- ...
- FTP
- libftp
(2.7)
- upload local or remote file to ftp:
# python 3
import os
import ftplib
import urllib.request
from urllib.parse import urlparse
host = '...'
username = '...'
password = '...'
#origin = '/tmp/toto.txt'
origin = 'https://...'
local_basename =
os.path.basename(origin)
remote_dirname = 'toto_dir_2'
remote_path =
os.path.join(remote_dirname,
local_basename)
print("ftp upload: {0} ->
{1}".format(origin, remote_path))
o = urlparse(origin)
print("sheme: {}".format(o.scheme))
if o.scheme:
filehandle =
urllib.request.urlopen(origin)
else:
filehandle =
open(origin, 'rb')
with ftplib.FTP(host, username,
password) as ftp:
# create dir
try:
print("creating remote dir:
{}".format(remote_dirname))
ftp.mkd(remote_dirname)
except Exception as
e:
print("WARNING when creating dir: {} -
{}".format(e, type(e)))
# upload file
try:
print("uploading: {0} ->
{1}".format(origin, remote_path))
ftp.storbinary("STOR
{0}".format(remote_path), filehandle)
except Exception as
e:
print("ERROR when uploading file:
{}".format(e))
print("done")
- upload remote http file directly to ftp (Python
- Transfer a file from HTTP(S) URL to
FTP/Dropbox without disk writing (chunked
upload)):
...
import urllib
...
filehandle =
urllib.request.urlopen(origin_url)
...
- Adreces de xarxa /
Network addresses
- adreça IP pròpia / own IP address
- example:
import socket
socket.gethostbyname_ex(socket.gethostname())[2][0]
- Problemes / Problems
socket.gaierror: [Errno -2] Name
or service not known
- 21.28
ipaddress (>3.3)
- Backport for Python 2: py2-ipaddress
- An
introduction to the ipaddress module
- models
-
|
|
convenience
factory function
|
functions
|
IPv4Address |
IPv6Address |
ip_address('192.168.1.100') |
|
IPv4Network |
IPv6Network |
ip_network('192.168.1.0/24')
|
- subnets()
- subnets(prefixlen_diff=2)
- subnets(new_prefix=26)
- supernet()
- supernet(prefixlen_diff=2)
- supernet(new_prefix=26)
|
IPv4Interface
|
IPv6Interface
|
ip_interface('192.168.1.100/24')
|
|
- ...
- Exemples / Examples
- subnets /20 from net /16
import
ipaddress
net =
ipaddress.ip_network('192.168.0.0/16')
subnets_20 = list(
net.subnets(new_prefix=20) )
print(subnets_20)
- ...
- Multimedia Services
- Internationalization
- Program Frameworks
- Graphical User Interfaces with Tk
- Development Tools
- Test unitari / Unit test
-
|
pytest |
unittest |
Django
tests (based on unittest) |
Django drf |
file |
mytest.py |
mytest.py |
myproject/myapp/tests/mytest.py |
myproject/myapp/tests/mytest.py |
import |
import pytest |
from unittest import TestCase |
from django.test import TestCase |
from rest_framework.test import
APITestCase |
class |
class MyGroupTests: |
class MyGroupTestCase(TestCase): |
class MyGroupTestCase(TestCase):
""" Tests over
database """ |
class MyGroupAPITestCase(APITestCase):
""" Tests over API
REST """ |
funció que s'executa sempre en
començar |
def setup_method(self): |
def setUp(self): |
def setUp(self): |
def setUp(self): |
test |
def test_first(self): |
def test_first(self): |
def test_first(self): |
def test_first(self): |
funció que s'executa sempre en acabar |
def teardown_method(self): |
def ...(self) |
|
|
run |
- pytest -s mytest.py -k test_first
|
- python -m unittest --verbose
mytest
- python -m unittest --verbose
mytest.MyGroupTestCase
- python -m unittest --verbose
mytest.MyGroupTestCase.test_first
|
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3 myapp.tests.mytest
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
myapp.tests.mytest.MyGroupTestCase
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
myapp.tests.mytest.MyGroupTestCase.test_first
|
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3 my_app.tests.mytest
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
my_app.tests.mytest.MyGroupAPITestCase
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
my_app.tests.mytest.MyGroupAPITestCase.test_first
|
run from Eclipse / PyDev |
|
- select class or method: Debug as
-> Python unit-test
|
|
|
- pytest
- Eclipse / PyDev
- Ús / Usage
- Basic
patterns and examples
- executa un test en concret
pytest toto.py -k
test_primer
- mostra tots els print (si no, només es
mostraran dels tests que fallin)
- ...
- unittest
- run all tests in test dir
- skip some tests
from unittest
import skip
@skip("my reason to skip this one")
class/def
- test with argparse
- How
do you write tests for the argparse
portion of a python module?
- mymodule.py
import sys
import argparse
def main(args):
parser =
argparse.ArgumentParser(description='ADD
YOUR DESCRIPTION HERE')
parser.add_argument('first_parameter',
type=int, help='First parameter')
parser.add_argument('second_parameter',
type=int, help='Second parameter')
parsed_args =
parser.parse_args(args)
print(parsed_args)
# rest of you
code goes here
if __name__ == '__main__':
main(sys.argv[1:])
- test_mymodule.py
from
unittest import TestCase
from mymodule import main
class MyModuleTestCase(TestCase):
def
test_main(self):
main([2,3])
- Eclipse / PyDev
- first time
- selected file: Debug as ->
Python unit-test
- Debug Configurations
- Arguments
- Override PyUnit preferences
for this launch?
- PyDev test runner
- --verbosity 3
- ...
- Django
tests
- Mock
- verify the number of times that a function
has been called, without mocking it (bypass,
pass through):
- ...
- Debugging and Profiling
- Software Packaging and Distribution
- Python Runtime Services
- Custom Python Interpreters
- Importing Modules
- Python Language Services
- MS Windows Specific Services
- Unix Specific Services
- Superseded Modules
- Security Considerations
- Unicode
- Python3 strings
and bytes
- Codificació de
text / Text coding
- Solving
Unicode
Problems in Python 2.7
- UnicodeDecodeError: ‘ascii’ codec can’t decode byte
0xd1 in position 1: ordinal not in range(128) (Why is
this so hard??)
- Django i18n
- Use format instead of '%'
- do not forget trailing 'u'
- Use 'u' before literals
string =
u'{:02d}:{:02d}:{:02d}'.format(hours, minutes,
seconds)
if days:
if days==1:
string = u'{} {} '.format(days,_("day")) + string
#string = u"%d %s %s" % ( days, _("day"), string)
else:
string = u'{} {} '.format(days,_("days")) + string
#string = u"%d %s %s" % ( days, _("days"), string)
- File writing
import codecs
filename = os.path.join(...)
f = codecs.open(filename,mode='w',encoding='utf-8')
f.write(content)
f.close ()
import codecs
filename = os.path.join(...)
f = codecs.open(filename,mode='w',encoding='utf-8')
fitxer = File(f)
fitxer.write(content)
fitxer.close
- Eines / Tools
- virtualenv
- Install virtualenv
- Python 3
- Mageia
urpmi python3-virtualenv
urpmi lib64python3-devel
- CentOS
sudo yum install python36
python36-pip python36-devel
python34-virtualenv
virtualenv-3 --python python36 env
- Python 2
- Mageia
urpmi python-pkg-resources
python-virtualenv
- Bug on Mageia 3 (makes "pip install pil"
fail)
- Bug 11283
- Python headers included are bungled,
probably due to multi-arch issues
- replace
?/usr/lib/python2.7/site-packages/virtualenv.py
by virtualenv.py
- run again:
- CentOS
yum install python-virtualenv
- option 1: install from SCL
- other options:
virtualenv -p python27 /opt/PYTHON27
source /opt/PYTHON27/bin/activate
- Problems:
- libpython2.7.so.1.0: cannot open
shared object file: no such file or
directory
- Ubuntu
sudo apt-get install
python-virtualenv
- MSWindows
- Utilització / Usage
- Python 3
- Python 2
- virtualenv [--distribute]
/opt/PYTHON27
- force Python 2.6
virtualenv -p python26 /opt/PYTHON26
- MSWindows
virtualenv.exe c:\tmp\p27
c:\tmp\p27\Scripts\activate
...
deactivate
source /opt/PYTHON27/bin/activate
(/opt/PYTHON27)...$ ...
deactivate
- virtualenv from crontab
- Cron
and virtualenv
- example:
SHELL=/bin/bash
0 2 * * * source /path/to/env/bin/activate
&& /path/to/my_program.py arg1 arg2
- virtualenv
in
PyDev
- virtualenv
in
Django
- mod_wsgi
- pip
(*) (package manager) (also installed
when virtualenv
is installed)
- related tools
- Installation of pip itself
- From distribution
- CentOS
sudo yum install python-pip
- From source
- download it:
- install it:
# python setup.py install
Installation of packages
pip install package_name --dry-run
pip install package_name
- Installation of a precise version of a package
pip install djangorestframework==0.4.0
- alpha version
pip install -pre package_name
- upgrade
pip install -U package_name
- Problems
error fatal: Python.h:
El fitxer o directori no existeix
- Solució / Solution
- Python 2.7
- Python 3
- Mageia
- Alma9
dnf install python3-devel
fatal error: pyconfig.h:
El fitxer o directori no existeix
- Solució / Solution
- Alma 9
dnf install python3-devel
Download error on
https://pypi.python.org/simple/: [SSL:
CERTIFICATE_VERIFY_FAILED] certificate verify
failed (_ssl.c:765) -- Some packages may not be
found!
pip install fails with “connection
error: [SSL: CERTIFICATE_VERIFY_FAILED]
certificate verify failed (_ssl.c:598)”
- Solution
openssl s_client -connect
pypi.python.org:443
curl -sO
http://cacerts.digicert.com/DigiCertHighAssuranceEVRootCA.crt
sudo cp
DigiCertHighAssuranceEVRootCA.crt
/etc/pki/ca-trust/source/anchors/
sudo update-ca-trust
- Alternative solution?
sudo yum install ca-certificates
- pip install --upgrade -r pip_requirements.txt
Could not find .egg-info directory in
install record for setuptools from
https://pypi.python.org/packages/25/4e/1b16cfe90856235a13872a6641278c862e4143887d11a12ac4905081197f/setuptools-28.8.0.tar.gz#md5=43d6eb25f60e8a2682a8f826ce9e3f42
in
/home/.../env/lib/python2.7/site-packages
- see also problems with Google API and httplib2
error: Installed distribution setuptools
0.9.8 conflicts with requirement
setuptools>=17.1
- Solució / Solution
pip install --upgrade setuptools
Error: pg_config executable not found.
- when installing psycopg2
- Solució / Solution
- Install postgresql devel
- Mageia
urpmi postgresql9.4-devel
- Alma 9
- Alma 8
- show specific package
- list of installed packages (it gives the version
number):
pip list
- with installation format
(>pip_requirements.txt)
pip freeze
pip freeze >pip_requirements.txt
- easy_install
urpmi python-setuptools
sudo apt-get install python-setuptools
- Invoke
- Fabric
- Instal·lació / Installation
- Fabric 2.x
- See also: Invoke
- Upgrading
from 1.x
-
1.x
|
2.x
|
sudo("yum install -y htop") |
c.run("sudo yum install
-y htop") |
sudo("echo /usr/local/lib
>/etc/ld.so.conf.d/local.conf") |
c.run("sudo sh -c 'echo
\"/usr/local/lib\"
>/etc/ld.so.conf.d/local.conf'")
# sudo sh -c 'echo "l'\''arbre"
>>/tmp/toto.txt'
c.run("sudo sh -c 'echo
\"l'\\''arbre\"
>>/tmp/toto.txt'") |
put(local_file,
remote_dir, use_sudo=True)
|
from
os.path import basename
def sudo_put(c, local_path,
remote_dirname):
"""
Upload a local file
to a remote dir, with sudo privileges
"""
filename =
basename(local_path)
remote_tmp_path =
filename
remote_path =
'{}/{}'.format(remote_dirname,
filename)
print 'sudo_put: {}
-> {}'.format(local_path,
remote_path)
c.put(local_path,
remote=remote_tmp_path)
c.run("sudo sh -c
'mv {} {}'".format(remote_tmp_path,
remote_path))
...
sudo_put(c, local_file, remote_dir)
|
from
fabric.contrib.files import append text
= """
FCGI_EXTRA_OPTIONS="-M 0770"
"""
append('/etc/sysconfig/spawn-fcgi',
text, use_sudo=True)
|
text
= """
FCGI_EXTRA_OPTIONS=\\\"-M 0770\\\"
"""
c.run("sudo sh -c 'echo \"{0}\"
>>/etc/sysconfig/spawn-fcgi'".format(text))
# not
working yet, with sudo=True:
from patchwork import files
files.append(c,
'/etc/sysconfig/spawn-fcgi', text,
sudo=True) |
from fabric.contrib.files import
sed
sed('{}/pg_hba.conf'.format(pgsql_data_dir),
'host
all
all
127.0.0.1/32
ident',
'host
all
all
127.0.0.1/32
md5',
use_sudo=True) |
c.run("sudo sed -i.bak
's#host
all
all
127.0.0.1/32
ident#host
all
all
127.0.0.1/32
md5#g'
{}/pg_hba.conf".format(pgsql_data_dir)) |
with cd /path/to:
run('my_command_1')
run('my_command_2') |
my_path = '/path_to'
c.run('cd {} &&
my_command_1'.format(my_path))
c.run('cd {} &&
my_command_1'.format(my_path)) |
with settings(warn_only=True):
run(...) |
c.run(..., warn=True) |
result = run(...) |
r = c.run(...)
result = r.stdout.strip() |
|
from patchwork import transfers
# strict_host_keys is set to
False to avoid interactive question:
# Are you sure you want to continue
connecting (yes/no/[fingerprint])? transfers.rsync(
c,
strict_host_keys=False,
...
)
|
local(...) |
# option -H must be present
c.local(...)
# only capture; do
not display
result = c.local(..., hide=True)
toto = result.stdout |
- Documentation
(2.1)
- restart connection (e.g. to update just modified
groups to the user that is making the connection)
#
add user to mygroup
print(" adding group
{} to user {}".format(service_group,
remote_user))
c.run("sudo usermod -a -G
{} {}".format(service_group, remote_user),
warn=True)
# update group membership
# c.run("groups")
# as newgrp command is not
working remotely, we need to close and open
ssh connection
# (and _sftp must be set to
None to force its reconnection)
c.close()
c.open()
c._sftp = None
# print("c after
close/open: {}".format(c))
# c.run("groups")
- Fabric 1.x documentation
- Utilització / Usage
<task
name>:<arg>,<kwarg>=<value>,...
- debug from Eclipse
- Fabric 2
- fabfile.py
# to allow
debugging from Eclipse
import re
import sys
from fabric.main import program
if __name__ == '__main__':
sys.argv[0] =
re.sub(r'(-script\.pyw|\.exe)?$', '',
sys.argv[0])
sys.exit(program.run())
- Eclipse
- fabfile.py (contextual menu)
- Debug As -> Python Run
- Debug Configurations
- (select your debug configuration)
- Arguments
--list
-H user@server
task_1 ...
- Fabric 1
- fabfile.py
...
from fabric.main
import main
if __name__ == '__main__':
import sys
sys.argv = ['fab',
'-f', __file__, 'my_task']
main()
- Exemples / Examples
put('toto.sh', '/usr/local/bin/',
mode=int('755', 8), use_sudo=True)
- Context
managers (context
manager)
- Problemes / Problems
paramiko.ssh_exception.SSHException:
encountered RSA key, expected OPENSSH key
- check that remote file
~myuser/.ssh/authorized_keys contains the public
key that you are using
- check that you are specifying a remote user
- Error management
- ignore
with
settings(warn_only=True):
- capture failure
result =
local('grunt')
if result.failed:
print "Grunt is not
installed."
abort('Grunt not found')
- Python try
try:
sudo('...')
except Exception as e:
...
abort('...')
- env definitions
- fabfile.py
from fabric.api import
*
# remote user and group
env.remote_user = 'myuser'
env.remote_group = 'mygroup'
# project
env.project_name = 'my_project'
env.project_dir =
'/home/%(remote_user)s/%(project_name)s' % env
- run locally
- running
fabric
script locally
- Optionally
avoid
using ssh if going to localhost #98
- fabfile.py
# by default, actions
are performed remotely.
# to perform them localy, e.g.: "fab localhost
create_virtualenv"
env.run_as = 'remote'
env.run = run
env.sudo = sudo
env.hosts = ['...',]
env.key_filename =
'~/.ssh/keys/key_for_remote.pem'
def localhost():
"""
Set environment for local
execution
"""
env.run_as = 'local'
env.run = local
env.sudo = local
env.hosts = []
with lcd()
...
- capture
- run remotelly and get the value
result = sudo(...)
result = run(...)
- run locally and get the value
result = local(..., capture=True)
- crontab
- files
- put
- rsync_project
- How
do I copy a directory to a remote machine
using Fabric?
- Example
from
fabric.contrib.project import
rsync_project
rsync_project(local_dir='.',
remote_dir='/var/www',
exclude=('.git','tmp',) )
- Ignore files indicated by .gitignore
- Ignore untracked files and files indicated by
.gitignore
untracked_files_zero
= local('git -C .. ls-files -z -o
--exclude-standard --directory',
capture=True)
untracked_files =
untracked_files_zero.split('\0')
print "untracked_files:
{}".format(untracked_files)
excluded_files = untracked_files +
['.git','tmp']
rsync_project(local_dir='.',
remote_dir=env.project_dir,
exclude=excluded_files,
extra_opts="--filter=':- .gitignore'" )
- append
- append a line
from
fabric.contrib.files import append
...
append('/etc/sysconfig/toto','this line
has been appended to the end')
- append several lines:
from
fabric.contrib.files import append
...
text = """
first_line = "value1"
second_line = "value2"
"""
append('/etc/sysconfig/toto', text,
use_sudo=True)
- sed
- single quotes in sed
- replace a line that starts with "
host
all
all
127.0.0.1 " by "host
all
all
127.0.0.1/32
md5 "
from
fabric.contrib.files import sed
...
#
/var/lib/pgsql/data/pg_hba.conf
#
host
all
all
127.0.0.1/32
md5
sed('/var/lib/pgsql/data/pg_hba.conf',
'host
all
all
127.0.0.1/32
ident',
'host
all
all
127.0.0.1/32
md5',
use_sudo=True )
env.sudo('sudo sed
-e "/^host
all
all
127.0.0.1/
c\host
all
all
127.0.0.1/32
md5"
-i /var/lib/pgsql/data/pg_hba.conf')
- Certificat de
servidor / Server certificate
- certificate for https connections (curl will not need to
specify --cacert)
# add myserver
self-signed certificate to the list of trust
ca certificates
put('myserver.example.org.crt',
'/etc/pki/ca-trust/source/anchors/',
use_sudo=True)
env.sudo('update-ca-trust')
- dependencies
- fabfile.py
# list of dependencies
to install
env.install_cmd = 'yum install -y'
env.dependencies = ['gcc', 'git',
'python-virtualenv', 'mariadb',]
def install_dependencies():
"""
Install the system
dependencies for the project
"""
env.sudo(env.install_cmd +
" " + "epel-release")
env.sudo(env.install_cmd +
" " + " ".join(env.dependencies))
- virtualenv
- Getting
virtualenv(wrapper)
and Fabric to play nice
- Activate
a
virtualenv via fabric as deploy user
- fabfile.py
from contextlib import
contextmanager as _contextmanager
# remote user and group
env.remote_user = 'my_user'
env.remote_group = 'my_group'
# virualenv directory
env.virtualenv_directory = '/opt/p27'
env.virtualenv_activate = 'source
%(virtualenv_directory)s/bin/activate' % env
def create_virtualenv():
"""
Create the virtualenv
"""
env.sudo('mkdir -p
%(virtualenv_directory)s' % env)
env.sudo('chown
%(remote_user)s.%(remote_group)s
%(virtualenv_directory)s' % (env) )
env.run('virtualenv
%(virtualenv_directory)s' % env)
@_contextmanager
def virtualenv():
"""
Activate the virtualenv
"""
with
cd(env.virtualenv_directory):
with prefix(env.virtualenv_activate):
yield
def install_pip_dependencies():
"""
Install the pip
dependencies
"""
with virtualenv():
if
env.run_as == 'remote':
put('pip_requirements.txt',
'pip_requirements.txt')
env.run('pip install -r pip_requirements.txt')
- Django
- fabfile.py
def django_setup():
"""
Django: migrate,
createsuperuser, collectstatic
"""
with virtualenv():
with cd(env.project_dir):
env.run('python manage.py migrate')
env.run('python manage.py createsuperuser')
env.run('python manage.py collectstatic')
env.run('python manage.py loaddata
auth_initial')
def django_update():
"""
Django: migrate,
collectstatic
"""
with virtualenv():
with cd(env.project_dir):
env.run('python manage.py migrate')
env.run('python manage.py collectstatic')
- Nginx
- fabfile.py
def nginx_setup():
"""
Configure and start nginx
"""
env.sudo('chmod 755
/home/centos')
env.sudo('mkdir -p
/etc/uwsgi/vassals/')
if env.run_as == 'remote':
#
nginx
put('nginx-uwsgi/%(project_name)s_nginx.conf'
% env, '/etc/nginx/conf.d/', use_sudo=True)
#
remove default site from nginx.conf
put('nginx-uwsgi/nginx.conf', '/etc/nginx/',
use_sudo=True)
put('nginx-uwsgi/nginx.pp', '/etc/nginx/',
use_sudo=True)
#
uwsgi
put('nginx-uwsgi/uwsgi_params', '/etc/uwsgi/',
use_sudo=True)
put('nginx-uwsgi/emperor.ini', '/etc/uwsgi/',
use_sudo=True)
put('nginx-uwsgi/%(project_name)s_uwsgi.ini' %
env, '/etc/uwsgi/vassals/', use_sudo=True)
put('nginx-uwsgi/emperor.uwsgi.service',
'/etc/systemd/system/', use_sudo=True)
#
socket in /run/
(http://uwsgi-docs.readthedocs.org/en/latest/Systemd.html#putting-sockets-in-run)
#put('nginx-uwsgi/emperor.uwsgi.socket',
'/etc/systemd/system/', use_sudo=True)
#put('nginx-uwsgi/emperor.uwsgi.conf',
'/etc/tmpfiles.d/', use_sudo=True)
# custom selinux policy
module
(http://axilleas.me/en/blog/2013/selinux-policy-for-nginx-and-gitlab-unix-socket-in-fedora-19/)
env.sudo('semodule -i
/etc/nginx/nginx.pp')
# activate selinux
env.sudo('setenforce 1')
# enable and start nginx
env.sudo('systemctl enable
nginx.service')
env.sudo('systemctl restart
nginx.service')
# enable and start uwsgi
env.sudo('systemctl enable
emperor.uwsgi.service')
env.sudo('systemctl restart
emperor.uwsgi.service')
# configure firewall
#env.sudo('firewall-cmd
--permanent --zone=public --add-service=http')
#env.sudo('firewall-cmd
--permanent --zone=public
--add-service=https')
#env.sudo('firewall-cmd
--reload')
def nginx_restart():
"""
Restart nginx and wsgi
"""
# restart nginx
env.sudo('systemctl restart
nginx.service')
# restart uwsgi
env.sudo('systemctl restart
emperor.uwsgi.service')
- Database
- fabfile.py
# database
env.mysql_host = 'localhost'
env.mysql_database = 'mydatabase_db'
env.mysql_user = 'my_user'
env.mysql_password = 'my_password'
env.mysql_master_user = 'root'
def database_setup():
"""
Setup database service
"""
env.sudo("systemctl enable
mariadb.service")
env.sudo("systemctl start
mariadb.service")
env.sudo("mysql_secure_installation")
def database_create():
"""
Create the sql database
"""
env.run('echo "CREATE
DATABASE IF NOT EXISTS %(mysql_database)s; \
GRANT ALL ON %(mysql_database)s.* TO
\'%(mysql_user)s\'@\'%%\' IDENTIFIED BY
\'%(mysql_password)s\'; \
FLUSH PRIVILEGES;" | \
mysql -h %(mysql_host)s -u
%(mysql_master_user)s -p' % (env) )
def database_delete():
"""
Delete the sql database
"""
env.run('echo "DROP
DATABASE %(mysql_database)s;" | \
mysql -h %(mysql_host)s -u
%(mysql_master_user)s -p' % (env) )
- Git
- fabfile.py
def ssh_config():
"""
Add
fabuser_bitbucket_support to ~/.ssh/config
"""
text = """
Host fabuser-bitbucket
HostName
bitbucket.org
IdentityFile
~/.ssh/fabuser_bitbucket
"""
append('%s/config' %
env.ssh_dir, text )
env.run('chmod 600
%s/config' % env.ssh_dir)
def git_clone():
"""
Clone from git
"""
#git_user =
'francesc_pinyol_margalef'
with
settings(warn_only=True):
with settings(warn_only=True):
if
env.run("test -d %s" %
env.project_dir).failed:
env.run("git
clone
git@fabuser-bitbucket:%(bitbucket_account)s/%(project_name)s.git
%(project_dir)s" % (env) )
def git_pull():
"""
Pull from git
"""
with cd(env.project_dir):
env.run("git pull")
- Empaquetament
/ Packaging
- Multiple platforms
- How can I find the current OS
in Python? [duplicate]
import platform
platform.platform()
platform.system()
- Popen
- Kill a process
- Restart computer
- restart local computer from
python
- Exemple / Example:
import platform
def restart_computer():
operating_system =
platform.system()
if operating_system == 'Linux':
os.system('reboot now')
print
"rebooting system"
elif operating_system ==
'Windows':
import
win32api
win32api.InitiateSystemShutdown()
- Python path:
- see also bash
path
- /usr/local/lib/python2.7/dist-packages/...
- paths
import os
my_path = './dir1/dir2/toto.mp4'
# ./dir1/dir2/toto.mp4
my_dirname =
os.path.dirname(my_path) #
./dir1/dir2
#my_rel_dirname=${my_dirname#*\./} # dir1/dir2
my_basename = os.path.basename(my_path) #
toto.mp4
my_name = os.path.splitext(my_basename)[0]
# toto
my_extension = os.path.splitext(my_path)[1] #
.mp4
my_rel_path = os.path.relpath(my_path)
# dir1/dir2/toto.mp4
my_abs_path = os.path.abspath(my_path) #
/path/to/dir1/dir2/toto.mp4
- print path:
- set path:
- python
import sys
sys.path.append("/my/path")
- Django:
- /etc/apache2/mods-available/wsgi.conf
- recursively create directory if it does not exist:
import os
# create the directory if it does not exist
father_dir = os.path.dirname(filename)
if not os.path.exists(father_dir):
os.makedirs(father_dir)
print("creating directory:
{}".format(father_dir))
- get home dir:
from os.path import expanduser
home = expanduser("~")
- get absolute dir inside home:
from os.path import expanduser
my_dir = expanduser("~/my_dir")
- Python for MS Windows:
- HTML
parsing
- Logging
- Django
logging
- Logging
Cookbook
-
|
new |
old |
|
a = 'primer'
b = 'segon'
logger.debug('{} - {}', a, b) |
a = 'primer'
b = 'segon'
logger.debug('%s - %s', a, b) |
pylint |
[--logging-format-style=new] |
--logging-format-style=old |
Do not use: ...
- Exemple bàsic
import os
import logging
logging.basicConfig()
#logger = logging.getLogger(__name__)
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.DEBUG)
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
- Exemple / Example:
# logger
import logging
#logger = logging.getLogger('my_program')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s
%(levelname)-8s %(name)-12s %(message)s',
'%Y-%m-%dT%H:%M:%SZ')
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(ch)
a = 'primer'
b = 'segon'
logger.debug("my message with %s and %s", a, b)
- import
logging
logger = logging.getLogger('myprogram.py')
logger.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s
[%(name)-12s] %(message)s')
formatter.default_time_format = '%Y-%m-%dT%H:%M:%S'
formatter.default_msec_format = '%s.%03dZ'
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(ch)
logger.debug("my message")
- Nginx
- Problemes / Problems
'ascii' codec can't encode character ... in
position ...: ordinal not in range(128)
|
- Celery with Django
- Arquitectura / Architecture
- Setting
up a queue service: Django, RabbitMQ, Celery on AWS
- Task
Routing in Celery - How to route a Celery task to a
dedicated queue
- Dynamic
Task Routing in Celery
- Specify
Worker in Celery
- Elements
- broker:
- controls queues
- accepta les crides des d'un apply_async i les
distribueix cap a algun worker subscrit a aquella
cua (si n'hi ha); accepts calls from apply_async and
distributes them to subscribed workers (if any)
- si una cua de RabbitMQ encara conté tasques
per a assignar a algun worker, apareixeran amb:
rabbitmqctl -p my_vhost list_queues
- si ja estan assignades a algun worker, ja
no apareixeran a la cua
- RabbitMQ
sudo systemctl start
rabbitmq-server.service
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl -p my_vhost list_queues
- si la cua té missatges: encara hi ha jobs
pendents de ser assignats a algun worker
- si la cua és buida: tots els jobs han
estat assignats a algun worker (potser
encara s'està executant aquell job en el
worker)
- worker:
- executes an async function
- connects to broker specified as Celery.broker in
mydir/celery.py (-A mydir)
- Problems
- consumer: Cannot connect to amqp://...:
[Errno 13] Permission denied.
- Solution: check SELinux on instance
that is initiating the connection
- ask broker for the list of active nodes and tasks:
.../env/bin/python -m celery -b
amqp://... inspect active
- tasks that this worker is able to execute are
listed in:
- if they are inside a Django project:
- if they are not inside a Django project:
- ...
- client:
- calls an async function
- when called from a Django project
- connects to broker specified in settings.py
BROKER_URL
- tasks that this client is able to call are
listed in:
- ...
-
|
needed files |
|
|
broker |
|
worker |
mydir/celery.py
|
client |
|
- First
steps
with Celery
- install a message broker
(message transport)
- RabbitMQ
(AMQP)
- RabbitMQ
- Configuració / Setup (Django:
same as in settings.py BROKER_URL)
rabbitmqctl add_user myuser
mypassword
rabbitmqctl add_vhost myvhost
rabbitmqctl set_permissions -p myvhost
myuser ".*" ".*" ".*"
- check settings:
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl list_permissions -p myvhost
- Amazon
SQS
- install celery (dependencies automatically installed: pytz
billiard kombu anyjson amqp)
- check that celery is working
- command line
- with a configuration
file
- Problemes / Problems
socket.timeout: timed out
-
|
options
(also from a file toto.conf, referenced a --config
toto.conf)
|
|
command
|
|
examples
|
celery
|
-b
<broker> (broker_url
in a config
file) |
-A
<module> |
worker
(start a single worker) |
-Q,
--queues <queue1>,<queue2>
--hostname=<node_name>@<host>
(default:
celery@<value_returned_by_hostname_command>)
-E,
--task-events (needed if we are
intercepting events)
-c <number_processes> (default:
number of CPUs)
|
|
multi
(start several named workers)
start <node_name_1>
[<node_name_2>, ...]
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
restart <node_name>
[<node_name_2>, ...]
stop <node_name>
[<node_name_2>, ...]
stopwait <node_name>
[<node_name_2>, ...]
|
(usually called from
celery.service
with parameters set by variables defined in
/etc/sysconfig/myceleryconfig, parsed with EnvironmentFile=-/etc/sysconfig/myceleryconfig )
/path/to/python -m celery multi start
mynodename -E -Q myqueue -A myapp
--workdir=/path/to/myworking_dir --loglevel=DEBUG
--logfile="/var/log/celery/%N.log"
--pidfile="/var/run/celery/%N.pid"
will start number_process workers (hostname is also
called "worker name" in Flower):
/path/to/python -m celery worker -E -Q
myqueue -A myapp --loglevel=DEBUG
--logfile=/var/log/celery/mynodename.log
--pidfile=/var/run/celery/mynodename.pid
--hostname=mynodename@...
/path/to/python -m celery worker -E -Q
myqueue -A myapp --loglevel=DEBUG
--logfile=/var/log/celery/mynodename.log
--pidfile=/var/run/celery/mynodename.pid
--hostname=mynodename@...
- ...
These workers will connect to the broker specified
as Celery.broker in /path/to/myworkingdir/myapp/celery.py,
on queue myqueue:
- from
__future__ import absolute_import,
unicode_literals
from celery import Celery
app = Celery('myapp',
broker="amqp://myuser:mypassword@ip_address_of_rabbitmq_server:5672/myvhost",
backend='rpc://',
include=['myapp.tasks'])
if __name__ == '__main__':
app.start()
Available tasks are registered in
/path/to/myworkingdir/myapp/tasks.py
(and referenced as myapp.task.task1 in
Flower):
- @shared_task(queue='myqueue')
def task1(...):
...
@shared_task(queue='myqueue',
resultrepr_maxsize=4096, bind=True,
acks_late=True)
def task2(self, ...):
...
|
|
inspect
active
- scheduled
- reserved
- revoked
- registered
- stats
- query_task
<task_uuid>
|
--destination=celery@example.com
|
- connect to broker and get a list of active
tasks, for all workers:
celery -b
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
inspect active
|
|
control
enable_events
disable_events
rate_limit tasks.add 10/m
|
|
- terminate a task:
celery -d ... control terminate KILL
<task_id>
- celery
-b ... control revoke <task_id>
|
|
events
|
--dump |
|
|
status
|
|
- connect to broker and list active nodes:
celery -b
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
status
- Problems
- Error: No nodes replied within time
constraint.
|
|
... |
|
|
-
|
|
|
worker |
client |
|
celery.py
|
tasks.py
|
command line
|
service |
usage (Calling
Tasks)
|
|
|
|
|
|
res = add.delay(2,2)
res =
add.apply_async((2,2))
res =
add.s(2,2).apply_async()
|
|
|
tasks.py
from celery import Celery
app = Celery('tasks',
broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
|
celery -A tasks worker
--loglevel=info
|
|
from tasks import add
result
= add.delay(4, 4)
result.ready()
result.successful()
result.get()
result.failed()
res.state
res.id
|
|
Using Celery in your Application
proj/rabbitmq.txt
-
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
proj/celery.py
from __future__ import
absolute_import, unicode_literals
from celery import Celery
app = Celery(
'proj',
broker=open('rabbitmq.txt','r').read().strip(),
backend='rpc://',
include=['proj.tasks']
)
# Optional configuration, see the application user
guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
|
proj/__init__.py
proj/tasks.py
from __future__ import
absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
|
(from proj parent dir)
celery -A proj worker -l INFO
- -A
proj: will use proj.celery.app
- will contact broker specified by broker and join
the party (or another one specified with -b option)
- used queue is the default one ("celery"), in vhost
specified in broker. If not using the default, it
must be specified with
-Q myqueue and
must match the queue specified in @app.task(queue="myqueue")
- registered tasks:
- proj.tasks.add
- proj.tasks.mul
- proj.tasks.xsum
|
|
(from proj parent dir)
from proj.tasks import add
res = add.delay(4, 4)
res.get()
res = add.apply_async((4,4))
|
|
mydir/mymodule/rabbitmq.txt
-
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
mydir/mymodule/celery.py
from celery import Celery
app = Celery(
'mymodule',
broker= open('rabbitmq.txt','r').read().strip() ,
backend='rpc://',
include=['mymodule.tasks']
)
if __name__ == '__main__':
app.start()
|
mydir/mymodule/__init__.py
mydir/mymodule/tasks.py
from celery import
shared_task
@shared_task(queue='myqueue')
def add(x,y):
...
|
(from mydir):
celery -A mymodule worker -Q myqueue -l info -E
- will use mymodule/celery.py
- will contact broker specified in app =
Celery(broker=...) and join the party
- used queue (created if it does not exist yet) will
be the one specified in @shared_task, inside vhost
specified by broker in Celery(broker=...). It must
match the que specified in @shared_task
|
python -m celery multi start mycelery_nodes -Q
myqueue -A mymodule
- will use mymodule/celery.py
- will contact broker specified in app =
Celery(broker=...) and join the party
- used queue (created if it does not exist yet) will
be the one specified in @shared_task, inside vhost
specified by broker in Celery(broker=...)
|
from myproject.tasks
import add
result = add.delay(2,2)
result = add.apply_async((2,2))
|
Django
mysite/myproject/settings.py
# celery
BROKER_URL =
'amqp://myuser:mypassword@localhost/myvhost'
|
mysite/myproject/celery.py
from __future__ import
absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the
'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'myproject.settings')
app = Celery('myproject')
# Using a string here means the worker will not
have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# access to mysite/*/tasks.py
app.autodiscover_tasks(lambda:
settings.INSTALLED_APPS)
# access to mysite/myproject/tasks.py
app.autodiscover_tasks(lambda: ('myproject',))
@app.task(bind=True)
def debug_task(self):
print('Request:
{0!r}'.format(self.request))
|
mysite/myproject/__init__.py
from __future__ import
absolute_import
# This will make sure the app is always imported
when
# Django starts so that shared_task will use this
app.
from .celery import app as celery_app
mysite/myproject/tasks.py
from __future__ import
absolute_import
import logging
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def add(x, y):
logger.info("adding...")
return x + y
mysite/myapp/tasks.py
from __future__ import
absolute_import
import logging
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def mul(x, y):
logger.info("multiplying...")
return x * y
|
celery -A myproject
worker -l info
|
|
e.g.:
mysite/myapp/views.py
from myapp.tasks import
mul
result = mul.delay(3, 4)
|
-
broker instance |
worker instance |
|
client instance |
monitor instance |
|
|
systemctl start rabbitmq-server.service
- from any instance with installed celery and access
to broker:
source env/bin/activate
- celery
-b $(cat rabbitmq.txt) inspect active
|
celery -A mymodule worker -Q myqueue -l info
-E |
|
cd mydir
python client.py |
|
|
|
|
mydir/myapp/celery.py |
mydir/myapp/tasks.py |
mydir/client.py |
task.state
|
celery_events.py
|
task.info() |
|
|
|
|
|
import sys
from celery import Celery
def my_monitor(app):
with app.connection() as
connection:
recv =
app.events.Receiver(
connection,
handlers={
"task-received":
announce_received_tasks,
"task-started": announce_started_tasks,
"task-failed": announce_failed_tasks,
"task-succeeded": announce_succeeded_tasks,
"task-rejected":
announce_rejected_tasks,
"task-revoked": announce_revoked_tasks,
"task-retried": announce_retried_tasks,
"*": state.event,
}
) |
|
|
|
|
|
RECEIVED
|
def announce_received_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK RECEIVED: %s [%s] [%s]
%s" % (task.name, task.uuid, task.state,
task.info(),)) |
- args
- kwargs
- retries
- root_id
|
|
|
|
|
STARTED
|
def announce_started_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK STARTED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
- args
- kwargs
- retries
- root_id
|
|
|
@app.task
def will_fail(x, y):
return a+b
@app.task
def will_fail(x, y):
raise Exception("not working")
|
|
FAILURE
|
def announce_failed_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK FAILED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
- args
- kwargs
- retries
- root_id
- exception (from raisen exception)
- 'exception': 'NameError("name \'a\' is not
defined",)'
- 'exception': "Exception('not working',)"
|
|
|
|
|
SUCCEEDED
|
def announce_succeeded_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK SUCCEEDED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
- args
- kwargs
- retries
- root_id
- result (from returned value)
- runtime
|
|
|
|
|
|
def announce_rejected_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK REJECTED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
|
|
|
|
|
|
def announce_revoked_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK REVOKED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
|
|
|
|
|
|
def announce_retried_tasks(event): |
|
|
|
|
|
|
|
|
- Next steps
- Calling tasks
- ETA
and countdown
- Expiration
- les tasques van de l'apply_async cap al broker (es
pot veure com està la cua del broker amb:
rabbitmqctl
-p celery_vhost list_queues ) i el broker
les manté a la cua fins que les pot assignar a un
worker
- el broker no controla aquest temps d'expiració
- temps d'expiració: si un worker rep tard una tasca
(ja és massa tard per a executar-la i la rebutja amb
un REVOKED); no és el temps màxim que una tasca pot
està executant-se
- Retry
Policy
- reintents de connexió cap al broker
- ...
- Canvas:
designing workflows
- Signatures
- add.signature(2,2)
- add.s(2,2)
- Primitives
- group
- chain
- chord
- map
- starmap
- chunks
- Routing
- Remote
control
- Timezone
- Optimization
- User
Guide
- Progrés /
Progress
- Problemes / Problems
- /var/log/messages
... python: File
"...lib/python2.7/site-packages/amqp/transport.py", line
438, in _read
... python: s = recv(n - len(rbuf))
... python: socket.error: [Errno 104] Connection reset by
peer
- ...
|