Python
|
Índex
|
|
|
- Books
- Magazine
- PEP
- PEP 8 – Style Guide for Python Code
|
|
- Opcions / Options
- Normalment Python s'instal·la fent servir la gestió de
paquets de la vostra distribució / Usually installed from
package in your distribution
- Si us cal una versió específica de Python (per exemple per
a fer servir des de tox),
us la podeu baixar, compilar i instal·lar / If you need a
specific version of Python (e.g. to be use with tox), you can download, compile and
install it:
- Download
cd Python...
./configure
make
sudo make altinstall # do not overwrite already
installed version
- uninstall
sudo rm -f
/usr/local/bin/{python3*,pip3*,2to3*,pyvenv*,easy_install-3*,idle3*,pydoc3*}
sudo rm -f /usr/local/lib/libpython3.6m.a
sudo rm -rf /usr/local/lib/python3.6
sudo rm -rf
/usr/local/lib/pkgconfig/python-3.6*
sudo rm -rf
/usr/local/lib/pkgconfig/{python3.pc,python-3.6m.pc,python-3.6.pc}
- Gestors d'entorns
/ Environment managers
- Gestors d'entorns / Environment
managers
-
- pyenv
- baixarà, compilarà i instal·larà pyhon a
~/.pyenv/versions/
- Dependències / Dependencies
- Suggested
build environment
- Mageia 9
sudo dnf install git gcc make
lib64zlib-devel lib64bz2-devel
lib64ncurses-devel lib64readline-devel
lib64openssl-devel lib64sqlite3-devel
lib64ffi-devel lib64tk-devel
- Ubuntu
sudo apt install gcc make zlib1g-dev
libbz2-dev libncurses-dev libreadline-dev ...
libsqlite3-dev libffi-dev tk-dev ...
- Instal·lació / Installation
- pyenv-installer
curl https://pyenv.run | bash
echo '# Load pyenv automatically'
>>~/.bashrc
echo 'export PYENV_ROOT="$HOME/.pyenv"'
>>~/.bashrc
echo 'command -v pyenv >/dev/null || export
PATH="$PYENV_ROOT/bin:$PATH"'
>>~/.bashrc
echo 'eval "$(pyenv init -)"'
>>~/.bashrc
- ...
- Ús / Usage
-
|
get |
set |
stored in |
available versions |
pyenv versions |
pyenv install 3.10 |
|
used version |
pyenv version |
|
|
global |
pyenv
global |
pyenv global 3.10 |
|
local |
pyenv
local |
cd <path_to_my_project>
pyenv local 3.10 |
<path_to_my_project>/.python-version |
shell (local temporal) |
pyenv shell |
pyenv shell 3.10 |
PYENV_VERSION |
- list installed versions
pyenv versions
- global
- local
- shell
- virtualenv
pyenv virtualenv 3.8 myproject-3.8
cd myproject
pyenv local myproject-3.8
- Eclipse:
- no agafa automàticament el valor
especificat dins de
myproject/.python-version
- sinó que cal especificar l'interpreter:
~/.pyenv/versions/myproject-3.8/bin/python
- ...
- conda
- Instal·lació
- Ús / Usage
- Cheat
sheet
- crea un entorn
conda create -n my-environment
- els entorns s'instal·laran a:
- llista dels entorns
- activa un entorn
conda activate my-environment
- instal·la ... en l'entorn activat:
conda install -c conda-forge manim
- Exemples
- Configuració de PyDev i Eclipse:
- ...
|
|
- Porting
Python 2 Code to Python 3
- check good coverage of tests: use coverage.py
- Django: Integration
with coverage
pip install coverage
coverage run --source='.'
--omit='env/*,*/migrations/*' manage.py test
coverage report
- coverage
html
- firefox
htmlcov/index.html
- make your code compatible with python 2 and python 3: use
futurize (based on lib2to3 and use fixers from 2to3,
3to2, and python-modernize) or modernize
- pip install future
- Django
cd myproject
- stage 1
futurize --stage1 myproject/*.py
futurize --stage1 -w myproject/*.py
- for every app:
futurize --stage1 myapp/*.py
futurize --stage1 -w myapp/*.py
futurize --stage1 myapp/*/*.py
futurize --stage1 -w myapp/*/*.py
- run tests
- stage 2
- NOTE: if you already made the effort to
protect your urllib imports with six.PY2, you
may want futurize not to replace imports under
six.PY2:
futurize --stage2 --nofix=libfuturize.fixes.fix_future_standard_library
--nofix=libfuturize.fixes.fix_future_standard_library_urllib
myproject/*.py
futurize --stage2 myproject/*.py
futurize --stage2 -w myproject/*.py
- for every app:
futurize --stage2 myapp/*.py
futurize --stage2 -w myapp/*.py
futurize --stage2 myapp/*/*.py
futurize --stage2 -w myapp/*/*.py
- run tests
- ...
- clean your code: use pylint
- check whether dependent modules can be ported to python 3:
use caniusepython3
- test your code under several versions of python: use tox
- Supporting Python 3: An
in-depth guide
- Detection of python version
- Porting
Code to Python 3 with 2to3
- Cheat
Sheet: Writing Python 2-3 compatible code
-
Python 2 |
Python 3 |
print |
print() |
.iteritems() |
.items() |
class Meta |
from builtins import object
class Meta(object) |
|
|
string.find()
|
str.find()
|
....values()
|
list(....values()) |
....keys() |
list(....keys()) |
....items() |
list(....items()) |
string.letters |
string.ascii_letters |
open(my_binary_file) |
# to avoid: 'utf-8' codec can't decode byte 0x89
in position 0: invalid start byte
open(my_binary_file, 'rb') |
a.next() |
next(a) |
except Exception as e:
# deprecated
e.message |
except Exception as e:
str(e) |
super(MyClass, self) |
super(MyClass, self)
super() # preferred |
- Lambda functions
- How to Use
Python Lambda Functions (Real Python)
- funcions petites, anònimes (tot i qu ese li pot assignar
un nom)
- basades en lambda
calculus (Alonzo Church)
- Tipus de llenguatges de programació
- llenguatge funcional (càlcul lambda; no manté cap
estat)
- llenguatge imperatiu (màquina de Turing, basat en
estats)
- Python és un llenguatge imperatiu, però incorpora alguns
conceptes funcionals:
- map()
- filter()
- reduce()
- lambda
- Exemples:
funció |
lambda definition |
lambda usage |
def <function_name>
(<arguments>):
<body> |
lambda <bound_variable_1>,
<bound_variable_2>: <body> |
|
def identitat(x):
return x |
lambda x: x |
|
def add_one(x):
return x+1 |
lambda x: x+1
- add_one
= lambda x: x+1
|
|
|
# IIFE (Immediately
Invoked Function Expresssion)
(lambda x, y: x + y)(2, 3) |
|
high_ord_func = lambda x, func: x + func(x) |
high_ord_func(2, lambda x: x * x) |
- Anàlisi de funcions i lambdes:
- dis
(Disassembler for Python bytecode)
- Diferències entre funcions i lambdes
funció |
lambda |
l'excepció et diu el nom de la funció |
l'excepció només et diu <lambda> |
|
només pot contenir expressions; no statements
(return, pass, assert, raise) |
|
expressió única (però es pot posar en diverses
línies si es fan servir parèntesis) |
|
no suporta anotacions de tipus (type hinting) |
|
es pot invocar immediatament (IIFE)
des de l'intèrpret de Python |
- Decoradors
-
def
my_decorator(f):
def wrap(*args, **kwargs):
...
return wrap |
function |
lambda |
@my_decorator
def my_function(x)
return ... |
(my_decorator(lambda x: ...)(...)) |
- ...
- Context manager,
iterator
|
ús |
class |
function |
expresssion |
context
manager |
|
class-based context manager
class ...
def __enter__()
def __exit__()
|
function-based context manager
@contextmanager
def ...
yield ...
|
|
asynchronous
context manager |
|
class-based asynchronous
context manager
class ...
async def __aenter__()
# task
async def __aexit__()
# task
|
|
|
iterator |
|
iterator object
class ...
def __iter__()
#
if the class defines __next__,
#
then __iter__ can just return self
def __next__()
|
generator function
sub-coroutine
|
generator expression
|
coroutine |
|
|
coroutine
-
def ...
... = (yield
...)
|
|
asynchronous
iterator
|
|
asynchronous
iterator object
class ...
# see #6272
def __aiter__()
async def __anext__()
# is a task
...
await
...
...
raise
StopAsyncIteration
|
|
|
awaitable |
- inicia l'scheduler i hi posa la tasca principal
- afegeix una tasca a l'scheduler, però no
s'executarà fins que es cridi qualsevol await
- executa una tasca
- executa una tasca amb un timeout
- await
asyncio.wait_for(..., ...)
- from context manager
(__await__, a generator function, must return self)
with await ... as ...
async with ... as ...
|
awaitable
class:
- Python: __await__ must be a generator function
- CPython:
- task≠generator;
- CPython task: __await__ returns a generator
- micropython:
class ...
def __await__(self):
|
awaitable
object: it can be used in an await expression
- coroutine
- task
- defined with async def
- at least one await statement
async
def ...
await ...
- future
|
|
- Context manager
- context
manager: An object which controls the environment seen
in a with statement by defining __enter__() and __exit__()
methods. See PEP
343.
- Info
with statement
with
expression as target_var:
do_something(target_var)
- with
A() as a, B() as
b:
pass
- context manager protocol (Context
Manager Types):
-
bad approach |
try-finally approach |
with approach |
file = open("hello.txt", "w")
file.write("Hello, World!")
file.close() |
file = open("hello.txt", "w")
try:
file.write("Hello, World!")
finally:
file.close() |
open file:
with
open("hello.txt", mode="w") as file:
file.write("Hello, World!")
try:
with open("hello.txt",
mode="w") as file:
file.write("Hello, World!")
except Exception as e:
loggin.error(e)
|
|
|
open file with Path:
import pathlib
file_path = pathlib.Path("hello.txt")
with file_path.open(mode="w")
as file:
file.write("Hello, World!")
|
|
|
scan dirs:
import os
with os.scandir(".") as
entries:
for entry in entries:
print(entry.name, "->",
entry.stat().st_size, "bytes")
|
|
|
number of decimals:
from decimal import
Decimal, localcontext
with localcontext() as ctx:
ctx.prec = 42
Decimal("1") /
Decimal("42")
|
|
import threading
my_lock = threading.Lock()
my_lock.acquire()
try:
# do something
finally:
my_lock.release()
|
locks:
import threading
my_lock = threading.Lock()
with my_lock:
# do something
|
|
|
pytest:
import pytest
with pytest.raises(...):
...
|
|
|
unittest (Django) |
|
|
aiohttp:
async def check(url):
async with
aiohttp.ClientSession() as session:
async
with session.get(url) as response:
print(f"{url}: status ->
{response.status}")
html = await response.text()
print(f"{url}: type ->
{html[:17].strip()}")
|
- ...
- Iterable
type(my_func) |
definició i creació |
|
type(my_func(4))
type(my_var) |
access as a collections.Iterable (__iter__) |
access as a collections.Iterator (__next__)
(subclass of collections.Iterable) |
|
|
my_var = ()
my_var = []
my_var = {}
my_var = set()
my_var = frozenset()
my_var = ""
my_var = b""
my_var = bytearray()
my_var = range(0)
my_var = memoryview(b"") |
<class 'tuple'>
<class 'list'>
<class 'dict'>
<class 'set'>
<class 'frozenset'>
<class 'str'>
<class 'bytes'>
<class 'bytearray'>
<class 'range'>
<class 'memoryview'> |
- for
n in my_var:
print(n)
|
next(my_var)
TypeError: 'xxx' object is not an iterator
But we can create an iterator from it:
- my_iterator
= iter(my_var)
type(my_iterator) # <class 'xxx_iterator'>
next(my_iter)
...
|
type(my_func):
<class 'function'> |
function returning a list:
def my_func(stop):
result = []
for num in range(stop):
result
+= [num**2]
return result
my_var = my_func(4)
|
comprehensive list:
my_var = [num**2 for num
in range(4)]
|
<class 'list'> |
- #
it can be accessed infinite times
for n in my_var:
print(n)
|
next(my_var)
TypeError: 'list' object is not an iterator
But we can create an iterator from it:
my_iterator = iter(my_var)
type(my_iterator) # <class 'list_iterator'>
next(my_iterator) # 0
next(my_iterator) # 1
next(my_iterator) # 4
next(my_iterator) # 9
next(my_iterator) # StopIteration
|
type(my_func):
<class 'function'> |
generator (simple and powerful tool
for creating iterators) function
(contains yield as an statement):
def my_func(stop):
for num in range(stop):
yield
num**2 # return value and pause the execution
until next call
my_var = my_func(4)
En MicroPython
uasync, un generador i una tasca són
idèntics; en CPython són diferents. |
generator expression:
my_var = (num**2 for num
in range(4))
|
<class 'generator'>
(because of yield or implicit yield) |
- #
it can only be accessed once
for n in my_var:
print(n)
|
next(my_var) # 0
next(my_var) # 1
next(my_var) # 4
next(my_var) # 9
next(my_var) # StopIteration
|
|
generator as a coroutine
(contains yield as an expression, on the
right of an assignement; it can receive values sent to
it with .send() )
def my_func(stop):
num = 0
while num < stop:
received = (yield num**2) #
return or receive value and pause the execution
until next call
if
received:
# from: ...send(something)
num = received
else:
# from: next(...) or ...send(None)
num += 1
my_var = my_func(4)
En asyncio, una corutina
es defineix amb:
import asyncio
async def bar(t):
await
asyncio.sleep(t)
print('Done')
|
|
|
|
next(my_var) # 0
next(my_var) # 1
my_var.send(3)
# 9
next(my_var) # StopIteration
En asyncio:
# a coro is awaitable
await bar(1)
# a task is awaitable
task = asyncio.create_task( bar(5) )
await(task)
|
|
cooperative delegation to sub-coroutine
with yield from :
def my_sub_func(stop):
num = 0
while num < stop:
yield
num**2
num +=
1
def my_func(sub_func):
try:
yield
from sub_func
except GeneratorExit:
return
sub_func.close()
my_sub_var = my_sub_func(4)
my_var = my_func(my_sub_var)
|
|
|
|
next(my_var) # 0
next(my_var) # 1
next(my_var) # 4
next(my_var) # 9
next(my_var) # StopIteration
|
type(MyClass):
<class 'type'> |
iterator object (implements __iter__
and __next__):
class MyClass:
def __init__(self, stop):
self.x
= 0
self.stop = stop
def __iter__(self):
# if
the class defines __next__, then __iter__ can just
return self
return
self
def __next__(self):
if
self.x < self.stop:
result = self.x ** 2
self.x += 1
return result
else:
# Iterators must raise when done, else considered
broken
raise StopIteration
my_object = MyClass(4)
En asyncio, una awaitable
class es defineix:
|
|
type(my_object):
<class '__main__.MyClass'> |
- #
it can only be accessed once
for n in my_object:
print(n)
|
next(my_object) # 0
next(my_object) # 1
next(my_object) # 4
next(my_object) # 9
next(my_object) # StopIteration
En asyncio:
my_awaitable_object =
MyAwaitableClass()
result = await my_awaitable_object
# a typical firmware app
my_awaitable_object = MyAwaitableClass()
my_task = asyncio.create_task(
my_awaitable_object.foo()
)
await my_task
# a typical firmware app
my_awaitable_object = MyAwaitableClass()
await my_awaitable_object.run_forever()
|
- Iterador / Iterator
- 9.8
Iterators
- What
is the difference between iterators and generators? Some
examples for when you would use each case would be
helpful.
- What
does the "yield" keyword do in Python?
- In Python 3, you can delegate from one generator
to another in both directions with
yield
from .
- generator:
def
func(an_iterable):
for item in
an_iterable:
yield item
- can be written as:
def
func(an_iterable):
yield from
an_iterable
- Coroutines
- un iterator és una classe que té els mètodes:
__iter__ : fa que sigui un iterable (com una llista, un
diccionari, ...)
__next__: implementa l'iterator protocol
- un iterador s'esgota
- un generador
(funció amb
yield ) és una manera simple i
compacta de crear un iterador; el generador és un subtipus
d'iterador
- quan es crida un generator retorna la instància d'un
objecte generator
- Generador / Generator
- és una manera simple i compacta de crear un iterador; el generator és un
subtipus d'iterator
- una funció és un generador pel sol fet de tenir un
yield
(com a statement o bé com a expressió)
- 9.9
Generators
- “RuntimeError:
generator raised StopIteration” every time I try to
run app
- How
to Use Generators and yield in Python
- generador: fa servir
yield
en lloc de return
- després d'arribar al yield, s'atura, però no surt
de la funció
- dues maneres de crear un generador:
- generator function
def nums_squared():
for num in range(5):
yield num**2
nums_squared_generator =
nums_squared()
next(nums_squared_generator)
- generator expression
- assumed yield at the end of each inner
iteration
- like list comprehensions (but lists use
memory):
nums_squared_list_comprehension
= [num**2 for num in
range(5)]
nums_squared_generator_comprehension
= (num**2 for num in
range(5))
next( nums_squared_generator_comprehension )
- comparació de la mida
import sys
sys.getsizeof(nums_squared_list_comprehension)
sys.getsizeof(nums_squared_generator_comprehension)
- comparació del temps d'execució:
import
CProfile
cProfile.run('sum([i * 2 for i in
range(10000)])')
cProfile.run('sum((i * 2 for i in
range(10000)))')
- advanced generator methods
.send()
- per a crear una corutina:
una funció generadora a la qual se li poden
passar dades
- a partir de python 2.5, yield, a més de
ser un statement, es pot fer servir com a
expression (PEP-342); per exemple, a la part
dreta d'una assignació (
line = (yield) )
- Sending
None is the same as calling next
(PEP-0342)
- "before you can communicate with a
coroutine you must first call next() or
send(None) to advance its execution to
the first yield expression."
- A
Curious Course on Coroutines and
Concurrency
- un yield atura l'execució i la passa
al codi que ha cridat el generador; això
es pot fer servir per a tenir un sistema
multitasca dins de python
.throw()
.close()
- Creating data pipelines with generators
- Exemples
def fibonacci():
# 0, 1, 1, 2, 3, 5, 8, ...
previous_value = -1
present_value = 1
while True:
new_value = present_value + previous_value
yield new_value
previous_value = present_value
present_value = new_value
# next is called 5 times
f1 = fibonacci()
next(f1)
next(f1)
next(f1)
next(f1)
next(f1)
# next is called 5 times inside a for loop
f2 = fibonacci()
for _ in range(5):
next(f2)
# infinite loop (CTRl-C to stop)
for i in fibonacci():
print(i)
- ...
- Strings and bytes
- Cadena / String
- Unicode
- Text
Vs. Data Instead Of Unicode Vs. 8-bit
-
|
python 2 |
python 3 |
|
|
type |
creation
(not recommended) |
creation
(recommended)
(to be used by 2to3) |
|
type |
creation |
unencoded |
unicode strings |
<type 'unicode'> |
u'ànima' |
unicode(...) |
text (unicode) |
- <class 'str'>
(unmutable)
|
'ànima' |
encoded, binary |
8-bit strings |
<type 'str'> |
|
str('ànima') |
binary data |
|
b'...' |
- conversion in Python3
- from (row) -> to (column)
|
str |
bytes |
str |
|
- my_str.encode()
- bytes(my_str, encode='utf-8')
|
bytes |
- my_bytes.decode()
- str(my_bytes, encode='utf-8')
|
|
|
|
- The
Pyhton tutorial
- PEP
- Estil / Style
- Fitxer / File (shebang)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
- ...
- Python
(Raspberry
Pi)
- PyPI: the Python package
index
- Python
Documentation contents
- The
Python standard library
- Built-in Functions
- print
from __future__ import print_function
- format
- Format
Specification Mini-Language
- 4 decimals:
a = 123.45
print("{:.4f}".format(a))
- left-aligned text and right-aligned value:
title = "My title:"
value = 123.45
print("{:<20}{:>20.4f}".format(title,
value))
- ...
- type
if type(myvar) is int
...
- isinstance
import six
if isinstance(myvar, (basestring))
...
- Dades / Data
- ...
type |
create |
create element |
set element |
check for key |
retrieve element |
get index |
remove element |
join |
all elements |
Object |
class MyModel:
myfield1 = 'myvalue1'
myobject = MyModel() |
|
myobject.myfield1 = 'mynewvalue' |
hasattr(myobject, 'myfield1') |
- myobject.myfield1
- getattr(myobject, 'myfield1',
'mydefaultvalue')
|
|
|
|
|
dict |
- mydict = {}
- mydict = {'mykey1':'myvalue1',
'mykey2':'myvalue2'}
|
mydict['mykey3'] = 'myvalue3' |
mydict['mykey1'] = 'mynewvalue1' |
'mykey1' in mydict |
- mydict['mykey1']
- mydict.get('mykey1','mydefaultvalue')
|
|
|
mydict.update(myotherdict) |
for k, v in list(mydict.items()): |
list |
- mylist = []
- mylist = ['myvalue1','myvalue2']
|
mylist.append('myvalue3') |
|
'myvalue1' in mylist |
|
mylist.index('mykey1') |
- mylist.pop(0)
- try:
mylist.remove("myvalue1")
|
mylist + myotherlist |
for i in mylist: |
tuple |
- mytuple = ()
- mytuple = ('myvalue1','myvalue2',)
|
mytuple += ('myvalue3',) |
|
'myvalue1' in mytuple |
|
mylist.index('mykey1') |
|
mytuple + myothertuple |
|
set |
|
myset.add("myvalue3") |
|
|
|
|
myset.remove("myvalue1") |
|
|
- Loops
- compact
["{}-{}".format(a,b) for a,b in
...]
- Create a dict from a list
- Python
: How to convert a list to dictionary ?
>>> mylist =
[{'first_key':'value_f1',
'second_key':'value_s1'},
{'first_key':'value_f2',
'second_key':'value_s2'}]
>>> a =
{b['first_key']:b['second_key'] for b in
mylist}
>>> a
{'value_f1': 'value_s1', 'value_f2':
'value_s2'}
- Search/filter from a list of dicts:
- Python
select from a list + Examples
a =
[{'name':'myname1','address':'myaddress11'},
{'name':'myname1','address':'myaddress12'},
{'name':'myname2','address':'myaddress21'},]
elements_from_myname1 = [c for c in a if
c['name']=='myname1']
addresses_from_myname1 = [c['address'] for
c in a if c['name']=='myname1']
- Flatten
- Compare lists, dicts ...
- ...
- Built-in Constants
- Built-in Types
- 5.6.
Sequence
Types — str, unicode, list, tuple, bytearray,
buffer, xrange
- Cadena / String
- Distància entre dues cadenes / Distance
between two strings
- Format
- columnes amb amplades fixes
(my_integer_value tindrà una coma com a
separador de milers)
print(
first_string.ljust(50)
+ second_string.center(20)
+ third_string.rjust(30)
+
str("{:,}".format(my_integer_value)).rjust(25)
)
- ...
- Llistes / Lists
- Dades /
Data
- Sort
- Unique (remove repeated elements)
my_list_without_repeated_elements
=
list(set(my_list_with_repeated_elements))
- when elements are dicts:
- Convert string to list
- Search
- Current, next
- Intersection of two lists
intersection_list =
list(set(first_list) &
set(second_list))
- Subtraction of two lists
subtraction_list =
list(set(first_list) -
set(second_list))
non_common_list =
list(set(first_list) -
set(second_list)) + list(set(second_list)
- set(first_list))
- 5.8
Mapping Types - dict
- items() vs iteritems()
- Pretty print (not recursive)
import pprint
pp = pprint.PrettyPrinter(width=1)
pp.pprint(my_dict)
- json
(recursive)
-
|
python
object (list
or dict)
|
json
string
|
json
file
|
python
object ->
|
-
|
obj_json
= json.dumps(obj, indent=4) |
with
open("toto.json", "w") as f:
json.dump(obj,
f, indent=4) |
json
string ->
|
obj
= json.loads(obj_json) |
-
|
-
|
json
file ->
|
with
open("toto.json", "r") as f:
obj =
json.load(f) |
-
|
-
|
import json
print json.dumps(my_dict, indent=2)
- Problemes
- TypeError: Object of type datetime
is not JSON serializable
- Solució
- How
to overcome
"datetime.datetime not
JSON serializable"?
import
datetime
class
DateTimeJSONEncoder(json.JSONEncoder):
#
https://stackoverflow.com/questions/11875770/how-to-overcome-datetime-datetime-not-json-serializable
def
default(self, o):
if isinstance(o,
datetime.datetime):
return o.isoformat()
return
json.JSONEncoder.default(self,
o)
json.dumps(my_object_with_datetimes,
cls=DateTimeJSONEncoder)
- yaml
import yaml
print yaml.dump(my_dict, indent=2)
- Built-in Exceptions
- Text Processing Services
- Expressions
regulars
/ Regular expressions
- 7.2.
re —
Regular expression operations
- URL
parsing
- Exemple / Example
import re
import dateutil.parser
regex = re.compile(r'.*_(.*)')
# get datetime from the following
string:
name = 'toto_2016-10-25T133700Z'
m = regex.search(name)
if m:
extracted_datetime =
dateutil.parser.parse( m.groups()[0] )
- Remove elements from a list according to a
regular expression (files: *.min.js, *.min.css)
import re
list_with_min_js = ['primer.js',
'segon.min.js', 'tercer.js']
regex =
re.compile(r'.*.min.js|.*.min.css')
list_without_min_js = [i for i in
list_with_min_js if not regex.search(i)]
# returns: ['primer.js', 'tercer.js']
- Substrings (trim)
- ...
- Binary Data Services
- Data Types
- Data
/ Date
- 8.1 datetime
- 8.1.7
strftime()
and strptime() Behavior
- now in UTC and ISO-8601 format
import datetime
now = datetime.datetime.utcnow()
now.isoformat() #
2017-06-14T09:57:56.145575
now.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
# 2017-06-14T09:57:56.145575Z
- build a datetime from string
import datetime
mydate =
datetime.datetime.strptime('20201201T100102',
'%Y%m%dT%H%M%S')
- now in naive and aware:
>>>
datetime.datetime.utcnow()
datetime.datetime(2023, 5, 29, 17,
55, 5, 890540)
>>>
datetime.datetime.now(datetime.timezone.utc)
datetime.datetime(2023, 5, 29,
17, 55, 6, 615502,
tzinfo=datetime.timezone.utc)
>>>
datetime.datetime.now().astimezone()
datetime.datetime(2023, 5, 30, 20,
45, 25, 701462,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200),
'CEST'))
>>>
datetime.datetime.now().astimezone(datetime.timezone(datetime.timedelta(seconds=7200),
'CEST'))
datetime.datetime(2023, 5, 30, 20,
50, 5, 125038,
tzinfo=datetime.timezone(datetime.timedelta(seconds=7200),
'CEST'))
>>>
datetime.datetime.now()
datetime.datetime(2023, 5, 29, 19,
55, 7, 561511)
- (using pytz: DEPRECATED; migration
guide, pytz-deprecation-shim)
convert from naive to aware:
import
datetime
import pytz
mydate_winter_naive =
datetime.datetime.strptime('2020-02-01T00:00:00',
'%Y-%m-%dT%H:%M:%S')
mydate_winter_naive.isoformat() #
'2020-02-01T00:00:00'
mydate_summer_naive =
datetime.datetime.strptime('2020-08-01T00:00:00',
'%Y-%m-%dT%H:%M:%S')
mydate_summer_naive.isoformat()
#
# convert datetime from naive to
aware (utc):
utc = pytz.utc
mydate_winter_aware_utc =
utc.localize(mydate_winter_naive)
mydate_winter_aware_utc.isoformat()
# '2020-02-01T00:00:00+00:00'
mydate_summer_aware_utc
=
utc.localize(mydate_summer_naive)
mydate_summer_aware_utc.isoformat()
# '2020-08-01T00:00:00+00:00'
# convert datetime from naive to
aware (Europe/Andorra)
mydate_winter_aware_andorra =
mydate_winter_naive.astimezone(pytz.timezone('Europe/Andorra'))
mydate_winter_aware_andorra.isoformat()
# '2020-02-01T00:00:00+01:00'
mydate_summer_aware_andorra =
mydate_summer_naive.astimezone(pytz.timezone('Europe/Andorra'))
mydate_summer_aware_andorra.isoformat()
# '2020-08-01T00:00:00+02:00'
- convert from ISO-8601 string to
datetime
import dateutil.parser
yourdate =
dateutil.parser.parse(datestring)
my_time = strptime(
my_string, '...')
- convert seconds to HH:MM:SS
- without second fractions
import
time
time_in_hh_mm_ss =
time.strftime('%H:%M:%S',
time.gmtime(time_in_seconds_int))
- with second fractions
#
similar to timedelta.__str__
def _to_hh_mm_ss_ms(seconds):
mm, ss =
divmod(seconds, 60)
hh, mm =
divmod(mm, 60)
fraction_seconds = seconds -
int(seconds)
s =
"{:d}:{:02d}:{:02d}.{:03d}".format(hh,
mm, ss, fraction_seconds)
return s
import
datetime
# no fractional part if it
is 0
time_in_hh_mm_ss
= str(datetime.timedelta(seconds=duration_seconds_float))
import
datetime
import pytz
time_in_hh_mm_ss_ff =
datetime.datetime.fromtimestamp(time_in_seconds_float,
pytz.UTC).strftime('%H:%M:%S.%f')
- convert HH:MM:SS to seconds
- ...
- Fusos horaris / Timezones
- Exemple
import pytz
timezone = 'Europe/Andorra'
my_date.astimezone(pytz.timezone(timezone)).strftime('%H:%M:%S
%Z')
- i18n
- 15.3
time - Time access and conversions
- minimum and maximum datetime (infinite)
unaware_maximum =
datetime.datetime.max
import pytz
aware_maximum =
datetime.datetime.max.replace(tzinfo=pytz.UTC)
- other libraries
- add datetime and time
- event.start +
datetime.timedelta(hours=event.duration.hour,minutes=event.duration.minute,seconds=event.duration.second)
- difference between two times
- Date
in Django
- Periods
- Numeric and Mathematical Modules
- Random
- 9.6
random
- Contrasenya aleatòria / Random password
- Generate
password in python
import random
import string
chars = string.letters + string.digits
length = 20
generated_password = ''.join(map(lambda
x: random.choice(chars),
range(length)))
- Functional Programming Modules
- File and Directory Access
- Data Persistence
- Data Compression and Archiving
- File Formats
- CSV
- Fulls
de càlcul
- 13.1. csv — CSV File Reading and Writing (2.7)
(3.7)
- csv from file
import csv
with open(csv_path, newline='') as f:
reader = csv.reader(f,
delimiter=';')
for row in reader:
row_length = len(row)
print(', '.join(row))
#print('{} -- {}'.format(row_length,
row[10:20]))
import csv
with open(csv_path, newline='',
encoding='Windows-1252') as f:
reader = csv.reader(f,
delimiter=';')
for row in reader:
row_length = len(row)
print('{} -- {}'.format(row_length,
row[10:20]))
- csv from string
- csv from InMemoryUploadedFile:
- csv from http response (e.g. from APITestCase)
import six
if six.PY2:
csv_string =
res.content
else:
csv_string =
str(res.content, 'utf-8')
csv_lines = csv_string.splitlines()
reader = csv.reader(csv_lines)
parsed_csv = list(reader)
number_csv_lines = len(parsed_csv)
- Cryptographic
Services
- Generic Operating System Services
- Fitxers / Files
- to open files using url, see urlopen
- io
- better option than old bultin open (?)
- read a file:
import io
with io.open(src, 'r',
encoding='utf-8') as f_src:
for line in f_src:
...
- temporal / temporary
- mkdtemp
- ...
- directory temporal / temporary dir
import
tempfile
tmpdir = tempfile.mkdtemp()
- fitxer temporal / temporary file
- Temporary...
- Concurrent Execution
- Processos / Processes
- 17.1 subprocess (2)
(3)
subprocess.run
(>=3.5)
- Older high-level API
-
old
|
new
|
subprocess.call(...) |
subprocess.run(...) |
subprocess.check_call(...) |
subprocess. run(...,
check=True)
|
subprocess.check_output(...) |
subprocess. run(...,
check=True,
stdout=subprocess.PIPE).stdout
value_str =
subprocess. run(...,
capture_output=True).stdout.decode().rstrip()
|
- Manage errors:
result = subprocess.run(...) |
when it fails,
an exception is thrown |
error can be accessed from |
capture_output=True |
no |
result.stdout
result.stderr |
stdout=subprocess.PIPE
stderr=subprocess.PIPE |
check=True |
yes |
thrown exception |
- Exemples / Examples
- compatible Python 2 / 3:
import shlex
if six.PY2:
# python 2
#
https://stackoverflow.com/questions/14218992/shlex-split-still-not-supporting-unicode#answer-14219159
args = map(lambda
s: s.decode('utf-8'),
shlex.split(complete_command.encode('utf-8')))
result_stdout =
subprocess.check_output(args,
stderr=subprocess.STDOUT)
result_stderr = ''
else:
# python 3
args =
shlex.split(complete_command)
completed_process =
subprocess.run(args, check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
result_stdout =
completed_process.stdout
result_stderr =
completed_process.stderr
- standard
import shlex,
subprocess
command = '...'
parameters = '...'
command_line = "{0}
{1}".format(command, parameters) args
= shlex.split(command_line)
try:
completed_process =
subprocess.run(args, check=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
print(completed_process.stdout.decode('utf-8'))
print(completed_process.stderr.decode('utf-8'))
except Exception as e:
print("ERROR:
{0}".format(e))
if e.stdout:
print("ERROR stdout:
{0}".format(e.stdout.decode('utf-8')))
if e.stderr:
print("ERROR stderr:
{0}".format(e.stderr.decode('utf-8')))
- stdout, stderr to a file:
import shlex,
subprocess
command = '/usr/bin/ffmpeg'
parameters = '-y -i sintel.mp4 -vf
"scale=1280:-1" -c:a copy -c:v h264 -f
flv /tmp/toto.flv'
command_line = "{0}
{1}".format(command, parameters)
args = shlex.split(command_line)
with open("toto.log", "w") as f:
try:
completed_process =
subprocess.run(args, check=True,
stdout=f, stderr=f)
except Exception as
e:
print("ERROR: {0}".format(e))
- shell pipes
- Problemes / Problems
- Unicode was not supported by shlex split
[Errno 12] Cannot allocate memory
- Scheduling
- Info
- sched
(Python)
- schedule
- APScheduler
- Migració
- Exemples
- basic
import
time
from
apscheduler.schedulers.background
import BackgroundScheduler
def do_something(**kwargs):
print(kwargs)
# create scheduler
scheduler = BackgroundScheduler()
# add job
scheduler.add_job(
# action with
kwargs
do_something,
kwargs=do_something_kwargs,
# date trigger
trigger="date",
run_date=now_datetime_utc,
# run every
job, even if it is too late
misfire_grace_time=None,
)
# start scheduler
scheduler.start()
# do not exit (should we use
BlockingScheduler?)
while True:
...
- comunicació
- RPC
- examples / rpc
/ (<=3.9.0)
- problemes
(APScheduler==3.10.1, rpyc==5.3.1)
TypeError: tzinfo
argument must be None or
of a tzinfo subclass, not
type
'backports.zoneinfo.ZoneInfo'
- Meet
an error when
reschedule the job via
RPyC #287
- Solució / Solution
- received kwargs
must be deep copied
before passing it to
add_job:
- server.py
import
copy
import rpyc
from
rpyc.utils.server
import
ThreadedServer
from
apscheduler.schedulers.background
import
BackgroundScheduler
def
print_text(*args,
**kwargs):
print("[print_text]
args: {},
kwargs:
{}".format(args,
kwargs))
class
SchedulerService(rpyc.Service):
def
exposed_add_job(self,
func, *args,
**kwargs):
# a copy of
the datetime
inside dict
must be done,
to avoid
errors when
astimezone is
called from it
(apscheduler/schedulers/base.py)
kwargs_copy
=
copy.deepcopy(kwargs)
return
scheduler.add_job(func,
*args, **kwargs_copy)
if __name__ ==
'__main__':
scheduler =
BackgroundScheduler()
scheduler.start()
#
allow_pickle
is needed by
deepcopy
protocol_config
=
{"allow_public_attrs":
True, "allow_pickle":
True}
server =
ThreadedServer(SchedulerService,
port=12345,
protocol_config=protocol_config)
try:
server.start()
except
(KeyboardInterrupt,
SystemExit):
pass
finally:
scheduler.shutdown()
- client.py
import
rpyc
import
datetime
run_date_aware
=
datetime.datetime.now(datetime.timezone.utc)
config = {
"allow_public_attrs":
True,
"allow_pickle":
True,
}
conn =
rpyc.connect("localhost",
12345,
config=config)
#job =
conn.root.add_job('server:print_text',
'interval',
args=['Hello,
World'],
seconds=2)
job =
conn.root.add_job(
"server:print_text",
trigger="date",
run_date=run_date_aware,
# args, kwargs
for func:
args=["Hello,
World"],
kwargs={"test":
True},
)
sleep(10)
conn.root.remove_job(job.id)
- ...
- threading
- Networking
and Interprocess Communication
- Xarxa / Network
- Asíncron / Asynchronous
- Context
manager, iterator
- cooperative
multitasking (per oposició a ... / as
opposed to preemptive multitasking in threading)
- process or task voluntarily yields
control to the processor:
await
- quan són adequats i quan no:
- async no cal:
- quan una cosa necessita tota la cpu
(fent un entrepà)
- async cal:
- quan un procés està esperant que acabi
una crida (escalfant la pizza al forn,
cerca a la base de dades, esperant que
arribi una notificació, websockets);
mentrestant, el processador pot anar
fent altres coses
- What
Is Async, How Does It Work, and When Should I
Use It?|A. Jesse Jiryu Davis|PyCon APAC 2014
(yt)
- subs (els clients trien l'entrepà i el
treballador el fa al moment), pizza al tall
(els clients trien el tall i el treballador
l'escalfa), omakase (els clients reben el
plat del dia, sense haver-lo de demanar)
- http://kegel.com/c10k.html
- threads, greenlets, coroutines, callbacks
- what is async?:
- single-threaded
- I/O concurrency
- Non-blocking sockets
- internally: epoll (Linux) / kqueue
(Mac, BSD)
- event loop
- async frameworks
- asyncio
- Documentació / Documentation
- High-level APIs
- Runners
- Coroutines and Tasks
- ...
- Low-level APIs
- Event Loop
- Futures
- Transports and Protocols
- Transports
- Protocols
- Examples
- ...
- Info
-
- la definició de les corutines va precedida d'
async ,
i hauria de contenir almenys una crida amb await
(encara que sigui await asyncio.sleep(0) )
perquè el processador pugui fer altres coses:
- si volem que la crida a una funció esperi que
s'acabi abans de passar a la línia següent, cal
posar
await (si no, s'executarà la
següent línia sense esperar el resultat); el
procés fa un yield i així mentrestant el
processador podrà executar les altres tasques
que hi ha a la cua de l'scheduler:
result = await
my_function()
- await
asyncio.sleep(2)
- per a iniciar l'scheduler cal cridar
asyncio.run() (normalment es fa servir per a
cridar el main, però pot cridar qualsevol altra
corutina) i posar-hi main com a tasca:
async def main():
...
if __name__ == "__main__"
asyncio.run(
main() )
- per a executar diverses crides a funcions
alhora:
await asyncio.gather(my_function(...),
my_function(...))
list_of_functions
= []
for i in range(10):
list_of_functions.append( my_function(i) )
await asyncio.gahter(*list_of_functions)
- afegeix una corutina a l'scheduler; permet
encapsular crides com a tasques, per a poder-les
cancel·lar; no s'executen fins que no hi ha
algun await:
my_task = asyncio.create_task(
my_function(...) )
result = await my_task
task.cancel()
- task.cancelled()
- task.done()
- task.result()
- micropython-async
tutorial
- 2 ...
- 2.2 Coroutines and tasks
- una corutina
es defineix amb async def i conté
algun await
- 2.2.1 Queuing a task for
scheduling
asyncio.create_task
Arg: the coro to run. The
scheduler converts the coro to a
Task and queues the task to run
ASAP. Return value: the Task
instance. It returns
immediately. The coro arg is
specified with function call
syntax with any required
arguments passed.
asyncio.run
Arg: the coro to run.
Return value: any value returned
by the passed coro. The
scheduler queues the passed coro
to run ASAP. The coro arg is
specified with function call
syntax with any required
arguments passed. In the current
version the run call returns
when the task terminates.
However, under CPython, the run
call does not terminate.
await Arg:
the task or coro to run. If a
coro is passed it must be
specified with function call
syntax. Starts the task ASAP.
The awaiting task blocks until
the awaited one has run to
completion. As described in
section 2.2, it is possible to
await a task which has already
been started. In this instance,
the await is on the task object
(function call syntax is not
used).
- 2.2.4
A typical firmware app
import
uasyncio as asyncio
from my_app import MyClass
def set_global_exception():
def
handle_exception(loop, context):
import sys
sys.print_exception(context["exception"])
sys.exit()
loop =
asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
async def main():
set_global_exception() #
Debug aid
my_class =
MyClass() # Constructor
might create tasks
asyncio.create_task(my_class.foo())
# Or you might do this
await my_class.run_forever()
# Non-terminating method
try:
asyncio.run(main())
finally:
asyncio.new_event_loop()
# Clear retained state
- 3 Synchronization
- 4
Designing classes for asyncio
- 4.1 Awaitable
classes
- 4.2 Asynchronous interators
- 4.3 Asynchronous context managers
- ...
- 5
Exceptions timeouts and cancellation
- 5.1 Exceptions
- les tasques creades amb
create_task contenen les excepcions
- si es crida directament await, es
propaga l'excepció
- si a la tasca principal
(asyncio.run(...)) li arriba una
excepció, peta
- 5.2 Cancellation and Timeouts
- per a cancel·lar una tasca, cal
executar foo_task.cancel()
- dins de la tasca, es pot capturar
una cancel·lació:
async
def foo():
try:
...
except asyncio.CancelledError:
...
finally:
...
- 5.2.2 Tasks with timeouts
async
def foo():
try:
await asyncio.wait_for(
forever(), 3 )
except asyncio.TimeoutError:
# mandatory
...
- 6
Interfacing hardware
- ...
- 8
Notes fo beginners
- ...
- Internet Data Handling
JSON
- mime types
- Codificació
de text / Text coding
- 18.7
mimetypes
- used by aws
s3 to determine content-type of aws s3
object when uploading it
- Dependencies
- CentOS 8
sudo dnf install mailcap
- will install /etc/mime.types
- Ús / Usage
import mimetypes guessed_mime_type
= mimetypes.guess_type('myfile') mimetypes.knownfiles
['/etc/mime.types',
'/etc/httpd/mime.types',
'/etc/httpd/conf/mime.types', ...]
- magicfile
- Dependències / dependencies
- Mageia
urpmi libpython-devel
libmagic-devel
- CentOS
sudo yum install file-devel
- Instal·lació / Installation
- Ús / Usage
import magicfile as magic
mime_type =
magic.from_file("testdata/test.pdf",
mime=True)
import magicfile as magic
f =
magic.Magic(magic_file='/usr/share/misc/magic',
mime=True) mime_type =
f.from_file("testdata/test.pdf")
- base64
- Structured Markup Processing Tools
- XML
- 19
Structured Markup Processing Tools
- xml.etree.ElementTree
–
XML Manipulation API
- defusedxml
- xmlsec
- Requirements
pip uninstall pycrypto
pip install --force pycryptodome
- Mageia 9
- la versió 1.2.37-1.1.mga9
no funciona. Apareix l'error:
undefined symbol:
xmlSecCryptoAppKeysMngrCertLoad
- cal forçar la versió anterior
1.2.37-1.mga9:
dnf --showduplicates
list lib64xmlsec1_1
dnf downgrade
lib64xmlsec1_1-1.2.37-1.mga9
- ...
- lxml
- lxml
API
- Write
xml file using lxml library in Python
- lxml
question -- creating an etree.Element
attribute with ':' in the name
- Python:
adding namespaces in lxml
- Example to create a document:
from lxml
import etree
# root
arrel = etree.Element('root')
# subelements
# print string
print( etree.tostring(arrel,
encoding='utf-8',
xml_declaration=True,
pretty_print=True).decode('utf-8') )
from lxml
import etree as ET
# root
NS =
"http://www.w3.org/2001/XMLSchema-instance"
location_attribute =
'{%s}noNameSpaceSchemaLocation' % NS
appearancetable =
ET.Element('appearancetable',
attrib={location_attribute:
"http://jmri.org/xml/schema/appearancetable.xsd"})
tree = ET.ElementTree(appearancetable)
# subelements
father =
ET.SubElement(appearancetable,
'father')
father.set('myattribute', 'myvalue')
father.text = 'text inside element'
# write to file
tree.write(xml_appearance_path,
pretty_print=True,
xml_declaration=True,
encoding="utf-8")
- Internet
Protocols and Support
-
|
server |
client |
|
sync |
asyncio |
sync |
asyncio |
HTTP |
|
|
- requests
response =
requests.get('http://python.org')
print(response.text)
- with
requests.Session() as session:
response =
session.get('http://python.org')
print(response.text)
|
- aiohttp
async with
aiohttp.ClientSession() as
session:
async with
session.get('http://python.org')
as response:
print(await response.text())
- import
aiohttp
import asyncio
async def main():
async with
aiohttp.ClientSession() as
session:
async with
session.get('http://python.org')
as response:
html = await response.text()
print(html)
asyncio.run(main())
- import
aiohttp
import asyncio
async def fetch(session, url):
async with
session.get(url) as response:
return await response.text()
async def main():
async with
aiohttp.ClientSession() as
session:
html = await fetch(session,
'http://python.org')
print(html)
asyncio.run(main())
|
WebSocket |
|
- websockets
- import
asyncio
import websockets
async def hello(websocket):
name = await
websocket.recv()
print(f"<<< {name}")
greeting =
f"Hello {name}!"
await
websocket.send(greeting)
print(f">>> {greeting}")
async def main():
async with
websockets.serve(hello,
"localhost", 8765):
await asyncio.Future() # run
forever
if __name__ == "__main__":
asyncio.run(main())
|
- websockets
from
websockets.sync.client import
connect
def hello():
with
connect("ws://localhost:8765") as
websocket:
websocket.send("Hello world!")
message = websocket.recv()
print(f"Received: {message}")
hello()
|
- websockets
import
asyncio
import websockets
async def hello():
uri =
"ws://localhost:8765"
async with
websockets.connect(uri) as
websocket:
name = input("What's your name? ")
await websocket.send(name)
print(f">>> {name}")
greeting = await websocket.recv()
print(f"<<< {greeting}")
if __name__ == "__main__":
asyncio.run(hello())
- #
How
to run forever async websocket
client?
import asyncio
import websockets
async def hello():
uri =
"ws://localhost:5678"
async with
websockets.connect(uri) as
websocket:
while True:
greeting = await websocket.recv()
print(f"<<< {greeting}")
if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(hello())
- ...
|
- HTTP
- URL
- Parsing
URLs with regular expressions
- 20.5
urllib
-
python2 |
python3 |
urllib
- urlopen()
- urlretrieve()
- ...
|
urllib2
- urlopen()
- install_opener()
- build_opener()
- ...
|
urllib.request
|
|
|
urllib.error
- exception URLError
- exception HTTPError
- exception ContentTooShortError
|
urlparse
- urlparse()
- parse_qs()
- parse_qsl()
- urlunparse()
- urlsplit()
- urlunsplit()
- urljoin()
- urldefrag()
urllib
- quote()
- quote_plus()
- unquote()
- unquote_plus()
- urlencode()
- pathname2url()
- url2pathname()
- getproxies()
|
|
urllib.parse
- urlparse()
- parse_qs()
- parse_qsl()
- urlunparse()
- urlsplit()
- urlunsplit()
- urljoin()
- urldefrag()
- unwrap()
- ...
- quote()
- quote_plus()
- quote_from_bytes()
- unquote()
- unquote_plus()
- unquote_to_bytes()
- urlencode()
|
|
|
urllib.robotparser |
- compatible python 2/3 import
import six
if six.PY2:
from urlparse
import urlparse, urlunparse
from urllib import
unquote, urlencode
else:
from urllib.parse
import urlparse, urlunparse,
unquote, urlencode
o = urlparse(...)
... unquote(...)
# urlunparse( (scheme, netloc, path,
params, query, fragment) )
... urlunparse( (o.
scheme,
o. netloc,
o. path,
o. params,
o. query,
o. fragment )
)
... urlencode(...)
- urlparse()
o =
urlparse('http://www.cwi.nl:80/%7Eguido/Python.html')
o
o =
urlparse('s3://bucket/path/to/file')
o
- urlopen() (mock)
import six
if six.PY2:
from urllib2 import
urlopen
# to avoid
error addinfourl instance has no
attribute '__exit__',
# on Python 2 you
cannot use urlopen inside a with
filehandle_src =
urlopen(url)
for line in
filehandle_src:
print(line.decode('utf-8'))
...
else:
from urllib.request
import urlopen
with urlopen(url)
as filehandle_src:
for line in filehandle_src:
...
- Changing
hostname in a url
import
urlparse
p =
urlparse.urlparse('https://www.google.dk:80/barbaz')
p._replace(netloc=p.netloc.replace(p.hostname,
'www.foo.dk')).geturl()
- URL Quoting (percent encoding)
- quote
import six
if six.PY2:
from urllib
import quote
else:
from
urllib.parse import quote
- Python 2
from
urllib import quote
# input is unicode
path_original =
u'/path/to/toto_à.html'
path_converted =
quote(path_original.encode('utf8'))
# will give:
'/path/to/toto_%C3%A0.html'
from
urllib import quote
# input is not unicode
path_original =
'/path/to/toto_à.html'
path_converted =
quote(path_original)
# will give:
'/path/to/toto_%C3%A0.html'
- Python 3
from
urllib.parse import quote
# input is unicode
path_original =
u'/path/to/toto_à.html'
path_converted =
quote(path_original.encode('utf8'))
# will give:
'/path/to/toto_%C3%A0.html'
from
urllib.parse import quote
# input is not unicode
path_original =
'/path/to/toto_à.html'
path_converted =
quote(path_original)
# will give:
'/path/to/toto_%C3%A0.html
- unquote
- convert: /path/to/toto_%C3%A0.mp4
-> /path/to/toto_à.mp4
import
six
path_original =
u'/path/to/toto_%C3%A0.mp4'
if six.PY2:
path_converted =
unquote(path_original.encode()).decode('utf-8')
else:
path_converted
= unquote( path_original )
- add query params to a path (which may already
have query params) (Python 3):
from urllib.parse
import urlencode, urlparse, parse_qsl
original_path = '/path/to?a=b&c=d'
o = urlparse(original_path)
existing_query_params = parse_qsl(o.query)
added_query_params = [('e','f'),('g','h')]
total_queryparams = existing_query_params
+ added_query_params
new_path = '{}?{}'.format(o.path,
urlencode(total_queryparams))
# '/path/to?a=b&c=d&e=f&g=h'
- urljoin
- download from url to local file:
import six
if six.PY2:
from urlparse import
urlparse
from urllib2 import
urlopen
else:
from urllib.parse
import urlparse
from urllib.request
import urlopen
with urlopen(src) as filehandle_src,
open(dst, 'wb') as filehandle_dst:
shutil.copyfileobj(filehandle_src,
filehandle_dst)
- ...
- FTP
- libftp
(2.7)
- upload local or remote file to ftp:
# python 3
import os
import ftplib
import urllib.request
from urllib.parse import urlparse
host = '...'
username = '...'
password = '...'
#origin = '/tmp/toto.txt'
origin = 'https://...'
local_basename =
os.path.basename(origin)
remote_dirname = 'toto_dir_2'
remote_path =
os.path.join(remote_dirname,
local_basename)
print("ftp upload: {0} ->
{1}".format(origin, remote_path))
o = urlparse(origin)
print("sheme: {}".format(o.scheme))
if o.scheme:
filehandle =
urllib.request.urlopen(origin)
else:
filehandle =
open(origin, 'rb')
with ftplib.FTP(host, username,
password) as ftp:
# create dir
try:
print("creating remote dir:
{}".format(remote_dirname))
ftp.mkd(remote_dirname)
except Exception as
e:
print("WARNING when creating dir: {} -
{}".format(e, type(e)))
# upload file
try:
print("uploading: {0} ->
{1}".format(origin, remote_path))
ftp.storbinary("STOR
{0}".format(remote_path), filehandle)
except Exception as
e:
print("ERROR when uploading file:
{}".format(e))
print("done")
- upload remote http file directly to ftp (Python
- Transfer a file from HTTP(S) URL to
FTP/Dropbox without disk writing (chunked
upload)):
...
import urllib
...
filehandle =
urllib.request.urlopen(origin_url)
...
- Adreces de xarxa /
Network addresses
- adreça IP pròpia / own IP address
- example:
import socket
socket.gethostbyname_ex(socket.gethostname())[2][0]
- Problemes / Problems
socket.gaierror: [Errno -2] Name
or service not known
- 21.28
ipaddress (>3.3)
- Backport for Python 2: py2-ipaddress
- An
introduction to the ipaddress module
- models
-
|
|
convenience
factory function
|
functions
|
IPv4Address |
IPv6Address |
ip_address('192.168.1.100') |
|
IPv4Network |
IPv6Network |
ip_network('192.168.1.0/24')
|
- subnets()
- subnets(prefixlen_diff=2)
- subnets(new_prefix=26)
- supernet()
- supernet(prefixlen_diff=2)
- supernet(new_prefix=26)
|
IPv4Interface
|
IPv6Interface
|
ip_interface('192.168.1.100/24')
|
|
- ...
- Exemples / Examples
- subnets /20 from net /16
import
ipaddress
net =
ipaddress.ip_network('192.168.0.0/16')
subnets_20 = list(
net.subnets(new_prefix=20) )
print(subnets_20)
- ...
- Multimedia Services
- Internationalization
- Program Frameworks
- Graphical User Interfaces with Tk
- Development Tools
- Test unitari / Unit test
-
|
pytest |
unittest |
Django
tests (based on unittest) |
Django drf |
file |
mytest.py |
myproject/tests/__init__.py
myproject/tests/mytest.py |
myproject/myapp/tests/mytest.py |
myproject/myapp/tests/mytest.py |
import |
import pytest |
from unittest import TestCase
# myproject/myunit/toto.py
from myunit.toto import ... |
from django.test import TestCase |
from rest_framework.test import
APITestCase |
class |
class MyGroupTests: |
class MyGroupTestCase(TestCase): |
class MyGroupTestCase(TestCase):
""" Tests over
database """ |
class MyGroupAPITestCase(APITestCase):
""" Tests over API
REST """ |
funció que s'executa sempre en
començar |
def setup_method(self): |
def setUp(self): |
def setUp(self): |
def setUp(self): |
test |
def test_first(self): |
def test_first(self): |
def test_first(self): |
def test_first(self): |
funció que s'executa sempre en acabar |
def teardown_method(self): |
def ...(self) |
|
|
run |
- pytest -s mytest.py -k test_first
|
- from myproject/
- python -m unittest
tests/mytest.py
- python -m unittest
tests.mytest.MyGroupTestCase
- python -m unittest
tests.mytest.MyGroupTestCase.test_first
- python -m unittest discover -s
tests
- python -m unittest discover -v
-s tests -p mytest.py
- from myproject/tests/
- python -m unittest --verbose
mytest
- python -m unittest --verbose
mytest.MyGroupTestCase
- python -m unittest --verbose
mytest.MyGroupTestCase.test_first
|
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3 myapp.tests.mytest
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
myapp.tests.mytest.MyGroupTestCase
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
myapp.tests.mytest.MyGroupTestCase.test_first
|
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3 my_app.tests.mytest
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
my_app.tests.mytest.MyGroupAPITestCase
- ./manage.py test --keepdb
--settings myapp.tests_settings
--verbosity 3
my_app.tests.mytest.MyGroupAPITestCase.test_first
|
run from Eclipse / PyDev |
|
- select class or method: Debug as
-> Python unit-test
- Debug configurations
- Arguments
- Override PyUnit
preferences for this
launch?
- PyDev
test runner
- Py.test
--verbosity 3
|
|
|
- pytest
- Eclipse / PyDev
- Ús / Usage
- Basic
patterns and examples
- executa un test en concret
pytest toto.py -k
test_primer
- mostra tots els print (si no, només es
mostraran dels tests que fallin)
- ...
- unittest
- run all tests in dir
tests/
- skip some tests
from unittest
import skip
@skip("my reason to skip this one")
class/def
- test with argparse
- How
do you write tests for the argparse
portion of a python module?
- mymodule.py
import sys
import argparse
def main(args):
parser =
argparse.ArgumentParser(description='ADD
YOUR DESCRIPTION HERE')
parser.add_argument('first_parameter',
type=int, help='First parameter')
parser.add_argument('second_parameter',
type=int, help='Second parameter')
parsed_args =
parser.parse_args(args)
print(parsed_args)
# rest of you
code goes here
if __name__ == '__main__':
main(sys.argv[1:])
- tests/test_mymodule.py
from
unittest import TestCase
from mymodule import main
class MyModuleTestCase(TestCase):
def
test_main(self):
main([2,3])
- run tests from dir where mymodule.py
is:
python -m unittest discover
-s tests
- Eclipse / PyDev
- first time
- selected file: Debug as ->
Python unit-test
- Debug Configurations
- Arguments
- Override PyUnit preferences
for this launch?
- PyDev
test runner
- Py.test
- --verbosity 3
- ...
- Django
tests
- Mock
- Understanding
the Python Mock Object Library
- Django
mock
- The
Mock Class
- ...
assert_called_with |
|
assert_called_once_with |
|
assert_has_calls |
|
call_count |
|
call_args |
|
... |
...
|
- Exemple:
@patch("path.to.my_function")
def test_something(self,
mock_my_function)
...
mock_my_function.assert_called_once_with(...)
- from
unittest.mock import call
@patch("path.to.my_function")
def test_something(self,
mock_my_function)
...
mock_my_function.assert_has_calls(
[
call(...),
call(...),
]
)
- verify the number of times that a function
has been called, without mocking it (bypass,
pass through):
- ...
- Debugging and Profiling
- Software Packaging and Distribution
- Python Runtime Services
- Custom Python Interpreters
- Importing Modules
- Python Language Services
- MS Windows Specific Services
- Unix Specific Services
- Superseded Modules
- Security Considerations
- Unicode
- Python3 strings
and bytes
- Codificació de
text / Text coding
- Errors
- UnicodeEncodeError
- UnicodeDecodeError
UnicodeDecodeError: 'ascii' codec can't
decode byte 0xc3 in position 44: ordinal not in
range(128)
- Solving
Unicode
Problems in Python 2.7
UnicodeDecodeError: ‘ascii’ codec can’t
decode byte 0xd1 in position 1: ordinal not in
range(128) (Why is this so hard??)
- ...
- Django i18n
- Use format instead of '%'
- do not forget trailing 'u'
- Use 'u' before literals
string =
u'{:02d}:{:02d}:{:02d}'.format(hours, minutes,
seconds)
if days:
if days==1:
string = u'{} {} '.format(days,_("day")) + string
#string = u"%d %s %s" % ( days, _("day"), string)
else:
string = u'{} {} '.format(days,_("days")) + string
#string = u"%d %s %s" % ( days, _("days"), string)
- File writing
import codecs
filename = os.path.join(...)
f = codecs.open(filename,mode='w',encoding='utf-8')
f.write(content)
f.close ()
import codecs
filename = os.path.join(...)
f = codecs.open(filename,mode='w',encoding='utf-8')
fitxer = File(f)
fitxer.write(content)
fitxer.close
- Eines / Tools
- virtualenv
- Install virtualenv
- Python 3
- Mageia
urpmi python3-virtualenv
urpmi lib64python3-devel
- CentOS
sudo yum install python36
python36-pip python36-devel
python34-virtualenv
virtualenv-3 --python python36 env
- Python 2
- Mageia
urpmi python-pkg-resources
python-virtualenv
- Bug on Mageia 3 (makes "pip install pil"
fail)
- Bug 11283
- Python headers included are bungled,
probably due to multi-arch issues
- replace
?/usr/lib/python2.7/site-packages/virtualenv.py
by virtualenv.py
- run again:
- CentOS
yum install python-virtualenv
- option 1: install from SCL
- other options:
virtualenv -p python27 /opt/PYTHON27
source /opt/PYTHON27/bin/activate
- Problems:
- libpython2.7.so.1.0: cannot open
shared object file: no such file or
directory
- Ubuntu
sudo apt-get install
python-virtualenv
- MSWindows
- Utilització / Usage
- Python 3
- Mageia
virtualenv env
- python
-n venv env
- CentOS
- Python 2
- virtualenv [--distribute]
/opt/PYTHON27
- force Python 2.6
virtualenv -p python26 /opt/PYTHON26
- MSWindows
virtualenv.exe c:\tmp\p27
c:\tmp\p27\Scripts\activate
...
deactivate
source /opt/PYTHON27/bin/activate
(/opt/PYTHON27)...$ ...
deactivate
- virtualenv from crontab
- Cron
and virtualenv
- example:
SHELL=/bin/bash
0 2 * * * source /path/to/env/bin/activate
&& /path/to/my_program.py arg1 arg2
- virtualenv
in
PyDev
- virtualenv
in
Django
- mod_wsgi
- pip
(*) (package manager) (also installed
when virtualenv
is installed)
- related tools
- Installation of pip itself
- From distribution
- CentOS
sudo yum install python-pip
- From source
- download it:
- install it:
# python setup.py install
Installation of packages
- using system python, but install packages in home
dir (~/.local/bin/)
pip install --user package_name
python -m pip install --user
package_name
- using a virtualenv:
pip install package_name --dry-run
- Installation of a precise version of a package
pip install djangorestframework==0.4.0
- alpha version
pip install -pre package_name
- upgrade
pip install -U package_name
- Problems
error fatal: Python.h:
El fitxer o directori no existeix
- Solució / Solution
- Python 2.7
- Python 3
- Mageia
- Alma9
dnf install python3-devel
fatal error: pyconfig.h:
El fitxer o directori no existeix
- Solució / Solution
- Alma 9
dnf install python3-devel
Download error on
https://pypi.python.org/simple/: [SSL:
CERTIFICATE_VERIFY_FAILED] certificate verify
failed (_ssl.c:765) -- Some packages may not be
found!
pip install fails with “connection
error: [SSL: CERTIFICATE_VERIFY_FAILED]
certificate verify failed (_ssl.c:598)”
- Solution
openssl s_client -connect
pypi.python.org:443
curl -sO
http://cacerts.digicert.com/DigiCertHighAssuranceEVRootCA.crt
sudo cp
DigiCertHighAssuranceEVRootCA.crt
/etc/pki/ca-trust/source/anchors/
sudo update-ca-trust
- Alternative solution?
sudo yum install ca-certificates
- pip install --upgrade -r pip_requirements.txt
Could not find .egg-info directory in
install record for setuptools from
https://pypi.python.org/packages/25/4e/1b16cfe90856235a13872a6641278c862e4143887d11a12ac4905081197f/setuptools-28.8.0.tar.gz#md5=43d6eb25f60e8a2682a8f826ce9e3f42
in
/home/.../env/lib/python2.7/site-packages
- see also problems with Google API and httplib2
error: Installed distribution setuptools
0.9.8 conflicts with requirement
setuptools>=17.1
- Solució / Solution
pip install --upgrade setuptools
Error: pg_config executable not found.
- when installing psycopg2
- Solució / Solution
- Install postgresql devel
- Mageia
urpmi postgresql9.4-devel
- Alma 9
- Alma 8
- show specific package
- list of installed packages (it gives the version
number):
pip list
- with installation format
(>pip_requirements.txt)
pip freeze
pip freeze >pip_requirements.txt
- pipx
- pip per a instal·lar globalment / pip to install
globally
- Pipx:
Safely Install Packages Globally
- Instal·lació / Installation
- Mageia
sudo dnf install python3-pip
python3 -m pip install --user pipx
- Ubuntu
- Ús / Usage
- afegeix el directori a la variable PATH
- llista dels paquets instal·lats
- instal·la un paquet
- actualitza un paquet
- desinstal·la un paquet:
- executa sense instal·lar:
pipx run black example.py
pipx run youtube-dl
https://www.youtube.com/watch?v=<video_id>
- easy_install
urpmi python-setuptools
sudo apt-get install python-setuptools
- Invoke
- Fabric
- Instal·lació / Installation
- Fabric 2.x
- See also: Invoke
- Upgrading
from 1.x
-
1.x
|
2.x
|
sudo("yum install -y htop") |
c.run("sudo yum install
-y htop") |
sudo("echo /usr/local/lib
>/etc/ld.so.conf.d/local.conf") |
c.run("sudo sh -c 'echo
\"/usr/local/lib\"
>/etc/ld.so.conf.d/local.conf'")
# sudo sh -c 'echo "l'\''arbre"
>>/tmp/toto.txt'
c.run("sudo sh -c 'echo
\"l'\\''arbre\"
>>/tmp/toto.txt'") |
put(local_file,
remote_dir, use_sudo=True)
|
from
os.path import basename
def sudo_put(c, local_path,
remote_dirname):
"""
Upload a local file
to a remote dir, with sudo privileges
"""
filename =
basename(local_path)
remote_tmp_path =
filename
remote_path =
'{}/{}'.format(remote_dirname,
filename)
print 'sudo_put: {}
-> {}'.format(local_path,
remote_path)
c.put(local_path,
remote=remote_tmp_path)
c.run("sudo sh -c
'mv {} {}'".format(remote_tmp_path,
remote_path))
...
sudo_put(c, local_file, remote_dir)
|
from
fabric.contrib.files import append text
= """
FCGI_EXTRA_OPTIONS="-M 0770"
"""
append('/etc/sysconfig/spawn-fcgi',
text, use_sudo=True)
|
text
= """
FCGI_EXTRA_OPTIONS=\\\"-M 0770\\\"
"""
c.run("sudo sh -c 'echo \"{0}\"
>>/etc/sysconfig/spawn-fcgi'".format(text))
# not
working yet, with sudo=True:
from patchwork import files
files.append(c,
'/etc/sysconfig/spawn-fcgi', text,
sudo=True) |
from fabric.contrib.files import
sed
sed('{}/pg_hba.conf'.format(pgsql_data_dir),
'host
all
all
127.0.0.1/32
ident',
'host
all
all
127.0.0.1/32
md5',
use_sudo=True) |
c.run("sudo sed -i.bak
's#host
all
all
127.0.0.1/32
ident#host
all
all
127.0.0.1/32
md5#g'
{}/pg_hba.conf".format(pgsql_data_dir)) |
with cd /path/to:
run('my_command_1')
run('my_command_2') |
my_path = '/path_to'
c.run('cd {} &&
my_command_1'.format(my_path))
c.run('cd {} &&
my_command_1'.format(my_path)) |
with settings(warn_only=True):
run(...) |
c.run(..., warn=True) |
result = run(...) |
r = c.run(...)
result = r.stdout.strip() |
|
from patchwork import transfers
# strict_host_keys is set to
False to avoid interactive question:
# Are you sure you want to continue
connecting (yes/no/[fingerprint])? transfers.rsync(
c,
strict_host_keys=False,
...
)
|
local(...) |
# option -H must be present
c.local(...)
# only capture; do
not display
result = c.local(..., hide=True)
toto = result.stdout |
- Documentation
(2.1)
- restart connection (e.g. to update just modified
groups to the user that is making the connection)
#
add user to mygroup
print(" adding group
{} to user {}".format(service_group,
remote_user))
c.run("sudo usermod -a -G
{} {}".format(service_group, remote_user),
warn=True)
# update group membership
# c.run("groups")
# as newgrp command is not
working remotely, we need to close and open
ssh connection
# (and _sftp must be set to
None to force its reconnection)
c.close()
c.open()
c._sftp = None
# print("c after
close/open: {}".format(c))
# c.run("groups")
- Fabric 1.x documentation
- Utilització / Usage
<task
name>:<arg>,<kwarg>=<value>,...
- debug from Eclipse
- Fabric 2
- fabfile.py
# to allow
debugging from Eclipse
import re
import sys
from fabric.main import program
if __name__ == '__main__':
sys.argv[0] =
re.sub(r'(-script\.pyw|\.exe)?$', '',
sys.argv[0])
sys.exit(program.run())
- Eclipse
- fabfile.py (contextual menu)
- Debug As -> Python Run
- Debug Configurations
- (select your debug configuration)
- Arguments
--list
-H user@server
task_1 ...
- Fabric 1
- fabfile.py
...
from fabric.main
import main
if __name__ == '__main__':
import sys
sys.argv = ['fab',
'-f', __file__, 'my_task']
main()
- Exemples / Examples
put('toto.sh', '/usr/local/bin/',
mode=int('755', 8), use_sudo=True)
- Context
managers (context
manager)
- Problemes / Problems
paramiko.ssh_exception.SSHException:
encountered RSA key, expected OPENSSH key
- check that remote file
~myuser/.ssh/authorized_keys contains the public
key that you are using
- check that you are specifying a remote user
- Error management
- ignore
with
settings(warn_only=True):
- capture failure
result =
local('grunt')
if result.failed:
print "Grunt is not
installed."
abort('Grunt not found')
- Python try
try:
sudo('...')
except Exception as e:
...
abort('...')
- env definitions
- fabfile.py
from fabric.api import
*
# remote user and group
env.remote_user = 'myuser'
env.remote_group = 'mygroup'
# project
env.project_name = 'my_project'
env.project_dir =
'/home/%(remote_user)s/%(project_name)s' % env
- run locally
- running
fabric
script locally
- Optionally
avoid
using ssh if going to localhost #98
- fabfile.py
# by default, actions
are performed remotely.
# to perform them localy, e.g.: "fab localhost
create_virtualenv"
env.run_as = 'remote'
env.run = run
env.sudo = sudo
env.hosts = ['...',]
env.key_filename =
'~/.ssh/keys/key_for_remote.pem'
def localhost():
"""
Set environment for local
execution
"""
env.run_as = 'local'
env.run = local
env.sudo = local
env.hosts = []
with lcd()
...
- capture
- run remotelly and get the value
result = sudo(...)
result = run(...)
- run locally and get the value
result = local(..., capture=True)
- crontab
- files
- put
- rsync_project
- How
do I copy a directory to a remote machine
using Fabric?
- Example
from
fabric.contrib.project import
rsync_project
rsync_project(local_dir='.',
remote_dir='/var/www',
exclude=('.git','tmp',) )
- Ignore files indicated by .gitignore
- Ignore untracked files and files indicated by
.gitignore
untracked_files_zero
= local('git -C .. ls-files -z -o
--exclude-standard --directory',
capture=True)
untracked_files =
untracked_files_zero.split('\0')
print "untracked_files:
{}".format(untracked_files)
excluded_files = untracked_files +
['.git','tmp']
rsync_project(local_dir='.',
remote_dir=env.project_dir,
exclude=excluded_files,
extra_opts="--filter=':- .gitignore'" )
- append
- append a line
from
fabric.contrib.files import append
...
append('/etc/sysconfig/toto','this line
has been appended to the end')
- append several lines:
from
fabric.contrib.files import append
...
text = """
first_line = "value1"
second_line = "value2"
"""
append('/etc/sysconfig/toto', text,
use_sudo=True)
- sed
- single quotes in sed
- replace a line that starts with "
host
all
all
127.0.0.1 " by "host
all
all
127.0.0.1/32
md5 "
from
fabric.contrib.files import sed
...
#
/var/lib/pgsql/data/pg_hba.conf
#
host
all
all
127.0.0.1/32
md5
sed('/var/lib/pgsql/data/pg_hba.conf',
'host
all
all
127.0.0.1/32
ident',
'host
all
all
127.0.0.1/32
md5',
use_sudo=True )
env.sudo('sudo sed
-e "/^host
all
all
127.0.0.1/
c\host
all
all
127.0.0.1/32
md5"
-i /var/lib/pgsql/data/pg_hba.conf')
- Certificat de
servidor / Server certificate
- certificate for https connections (curl will not need to
specify --cacert)
# add myserver
self-signed certificate to the list of trust
ca certificates
put('myserver.example.org.crt',
'/etc/pki/ca-trust/source/anchors/',
use_sudo=True)
env.sudo('update-ca-trust')
- dependencies
- fabfile.py
# list of dependencies
to install
env.install_cmd = 'yum install -y'
env.dependencies = ['gcc', 'git',
'python-virtualenv', 'mariadb',]
def install_dependencies():
"""
Install the system
dependencies for the project
"""
env.sudo(env.install_cmd +
" " + "epel-release")
env.sudo(env.install_cmd +
" " + " ".join(env.dependencies))
- virtualenv
- Getting
virtualenv(wrapper)
and Fabric to play nice
- Activate
a
virtualenv via fabric as deploy user
- fabfile.py
from contextlib import
contextmanager as _contextmanager
# remote user and group
env.remote_user = 'my_user'
env.remote_group = 'my_group'
# virualenv directory
env.virtualenv_directory = '/opt/p27'
env.virtualenv_activate = 'source
%(virtualenv_directory)s/bin/activate' % env
def create_virtualenv():
"""
Create the virtualenv
"""
env.sudo('mkdir -p
%(virtualenv_directory)s' % env)
env.sudo('chown
%(remote_user)s.%(remote_group)s
%(virtualenv_directory)s' % (env) )
env.run('virtualenv
%(virtualenv_directory)s' % env)
@_contextmanager
def virtualenv():
"""
Activate the virtualenv
"""
with
cd(env.virtualenv_directory):
with prefix(env.virtualenv_activate):
yield
def install_pip_dependencies():
"""
Install the pip
dependencies
"""
with virtualenv():
if
env.run_as == 'remote':
put('pip_requirements.txt',
'pip_requirements.txt')
env.run('pip install -r pip_requirements.txt')
- Django
- fabfile.py
def django_setup():
"""
Django: migrate,
createsuperuser, collectstatic
"""
with virtualenv():
with cd(env.project_dir):
env.run('python manage.py migrate')
env.run('python manage.py createsuperuser')
env.run('python manage.py collectstatic')
env.run('python manage.py loaddata
auth_initial')
def django_update():
"""
Django: migrate,
collectstatic
"""
with virtualenv():
with cd(env.project_dir):
env.run('python manage.py migrate')
env.run('python manage.py collectstatic')
- Nginx
- fabfile.py
def nginx_setup():
"""
Configure and start nginx
"""
env.sudo('chmod 755
/home/centos')
env.sudo('mkdir -p
/etc/uwsgi/vassals/')
if env.run_as == 'remote':
#
nginx
put('nginx-uwsgi/%(project_name)s_nginx.conf'
% env, '/etc/nginx/conf.d/', use_sudo=True)
#
remove default site from nginx.conf
put('nginx-uwsgi/nginx.conf', '/etc/nginx/',
use_sudo=True)
put('nginx-uwsgi/nginx.pp', '/etc/nginx/',
use_sudo=True)
#
uwsgi
put('nginx-uwsgi/uwsgi_params', '/etc/uwsgi/',
use_sudo=True)
put('nginx-uwsgi/emperor.ini', '/etc/uwsgi/',
use_sudo=True)
put('nginx-uwsgi/%(project_name)s_uwsgi.ini' %
env, '/etc/uwsgi/vassals/', use_sudo=True)
put('nginx-uwsgi/emperor.uwsgi.service',
'/etc/systemd/system/', use_sudo=True)
#
socket in /run/
(http://uwsgi-docs.readthedocs.org/en/latest/Systemd.html#putting-sockets-in-run)
#put('nginx-uwsgi/emperor.uwsgi.socket',
'/etc/systemd/system/', use_sudo=True)
#put('nginx-uwsgi/emperor.uwsgi.conf',
'/etc/tmpfiles.d/', use_sudo=True)
# custom selinux policy
module
(http://axilleas.me/en/blog/2013/selinux-policy-for-nginx-and-gitlab-unix-socket-in-fedora-19/)
env.sudo('semodule -i
/etc/nginx/nginx.pp')
# activate selinux
env.sudo('setenforce 1')
# enable and start nginx
env.sudo('systemctl enable
nginx.service')
env.sudo('systemctl restart
nginx.service')
# enable and start uwsgi
env.sudo('systemctl enable
emperor.uwsgi.service')
env.sudo('systemctl restart
emperor.uwsgi.service')
# configure firewall
#env.sudo('firewall-cmd
--permanent --zone=public --add-service=http')
#env.sudo('firewall-cmd
--permanent --zone=public
--add-service=https')
#env.sudo('firewall-cmd
--reload')
def nginx_restart():
"""
Restart nginx and wsgi
"""
# restart nginx
env.sudo('systemctl restart
nginx.service')
# restart uwsgi
env.sudo('systemctl restart
emperor.uwsgi.service')
- Database
- fabfile.py
# database
env.mysql_host = 'localhost'
env.mysql_database = 'mydatabase_db'
env.mysql_user = 'my_user'
env.mysql_password = 'my_password'
env.mysql_master_user = 'root'
def database_setup():
"""
Setup database service
"""
env.sudo("systemctl enable
mariadb.service")
env.sudo("systemctl start
mariadb.service")
env.sudo("mysql_secure_installation")
def database_create():
"""
Create the sql database
"""
env.run('echo "CREATE
DATABASE IF NOT EXISTS %(mysql_database)s; \
GRANT ALL ON %(mysql_database)s.* TO
\'%(mysql_user)s\'@\'%%\' IDENTIFIED BY
\'%(mysql_password)s\'; \
FLUSH PRIVILEGES;" | \
mysql -h %(mysql_host)s -u
%(mysql_master_user)s -p' % (env) )
def database_delete():
"""
Delete the sql database
"""
env.run('echo "DROP
DATABASE %(mysql_database)s;" | \
mysql -h %(mysql_host)s -u
%(mysql_master_user)s -p' % (env) )
- Git
- fabfile.py
def ssh_config():
"""
Add
fabuser_bitbucket_support to ~/.ssh/config
"""
text = """
Host fabuser-bitbucket
HostName
bitbucket.org
IdentityFile
~/.ssh/fabuser_bitbucket
"""
append('%s/config' %
env.ssh_dir, text )
env.run('chmod 600
%s/config' % env.ssh_dir)
def git_clone():
"""
Clone from git
"""
#git_user =
'francesc_pinyol_margalef'
with
settings(warn_only=True):
with settings(warn_only=True):
if
env.run("test -d %s" %
env.project_dir).failed:
env.run("git
clone
git@fabuser-bitbucket:%(bitbucket_account)s/%(project_name)s.git
%(project_dir)s" % (env) )
def git_pull():
"""
Pull from git
"""
with cd(env.project_dir):
env.run("git pull")
- Empaquetament
/ Packaging
- Multiple platforms
- How can I find the current OS
in Python? [duplicate]
import platform
platform.platform()
platform.system()
- Popen
- Kill a process
- Restart computer
- restart local computer from
python
- Exemple / Example:
import platform
def restart_computer():
operating_system =
platform.system()
if operating_system == 'Linux':
os.system('reboot now')
print
"rebooting system"
elif operating_system ==
'Windows':
import
win32api
win32api.InitiateSystemShutdown()
- Python path:
- see also bash
path
- /usr/local/lib/python2.7/dist-packages/...
- paths
import os
my_path = './dir1/dir2/toto.mp4'
# ./dir1/dir2/toto.mp4
my_dirname =
os.path.dirname(my_path) #
./dir1/dir2
#my_rel_dirname=${my_dirname#*\./} # dir1/dir2
my_basename = os.path.basename(my_path) #
toto.mp4
my_name = os.path.splitext(my_basename)[0]
# toto
my_extension = os.path.splitext(my_path)[1] #
.mp4
my_rel_path = os.path.relpath(my_path)
# dir1/dir2/toto.mp4
my_abs_path = os.path.abspath(my_path) #
/path/to/dir1/dir2/toto.mp4
- #
longest
common path
prefix = os.path.commonpath(paths)
- print path:
- set path:
- python
import sys
sys.path.append("/my/path")
- Django:
- /etc/apache2/mods-available/wsgi.conf
- recursively create directory if it does not exist:
import os
# create the directory if it does not exist
father_dir = os.path.dirname(filename)
if not os.path.exists(father_dir):
os.makedirs(father_dir)
print("creating directory:
{}".format(father_dir))
- get home dir:
from os.path import expanduser
home = expanduser("~")
- get absolute dir inside home:
from os.path import expanduser
my_dir = expanduser("~/my_dir")
- Python for MS Windows:
- HTML
parsing
- Logging
- Django
logging
- Logging
Cookbook
-
|
new |
old |
|
a = 'primer'
b = 'segon'
logger.debug('{} - {}', a, b) |
a = 'primer'
b = 'segon'
logger.debug('%s - %s', a, b) |
pylint |
[--logging-format-style=new] |
--logging-format-style=old |
Do not use: ...
- Exemple bàsic
import os
import logging
logging.basicConfig()
#logger = logging.getLogger(__name__)
logger = logging.getLogger(os.path.basename(__file__))
logger.setLevel(logging.DEBUG)
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')
- Exemple / Example:
# logger
import logging
#logger = logging.getLogger('my_program')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s
%(levelname)-8s %(name)-12s %(message)s',
'%Y-%m-%dT%H:%M:%SZ')
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(ch)
a = 'primer'
b = 'segon'
logger.debug("my message with %s and %s", a, b)
- import
logging
logger = logging.getLogger('myprogram.py')
logger.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s
[%(name)-12s] %(message)s')
formatter.default_time_format = '%Y-%m-%dT%H:%M:%S'
formatter.default_msec_format = '%s.%03dZ'
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(ch)
logger.debug("my message")
- Nginx
- Problemes / Problems
UnicodeEncodeError: 'ascii' codec can't encode
character ... in position ...: ordinal not in
range(128)
|
|
- Pylint
documentation
- Instal·lació / Installation
- pip install pylint (pipx install pylint)
Installing collected packages:
backports.functools-lru-cache, isort, mccabe,
singledispatch, wrapt, lazy-object-proxy, astroid,
pylint
Successfully installed astroid-1.6.6
backports.functools-lru-cache-1.6.4 isort-4.3.21
lazy-object-proxy-1.6.0 mccabe-0.6.1 pylint-1.9.5
singledispatch-3.6.2 wrapt-1.12.1
- to use with Django
- to avoid errors:
- "E: xx,yy: Class '...' has no 'objects' member
(no-member)"
- install pylint-django:
pip install pylint-django
- Python 2.7
$ pip install "pylint-django<2"
Successfully built pylint-django
pylint-plugin-utils
Installing collected packages:
pylint-plugin-utils, pylint-django
Successfully installed
pylint-django-0.11.1
pylint-plugin-utils-0.6
- IDE integration
- Eclipse / PyDev
- PyLint
can be used with PyDev
- Window -> Preferences -> PyDev -> Editor
-> Code Analysis -> PyLint
- to avoid ""Undefined variable from import:
..." ("PyDev Problem"),
- deactivate pyDev code analysis:
- PyDev -> Editor -> Code Analysis
-> Do code analysis? (disabled)
- activate PyLint with arguments
- PyDev -> Editor -> Code Analysis
-> PyLint -> Use PyLint? (enabled)
- Arguments to pass to the pylint
command:
--load-plugins pylint_django
- Run
pylint mymodule
- to use with Django code
DJANGO_SETTINGS_MODULE=your.app.settings
pylint --load-plugins=pylint_django [..other
options..] <path_to_your_sources>
DJANGO_SETTINGS_MODULE=your.app.settings
pylint --disable=all --enable=E
--load-plugins=pylint_django
--logging-format-style=old
<path_to_your_sources>
- only some checkers
(Error, Warning, ...)
pylint --disable=all --enable=E,W mymodule
pylint --disable=all --enable=unused-import
mymodule
- help about a message
pylint --help-msg=no-member
- logging
- ...
- Messages
- Dangerous default value %s as argument
- dangerous-default-value
/ W0102
- Problematic code
def whats_on_the_telly(penguin=[]):
# [dangerous-default-value]
- Correct code
def whats_on_the_telly(penguin=None):
|
|
- Tox
- Instal·lació / Installation
- System-wide
- Mageia
sudo urpmi python2-tox python3-tox
- Totes les versions de Python que especifiqueu, han d'estar instal·lades
- Tutorials
- Django
- Testing
a third party Django application with pytest and tox
- Exemples / Examples (tox.ini)
[tox]
envlist =
django111-py{27,36,37}
skipsdist = true
[testenv]
commands =
{envpython} -m manage test
--settings my_project.tests_settings --verbosity 3
--parallel=8 my_app
deps =
tblib
django111-py27: -r
deployment/pip_requirements.py27.txt
django111-py36: -r
deployment/pip_requirements.py36.txt
django111-py37: -r
deployment/pip_requirements.py37.txt
django111-py37:
other_dep_just_for_testing
- Ús / Usage
- list available environments
- run all environments
- run a specific environment
|
Biblioteques / Libraries
|
- Useful
modules
- Dades / Data
- Bases de dades / Databases
- sqlite
- MySQL
pip install MySQL-python
pip install mysql-python
- Problems
- Imatges / Images
- python-imaging
- Pillow
- documentation
- Install
- requisites
- CentOS
yum install
libjpeg-turbo-devel
- Ús / Usage
- open an image from file
- open an image from a url
- Problemes / Problems
DecompressionBombError at ...
Image size (356526846 pixels) exceeds
limit of 178956970 pixels, could be
decompression bomb DOS attack.
- Solució / Solution
from PIL
import Image
Image.MAX_IMAGE_PIXELS =
1000000000
OSError: image file is truncated
- ...
- PIL
(use Pillow
instead)
- ImageIO
- Animated GIF
- Criptografia / Cryptography
- Autenticació
/ Authentication
- Xarxa / Network
- Twisted
- stomp.py
- Problemes / Problems
UnicodeEncodeError: 'ascii' codec can't
encode character ...
- Unicode
- Solution
- /usr/local/lib/python2.7/dist-packages/stomp/protocol.py
- netcat
/ nc
- RPC
- SSH
- Paramiko
- Documentation
- Example
import paramiko
client = paramiko.SSHClient()
#client.load_system_host_keys()
k =
paramiko.RSAKey.from_private_key_file('/home/my_user/.ssh/keys/remoteserver.pem')
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect('remoteserver.example.org',
username='myremoteuser', pkey=k)
ssh_stdin, ssh_stdout, ssh_stderr =
client.exec_command('ls -ltr')
for line in ssh_stdout:
print line
- SVG
- Creating
Simple SVG from Python
- pySVG
- svgwrite
- Documentation
- mozman / svgwrite
(github)
- Exemples / Examples
- create an empty document
-
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import svgwrite
from svgwrite import mm
from svgwrite.container import Group
# create svg document
svg_width = 100 # mm
svg_height = 200 # mm
path = "toto.svg"
dwg = svgwrite.Drawing(path, profile='full', size=(svg_width*mm, svg_height*mm))
dwg.viewbox(0, 0, svg_width, svg_height)
# save svg to file
dwg.save(pretty=True)
- ...
- utf-8
# save svg to file
from xml.etree.ElementTree import tostring
dwg.tostring = lambda self=dwg:
tostring(self.get_xml()).decode("utf-8")
dwg.save(pretty=True)
- convert svg to image
- Convert
SVG to PNG in Python
- CairoSVG (Python3)
- install Python3 and virtualenv
pip3 install cairosvg
- as command:
cairosvg image.svg [--output_width
200] -o image.png
- as library:
import cairosvg
cairosvg.svg2pdf(url='image.svg', output_width=200,
write_to='image.pdf')
- pyrsvg (librsvg)
- pyrsvg
How to use librsvg from Python (Cairo)
- Installation
- Mageia
urpmi gnome-python-desktop
/usr/lib64/python2.7/site-packages/gtk-2.0/rsvg.so
/usr/share/doc/gnome-python-desktop/rsvg/rsvg-cairo.py
- Ubuntu
- Ús / Usage
- virtualenv --system-site-packages env
- souce env/bin/activate
- python toto.py
- toto.py
import cairo
import rsvg
...
- HTTP / HTTPS
- PSL:
Internet Protocols and Support
- aiohttp
- python-requests
- Instal·lació / Installation
- Ús / Usage
- get
import requests
from pprint import pprint
url = ...
r = requests.get(url)
pprint( r.json() )
import requests
from pprint import pprint
url = ...
queryparams = {"key1": "value1", "key2":
"value2"}
r = requests.get(url,
params=queryparams)
import requests
from requests.exceptions import
ConnectionError, ReadTimeout
url = ...
# timeout of 3 seconds
try:
r = requests.get(url,
timeout=3)
except ConnectionError as e:
# server is not listening
...
except ReadTimeout as e:
# server is listening but we
are not getting a response
...
except Exception as e:
...
- post
import requests
url = ...
payload = {"toto_key": "toto_value",}
r = requests.post(url,
payload)
import requests
url = ...
# keys can be repeated (both are kept)
data =
"toto_key=toto_value1&toto_key=toto_value2"
headers = {"Content-Type":
"application/x-www-form-urlencoded"}
r = requests.post(url,
data, headers=headers)
- jwt + get
import requests
from pprint import pprint
backend_url = 'http://127.0.0.1:8000'
# jwt token
# file toto_admin.txt contains the password
password = open('toto_admin.txt',
'r').read().strip()
payload_credentials = {
'username':
'admin',
'password':
password
}
r = requests.post(backend_url+'/api-token-auth/',
data=payload_credentials)
token =
r.json()['token']
r = requests.get('%s/v1/api/totos/'
% (backend_url), headers={'Authorization':'JWT
%s'%token})
pprint( r.json() )
- authentication
- upload (e.g. with Django
Restframework)
- Quickstart
- Advanced
- Problemes
OverflowError: string longer than
2147483647 bytes
- Exemple / Example:
- models.py
class
MyModel(models.Model):
field_1 =
models.FileField(...)
field_2 = ...
field_3 = ...
- upload_test.py
import requests
url = 'http...'
payload = {'field_2': '...', 'field_3':
'...'}
headers = {}
headers['...'] = '...'
with open('/tmp/toto.png', 'rb') as
field_1_file:
files = {'field_1':
field_1_file}
# to specify a name
files = {'field_1':
('name of the uploaded file',
field_1_file)}
# specifying also the
mime-type
#files =
{'field_1': ('name of the uploaded
file', field_1_file,
"application/octet-stream")}
r = requests.post(url,
payload, headers=headers, files=files)
# upload from a
url
source_url = ...
url = ...
payload = {'field_2': '...', 'field_3':
'...'}
headers = {}
files = {'field_1': ('name of the uploaded
file', urlopen(source_url))}
r = requests.post(url, payload,
headers=headers, files=files)
- HTTPS
- Problemes / Problems
- empty request.DATA when receiving a put/patch:
- Solució / Solution
- check that there is a trailing "/".
Otherwise, requests receives a 30x and data is
lost on the second url
- claudàtors en
el payload / square brackets in payload
- Solució / Solution: explicit conversion to json,
with json header
import requests
import json
payload = {
'toto':['first element','second element'],
}
r = requests.post(address, headers={'Content-Type':'application/json'},
data=json.dumps(payload))
import requests
import json
# get token as usual
...
token = ...
payload = {
'toto':['first element','second element'],
}
r = requests.post(address,
headers={'Authorization':'JWT %s'%token, 'Content-Type':'application/json'},
data=json.dumps(payload))
- pycurl (curl)
- Exemple HHTPS / HTTPS Example
import pycurl
curl = pycurl.Curl()
curl.setopt(pycurl.CAINFO, "ca.crt")
curl.setopt(pycurl.SSL_VERIFYPEER, 1)
curl.setopt(pycurl.SSL_VERIFYHOST, 2)
curl.setopt(pycurl.URL, "https://server_name/")
curl.perform()
- ...
- PyGObject
(based on GObject)
-
|
C
|
Python
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
|
|
error when packages are not installed |
|
-
|
PyGObject |
- virtualenv:
pip install pygobject
- dependències / dependencies:
urpmi lib64girepository-devel
gobject-introspection libcairo-devel
[?python3-cairo-devel]
yum install gcc
gobject-introspection-devel
cairo-gobject-devel freetype-devel
apt-get ...
urpmi [python-gobject] python3-gobject3
yum install python34-gobject
apt-get ...
|
- ModuleNotFoundError: No module named 'gi'
- Package cairo was not found in the pkg-config
search path.
|
GObject
libraries:
- /usr/lib[64]/girepository-1.0/
- GI_TYPELIB_PATH
|
GLib
|
|
|
Gtk
- Gtk-2.0.typelib
- Gtk-3.0.typelib
|
|
|
GStreamer
- Gst-1.0.typelib
- Gst...1.0.typelib
- GES-1.0.typelib
|
|
- ValueError: Namespace Gst not available
- ValueError: Namespace GstWebRTC not available
|
...
|
|
|
- Instal·lació /
Installation
- Sistema /
System
- CentOS
sudo yum install python36-gobject
- Mageia
- Virtualenv
+ pip
- Dependències / Dependencies
-
- Install virtualenv
- CentOS
sudo yum install
gobject-introspection-devel
cairo-gobject-devel freetype-devel
- Mageia
-
urpmi lib64girepository-devel
gobject-introspection
virtualenv-3.5 env (CentOS: virtualenv-3
--python python36 env )
source env/bin/activate
[pip install --upgrade pip ]
pip install pygobject
- Libraries are retrieved from:
- standard location:
/usr/lib64/girepository-1.0/*.typelib
- non-standard location (e.g. GStreamer
compiled from source and installed into /usr/local)
export
GI_TYPELIB_PATH=/usr/local/lib/girepository-1.0
- PyGObject
API Reference
- Exemple / Example
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
window = Gtk.Window(title="Hello World")
window.show()
window.connect("destroy", Gtk.main_quit)
Gtk.main()
- Problemes
import gi
ModuleNotFoundError: No module named 'gi'
- Solució / Solution
- Install Python 3.6 bindings for GObject
Introspection
sudo yum install python36-gobject
- ...
- ValueError:
Namespace Gst not available
- Solució / Solution
- check that Gst is installed in standard
location:
ls -l
/usr/lib64/girepository-1.0/Gst*.typelib
- if not, check that Gst is installed in
non-standard location:
ls -l
/usr/local/lib/girepository-1.0/Gst*.typelib
- add non-standard location to search path:
export
GI_TYPELIB_PATH=/usr/local/lib/girepository-1.0
- Parsing
- Codificació de text / Text coding
- Fulls de càlcul / Spreadsheet
-
|
CSV |
OpenPyXL |
read each row as an OrderedDict
A_label |
B_label |
C_label |
A2_value |
B2_value |
C2_value |
A3_value |
B3_value |
C3_value |
|
import csv
with open(spreadsheet_path, newline='',
encoding=file_encoding) as f:
reader = csv.DictReader(f,
delimiter=';')
print(reader.fieldnames)
for row in reader:
print(row['C_label'])
print(row['B_label'])
print(row['A_label']) |
import openpyxl
wb = openpyxl.load_workbook(spreadsheet_path) for
sheet_name in wb.sheetnames:
ws = wb[sheet_name]
# header
fieldnames = []
for column in ws.iter_cols(1,
ws.max_column):
#
header value corresponds to the cell on the top
([0])
fieldnames.append(column[0].value)
print(fieldnames)
for
row_cells in ws.iter_rows(min_row=2):
row = OrderedDict()
column_index = 0
for cell in row_cells:
row[fieldnames[column_index]] = cell.value
column_index += 1
print(row['C_label'])
print(row['B_label'])
print(row['A_label'])
|
|
|
|
|
|
|
- CSV
- OpenPyXL
- print all rows and columns from a binary file
retrieved using requests (or unit test)
from __future__ import
print_function
file_like_object = BytesIO(res.content)
wb =
openpyxl.load_workbook(file_like_object)
for
sheet_name in wb.sheetnames:
print(u"- sheet: {}".format(sheet_name))
ws = wb.get_sheet_by_name(sheet_name)
for row in ws.iter_rows():
for cell in row:
print(cell.value, end=',')
print()
- ...
- Problemes / Problems
-
IllegalCharacterError
- ...
- xlwt (used
by Django XMLRenderer)
- ...
- Microsoft SharePoint
- Versió semàntica / Semantic version
- ...
|
|
- Celery with Django
- celery.conf
- celery.service
- Problems / Problemes
Usage: python -m celery [OPTIONS] COMMAND
[ARGS]...
Try 'python -m celery --help' for help.
Error: Invalid value for '-A' / '--app':
Unable to load celery application.
The module mymodule was not found.
celery_mymodule.service: Control process exited,
code=exited, status=2/INVALIDARGUMENT
- Solució / Solution:
- a partir de Celery 5, les opcions
-A/--app
i --workdir són globals (Step
1: Adjust your command line invocation)
i cal posar-les al principi de tot, abans de
l'ordre (p.ex. celery --workdir=/my/dir
--app=myapp
multi ... )
- Arquitectura / Architecture
- Setting
up a queue service: Django, RabbitMQ, Celery on AWS
- Task
Routing in Celery - How to route a Celery task to a
dedicated queue
- Dynamic
Task Routing in Celery
- Specify
Worker in Celery
- Elements
- broker:
- controls queues
- accepta les crides des d'un apply_async i les
distribueix cap a algun worker subscrit a aquella
cua (si n'hi ha); accepts calls from apply_async and
distributes them to subscribed workers (if any)
- si una cua de RabbitMQ encara conté tasques
per a assignar a algun worker, apareixeran amb:
rabbitmqctl -p my_vhost list_queues
- si ja estan assignades a algun worker, ja
no apareixeran a la cua
- RabbitMQ
sudo systemctl start
rabbitmq-server.service
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl -p my_vhost list_queues
- ? si la cua té missatges: encara hi ha
jobs pendents de ser assignats a algun
worker
- ? si la cua és buida: tots els jobs han
estat assignats a algun worker (potser
encara s'està executant aquell job en el
worker)
- worker:
- executes an async function
- connects to broker specified as Celery.broker in
mydir/celery.py (-A mydir)
- Problems
- consumer: Cannot connect to amqp://...:
[Errno 13] Permission denied.
- Solution: check SELinux on instance
that is initiating the connection
- ask broker for the list of active nodes and tasks:
.../env/bin/python -m celery -b
amqp://... inspect active
- tasks that this worker is able to execute are
listed in:
- if they are inside a Django project:
- if they are not inside a Django project:
- ...
- client:
- calls an async function
- when called from a standalone python:
- connects to broker specified by app:
from
mymodule.tasks import mytask
from celery import Celery
app = Celery(
"my...",
broker="amqp://my_user:xxxxxx@localhost/my_vhost",
)
tasca = mytask.apply_async(("first_arg",
"second_arg"), serializer="json")
- when called from a Django project
- connects to broker specified in settings.py
BROKER_URL
- tasks that this client is able to call are
listed in:
- ...
-
|
needed files |
|
|
broker |
|
worker |
mydir/celery.py
|
client |
|
- First
steps
with Celery
- install a message broker
(message transport)
- RabbitMQ
(AMQP)
- RabbitMQ
- Configuració / Setup (Django:
same as in settings.py BROKER_URL)
rabbitmqctl add_user myuser
mypassword
rabbitmqctl add_vhost myvhost
rabbitmqctl set_permissions -p myvhost
myuser ".*" ".*" ".*"
- check settings:
rabbitmqctl list_users
rabbitmqctl list_vhosts
rabbitmqctl list_permissions -p myvhost
- Amazon
SQS
- install celery (dependencies automatically installed: pytz
billiard kombu anyjson amqp)
- check that celery is working
- command line
- with a configuration
file
- debug:
- Problemes / Problems
socket.timeout: timed out
-
|
options
(also from a file toto.conf, referenced a --config
toto.conf)
|
|
command
|
|
examples
|
celery
|
-b
<broker> (broker_url
in a config
file) |
-A
<module> |
worker
(start a single worker) |
-Q,
--queues <queue1>,<queue2>
--hostname=<node_name>@<host>
(default:
celery@<value_returned_by_hostname_command>)
-E,
--task-events (needed if we are
intercepting events)
-c <number_processes> (default:
number of CPUs)
|
|
multi
(start several named workers)
start <node_name_1>
[<node_name_2>, ...]
--pidfile=/var/run/celery/%n.pid
--logfile=/var/log/celery/%n%I.log
restart <node_name>
[<node_name_2>, ...]
stop <node_name>
[<node_name_2>, ...]
stopwait <node_name>
[<node_name_2>, ...]
|
(usually called from
celery.service
with parameters set by variables defined in
/etc/sysconfig/myceleryconfig, parsed with EnvironmentFile=-/etc/sysconfig/myceleryconfig )
/path/to/python -m celery multi start
mynodename -E -Q myqueue -A myapp
--workdir=/path/to/myworking_dir --loglevel=DEBUG
--logfile="/var/log/celery/%N.log"
--pidfile="/var/run/celery/%N.pid"
will start number_process workers (hostname is also
called "worker name" in Flower):
/path/to/python -m celery worker -E -Q
myqueue -A myapp --loglevel=DEBUG
--logfile=/var/log/celery/mynodename.log
--pidfile=/var/run/celery/mynodename.pid
--hostname=mynodename@...
/path/to/python -m celery worker -E -Q
myqueue -A myapp --loglevel=DEBUG
--logfile=/var/log/celery/mynodename.log
--pidfile=/var/run/celery/mynodename.pid
--hostname=mynodename@...
- ...
These workers will connect to the broker specified
as Celery.broker in /path/to/myworkingdir/myapp/celery.py,
on queue myqueue:
- from
__future__ import absolute_import,
unicode_literals
from celery import Celery
app = Celery('myapp',
broker="amqp://myuser:mypassword@ip_address_of_rabbitmq_server:5672/myvhost",
backend='rpc://',
include=['myapp.tasks'])
if __name__ == '__main__':
app.start()
Available tasks are registered in
/path/to/myworkingdir/myapp/tasks.py
(and referenced as myapp.task.task1 in
Flower):
- @shared_task(queue='myqueue')
def task1(...):
...
@shared_task(queue='myqueue',
resultrepr_maxsize=4096, bind=True,
acks_late=True)
def task2(self, ...):
...
|
|
inspect
active
- scheduled
- reserved
- revoked
- registered
- stats
- query_task
<task_uuid>
- active_queues
- registered
|
--destination=celery@example.com
|
- connect to broker and get a list of active
tasks, for all workers:
celery -b
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
inspect active
- list of queues used by each worker:
celery inspect active_queues
- list of registered tasks
celery inspect registered
|
|
control
enable_events
disable_events
rate_limit tasks.add 10/m
|
|
- terminate a task:
celery -d ... control terminate KILL
<task_id>
- celery
-b ... control revoke <task_id>
|
|
events
|
--dump |
|
|
status
|
|
- connect to broker and list active nodes:
celery -b
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
status
- Problems
- Error: No nodes replied within time
constraint.
|
|
... |
|
|
-
|
|
|
worker |
client |
|
celery.py
|
tasks.py
|
command line
|
service |
usage (Calling
Tasks)
|
|
|
|
|
|
res = add.delay(2,2)
res =
add.apply_async((2,2))
res =
add.s(2,2).apply_async()
|
|
|
tasks.py
from celery import Celery
app = Celery('tasks',
broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
|
celery -A tasks worker
--loglevel=info
|
|
from tasks import add
result
= add.delay(4, 4)
result.ready()
result.successful()
result.get()
result.failed()
res.state
res.id
|
|
Using Celery in your Application
proj/rabbitmq.txt
-
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
proj/celery.py
from __future__ import
absolute_import, unicode_literals
from celery import Celery
app = Celery(
'proj',
broker=open('rabbitmq.txt','r').read().strip(),
backend='rpc://',
include=['proj.tasks']
)
# Optional configuration, see the application user
guide.
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
|
proj/__init__.py
proj/tasks.py
from __future__ import
absolute_import, unicode_literals
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)
|
(from proj parent dir)
celery -A proj worker -l INFO
- -A
proj: will use proj.celery.app
- will contact broker specified by broker and join
the party (or another one specified with -b option)
- used queue is the default one ("celery"), in vhost
specified in broker. If not using the default, it
must be specified with
-Q myqueue and
must match the queue specified in @app.task(queue="myqueue")
- registered tasks:
- proj.tasks.add
- proj.tasks.mul
- proj.tasks.xsum
|
|
(from proj parent dir)
from proj.tasks import add
res = add.delay(4, 4)
res.get()
res = add.apply_async((4,4))
|
|
mydir/mymodule/rabbitmq.txt
-
amqp://<celery_user>:<celery_password>@<rabbitmq_server/><celery_vhost>
mydir/mymodule/celery.py
from celery import Celery
app = Celery(
'mymodule',
broker= open('rabbitmq.txt','r').read().strip() ,
backend='rpc://',
include=['mymodule.tasks']
)
if __name__ == '__main__':
app.start()
|
mydir/mymodule/__init__.py
mydir/mymodule/tasks.py
from celery import
shared_task
@shared_task(queue='myqueue')
def add(x,y):
...
|
(from mydir):
celery -A mymodule worker -Q
myqueue -l info -E
- will use mymodule/celery.py
- will contact broker specified in app =
Celery(broker=...) and join the party
- used queue (created if it does not exist yet) will
be the one specified in @shared_task, inside vhost
specified by broker in Celery(broker=...). It must
match the que specified in @shared_task
|
python -m celery multi start mycelery_nodes -Q
myqueue -A mymodule
- will use mymodule/celery.py
- will contact broker specified in app =
Celery(broker=...) and join the party
- used queue (created if it does not exist yet) will
be the one specified in @shared_task, inside vhost
specified by broker in Celery(broker=...)
|
from myproject.tasks
import add
result = add.delay(2,2)
result = add.apply_async((2,2))
|
Django
mysite/myproject/settings.py
# celery
CELERY_BROKER_URL =
'amqp://myuser:mypassword@localhost/myvhost'
|
mysite/myproject/celery.py
from __future__ import
absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the
'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE',
'myproject.settings')
app = Celery('myproject')
# Using a string here means the worker will not
have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace="CELERY")
# access to mysite/*/tasks.py
app.autodiscover_tasks(lambda:
settings.INSTALLED_APPS)
# access to mysite/myproject/tasks.py
app.autodiscover_tasks(lambda: ('myproject',))
@app.task(bind=True)
def debug_task(self):
print('Request:
{0!r}'.format(self.request))
|
mysite/myproject/__init__.py
from __future__ import
absolute_import
# This will make sure the app is always imported
when
# Django starts so that shared_task will use this
app.
from .celery import app as celery_app
mysite/myproject/tasks.py
from __future__ import
absolute_import
import logging
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def add(x, y):
logger.info("adding...")
return x + y
mysite/myapp/tasks.py
from __future__ import
absolute_import
import logging
from celery import shared_task
logger = logging.getLogger(__name__)
@shared_task
def mul(x, y):
logger.info("multiplying...")
return x * y
|
celery -A myproject
--workdir=/path/to/mysite worker -l
info
- will use /path/to/mysite/myproject/celery.py
- as specified by
app.config_from_object('django.conf:settings',
namespace="CELERY")
config values will be taken from settings
definitions starting with CELERY_ (namespace). E.g.
CELERY_BROKER_URL)
- used queue (created if it does not exist yet) will
be the one specified in @shared_task, inside vhost
specified by broker in settings.CELERY_BROKER_URL
|
|
e.g.:
mysite/myapp/views.py
from myapp.tasks import
mul
result = mul.delay(3, 4)
|
-
broker instance |
worker instance |
|
client instance |
monitor instance |
|
|
systemctl start rabbitmq-server.service
- from any instance with installed celery and access
to broker:
source env/bin/activate
- celery
-b $(cat rabbitmq.txt) inspect active
|
celery -A mymodule worker -Q myqueue -l info
-E |
|
cd mydir
python client.py |
|
|
|
|
mydir/myapp/celery.py |
mydir/myapp/tasks.py |
mydir/client.py |
task.state
|
celery_events.py
|
task.info() |
|
|
|
|
|
import sys
from celery import Celery
def my_monitor(app):
with app.connection() as
connection:
recv =
app.events.Receiver(
connection,
handlers={
"task-received":
announce_received_tasks,
"task-started": announce_started_tasks,
"task-failed": announce_failed_tasks,
"task-succeeded": announce_succeeded_tasks,
"task-rejected":
announce_rejected_tasks,
"task-revoked": announce_revoked_tasks,
"task-retried": announce_retried_tasks,
"*": state.event,
}
) |
|
|
|
|
|
RECEIVED
|
def announce_received_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK RECEIVED: %s [%s] [%s]
%s" % (task.name, task.uuid, task.state,
task.info(),)) |
- args
- kwargs
- retries
- root_id
|
|
|
|
|
STARTED
|
def announce_started_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK STARTED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
- args
- kwargs
- retries
- root_id
|
|
|
@app.task
def will_fail(x, y):
return a+b
@app.task
def will_fail(x, y):
raise Exception("not working")
|
|
FAILURE
|
def announce_failed_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK FAILED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
- args
- kwargs
- retries
- root_id
- exception (from raisen exception)
- 'exception': 'NameError("name \'a\' is not
defined",)'
- 'exception': "Exception('not working',)"
|
|
|
|
|
SUCCEEDED
|
def announce_succeeded_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK SUCCEEDED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
- args
- kwargs
- retries
- root_id
- result (from returned value)
- runtime
|
|
|
|
|
|
def announce_rejected_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK REJECTED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
|
|
|
|
|
|
def announce_revoked_tasks(event):
state.event(event)
task =
state.tasks.get(event['uuid'])
print("TASK REVOKED: %s [%s] [%s]
%s" % (task.name, task.uuid,
task.state,
task.info(),))
|
|
|
|
|
|
|
def announce_retried_tasks(event): |
|
|
|
|
|
|
|
|
- Migració / Migration
-
Celery 4 |
Celery 5 |
|
Upgrading
from Celery 4.x |
celery.utils.encoding |
kombu.utils.encoding |
from celery.task import Task |
from celery import Task |
# decorator
from celery import task |
# decorator
from celery import shared_task |
celery multi ... -A my_app
--workdir ... --pidfile=...
|
# daemonizing:
global options must be just after command
celery -A my_app --workdir ...
multi ... --pidfile=... |
- Next steps
- Calling tasks
- ETA
and countdown
- Expiration
- les tasques van de l'apply_async cap al broker (es
pot veure com està la cua del broker amb:
rabbitmqctl
-p celery_vhost list_queues name
messages_ready messages_unacknowledged ) i
el broker les manté a la cua com a Ready fins que
les pot assignar a un worker;
- des de la interfície web de RabbitMQ, els
missatges de la cua i vhost correponent constaran
com a:
- Ready: la tasca encara no s'han
assignat; s'està esperant que un worker l'agafi
- Unacked: la tasca ja s'està
executant en un worker, però encara no s'ha acabat
- si el worker no respon dins del temps màxim
especificat al servidor de RabbitMQ com a
consumer_timeout
(per defecte a 1800000 ms, 30
minuts):
- les traces de celery mostraran un error
PreconditionFailed,
PRECONDITION_FAILED (ETA
and Coundown)
- la tasca es continuarà executant al worker
- al worker el servei celery continuarà sent
ok
- però el worker deixarà de ser vàlid i ja no
apareixerà amb
celery status
- el missatge tornarà a passar d'Unacked a
Ready i, de seguida que hi hagi un altre
worker, es tornarà a assignar i es
tornarà a executar. Llavors hi podria haver
conflictes si, per exemple, les tasques fant
servir un mateix fitxer en un sistema NFS comú
- el broker no controla aquest temps d'expiració
- temps d'expiració: si un worker rep tard una tasca
(ja és massa tard per a executar-la i la rebutja amb
un REVOKED); no és el temps màxim que una tasca pot
està executant-se
- Retry
Policy
- reintents de connexió cap al broker
- ...
- Canvas:
designing workflows
- Signatures
- add.signature(2,2)
- add.s(2,2)
- Primitives
- group
- chain
- chord
- map
- starmap
- chunks
- Routing
- Remote
control
- Timezone
- Optimization
- User
Guide
- Progrés /
Progress
- Problemes / Problems
Hard time limit (300.0s) exceeded for
myproject.tasks.mytask
- Time
limits
- Solució / Solution
@task(time_limit=600,
soft_time_limit=600)
def process:
- /var/log/messages
... python: File
"...lib/python2.7/site-packages/amqp/transport.py", line
438, in _read
... python: s = recv(n - len(rbuf))
... python: socket.error: [Errno 104] Connection reset by
peer
- ...
|
|
- Info
- Python
Modules and Packages – An Introduction
- search path when importing modules:
- ordre / order:
- current dir
- PYTHONPATH
- Python installation
- get:
- append:
- sys.path.append(r"/my/path/to/")
- module location
- import mod
mod
mod.__file__
- example
- mod.py
s = "..."
a = ["primer", "segon"]
def foo(arg):
...
class Foo:
...
- import (Python
Modules and Packages – An Introduction)
import from module |
module |
import from package |
package |
subpackages |
|
- mod.py
s
= "Something."
a = [1, 2,
3]
def foo(arg):
print(f'arg = {arg}')
class Foo:
pass
|
|
- pkg/
- __init__.py
# code in this
file is executed when
# importing package with: import pkg
print(f'Invoking __init__.py for
{__name__}')
A = ['quux', 'corge', 'grault']
# could also be used to force import
of modules
# by just importing package with:
import pkg
# import pkg.mod1,
pkg.mod2
# create a specific list of modules to
be
# imported with: from pkg import *
# __all__ = ["mod1", "mod2"]
- mod1.py
def foo():
print('[mod1]
foo()')
class Foo:
pass
- mod2.py
def bar():
print('[mod2]
bar()')
class Bar:
pass
|
- pkg/
- sub_pkg1/
- sub_pkg2/
- mod3.py
- from
.. import sub_pkg1
print(sub_pkg1)
from ..sub_pkg1.mod1 import foo
foo()
- mod4.py
|
import <module_name>[,
<module_name>
...] |
- toto.py
import mod
mod.s
mod.foo("toto")
x = mod.Foo()
|
|
- toto.py
import pkg.mod1,
pkg.mod2
pkg.mod1.foo()
x = pkg.mod2.Bar()
|
- pkg/sub_pkg2/mod3.py
- from
.. import sub_pkg1
print(sub_pkg1)
from ..sub_pkg1.mod1 import foo
foo()
- toto.py
|
import <module_name>
as <alt_name> |
- toto.py
import mod
as my_module
my_module.a
|
|
|
|
|
|
import <package_name> |
- toto.py
# nothing
imported, unless specifically
# coded in pkg/__init__.py
import pkg
|
|
from <module_name>
import <name(s)> |
- toto.py
from mod
import s, foo
s
foo("toto")
|
|
- toto.py
from pkg.mod1 import foo
foo()
|
- toto.py
from
pkg.sub_pkg2.mod3 import baz
|
from <module_name>
import * |
- toto.py
# DANGEROUS: all
objects are imported,
# except those starting with _
from mod
import *
# you can limit the objects to be
imported
# by defining __all__
list in mod.py
|
from <package_name>
import * |
- toto.py
# import only
modules in list __all__
# defined inside __init__.py
from pkg
import *
|
|
from <module_name>
import <name>
as <alt_name>
|
- toto.py
from mod
import s
as string,
a as
alist
string
alist
|
|
- toto.py
from pkg.mod2 import Bar as Qux
x = Qux()
|
- toto.py
from
pkg.sub_pkg2.mod4 import qux as grault
|
|
|
from <package_name>
import <module_name>[,
<module_name>
...] |
- toto.py
from pkg
import mod1
mod1.foo()
|
- toto.py
from
pkg.sub_pkg1
import mod2
|
|
|
from <package_name>
import <module_name>
as <alt_name> |
- toto.py
from pkg
import mod2
as quux
quux.bar()
|
|
dir()
- alphabetically sorted list of names in the current
local symbol table
dir(mod)
- per a evitar que s'executi codi quan s'importa un
mòdul, però que també es pugui executar com a script,
protegiu el codi de l'script amb un if:
if __name__=="__main__":
# coses que s'executaran quan es
cridi amb: python mod.py
- recàrrega d'un mòdul:
- el codi del fitxer només s'executarà la primera
vegada que l'importem
- per a forar la recàrrega del mòdul:
import importlib
importlib.reload(mod)
- paquets / packages
- ...
- ...
|
Exemples / Examples
|
- Estructura de directoris /
Directory structure
- Info
- ...
|
info |
dirs |
create dirs |
run unit tests |
install with pip |
usage |
django
settings.INSTALLED_APPS |
exemples |
without src/ |
|
|
|
|
pip install my-distribution-package |
- import my_import_package_a
- import my_import_package_a.modulefirst
- import .modulefirst
(when in the same dir)
- from
my_import_package_a.modulefirst import *
- from my_import_package_a.modulefirst import func
|
- "my_import_package_a"
- "dj_rest_auth"
- "axes"
|
|
with src/ |
|
- ~/src/
- my-distribution-package/
- README[.md,.adoc,.rst]
- CHANGELOG
- INSTALL
- setup.py
- requirements.txt
- docs/
- conf.py
- index.srt
- Makefile
- ...
- tests/
- src/
- my_import_package_a/
- my_import_package_b/
|
|
|
pip install my-distribution-package
|
|
|
|
Django |
|
|
- cd
~/src/
django-admin.py startproject samplesite
|
|
|
|
|
|
|
|
|
- cd .....
django-admin.py startproject samplesite .
|
|
|
|
|
|
- ...
- Python Command Line Arguments Examples
- Basic
python script structure
-
|
|
list of arguments
|
number of arguments
|
script
|
argument
|
|
|
|
|
|
|
sys
|
import sys
|
# including script
sys.argv |
# including script
len(sys.argv) |
sys.argv[0] |
sys.argv[1]
sys.argv[2]
...
|
getopt
|
import sys, getopt
|
|
|
|
try:
myopts, args =
getopt.getopt(sys.argv[1:],"i:o:")
except getopt.GetoptError as e:
print (str(e))
print("Usage: %s -i input -o
output" % sys.argv[0])
sys.exit(2)
for o, a in myopts:
if o == '-i':
ifile=a
elif o == '-o':
ofile=a
|
argparse
|
|
|
|
|
import argparse
import textwrap
#parser = argparse.ArgumentParser(description='ADD
YOUR DESCRIPTION HERE')
# RawDescriptionHelpFormatter: only
description
#
RawTextHelpFormatter:
description and arguments
parser
=
argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\
First line
and second line
'''))
parser.add_argument('-i','--input', help='Input
file name', required=True)
parser.add_argument('-t','--title',
metavar='titol', help='Input file name',
required=False)
parser.add_argument('-n','--number', type=int,
help='Input file name', required=True)
# list: https://stackoverflow.com/questions/15753701/how-can-i-pass-a-list-as-a-command-line-argument-with-argparse
parser.add_argument('-l','--list',
action='append', help='<Required> Set flag',
required=True)
parser.add_argument('first_parameter', help='...')
parser.add_argument('parameter_1')
parser.add_argument('parameter_2')
parser.add_argument('--disable',
dest='is_enabled', action='store_false',
required=False)
parser.add_argument('-t', '--title',
metavar='titol',
help='title
to be put in field s= in sdp file',
required=False)
args = parser.parse_args()
print(args)
print("parameter_1: {0}".format(args.parameter_1))
print("title: {0}".format(args.titol))
|
- print
print ("nombre_args: %d" % nombre_args)
- Fusió d'intervals / Merge intervals
|
IDE
|
- Comparison
of
integrated development environments: Python (wp)
- IDLE (wp)
- PyDev (Eclipse)
- Installation
- Help -> Install new software -> Add ...:
- PyDev - https://pydev.org/updates
- PyDev Perspective
- PyDev Package Explorer
- Top Level Elements: Projects
- Django
- New project
- workspace: ~/src/djcode/
- project name: nom_projecte
- will create (according to new directory layout in
Django 1.4):
- ~/src/djcode/
- nom_projecte/
- .project
- <name>nom_projecte</name>
- .pydevproject
- sqlite.db
- nom_projecte/
- nom_projecte can be renamed to nom_projecte_pare (if
you get the error "out of sync with file system", do a
Refresh) (but DATABASES in settings.py is not updated):
- ~/src/djcode/
- nom_projecte_pare/
- .project
- <name>nom_projecte_pare</name>
- .pydevproject
- sqlite.db
- nom_projecte/
- les noves aplicacions (nom_projecte_pare -> Django
-> create new app) es crearan dins de nom_projecte
(de fet, dins del directori especificat a .pydevproject
com a DJANGO_MANAGE_LOCATION)
- virtualenv
- Integrar Virtualenv con Eclipse (PyDev)
- Pydev
and
virtualenv
- passos / steps
- general configuration:
- Window -> Preferences -> PyDev ->
Interpreter - Python: New...
- Interpreter Name: python-PYTHON27
- Interpreter Executable:
/opt/PYTHON27/bin/python
- on your project:
- Properties -> PyDev - Interpreter/Grammar
- Interpreter: python-PYTHON27
- Editor
- Format de codi / Code format
- Window -> Preferences
- PyDev
- Editor
- Code Style
- Code Formatters
- Formatter
style: Black
- Black executable: Search in
interpreter
- black must be installed
- option 1: in interpreter:
- source
env/bin/activate
pip install black
- option 2: globally using
pipx
- Save Actions
- Auto-format editor contents before
saving
- Problemes / Problems
- PyDev Package Explorer
Warning: Top level elements set to working
sets but no working sets are defined
Access the menu (Ctrl+F10) to change to show
projects or create a working set
- Solució
- cliqueu sobre els tres pics verticals
(«View Menu») de la finestra «PyDev Package
Explorer»
- debugging suspends on caught exceptions ("
VariableDoesNotExist:
Failed
lookup for key... ") related to Django
templates:
- Solució / Solution
- PyDev -> Manage exception breakpoints
- Uncheck: "Suspend on django template
render exceptions"
- Unable to read repository at
http://pydev.org/updates/content.xml Transport
initialization error..
- Unresolved import
- Solució / Solution:
- select project -> right click -> PyDev
-> Remove PyDev Project Config
- File -> Restart
- New added library (e.g. by using pip) not detected
- Solution:
- Window -> Preferences -> PyDev ->
Interpreter - Python Interpreter -> Remove
-> AutoConfig
- when selecting a class:
__module_not_in_the_pythonpath__
- Solució / Solution:
- right click on project:
- PyDev -> Set as Source Folder (add to
PYTHONPATH)
- when running: Reference to undefined variable
DJANGO_MANAGE_LOCATION
- Existing code (1.3)
- djcode/
- mysite/
- settings.py
- mystite.db
- polls/
|
|
|
GUI
|
|
Frameworks
|
|
|
|
|
|
- Info
- Documenting
- code
- Docstrings
- PEP
257 – Docstring Conventions
- Docstrings
formats
|
|
sphinx
extensions (config.py)
|
reStructuredText |
"""Summary line.
Extended
description.
:param arg1:
Description of arg1
:type arg1: int
:param arg2: Description of arg2
:type arg2: str
:returns:
Description of return value
:rtype: bool
"""
|
"sphinx.ext.autodoc" |
Google
docstrings (git)
|
# using PEP484
# available
sections
def func(arg1: int, arg2: str) ->
bool:
"""Summary line.
Extended
description.
Args:
arg1: Description of arg1.
arg2: Description of arg2.
Returns:
Description of return value.
Raises:
IOError: An error occurred.
"""
def func(arg1,
arg2):
"""Summary line.
Extended
description.
Args:
arg1 (int): Description of arg1.
arg2 (str): Description of arg2.
Returns:
bool: Description of return value.
Raises:
IOError: An error occurred.
"""
|
"sphinx.ext.napoleon"
|
NumPy/SciPy
docstrings |
-
"""Summary
line.
Extended description.
Parameters
----------
arg1 : int
Description of arg1
arg2 : str
Description of arg2
Returns
-------
bool
Description of
return value
"""
|
"sphinx.ext.autodoc"
"sphinx.ext.napoleon" |
Epytext |
|
|
- Get info
help(my_function)
dir(my_function)
- project
- Documentation
Tools and Resources
- Documentation
Tools (python.org)
- Comparison
of Python documentation generators
- How
to write software documentation
- Sphinx
- Info
- Instal·lació / Installation
- system-wide (not working):
pipx
install sphinx sphinx_rtd_theme
- local
pip install sphinx sphinx_rtd_theme
- Django
- Exemples / Examples
- Ús / Usage
- Build
your first project
cd my_project/docs
sphinx-quickstart
- auto documentation
- ...
conf.py |
manually created |
when running... |
...will create |
and finally, to generate html files,
run: |
"sphinx.ext.autosummary" |
docs/index.rst
Modules
=======
.. autosummary::
:toctree: toto
foobar.foo
foobar.bar
foobar.bar.baz
|
make html |
- docs/
- toto/
- foobar.foo.rst
- foo.bar.rst
- foo.bar.baz.rst
(and html files in build/) |
- |
|
docs/index.rst
Modules
=======
.. autosummary::
:toctree: toto
foobar.foo
foobar.bar
foobar.bar.baz
|
PYTHONPATH=. sphinx-autogen
docs/index.rst |
- docs/
- toto/
- foobar.foo.rst
- foo.bar.rst
- foo.bar.baz.rst
|
make html |
|
docs/conf.py
(default, generated by sphinx-quickstart)
# If
extensions (or modules to document
with autodoc) are in another
directory,
# add these directories to sys.path
here.
import pathlib
import sys
sys.path.insert(0,
pathlib.Path(__file__).parents[2].resolve().as_posix())
docs/index.rst
.. toctree::
:maxdepth: 2
:caption: Contents:
modules
|
sphinx-apidoc
-o docs .
|
- docs/
- modules.rst
sphinx-getting-started
======================
.. toctree::
:maxdepth: 4
my_first_module.rst
...
- my_first_module.rst
- ...
|
make html |
"sphinx.ext.autodoc" |
docs/my_file.rst
.. automodule::
axes.backends
:members:
:show-inheritance:
|
|
|
|
- crea documentació a partir dels docstrings
del codi / build documentation from docstrings in
code
- conf.py
import os
import sys
# is this the autogenerated default one?:
sys.path.insert(0, os.path.abspath(".."))
# -- General
configuration
---------------------------------------------------
extensions
= ["sphinx.ext.todo",
"sphinx.ext.viewcode", "sphinx.ext.autodoc"]
todo_include_todos = True
- #
-- Options for HTML output
-------------------------------------------------
html_theme
= "sphinx_rtd_theme"
# sphinx-themes
- index.srt
.. toctree::
:maxdepth: 2
:caption: Contents:
modules
cd ..
sphinx-apidoc
-o docs .
- crearà al directori docs/ un fitxer
modules.rst i un fitxer rst per a cadascun del
mòduls trobats a . :
- modules.rst
- my_first_found_module.rst
- ...
- Builders
- HTML
cd docs
make html
- (sphinx-build
-M html docs/source/ docs/build/)
- open _build/html/index.html
- PDF
cd docs
make latexpdf
- open _build/latex/my_project.pdf
- ...
|
http://www.francescpinyol.cat/python.html
Primera versió: / First version: 24 VIII 2015
Darrera modificació: 26 d'agost de 2024 / Last update: 26th August
2024
Cap a casa / Back home |