initial import

This commit is contained in:
c056091 2020-03-09 18:13:45 -03:00
parent 99ca18774a
commit 396f7fa6e9
6 changed files with 120 additions and 19 deletions

View File

@ -15,7 +15,7 @@ setup(
license="MIT -or- Apache License 2.0", license="MIT -or- Apache License 2.0",
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
"trio", "trio", 'async_generator'
], ],
keywords=[ keywords=[
# COOKIECUTTER-TRIO-TODO: add some keywords # COOKIECUTTER-TRIO-TODO: add some keywords

View File

@ -1,3 +1,2 @@
"""Top-level package for Trio FTPLIB.""" from ._aftplib import AFTP
from .operations import download_text
from ._version import __version__

72
trio_ftplib/_aftplib.py Normal file
View File

@ -0,0 +1,72 @@
from typing import AsyncIterator
import trio
import ftplib
class AFTP(ftplib.FTP):
def __init__(self, host='', user='', passwd='', acct='',
timeout=ftplib._GLOBAL_DEFAULT_TIMEOUT, source_address=None):
super().__init__(acct=acct, timeout=timeout, source_address=source_address)
self._host = host
self._user = user
self._passwd = passwd
async def connect(self, host='', port=0, timeout=-999, source_address=None):
if host == '':
host = self._host
welcome = await trio.to_thread.run_sync(
super().connect,
host,
port,
timeout,
source_address
)
if self._user:
await self.login()
return welcome
async def __aenter__(self):
if self.sock is None:
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await trio.to_thread.run_sync(self.__exit__)
async def login(self, user='', passwd='', acct=''):
if not user:
user = self._user
if not passwd:
passwd = self._passwd
await trio.to_thread.run_sync(super().login, user, passwd, acct)
async def cwd(self, dirname):
return await trio.to_thread.run_sync(self.cwd, dirname)
async def iretrlines(self, cmd, transf_type='A') -> AsyncIterator[bytes]:
"""
Retrieve data in line mode.
The argument is a RETR or LIST command.
Yields each line.
"""
await trio.to_thread.run_sync(self.sendcmd, f'TYPE {transf_type}')
conn = await trio.to_thread.run_sync(self.transfercmd, cmd)
fp = conn.makefile('rb')
def _finalize():
fp.close()
conn.close()
self.voidresp()
try:
while True:
line = await trio.to_thread.run_sync(fp.readline)
if not line:
break
yield line
finally:
await trio.to_thread.run_sync(_finalize)

View File

@ -1,15 +0,0 @@
import trio
# We can just use 'async def test_*' to define async tests.
# This also uses a virtual clock fixture, so time passes quickly and
# predictably.
async def test_sleep_with_autojump_clock(autojump_clock):
assert trio.current_time() == 0
for i in range(10):
print("Sleeping {} seconds".format(i))
start_time = trio.current_time()
await trio.sleep(i)
end_time = trio.current_time()
assert end_time - start_time == i

View File

@ -0,0 +1,10 @@
from async_generator import aclosing
from trio_ftplib import download_text
async def test_tso_sinaf():
async with aclosing(download_text(server='ftpplex01.corecaixa', user='ftpopr', password='ftpbsa',
path='MIC.NAF.MZ.BBD2.M130D001.D200306')) as _text:
async for line in _text:
assert line.count('@') == 15

35
trio_ftplib/operations.py Normal file
View File

@ -0,0 +1,35 @@
import ftplib
import posixpath
from typing import AsyncIterator
from async_generator import aclosing
from ._aftplib import AFTP
async def download_text(server: str, user: str, password: str, path: str,
encoding='cp1252', n_attempts: int = 10) -> AsyncIterator[str]:
nlines_sent = 0
for attempt in range(n_attempts):
ftp = AFTP(server, user, password)
await ftp.connect()
filename = posixpath.basename(path)
path = posixpath.dirname(path)
if path:
await ftp.cwd(path)
nlines_captured = 0
try:
async with aclosing(ftp.iretrlines(f"RETR '{filename}'")) as _iterator:
async for line in _iterator:
nlines_captured += 1
if nlines_captured <= nlines_sent:
continue
nlines_sent += 1
yield line.decode(encoding)
except ftplib.error_temp as e:
if '426-Data transfer timeout, aborted' not in e.args[0]:
raise
else:
break
else:
raise Exception(f'Failed {n_attempts} attempts, cancelling')