From 75b210ef0922d73e94a3db47885f2b4c33133f8d Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Wed, 8 May 2019 13:20:37 +0100 Subject: [PATCH 01/16] Create docker container based on Michael experiments --- Dockerfile | 42 +++++++++++++++++++++ docker/README.md | 38 +++++++++++++++++++ docker/default.env | 54 +++++++++++++++++++++++++++ docker/dot_env.sh | 56 ++++++++++++++++++++++++++++ docker/erlang-solutions_1.0_all.deb | Bin 0 -> 4992 bytes docker/oe-entrypoint.sh | 37 ++++++++++++++++++ docker/run_edgar.py | 23 ++++++++++++ tika/download_tika.sh | 2 +- 8 files changed, 251 insertions(+), 1 deletion(-) create mode 100644 Dockerfile create mode 100644 docker/README.md create mode 100755 docker/default.env create mode 100755 docker/dot_env.sh create mode 100644 docker/erlang-solutions_1.0_all.deb create mode 100755 docker/oe-entrypoint.sh create mode 100644 docker/run_edgar.py diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d4802e3 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,42 @@ +# CC Specific Dockerfile implementing steps at https://github.com/LexPredict/openedgar/blob/master/INSTALL.md +# Allows the use of OpenEDGAR in AKS +FROM ubuntu:18.04 +MAINTAINER Michael Seddon (michael.seddon@cliffordchance.com) + +# Environment variables +ENV DEBIAN_FRONTEND=noninteractive + +# Package installation +RUN apt update +RUN apt upgrade -y +RUN apt install -y software-properties-common build-essential python3-dev python3-pip virtualenv git-all +# to be removed when rabbit is in its own container +RUN apt install -y rabbitmq-server +RUN apt-get install -y openjdk-8-jdk + +# Clone OpenEDGAR repository +WORKDIR /opt +RUN mkdir /opt/openedgar +COPY lexpredict_openedgar/ /opt/openedgar/lexpredict_openedgar/ + +# Set up Python venv +WORKDIR /opt/openedgar/ +RUN virtualenv -p /usr/bin/python3 env +RUN ./env/bin/pip install -r lexpredict_openedgar/requirements/full.txt +RUN ./env/bin/pip install azure-mgmt-resource azure-mgmt-datalake-store azure-datalake-store + +COPY tika/tika-server-1.20.jar /opt/openedgar/tika/tika-server-1.20.jar +COPY docker/default.env /opt/openedgar/ +RUN cp lexpredict_openedgar/sample.env lexpredict_openedgar/.env +#COPY docker/erlang-solutions_1.0_all.deb lexpredict_openedgar/erlang-solutions_1.0_all.deb +#COPY tasks.py lexpredict_openedgar/openedgar/tasks.py +#COPY edgar.py lexpredict_openedgar/openedgar/processes/edgar.py +#COPY parsers/edgar.py lexpredict_openedgar/openedgar/parsers/edgar.py +#COPY clients/aks.py lexpredict_openedgar/openedgar/clients/aks.py +#COPY clients/edgar.py lexpredict_openedgar/openedgar/clients/edgar.py +COPY docker/oe-entrypoint.sh /usr/local/bin/ +COPY docker/run_edgar.py /opt/openedgar/lexpredict_openedgar/run_edgar.py +COPY docker/dot_env.sh /opt/openedgar +RUN mkdir /data + +ENTRYPOINT ["oe-entrypoint.sh"] \ No newline at end of file diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000..989ec2e --- /dev/null +++ b/docker/README.md @@ -0,0 +1,38 @@ +# Dockerisation +The image will take as default parameters [default.env](docker/default.env) + +All the variable can be substitute at runtime as environment variables + +## Download tika +run in `/tika` the script `download_tika.sh` it will download in the `/tika' +folder tika version 1.20 + +## Docker +Run the follow from the repository root for creating the image: + + docker build -t dslcr.azurecr.io/openedgar:1.1 . + +# Run container +It is wise to mount a local folder to the container for being able to access to the +downloaded documents. +Example: + + docker run --env-file vars.txt -v /Users/mirko/Projects/research-openedgar/data:/data dslcr.azurecr.io/openedgar:1.1 + +Contents of vars.txt + + EDGAR_YEAR=2015 + EDGAR_QUARTER=1 + EDGAR_MONTH=1 + CLIENT_TYPE=Local + S3_DOCUMENT_PATH=/data + DOWNLOAD_PATH=/data + +After the download is terimated you have to stop the container: + + $ docker ps + + CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES + 9e0ae247b61f dslcr.azurecr.io/openedgar:1.1 "oe-entrypoint.sh" 2 minutes ago Up 2 minutes priceless_bardeen + + $ docker kill 9e diff --git a/docker/default.env b/docker/default.env new file mode 100755 index 0000000..a8f60ac --- /dev/null +++ b/docker/default.env @@ -0,0 +1,54 @@ +# PostgreSQL +DATABASE_URL=${DATABASE_URL:="postgres://postgres:postgres@host.docker.internal:5432/openedgar"} +CELERY_BROKER_URL=${CELERY_BROKER_URL:="amqp://openedgar:openedgar@localhost:5672/openedgar"} +CELERY_RESULT_BACKEND=${CELERY_RESULT_BACKEND:="rpc"} +CELERY_RESULT_PERSISTENT=${CELERY_RESULT_PERSISTENT:="False"} +DJANGO_SECRET_KEY=${DJANGO_SECRET_KEY:="openedgar"} + +# Domain name, used by caddy +#DOMAIN_NAME=domain.com + +# General settings +# DJANGO_READ_DOT_ENV_FILE=True +# CLIENT_TYPE: AKS, ADLAKE, Local +CLIENT_TYPE=${CLIENT_TYPE:="Local"} + + +DJANGO_ADMIN_URL=${DJANGO_ADMIN_URL=""} +DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE:="config.settings.production"} +DJANGO_SECRET_KEY=${DJANGO_SECRET_KEY:="openedgar"} +DJANGO_ALLOWED_HOSTS=${DJANGO_ALLOWED_HOSTS:="localhost"} + +# AWS Settings +DJANGO_AWS_ACCESS_KEY_ID=${DJANGO_AWS_ACCESS_KEY_ID:=""} +DJANGO_AWS_SECRET_ACCESS_KEY=${DJANGO_AWS_SECRET_ACCESS_KEY:=""} +DJANGO_AWS_STORAGE_BUCKET_NAME=${DJANGO_AWS_STORAGE_BUCKET_NAME:=""} + +# Used with email +DJANGO_MAILGUN_API_KEY=${DJANGO_MAILGUN_API_KEY:=""} +DJANGO_SERVER_EMAIL=${DJANGO_SERVER_EMAIL:=""} +MAILGUN_SENDER_DOMAIN=${MAILGUN_SENDER_DOMAIN:=""} +EMAIL_BACKEND=${EMAIL_BACKEND:="django.core.mail.backends.console.EmailBackend"} + +# Security! Better to use DNS for this task, but you can use redirect +DJANGO_SECURE_SSL_REDIRECT=${DJANGO_SECURE_SSL_REDIRECT:="False"} + +# django-allauth +DJANGO_ACCOUNT_ALLOW_REGISTRATION=${DJANGO_ACCOUNT_ALLOW_REGISTRATION:="True"} + +# AWS setup +S3_ACCESS_KEY=${S3_ACCESS_KEY:="ABCDEFGHIJKLMNOPQRST"} +S3_SECRET_KEY=${S3_SECRET_KEY:="abcdefghijklmnopqrstuvwxyz12345678901234"} +S3_BUCKET=${S3_BUCKET:=""} + +S3_PREFIX=${S3_PREFIX:="DATA"} +S3_COMPRESSION_LEVEL=${S3_COMPRESSION_LEVEL:="9"} + +# Download path +DOWNLOAD_PATH=${DOWNLOAD_PATH:="/data"} +S3_DOCUMENT_PATH=${S3_DOCUMENT_PATH:="/data"} + +# EDGAR PARAMETERS +EDGAR_YEAR=${EDGAR_YEAR:="2015"} +FORM_TYPES=${FORM_TYPES:="3, 10, 8-K, 10-Q, 10-K"} + diff --git a/docker/dot_env.sh b/docker/dot_env.sh new file mode 100755 index 0000000..6d44328 --- /dev/null +++ b/docker/dot_env.sh @@ -0,0 +1,56 @@ +#!/bin/bash + +cat >/opt/openedgar/lexpredict_openedgar/.env << EOF +DATABASE_URL=$DATABASE_URL +CELERY_BROKER_URL=$CELERY_BROKER_URL +CELERY_RESULT_BACKEND=$CELERY_RESULT_BACKEND +CELERY_RESULT_PERSISTENT=$CELERY_RESULT_PERSISTENT +DJANGO_SECRET_KEY=$DJANGO_SECRET_KEY + +# Domain name, used by caddy +#DOMAIN_NAME=domain.com + +# General settings +# DJANGO_READ_DOT_ENV_FILE=True +# CLIENT_TYPE: AKS, ADLAKE, Local +CLIENT_TYPE=$CLIENT_TYPE + + +DJANGO_ADMIN_URL=$DJANGO_ADMIN_URL +DJANGO_SETTINGS_MODULE=$DJANGO_SETTINGS_MODULE +DJANGO_SECRET_KEY=$DJANGO_SECRET_KEY +DJANGO_ALLOWED_HOSTS=$DJANGO_ALLOWED_HOSTS + +# AWS Settings +DJANGO_AWS_ACCESS_KEY_ID=$DJANGO_AWS_ACCESS_KEY_ID +DJANGO_AWS_SECRET_ACCESS_KEY=$DJANGO_AWS_SECRET_ACCESS_KEY +DJANGO_AWS_STORAGE_BUCKET_NAME=$DJANGO_AWS_STORAGE_BUCKET_NAME + +# Used with email +DJANGO_MAILGUN_API_KEY=$DJANGO_MAILGUN_API_KEY +DJANGO_SERVER_EMAIL=$DJANGO_SERVER_EMAIL +MAILGUN_SENDER_DOMAIN=$MAILGUN_SENDER_DOMAIN +EMAIL_BACKEND=$EMAIL_BACKEND + +# Security! Better to use DNS for this task, but you can use redirect +DJANGO_SECURE_SSL_REDIRECT=$DJANGO_SECURE_SSL_REDIRECT + +# django-allauth +DJANGO_ACCOUNT_ALLOW_REGISTRATION=$DJANGO_ACCOUNT_ALLOW_REGISTRATION + +# AWS setup +S3_ACCESS_KEY=$S3_ACCESS_KEY +S3_SECRET_KEY=$S3_SECRET_KEY +S3_BUCKET=$S3_BUCKET + +S3_PREFIX=$S3_PREFIX +S3_COMPRESSION_LEVEL=$S3_COMPRESSION_LEVEL + +# Download path +DOWNLOAD_PATH=$DOWNLOAD_PATH +S3_DOCUMENT_PATH=$S3_DOCUMENT_PATH + +# EDGAR PARAMETERS +EDGAR_YEAR=$EDGAR_YEAR +FORM_TYPES=$FORM_TYPES +EOF \ No newline at end of file diff --git a/docker/erlang-solutions_1.0_all.deb b/docker/erlang-solutions_1.0_all.deb new file mode 100644 index 0000000000000000000000000000000000000000..6cf8fefa5dd7a84d81c97c0f41901bcaab2ddcc9 GIT binary patch literal 4992 zcma)=Wl$Sjv&VzCSfIEjNPz;yA-ERzQXbqLTA;YQQz#l-Dv%<@H3ThI+@WYGZh=5> zyM5ld_w#%IvwLRG%@~~rGN`f zRex9O5VEMCDKt867`}0;3C6A)N+#|Mp*<=jEO?!LtssON8~n+pC8cXCHZI1C7{|o zPX)c@C`W{neM-jf&b-c8J2DNhAJoMDbri-O>;%19Uby;r0QDZ$+5mxteL-RnPcoPv zP{L@p+b!e@ApT?VCHe?t$y+7>d|29|y1ffic#TO&saSO;N|Y>1NuP^ZI~IX7BXB30 zlH>p$uM0_J=92cdMi=1y!cq949TPQ7F2aB}w!-}~LSPj5a(A{9)JResk~8r5>JeC| z{Q$y~;oxA?KIl|fo_{=+Uki;jl#3vjepPY99zHK4^iX$+wG=yA8H8Sb)*l*1I0Hcy`Eq(A9>cu=b-ohYI-=(*msTTKEt1TQ*oD-kmCDNK>XsMitia zh$P?lK^&m&Rug#QaJu9dyUywM%R7T7G2z&r0jHl(VZvP|H8G)4AeR=@3AYi}!`?^t zy|$;{V%?ncImf~vFJ#?5@+`|f+_|sgu_Vo}iB*qB_j&FO^bbJ3`-0X&-ZrcyW;Y}@0m8cj+UTiv%~n4!rG4ph116)PxmAZwG})i ztVuYAZLZ$m+papOrdBuQkrmM)6a6OC_7usW zX_f~C8{p?J3L9^(2V~6oYzf|3*xb#GxbZg!Rnm)Y3GND(PRmQoQxA*?Ksx)EPv0{f zPt8(kTypu(@htrH+Yjj*s2KWVyuCUfkkjg!*t&(aO&sTe2}9v11ot`EVA7fX{9CXK zor%_1So5KKc4^4;FUr2SjO0uTvWfV*GA^vwOH%s`v-1zrvMT9%)xD)Nv4%^$c) zh|Fd}pTgH|VTwUS0hbpq*!^(}?hUOLNe26>B#C#7!*fixnmRWHF@2deFGy_R4U5$i_2RuCCC-v;R=83y9{+qmGN98wlpM$I5uur`LdI`ht)+e^-&V2H$Mz7~nV3^$3CmA0T&{E{46GgJ9dgdbA)fRz+ zvdJV(H*ajsj*{Q4S7_2dqz~)ro?Qnrd{^3`n3?q7rzbYzsm)XRYdF8vh}*(KwTU@C%B2`R5jvP872Ie^TG72- zQQPlLZn=&=@ zTD_1+MaC7swq-&=Ulw9RWTxj?rS+V^*UmOaRzG4G72>|jqf?EZv`=5l-h{dlyn-A} z)gw08Z}&!yVaC-pnnh(;aLMe2Z!0@1#oUCBF2&oErg_q(N(^~vDLrCenXAYwSvIpH zXZRyOY2Z4iGr0ss#-Hkv^r$tHI|#|+IbksULa-Zi3R(-tp6M$VYF*c4cv>H5irGqu zE4k{|(Eg+z7=+#DzroMjO2ZPM$D#j%{={gedj;KGaMzni4SK7WpH2jEokJe`j<_#v zi&`m&No~hnmK$-f_g6XyADNd!T@sExCPqr*0741kK?rA;@Jm}raOa1m{Z%raO!o|rv+GL-!cJ*5OzM=as z2=md%4QqIQ_L~U1#7OuFlSrKA$J2{LP7dn&g)zM(y``mAcAwMk-2EzdY-JJ)C5e#y z&F2N;KceFV3&+w`$8y*j(7VQIV()6`o}9wP7@{jiNe1M;7HZa-&IFLz!t_S%_MhCh zMrRCw8Pib#pOpDEl{qk5$@&x8cw}5`+AI~ykNC&+9zItu=^Bn-;?VeF+O^OLDYqB= zlU1-fBqUB?za4+>{37wWkQ#}pRr>Rev$zG(p~MN$XcMF78_H+($6uLgi8C`C$D2>P zSL!kwb>Q_0^etA>V_5-IX}Z?c5akATqgtn^Gw}R%NT96Sp1GV?l5`Q*-nUpEK}DGp zDK@@ok&Z6`_sJh$F<1}F;;;!MOfXbcT%Ur$trf2dWtB~f4m5)qsb;s|DuRWk=odSd zVM%rGoG_DzBd1K_R)wPe*{Bsez%P%^!zeSMbJ(G*NHBieOkHj>me4l@H=HFk-dpY( z=-beJaYwUMeQWlkzvAW0Oty1;E?cAX{AP`;eWP^2rG~AOa>94*5^H0qU{1bDMQ!x) z$U6|jjbn?DxAc6(9%Xo`6%FW?P>rZ5yw(DV^)rr;pDf3tTi4YxLybaX!P}Q^T!L{C z$;y!Q%bIRku5H@6jaClLLD14F$H3()G8~gQ>pFj=Hj+Q!{ha}S<*1>0P7l=LE|z)0 zMpbg870CZ{>pSS}jV@inPb}3yj?kQ2gwn8~qd~3$b6bjbSJfYK%gGFkR-LRms`>sf z1HU(H*YWFB8MCB|adkl|HS#octYqIYQ%m4eZ?cgA1J223=}o^0zpg6vX81Q(1}2As zMJS{RfL}=nFV_K_Y3H_sXc*yu?q;zg)gI^h*LVCST}aYuMPki4S5^PjC{r5!`GUhdHuG)e_m%?)U->g0nP9kc*@lBMJD0> z&e*@$*WZ}XrC6UIcK29(NC*(tMx?!l;MCHJe8(}Lr(k|TTk;mVI0!nq{Q-|<{jzM% zO6a_u)+r^TJmDD6y?2g5b7)8W4k!lGl1yUh7c41{nQ~?_Xv;ONJT({0{YPXyv!wcB zSR_?xs=q4<@Jv{yZpTA_ec#q8$HX%yGw(Ub73&q!wb!Fi=YE747j2!d;xZ*jwBq3x zRB;~p9d1reQehRCMgeK59E+!}Rp3wI;x0f^1`BE(MI3+OelHZGH5Bim6Yn~yY+&NU zg~~{B?Up?0E!A6s?hE2U(}lQ1vH!R_Q7mN8ke;>kT;;9s55?&DnDApdP4~oUCEIwi zZj;A3QOA`=V;o)jg>M_>65o3H5&Ifib#|iWuB8`QG*zj0Go|{@UC9nxO>y@;*2y(~ ze?v^|XnvT=Hr@aD&3caz+G8dCi-qF#2$7v-*r+81!7NnCFD7mlrPhG|{;A!_t1P7m zm&BPlmY?1l(4%NAQg~xi#%Ne$WnDJc2KPxyt@V{BTw%mM!YKR4hN*(MB&*#!u2rqR ziz(1{LNd{W!dhQkYCSPtf%i*dC4~|9*y|zf9RhJWXYy%9QlE~hX7#!p=ZUwVSvehV zXT;_Kj>Hhro$R};+&W?jXf0=dQ(i)A=Vri)R$7CgLk1AHtZiGz6m^ayk(YhotVtL8 z^HJO|30k!o*COSm0!#<9By=b`&`lS8PCp>ug5`dhUN5G zem-vmXG7ZM5#59Pds5aA*D9{HCBncQj=b<~HYDP;64(?BJP$|iwB*U@uRIl?Dk3U# zhk6-UqVTZtc2h4@ARK{;#XbF=4fN{#g-eOyq>wKPfz%6(t?V{GzJ2mfGEuhU~zaufijl~UUZ#OuK)D9QR&LK2ght!p$#p{jqx>oE&jjI~rN)4}%X-I{EOs&gA_ zZ7EDI1M>tp72;e+St^@HcnD}aH!Zl>v?%b$t|+nD`;6=cN>9qGpmgTmL4Y(YsPsDH zVMAlTH?M%cgYho#0a(F^SXDs?t0u5Jh3%DHDU%}QKl{l(fhlJFr$|G6S7Q0I)+SU6 zV>$7nOGt7^m~@yVUYjlMUG{A>;-OgwZX8YNw2Z15X4q1Q=$jD4#Dx78Pvlhd4}SNXX& zWcw3RWOZ$aFDjKb%eD0z0B{;b9v@+MeFk7eVubyDR07W~KQNJ# AmjD0& literal 0 HcmV?d00001 diff --git a/docker/oe-entrypoint.sh b/docker/oe-entrypoint.sh new file mode 100755 index 0000000..292286e --- /dev/null +++ b/docker/oe-entrypoint.sh @@ -0,0 +1,37 @@ +#!/bin/bash + +cd /opt/openedgar/lexpredict_openedgar +export PYTHONIOENCODING=utf-8 + +source ../env/bin/activate +source /opt/openedgar/default.env + +source /opt/openedgar/dot_env.sh + +export C_FORCE_ROOT="true" + +service rabbitmq-server start + +rabbitmqctl add_user openedgar openedgar + +rabbitmqctl add_vhost openedgar + +rabbitmqctl set_permissions -p openedgar openedgar ".*" ".*" ".*" + +# perform initial migration +python manage.py migrate + +celery -A lexpredict_openedgar.taskapp worker --loglevel=INFO -f ./celery.log -c16 & + +cd /opt/openedgar/tika + +java -jar tika-server-1.20.jar > tika.log & + +cd /opt/openedgar/lexpredict_openedgar +source ../env/bin/activate +source /opt/openedgar/default.env + + +python manage.py shell < run_edgar.py + +tail -f celery.log \ No newline at end of file diff --git a/docker/run_edgar.py b/docker/run_edgar.py new file mode 100644 index 0000000..f69e37e --- /dev/null +++ b/docker/run_edgar.py @@ -0,0 +1,23 @@ +import os +from openedgar.processes.edgar import download_filing_index_data, process_all_filing_index + +year = os.getenv("EDGAR_YEAR") +quarter = os.getenv("EDGAR_QUARTER") +month = os.getenv("EDGAR_MONTH") +types = [t.upper().strip() for t in os.getenv("FORM_TYPES").split(",")] + + +# process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=["3.1","3.2","10.1"]) +# process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=["10-k"]) + +# for i in range(1995,2019) +# process_all_filing_index(year=i, form_type_list=types) + +#£ +# fix: openedgar.tasks: ERROR Multiple Filing records found for s3_path=edgar/data/1096325/0001096325-15-000002.txt, skipping... +# fix: openedgar.tasks: ERROR Unable to create filing documents for Filing id=30186, cik=1024725, form_type=8-K, date_filed=2015-02-02: 'ascii' codec can't encode character '\xa0' in position 20: ordinal not in range(128) +# Reduce log level +process_all_filing_index(year=year, form_type_list=types) + +for i in range(1, 10): + print("###############################################") diff --git a/tika/download_tika.sh b/tika/download_tika.sh index 7fcce88..d875ea4 100644 --- a/tika/download_tika.sh +++ b/tika/download_tika.sh @@ -1 +1 @@ -wget http://www-us.apache.org/dist/tika/tika-server-1.18.jar +wget http://www-us.apache.org/dist/tika/tika-server-1.20.jar From 184c1ad12eea92f96399271e007044607f2a16f2 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Wed, 8 May 2019 13:48:22 +0100 Subject: [PATCH 02/16] Reduced log level to WARNING --- Dockerfile | 5 +++-- docker/oe-entrypoint.sh | 2 +- lexpredict_openedgar/openedgar/tasks.py | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Dockerfile b/Dockerfile index d4802e3..d5b841e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,15 +17,16 @@ RUN apt-get install -y openjdk-8-jdk # Clone OpenEDGAR repository WORKDIR /opt RUN mkdir /opt/openedgar -COPY lexpredict_openedgar/ /opt/openedgar/lexpredict_openedgar/ # Set up Python venv WORKDIR /opt/openedgar/ RUN virtualenv -p /usr/bin/python3 env +COPY lexpredict_openedgar/requirements/full.txt lexpredict_openedgar/requirements/full.txt RUN ./env/bin/pip install -r lexpredict_openedgar/requirements/full.txt RUN ./env/bin/pip install azure-mgmt-resource azure-mgmt-datalake-store azure-datalake-store - COPY tika/tika-server-1.20.jar /opt/openedgar/tika/tika-server-1.20.jar +COPY lexpredict_openedgar/ /opt/openedgar/lexpredict_openedgar/ + COPY docker/default.env /opt/openedgar/ RUN cp lexpredict_openedgar/sample.env lexpredict_openedgar/.env #COPY docker/erlang-solutions_1.0_all.deb lexpredict_openedgar/erlang-solutions_1.0_all.deb diff --git a/docker/oe-entrypoint.sh b/docker/oe-entrypoint.sh index 292286e..dd41f9a 100755 --- a/docker/oe-entrypoint.sh +++ b/docker/oe-entrypoint.sh @@ -21,7 +21,7 @@ rabbitmqctl set_permissions -p openedgar openedgar ".*" ".*" ".*" # perform initial migration python manage.py migrate -celery -A lexpredict_openedgar.taskapp worker --loglevel=INFO -f ./celery.log -c16 & +celery -A lexpredict_openedgar.taskapp worker --loglevel=WARNING£# -f ./celery.log -c16 & cd /opt/openedgar/tika diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index e4b2380..8166e75 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -50,9 +50,9 @@ # Logging setup logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) +logger.setLevel(logging.WARNING) console = logging.StreamHandler() -console.setLevel(logging.INFO) +console.setLevel(logging.WARNING) formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') console.setFormatter(formatter) logger.addHandler(console) From 23e10c115820f9b35867f9f59e29e2d13799f3c1 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 10:25:04 +0100 Subject: [PATCH 03/16] Added rate limit to 10/s Pipe for celery log --- docker/default.env | 1 + docker/dot_env.sh | 1 + docker/oe-entrypoint.sh | 3 ++- docker/run_edgar.py | 5 ++++- 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/docker/default.env b/docker/default.env index a8f60ac..8179555 100755 --- a/docker/default.env +++ b/docker/default.env @@ -23,6 +23,7 @@ DJANGO_ALLOWED_HOSTS=${DJANGO_ALLOWED_HOSTS:="localhost"} DJANGO_AWS_ACCESS_KEY_ID=${DJANGO_AWS_ACCESS_KEY_ID:=""} DJANGO_AWS_SECRET_ACCESS_KEY=${DJANGO_AWS_SECRET_ACCESS_KEY:=""} DJANGO_AWS_STORAGE_BUCKET_NAME=${DJANGO_AWS_STORAGE_BUCKET_NAME:=""} +CELERY_TASK_DEFAULT_RATE_LIMIT={$CELERY_TASK_DEFAULT_RATE_LIMIT:="10/s"} # Used with email DJANGO_MAILGUN_API_KEY=${DJANGO_MAILGUN_API_KEY:=""} diff --git a/docker/dot_env.sh b/docker/dot_env.sh index 6d44328..5067f10 100755 --- a/docker/dot_env.sh +++ b/docker/dot_env.sh @@ -20,6 +20,7 @@ DJANGO_ADMIN_URL=$DJANGO_ADMIN_URL DJANGO_SETTINGS_MODULE=$DJANGO_SETTINGS_MODULE DJANGO_SECRET_KEY=$DJANGO_SECRET_KEY DJANGO_ALLOWED_HOSTS=$DJANGO_ALLOWED_HOSTS +CELERY_TASK_DEFAULT_RATE_LIMIT=$CELERY_TASK_DEFAULT_RATE_LIMIT # AWS Settings DJANGO_AWS_ACCESS_KEY_ID=$DJANGO_AWS_ACCESS_KEY_ID diff --git a/docker/oe-entrypoint.sh b/docker/oe-entrypoint.sh index dd41f9a..d8e4d3f 100755 --- a/docker/oe-entrypoint.sh +++ b/docker/oe-entrypoint.sh @@ -21,7 +21,8 @@ rabbitmqctl set_permissions -p openedgar openedgar ".*" ".*" ".*" # perform initial migration python manage.py migrate -celery -A lexpredict_openedgar.taskapp worker --loglevel=WARNING£# -f ./celery.log -c16 & +mkfifo celery.log +celery -A lexpredict_openedgar.taskapp worker --loglevel=WARNING 2> celery.log & cd /opt/openedgar/tika diff --git a/docker/run_edgar.py b/docker/run_edgar.py index f69e37e..a882df3 100644 --- a/docker/run_edgar.py +++ b/docker/run_edgar.py @@ -1,6 +1,8 @@ +import logging import os from openedgar.processes.edgar import download_filing_index_data, process_all_filing_index + year = os.getenv("EDGAR_YEAR") quarter = os.getenv("EDGAR_QUARTER") month = os.getenv("EDGAR_MONTH") @@ -16,7 +18,8 @@ #£ # fix: openedgar.tasks: ERROR Multiple Filing records found for s3_path=edgar/data/1096325/0001096325-15-000002.txt, skipping... # fix: openedgar.tasks: ERROR Unable to create filing documents for Filing id=30186, cik=1024725, form_type=8-K, date_filed=2015-02-02: 'ascii' codec can't encode character '\xa0' in position 20: ordinal not in range(128) -# Reduce log level +print("Edgar analysis started") +print("Analysing year {} types {}".format(year, types)) process_all_filing_index(year=year, form_type_list=types) for i in range(1, 10): From 2a8c1b3f0472bd0ea0b185c1a7a5ec4b0d441181 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 11:34:30 +0100 Subject: [PATCH 04/16] Fixing logs --- docker/oe-entrypoint.sh | 17 +++++++++-------- .../openedgar/processes/edgar.py | 14 +++++++------- lexpredict_openedgar/openedgar/tasks.py | 11 ++++++----- 3 files changed, 22 insertions(+), 20 deletions(-) diff --git a/docker/oe-entrypoint.sh b/docker/oe-entrypoint.sh index d8e4d3f..511e722 100755 --- a/docker/oe-entrypoint.sh +++ b/docker/oe-entrypoint.sh @@ -18,21 +18,22 @@ rabbitmqctl add_vhost openedgar rabbitmqctl set_permissions -p openedgar openedgar ".*" ".*" ".*" -# perform initial migration -python manage.py migrate - -mkfifo celery.log -celery -A lexpredict_openedgar.taskapp worker --loglevel=WARNING 2> celery.log & - cd /opt/openedgar/tika -java -jar tika-server-1.20.jar > tika.log & +java -jar tika-server-1.20.jar > /data/logs/tika.log 2>&1 & cd /opt/openedgar/lexpredict_openedgar + source ../env/bin/activate source /opt/openedgar/default.env +# perform initial migration +python manage.py migrate + +mkdir /data/logs +celery -A lexpredict_openedgar.taskapp worker --loglevel=INFO > /data/logs/celery.log 2>&1 & python manage.py shell < run_edgar.py -tail -f celery.log \ No newline at end of file +tail -f /data/logs/celery.log +#| grep -v "INFO rmeta/text (autodetecting type)" diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index fa4b082..320c972 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -37,9 +37,9 @@ # Logging setup logger = logging.getLogger(__name__) -logger.setLevel(logging.ERROR) +logger.setLevel(logging.INFO) console = logging.StreamHandler() -console.setLevel(logging.ERROR) +console.setLevel(logging.INFO) formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') console.setFormatter(formatter) logger.addHandler(console) @@ -81,10 +81,10 @@ def download_filing_index_data(year: int = None): try: filing_index = FilingIndex.objects.get(edgar_url=filing_index_path) is_processed = filing_index.is_processed - logger.info("Index {0} already exists in DB.".format(filing_index_path)) + logger.debug("Index {0} already exists in DB.".format(filing_index_path)) except FilingIndex.DoesNotExist: is_processed = False - logger.info("Index {0} does not exist in DB.".format(filing_index_path)) + logger.debug("Index {0} does not exist in DB.".format(filing_index_path)) # Check if exists; download and upload to S3 if missing if not download_client.path_exists(file_path): @@ -94,10 +94,10 @@ def download_filing_index_data(year: int = None): # Upload download_client.put_buffer(file_path, buffer) - logger.info("Retrieved {0} and uploaded to S3.".format(filing_index_path)) + logger.debug("Retrieved {0} and uploaded to S3.".format(filing_index_path)) path_list.append((file_path, True, is_processed)) else: - logger.info("Index {0} already exists on S3.".format(filing_index_path)) + logger.debug("Index {0} already exists on S3.".format(filing_index_path)) path_list.append((file_path, False, is_processed)) # Return list of updates @@ -179,7 +179,7 @@ def search_filing_documents(term_list: Iterable[str], form_type_list: Iterable[s stem_search=stem_search) n += 1 - logger.info("Searching {0} documents for {1} terms...".format(n, len(term_list))) + logger.debug("Searching {0} documents for {1} terms...".format(n, len(term_list))) def export_filing_document_search(search_query_id: int, output_file_path: str): diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 8166e75..83c4469 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -50,9 +50,9 @@ # Logging setup logger = logging.getLogger(__name__) -logger.setLevel(logging.WARNING) +logger.setLevel(logging.INFO) console = logging.StreamHandler() -console.setLevel(logging.WARNING) +console.setLevel(logging.INFO) formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') console.setFormatter(formatter) logger.addHandler(console) @@ -217,7 +217,7 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: # Get main filing data structure filing_index_data = openedgar.parsers.edgar.parse_index_file(temp_file.name) - logger.info("Parsed {0} records from index".format(filing_index_data.shape[0])) + logger.warning("Parsed {0} records from index".format(filing_index_data.shape[0])) # Iterate through rows bad_record_count = 0 @@ -241,8 +241,9 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: except Filing.MultipleObjectsReturned as e: # Create new filing record logger.error("Multiple Filing records found for s3_path={0}, skipping...".format(filing_path)) - logger.info("Raw exception: {0}".format(e)) - continue + filing=Filing.objects.filter(s3_path=filing_path).first() + logger.warning("Getting the first one: {0}".format(filing)) + logger.warning("Raw exception: {0}".format(e)) except Filing.DoesNotExist as f: # Create new filing record logger.info("No Filing record found for {0}, creating...".format(filing_path)) From b5ce75aebd0df342371ffca0098f43cc3f8c7271 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 14:08:14 +0100 Subject: [PATCH 05/16] Fixing encoding problems and improved logging --- .../openedgar/clients/local.py | 4 +- .../openedgar/parsers/edgar.py | 41 +++++++++----- lexpredict_openedgar/openedgar/tasks.py | 56 +++++++++++-------- 3 files changed, 63 insertions(+), 38 deletions(-) diff --git a/lexpredict_openedgar/openedgar/clients/local.py b/lexpredict_openedgar/openedgar/clients/local.py index 2a7fa62..67e7dbb 100644 --- a/lexpredict_openedgar/openedgar/clients/local.py +++ b/lexpredict_openedgar/openedgar/clients/local.py @@ -48,9 +48,11 @@ def put_buffer(self, file_path: str, buffer, write_bytes=True): os.makedirs(dir_name) if write_bytes: mode="wb" + encoding = None else: mode="w" - with open(file_path, mode=mode) as localfile: + encoding="utf-8" + with open(file_path, mode=mode, encoding=encoding) as localfile: localfile.write(buffer) def get_buffer(self, file_path: str): diff --git a/lexpredict_openedgar/openedgar/parsers/edgar.py b/lexpredict_openedgar/openedgar/parsers/edgar.py index 06a31c8..9970e55 100755 --- a/lexpredict_openedgar/openedgar/parsers/edgar.py +++ b/lexpredict_openedgar/openedgar/parsers/edgar.py @@ -125,7 +125,6 @@ def parse_index_file(file_name: str, double_gz: bool = False): with gzip.open(file_name, "rb") as index_file: index_buffer = index_file.read() except IOError as e: - logger.error("IOError parsing {0}: {1}".format(file_name, e)) # Read as plain binary with open(file_name, "rb") as index_file: index_buffer = index_file.read() @@ -135,6 +134,8 @@ def parse_index_file(file_name: str, double_gz: bool = False): index_buffer = zlib.decompress(index_buffer).decode("utf-8") logger.info("gz with valid header: decompressing {0} to {1} bytes.".format(file_name, len(index_buffer))) + else: + logger.info("IOError parsing {0}: {1}".format(file_name, e)) # Check for double-gz if double_gz: @@ -218,6 +219,29 @@ def extract_filing_header_field(buffer: Union[bytes, str], field: str): return buffer[p0:p1].strip() +def decode_filing(to_decode: Union[bytes, str]) -> Union[str, None]: + buffer = to_decode + if isinstance(buffer, bytes): + try: + # Start with UTF-8 + buffer = str(buffer.decode("utf-8")) + except UnicodeDecodeError as _: + try: + # Fallback to ISO 8859-1 + logger.warning("Falling back to ISO 8859-1 after failing to decode with UTF-8...") + buffer = str(buffer.decode("iso-8859-1")) + except UnicodeDecodeError as _: + try: + # Fallback to ISO 8859-15 + logger.warning("Falling back to ISO 8859-15 after failing to decode with UTF-8...") + buffer = str(buffer.decode("iso-8859-5")) + except UnicodeDecodeError as _: + # Give up if we can't + logger.error("Unable to decode with either UTF-8 or ISO 8859-1; giving up...") + return None + return buffer + + def parse_filing(buffer: Union[bytes, str], extract: bool = False): """ Parse a filing file by returning each document within @@ -243,18 +267,9 @@ def parse_filing(buffer: Union[bytes, str], extract: bool = False): # Typing if isinstance(buffer, bytes): - try: - # Start with UTF-8 - buffer = str(buffer.decode("utf-8")) - except UnicodeDecodeError as _: - try: - # Fallback to ISO 8859-1 - logger.warning("Falling back to ISO 8859-1 after failing to decode with UTF-8...") - buffer = str(buffer.decode("iso-8859-1")) - except UnicodeDecodeError as _: - # Give up if we can't - logger.error("Unable to decode with either UTF-8 or ISO 8859-1; giving up...") - return filing_data + buffer = decode_filing(buffer) + if buffer is None: + return filing_data # Check for SEC-HEADER block if "" in buffer or "" in buffer: diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 83c4469..744e699 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -26,9 +26,11 @@ import datetime import hashlib import logging +import sys import tempfile import os import pathlib +import traceback from typing import Iterable, Union # Packages @@ -70,24 +72,9 @@ def create_filing_documents(client, documents, filing, store_raw: bool = True, s """ # Get client if we're using S3 - # Iterate through documents document_records = [] for document in documents: - # Create DB object - filing_doc = FilingDocument() - filing_doc.filing = filing - filing_doc.type = document["type"] - filing_doc.sequence = document["sequence"] - filing_doc.file_name = document["file_name"] - filing_doc.content_type = document["content_type"] - filing_doc.description = document["description"] - filing_doc.sha1 = document["sha1"] - filing_doc.start_pos = document["start_pos"] - filing_doc.end_pos = document["end_pos"] - filing_doc.is_processed = True - filing_doc.is_error = len(document["content"]) > 0 - document_records.append(filing_doc) # Upload raw if requested if store_raw and len(document["content"]) > 0: @@ -110,6 +97,20 @@ def create_filing_documents(client, documents, filing, store_raw: bool = True, s else: logger.info("Text contents for filing={0}, sequence={1}, sha1={2} already exists on S3" .format(filing, document["sequence"], document["sha1"])) + # Create DB object + filing_doc = FilingDocument() + filing_doc.filing = filing + filing_doc.type = document["type"] + filing_doc.sequence = document["sequence"] + filing_doc.file_name = document["file_name"] + filing_doc.content_type = document["content_type"] + filing_doc.description = document["description"] + filing_doc.sha1 = document["sha1"] + filing_doc.start_pos = document["start_pos"] + filing_doc.end_pos = document["end_pos"] + filing_doc.is_processed = True + filing_doc.is_error = len(document["content"]) > 0 + document_records.append(filing_doc) # Create in bulk FilingDocument.objects.bulk_create(document_records) @@ -240,10 +241,10 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: logger.info("Filing record already exists: {0}".format(filing)) except Filing.MultipleObjectsReturned as e: # Create new filing record - logger.error("Multiple Filing records found for s3_path={0}, skipping...".format(filing_path)) - filing=Filing.objects.filter(s3_path=filing_path).first() - logger.warning("Getting the first one: {0}".format(filing)) - logger.warning("Raw exception: {0}".format(e)) + logger.warning("Multiple Filing records found for s3_path={0} .. Taking first one!".format(filing_path)) + filing = Filing.objects.filter(s3_path=filing_path).first() + logger.info("Filing record already exists: {0}".format(filing)) + logger.debug("Raw exception: {0}".format(e)) except Filing.DoesNotExist as f: # Create new filing record logger.info("No Filing record found for {0}, creating...".format(filing_path)) @@ -270,7 +271,8 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: filing_buffer = client.get_buffer(filing_path) # Parse - filing_result = process_filing(client, filing_path, filing_buffer, store_raw=store_raw, store_text=store_text) + filing_result = process_filing(client, filing_path, filing_buffer, store_raw=store_raw, + store_text=store_text) if filing_result is None: logger.error("Unable to process filing.") bad_record_count += 1 @@ -316,7 +318,6 @@ def process_filing(client, file_path: str, filing_buffer: Union[str, bytes] = No # Log entry logger.info("Processing filing {0}...".format(file_path)) - # Check for existing record first try: filing = Filing.objects.get(s3_path=file_path) @@ -333,6 +334,10 @@ def process_filing(client, file_path: str, filing_buffer: Union[str, bytes] = No if filing_buffer is None: logger.info("Retrieving filing buffer from S3...") filing_buffer = client.get_buffer(file_path) + filing_buffer = openedgar.parsers.edgar.decode_filing(filing_buffer) + if filing_buffer is None: + logger.error("Unable to read the filing {}".format(file_path)) + return None # Get main filing data structure filing_data = openedgar.parsers.edgar.parse_filing(filing_buffer, extract=store_text) @@ -416,7 +421,12 @@ def process_filing(client, file_path: str, filing_buffer: Union[str, bytes] = No filing.save() return filing except Exception as e: # pylint: disable=broad-except - logger.error("Unable to create filing documents for {0}: {1}".format(filing, e)) + logger.error("423: Unable to create filing documents for {0}: {1}qGG".format(filing, e)) + exc_type, exc_value, exc_tb = sys.exc_info() + ex_txt = "" + for line in traceback.TracebackException(type(exc_type), exc_value, exc_tb).format(chain=None): + ex_txt += line + "\n" + logger.error(ex_txt) return None @@ -430,8 +440,6 @@ def extract_filing(client, file_path: str, filing_buffer: Union[str, bytes] = No """ # Get buffer - - if filing_buffer is None: logger.info("Retrieving filing buffer from S3...") filing_buffer = client.get_buffer(file_path) From 5372ba9d65e467bffb4a7f46c4e780e8af5e3e7d Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 15:13:39 +0100 Subject: [PATCH 06/16] Added the processing step for extracting the TEXT --- docker/oe-entrypoint.sh | 2 +- .../openedgar/clients/local.py | 6 +-- .../openedgar/process_text.py | 49 +++++++++++++++++++ .../openedgar/processes/edgar.py | 8 +-- lexpredict_openedgar/openedgar/tasks.py | 30 ++++++++++-- lexpredict_openedgar/requirements/full.txt | 1 + 6 files changed, 83 insertions(+), 13 deletions(-) create mode 100644 lexpredict_openedgar/openedgar/process_text.py diff --git a/docker/oe-entrypoint.sh b/docker/oe-entrypoint.sh index 511e722..0933d55 100755 --- a/docker/oe-entrypoint.sh +++ b/docker/oe-entrypoint.sh @@ -2,6 +2,7 @@ cd /opt/openedgar/lexpredict_openedgar export PYTHONIOENCODING=utf-8 +mkdir -p /data/logs source ../env/bin/activate source /opt/openedgar/default.env @@ -30,7 +31,6 @@ source /opt/openedgar/default.env # perform initial migration python manage.py migrate -mkdir /data/logs celery -A lexpredict_openedgar.taskapp worker --loglevel=INFO > /data/logs/celery.log 2>&1 & python manage.py shell < run_edgar.py diff --git a/lexpredict_openedgar/openedgar/clients/local.py b/lexpredict_openedgar/openedgar/clients/local.py index 67e7dbb..1a3d917 100644 --- a/lexpredict_openedgar/openedgar/clients/local.py +++ b/lexpredict_openedgar/openedgar/clients/local.py @@ -47,11 +47,11 @@ def put_buffer(self, file_path: str, buffer, write_bytes=True): if not os.path.exists(dir_name): os.makedirs(dir_name) if write_bytes: - mode="wb" + mode = "wb" encoding = None else: - mode="w" - encoding="utf-8" + mode = "w" + encoding = "utf-8" with open(file_path, mode=mode, encoding=encoding) as localfile: localfile.write(buffer) diff --git a/lexpredict_openedgar/openedgar/process_text.py b/lexpredict_openedgar/openedgar/process_text.py new file mode 100644 index 0000000..9e9244a --- /dev/null +++ b/lexpredict_openedgar/openedgar/process_text.py @@ -0,0 +1,49 @@ +from typing import Union + +from bs4 import BeautifulSoup +import re + + +def html_to_text(html_doc: str) -> str: + """ + Convert html/xml to the pure text + :param html_doc: the document to convert + :return: + """ + soup = BeautifulSoup(html_doc, 'html.parser') + if soup.find('xbrl'): + return xbr(soup) + else: + return not_xbrl(soup) + + +def not_xbrl(soup: BeautifulSoup) -> str: + """ + Handle format not in XBLR + :param soup: + :return: + """ + doc = '' + for string in soup.strings: + string = string.strip() + if len(string) > 0: + doc += string + '\n' + return doc + + +def xbr(soup: BeautifulSoup) -> str: + """ + Handle XBLR format for extracting all the text fields + :param soup: + :return: + """ + doc = "" + for tag in soup.find_all(re.compile("[T,t]ext|[D,d]escription")): + if tag.string is not None: + inner_soup = BeautifulSoup(tag.string, 'html.parser') + string = "" + for string_inner in inner_soup.strings: + if string_inner is not None: + string = string + " " + string_inner + doc += string + '\n' + return doc diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index 320c972..b38a926 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -105,8 +105,7 @@ def download_filing_index_data(year: int = None): def process_all_filing_index(year: int = None, form_type_list: Iterable[str] = None, new_only: bool = False, - store_raw: bool = True, - store_text: bool = True): + store_raw: bool = True, store_text: bool = True, store_processed: bool = True): """ Process all filing index data. :type year: optional year to process @@ -114,6 +113,7 @@ def process_all_filing_index(year: int = None, form_type_list: Iterable[str] = N :param new_only: :param store_raw: :param store_text: + :param store_processed: :return: """ # Get the list of file paths @@ -127,11 +127,11 @@ def process_all_filing_index(year: int = None, form_type_list: Iterable[str] = N if new_only and not is_processed: logger.info("Processing filing index for {0}...".format(s3_path)) _ = process_filing_index.delay(client_type, s3_path, form_type_list=form_type_list, store_raw=store_raw, - store_text=store_text) + store_text=store_text, store_processed=store_processed) elif not new_only: logger.info("Processing filing index for {0}...".format(s3_path)) _ = process_filing_index.delay(client_type, s3_path, form_type_list=form_type_list, store_raw=store_raw, - store_text=store_text) + store_text=store_text, store_processed=store_processed) else: logger.info("Skipping process_filing_index for {0}...".format(s3_path)) diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 744e699..89fa5ee 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -51,6 +51,8 @@ import lexnlp.nlp.en.tokens # Logging setup +from openedgar.process_text import html_to_text + logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) console = logging.StreamHandler() @@ -60,7 +62,8 @@ logger.addHandler(console) -def create_filing_documents(client, documents, filing, store_raw: bool = True, store_text: bool = True): +def create_filing_documents(client, documents, filing, store_raw: bool = True, store_text: bool = True, + store_processed=True): """ Create filing document records given a list of documents and a filing record. @@ -68,6 +71,7 @@ def create_filing_documents(client, documents, filing, store_raw: bool = True, s :param filing: Filing record :param store_raw: whether to store raw contents :param store_text: whether to store text contents + :param store_processed: whether to extract the text from the contents :return: """ # Get client if we're using S3 @@ -97,6 +101,18 @@ def create_filing_documents(client, documents, filing, store_raw: bool = True, s else: logger.info("Text contents for filing={0}, sequence={1}, sha1={2} already exists on S3" .format(filing, document["sequence"], document["sha1"])) + + if store_processed and document["content_text"] is not None: + text_path = pathlib.Path(S3_DOCUMENT_PATH, "processed", document["sha1"]).as_posix() + if not client.path_exists(text_path): + text = html_to_text(document["content_text"]) + client.put_buffer(text_path, text, write_bytes=False) + logger.info("Processed text contents for filing={0}, sequence={1}, sha1={2}" + .format(filing, document["sequence"], document["sha1"])) + else: + logger.info("Processed text contents for filing={0}, sequence={1}, sha1={2} already exists" + .format(filing, document["sequence"], document["sha1"])) + # Create DB object filing_doc = FilingDocument() filing_doc.filing = filing @@ -188,7 +204,8 @@ def create_filing_error(row, filing_path: str): @shared_task def process_filing_index(client_type: str, file_path: str, filing_index_buffer: Union[str, bytes] = None, - form_type_list: Iterable[str] = None, store_raw: bool = False, store_text: bool = False): + form_type_list: Iterable[str] = None, store_raw: bool = False, store_text: bool = False, + store_processed: bool = False): """ Process a filing index from an S3 path or buffer. :param file_path: S3 or local path to process; if filing_index_buffer is none, retrieved from here @@ -196,6 +213,7 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: :param form_type_list: optional list of form type to process :param store_raw: :param store_text: + :param store_processed: :return: """ # Log entry @@ -272,7 +290,7 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: # Parse filing_result = process_filing(client, filing_path, filing_buffer, store_raw=store_raw, - store_text=store_text) + store_text=store_text, store_processed=store_processed) if filing_result is None: logger.error("Unable to process filing.") bad_record_count += 1 @@ -306,13 +324,14 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: @shared_task def process_filing(client, file_path: str, filing_buffer: Union[str, bytes] = None, store_raw: bool = False, - store_text: bool = False): + store_text: bool = False, store_processed: bool = False): """ Process a filing from a path or filing buffer. :param file_path: path to process; if filing_buffer is none, retrieved from here :param filing_buffer: buffer; if not present, s3_path must be set :param store_raw: :param store_text: + :param store_processed :return: """ # Log entry @@ -415,7 +434,8 @@ def process_filing(client, file_path: str, filing_buffer: Union[str, bytes] = No # Create filing document records try: - create_filing_documents(client, filing_data["documents"], filing, store_raw=store_raw, store_text=store_text) + create_filing_documents(client, filing_data["documents"], filing, store_raw=store_raw, store_text=store_text, + store_processed=store_processed) filing.is_processed = True filing.is_error = False filing.save() diff --git a/lexpredict_openedgar/requirements/full.txt b/lexpredict_openedgar/requirements/full.txt index f34945d..19e16c6 100755 --- a/lexpredict_openedgar/requirements/full.txt +++ b/lexpredict_openedgar/requirements/full.txt @@ -135,4 +135,5 @@ Werkzeug==0.14.1 whitenoise==3.3.1 widgetsnbextension==3.2.1 wrapt==1.10.11 +beautifulsoup4==4.7.1 https://github.com/LexPredict/lexpredict-lexnlp/archive/0.1.8.zip \ No newline at end of file From 4aa6f90e67f11a84e8c041d4eaa93a753c29c2e9 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 15:36:44 +0100 Subject: [PATCH 07/16] Added Azure Data Lake config from Michael code --- Dockerfile | 2 +- docker/default.env | 12 +- docker/dot_env.sh | 9 ++ lexpredict_openedgar/config/settings/base.py | 8 ++ lexpredict_openedgar/openedgar/clients/adl.py | 125 ++++++++++++++++++ .../openedgar/processes/edgar.py | 3 + lexpredict_openedgar/openedgar/tasks.py | 3 + 7 files changed, 160 insertions(+), 2 deletions(-) create mode 100755 lexpredict_openedgar/openedgar/clients/adl.py diff --git a/Dockerfile b/Dockerfile index d5b841e..84b8b49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,7 +33,7 @@ RUN cp lexpredict_openedgar/sample.env lexpredict_openedgar/.env #COPY tasks.py lexpredict_openedgar/openedgar/tasks.py #COPY edgar.py lexpredict_openedgar/openedgar/processes/edgar.py #COPY parsers/edgar.py lexpredict_openedgar/openedgar/parsers/edgar.py -#COPY clients/aks.py lexpredict_openedgar/openedgar/clients/aks.py +#COPY clients/adl.py lexpredict_openedgar/openedgar/clients/adl.py #COPY clients/edgar.py lexpredict_openedgar/openedgar/clients/edgar.py COPY docker/oe-entrypoint.sh /usr/local/bin/ COPY docker/run_edgar.py /opt/openedgar/lexpredict_openedgar/run_edgar.py diff --git a/docker/default.env b/docker/default.env index 8179555..9c8a836 100755 --- a/docker/default.env +++ b/docker/default.env @@ -10,7 +10,7 @@ DJANGO_SECRET_KEY=${DJANGO_SECRET_KEY:="openedgar"} # General settings # DJANGO_READ_DOT_ENV_FILE=True -# CLIENT_TYPE: AKS, ADLAKE, Local +# CLIENT_TYPE: S3, ADL, Local CLIENT_TYPE=${CLIENT_TYPE:="Local"} @@ -23,6 +23,16 @@ DJANGO_ALLOWED_HOSTS=${DJANGO_ALLOWED_HOSTS:="localhost"} DJANGO_AWS_ACCESS_KEY_ID=${DJANGO_AWS_ACCESS_KEY_ID:=""} DJANGO_AWS_SECRET_ACCESS_KEY=${DJANGO_AWS_SECRET_ACCESS_KEY:=""} DJANGO_AWS_STORAGE_BUCKET_NAME=${DJANGO_AWS_STORAGE_BUCKET_NAME:=""} + +# AZURE DLAKE Settings +ADL_ACCOUNT = ${ADL_ACCOUNT:=""} +ADL_TENANT = ${ADL_TENANT:=""} +# Client ID +ADL_CID= ${ADL_CID:=""} +# Client secret/password +ADL_SECRET = ${ADL_SECRET:=""} + +# Read rate limit CELERY_TASK_DEFAULT_RATE_LIMIT={$CELERY_TASK_DEFAULT_RATE_LIMIT:="10/s"} # Used with email diff --git a/docker/dot_env.sh b/docker/dot_env.sh index 5067f10..64bf2cd 100755 --- a/docker/dot_env.sh +++ b/docker/dot_env.sh @@ -20,8 +20,17 @@ DJANGO_ADMIN_URL=$DJANGO_ADMIN_URL DJANGO_SETTINGS_MODULE=$DJANGO_SETTINGS_MODULE DJANGO_SECRET_KEY=$DJANGO_SECRET_KEY DJANGO_ALLOWED_HOSTS=$DJANGO_ALLOWED_HOSTS + CELERY_TASK_DEFAULT_RATE_LIMIT=$CELERY_TASK_DEFAULT_RATE_LIMIT +# data Lake Settings +ADL_ACCOUNT = $ADL_ACCOUNT +ADL_TENANT = $ADL_TENANT +# Client ID +ADL_CID= $ADL_CID +# Client secret/password +ADL_SECRET = $ADL_SECRET + # AWS Settings DJANGO_AWS_ACCESS_KEY_ID=$DJANGO_AWS_ACCESS_KEY_ID DJANGO_AWS_SECRET_ACCESS_KEY=$DJANGO_AWS_SECRET_ACCESS_KEY diff --git a/lexpredict_openedgar/config/settings/base.py b/lexpredict_openedgar/config/settings/base.py index 02c2ad3..9e19d4d 100755 --- a/lexpredict_openedgar/config/settings/base.py +++ b/lexpredict_openedgar/config/settings/base.py @@ -337,3 +337,11 @@ TIKA_HOST = "localhost" TIKA_PORT = 9998 TIKA_ENDPOINT = "http://{0}:{1}/tika".format(TIKA_HOST, TIKA_PORT) + +# Azure data Lake +ADL_ACCOUNT = env('ADL_ACCOUNT', default="") +ADL_TENANT = env('ADL_TENANT', default="") +# Client ID +ADL_CID= env('ADL_CID', default="") +# Client secret/password +ADL_SECRET = env('ADL_SECRET', default="") diff --git a/lexpredict_openedgar/openedgar/clients/adl.py b/lexpredict_openedgar/openedgar/clients/adl.py new file mode 100755 index 0000000..99284a5 --- /dev/null +++ b/lexpredict_openedgar/openedgar/clients/adl.py @@ -0,0 +1,125 @@ +# Libraries +import logging +from typing import Union +import adal + +from msrestazure.azure_active_directory import AADTokenCredentials +from azure.datalake.store import core, lib, multithread +from config.settings.base import ADL_ACCOUNT, ADL_TENANT, ADL_CID, ADL_SECRET + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +console = logging.StreamHandler() +console.setLevel(logging.INFO) +formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') +console.setFormatter(formatter) +logger.addHandler(console) + +authority_host_uri = 'https://login.microsoftonline.com' +authority_uri = authority_host_uri + '/' + ADL_TENANT +RESOURCE = 'https://management.core.windows.net/' + +adlCreds = lib.auth(tenant_id=ADL_TENANT, + client_secret=ADL_SECRET, + client_id=ADL_CID, + resource=RESOURCE) + +context = adal.AuthenticationContext(authority_uri, api_version=None) +mgmt_token = context.acquire_token_with_client_credentials(RESOURCE, ADL_CID, ADL_SECRET) +armCreds = AADTokenCredentials(mgmt_token, ADL_CID, resource=RESOURCE) + +## Create a filesystem client object +adlsFileSystemClient = core.AzureDLFileSystem(adlCreds, store_name=ADL_ACCOUNT) + + +class ADLClient: + + def __init__(self): + logger.info("Initialized AKS client") + + def path_exists(self, path: str, client=None): + """ + Check if an AKS path exists + :param path: + :return: true if AKS object exists, else false + """ + + return adlsFileSystemClient.exists(path) + + def get_buffer(self, remote_path: str, client=None, deflate: bool = True): + """ + Get a file from S3 given a path and optional client. + :param remote_path: S3 path under bucket + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: buffer bytes/str + """ + + with adlsFileSystemClient.open(remote_path, blocksize=2 ** 20) as f: + return f.read() + + def get_file(self, remote_path: str, local_path: str, client=None, deflate: bool = True): + """ + Save a local file from AKS given a path and optional client. + :param remote_path: AKS path under bucket + :param local_path: local path to save to + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: + """ + # Open and write buffer + with open(local_path, "wb") as out_file: + out_file.write(self.get_buffer(remote_path, client, deflate)) + + def get_buffer_segment(self, remote_path: str, start_pos: int, end_pos: int, client=None, deflate: bool = True): + """ + Get a file from S3 given a path and optional client. + :param remote_path: S3 path under bucket + :param start_pos: + :param end_pos: + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: + """ + # Retrieve buffer and return subset + buffer = self.get_buffer(remote_path, client, deflate) + return buffer[start_pos:end_pos] + + def put_buffer(self, remote_path: str, buffer: Union[str, bytes], client=None, deflate: bool = True, + write_bytes=True): + """ + Upload a buffer to AKS given a path. + :param remote_path: AKS path + :param buffer: buffer to upload + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: + """ + + import tempfile + + temp = tempfile.NamedTemporaryFile(mode='w+b') + + try: + # Ensure we have bytes object + if isinstance(buffer, str): + upload_buffer = bytes(buffer, "utf-8") + elif isinstance(buffer, bytes): + upload_buffer = buffer + + temp.write(upload_buffer) + + multithread.ADLUploader(adlsFileSystemClient, lpath=temp.name, rpath=remote_path, nthreads=64, + overwrite=True, buffersize=4194304, blocksize=4194304) + finally: + temp.close() + + def put_file(self, remote_path: str, local_path: str): + """ + Save a local file from AKS. + :param remote_path: AKS remote path + :param local_path: local path to save to + :return: + """ + multithread.ADLUploader(adlsFileSystemClient, lpath=local_path, rpath=remote_path, nthreads=64, overwrite=True, + buffersize=4194304, blocksize=4194304) diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index b38a926..3fc4e46 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -28,6 +28,7 @@ import os # Project import openedgar.clients.edgar +from openedgar.clients.adl import ADLClient from openedgar.clients.s3 import S3Client from openedgar.clients.local import LocalClient import openedgar.clients.local @@ -65,6 +66,8 @@ def download_filing_index_data(year: int = None): if configured_client is None or configured_client == "S3": # Create S3 client download_client = S3Client() + elif configured_client == "ADL": + download_client = ADLClient() else: download_client = LocalClient() path_prefix = os.environ["DOWNLOAD_PATH"] diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 89fa5ee..6f2e473 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -40,6 +40,7 @@ # Project from config.settings.base import S3_DOCUMENT_PATH +from openedgar.clients.adl import ADLClient from openedgar.clients.s3 import S3Client from openedgar.clients.local import LocalClient import openedgar.clients.edgar @@ -221,6 +222,8 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: if client_type == "S3": client = S3Client() + elif client_type == "ADL": + client = ADLClient() else: client = LocalClient() From 3933e0453d25515cddd87b5000b5f16ccf9fa5c0 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 15:56:18 +0100 Subject: [PATCH 08/16] Added quarter and month code from Michael and fixed some typos --- docker/default.env | 8 +- docker/dot_env.sh | 8 +- docker/run_edgar.py | 16 ++-- .../openedgar/clients/edgar.py | 85 +++++++++++++++++++ .../openedgar/processes/edgar.py | 14 ++- 5 files changed, 108 insertions(+), 23 deletions(-) diff --git a/docker/default.env b/docker/default.env index 9c8a836..2689971 100755 --- a/docker/default.env +++ b/docker/default.env @@ -25,12 +25,12 @@ DJANGO_AWS_SECRET_ACCESS_KEY=${DJANGO_AWS_SECRET_ACCESS_KEY:=""} DJANGO_AWS_STORAGE_BUCKET_NAME=${DJANGO_AWS_STORAGE_BUCKET_NAME:=""} # AZURE DLAKE Settings -ADL_ACCOUNT = ${ADL_ACCOUNT:=""} -ADL_TENANT = ${ADL_TENANT:=""} +ADL_ACCOUNT=${ADL_ACCOUNT:=""} +ADL_TENANT=${ADL_TENANT:=""} # Client ID -ADL_CID= ${ADL_CID:=""} +ADL_CID=${ADL_CID:=""} # Client secret/password -ADL_SECRET = ${ADL_SECRET:=""} +ADL_SECRET=${ADL_SECRET:=""} # Read rate limit CELERY_TASK_DEFAULT_RATE_LIMIT={$CELERY_TASK_DEFAULT_RATE_LIMIT:="10/s"} diff --git a/docker/dot_env.sh b/docker/dot_env.sh index 64bf2cd..218156c 100755 --- a/docker/dot_env.sh +++ b/docker/dot_env.sh @@ -24,12 +24,12 @@ DJANGO_ALLOWED_HOSTS=$DJANGO_ALLOWED_HOSTS CELERY_TASK_DEFAULT_RATE_LIMIT=$CELERY_TASK_DEFAULT_RATE_LIMIT # data Lake Settings -ADL_ACCOUNT = $ADL_ACCOUNT -ADL_TENANT = $ADL_TENANT +ADL_ACCOUNT=$ADL_ACCOUNT +ADL_TENANT=$ADL_TENANT # Client ID -ADL_CID= $ADL_CID +ADL_CID=$ADL_CID # Client secret/password -ADL_SECRET = $ADL_SECRET +ADL_SECRET=$ADL_SECRET # AWS Settings DJANGO_AWS_ACCESS_KEY_ID=$DJANGO_AWS_ACCESS_KEY_ID diff --git a/docker/run_edgar.py b/docker/run_edgar.py index a882df3..d8a9713 100644 --- a/docker/run_edgar.py +++ b/docker/run_edgar.py @@ -2,25 +2,19 @@ import os from openedgar.processes.edgar import download_filing_index_data, process_all_filing_index - year = os.getenv("EDGAR_YEAR") quarter = os.getenv("EDGAR_QUARTER") month = os.getenv("EDGAR_MONTH") types = [t.upper().strip() for t in os.getenv("FORM_TYPES").split(",")] +if len(quarter) == 0: + quarter = None +if len(month) == 0: + month = None -# process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=["3.1","3.2","10.1"]) -# process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=["10-k"]) - -# for i in range(1995,2019) -# process_all_filing_index(year=i, form_type_list=types) - -#£ -# fix: openedgar.tasks: ERROR Multiple Filing records found for s3_path=edgar/data/1096325/0001096325-15-000002.txt, skipping... -# fix: openedgar.tasks: ERROR Unable to create filing documents for Filing id=30186, cik=1024725, form_type=8-K, date_filed=2015-02-02: 'ascii' codec can't encode character '\xa0' in position 20: ordinal not in range(128) print("Edgar analysis started") print("Analysing year {} types {}".format(year, types)) -process_all_filing_index(year=year, form_type_list=types) +process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=types) for i in range(1, 10): print("###############################################") diff --git a/lexpredict_openedgar/openedgar/clients/edgar.py b/lexpredict_openedgar/openedgar/clients/edgar.py index 70496d5..ea84cfd 100755 --- a/lexpredict_openedgar/openedgar/clients/edgar.py +++ b/lexpredict_openedgar/openedgar/clients/edgar.py @@ -185,6 +185,91 @@ def list_index_by_year(year: int): return form_index_list +def list_index_by_quarter(year: int, quarter: int): + """ + Get list of index files for a given year and quarter. + :param year: filing year to retrieve + :param quarter: filing quarter to retrieve + :return: + """ + # Log entrance + logger.info("Locating form index list for {0}".format(year)) + + # Form index list + year = str(year) + form_index_list = [] + + # Get year directory list + year_index_uri = urllib.parse.urljoin(HTTP_SEC_INDEX_PATH, str(year) + "/") + year_root_list = list_path(year_index_uri) + print(year_root_list) + + # Get quarters + quarter_list = [f for f in year_root_list if "/QTR" in f] + + quarter_string_match = "QTR" + str(quarter) + + # Iterate over quarters + for qu in quarter_list: + if quarter_string_match in qu: + quarter_root_list = list_path(qu) + form_index_list.extend([q for q in quarter_root_list if "/form." in q.lower()]) + + # Cleanup double / + for i in range(len(form_index_list)): + form_index_list[i] = form_index_list[i].replace("//", "/") + + # Log exit + logger.info("Successfully located {0} form index files for {1}".format(len(form_index_list), year)) + + # Return + return form_index_list + + +def list_index_by_month(year: int, month: int): + """ + Get list of index files for a given year and month. + :param year: filing year to retrieve + :param month: filing month to retrieve + :return: + """ + # Log entrance + logger.info("Locating form index list for {0}".format(year)) + + # Form index list + year = str(year) + form_index_list = [] + + # Get year directory list + year_index_uri = urllib.parse.urljoin(HTTP_SEC_INDEX_PATH, str(year) + "/") + year_root_list = list_path(year_index_uri) + print(year_root_list) + + # Get quarters + quarter_list = [f for f in year_root_list if "/QTR" in f] + + # company.20190102.idx + if int(month) < 10: + month = "0" + month + + date_string_match = year + month + + # Iterate over quarters + for qu in quarter_list: + quarter_root_list = list_path(qu) + form_index_list.extend([q for q in quarter_root_list if "/form." + date_string_match in q.lower()]) + + # Cleanup double / + for i in range(len(form_index_list)): + form_index_list[i] = form_index_list[i].replace("//", "/") + + # Log exit + logger.info("Successfully located {0} form index files for {1}".format(len(form_index_list), year)) + + # Return + return form_index_list + + def list_index(min_year: int = 1950, max_year: int = 2050): """ Get the list of form index files on SEC HTTP. diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index 3fc4e46..06e494a 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -46,7 +46,7 @@ logger.addHandler(console) -def download_filing_index_data(year: int = None): +def download_filing_index_data(year: int = None, quarter: int = None, month: int = None): """ Download all filing index data. :param year: @@ -54,7 +54,12 @@ def download_filing_index_data(year: int = None): """ # Get filing index list if year is not None: - filing_index_list = openedgar.clients.edgar.list_index_by_year(year) + if month is not None: + filing_index_list = openedgar.clients.edgar.list_index_by_month(year, month) + elif quarter is not None: + filing_index_list = openedgar.clients.edgar.list_index_by_quarter(year, quarter) + else: + filing_index_list = openedgar.clients.edgar.list_index_by_year(year) else: filing_index_list = openedgar.clients.edgar.list_index() @@ -107,7 +112,8 @@ def download_filing_index_data(year: int = None): return path_list -def process_all_filing_index(year: int = None, form_type_list: Iterable[str] = None, new_only: bool = False, +def process_all_filing_index(year: int = None, quarter: int = None, month: int = None, + form_type_list: Iterable[str] = None, new_only: bool = False, store_raw: bool = True, store_text: bool = True, store_processed: bool = True): """ Process all filing index data. @@ -120,7 +126,7 @@ def process_all_filing_index(year: int = None, form_type_list: Iterable[str] = N :return: """ # Get the list of file paths - file_path_list = download_filing_index_data(year) + file_path_list = download_filing_index_data(year, quarter, month) client_type = os.environ["CLIENT_TYPE"] or "S3" From 3b0011cf271f2da74e7014985381549e773444a7 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 9 May 2019 17:04:41 +0100 Subject: [PATCH 09/16] Fixed nasty bug that didn't allow to save the edgar/data --- lexpredict_openedgar/config/settings/base.py | 3 ++- lexpredict_openedgar/openedgar/clients/edgar.py | 2 +- lexpredict_openedgar/openedgar/processes/edgar.py | 7 +++++-- lexpredict_openedgar/openedgar/tasks.py | 5 +++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/lexpredict_openedgar/config/settings/base.py b/lexpredict_openedgar/config/settings/base.py index 9e19d4d..2342ee4 100755 --- a/lexpredict_openedgar/config/settings/base.py +++ b/lexpredict_openedgar/config/settings/base.py @@ -332,6 +332,7 @@ S3_DOCUMENT_PATH = env('S3_DOCUMENT_PATH', default="openedgar") S3_PREFIX = env('S3_PREFIX', default="documents") S3_COMPRESSION_LEVEL = int(env('S3_COMPRESSION_LEVEL', default=6)) +DOWNLOAD_PATH = env('DOWNLOAD_PATH', default='openedgar') # Tika configuration TIKA_HOST = "localhost" @@ -342,6 +343,6 @@ ADL_ACCOUNT = env('ADL_ACCOUNT', default="") ADL_TENANT = env('ADL_TENANT', default="") # Client ID -ADL_CID= env('ADL_CID', default="") +ADL_CID = env('ADL_CID', default="") # Client secret/password ADL_SECRET = env('ADL_SECRET', default="") diff --git a/lexpredict_openedgar/openedgar/clients/edgar.py b/lexpredict_openedgar/openedgar/clients/edgar.py index ea84cfd..c732f4f 100755 --- a/lexpredict_openedgar/openedgar/clients/edgar.py +++ b/lexpredict_openedgar/openedgar/clients/edgar.py @@ -250,7 +250,7 @@ def list_index_by_month(year: int, month: int): # company.20190102.idx if int(month) < 10: - month = "0" + month + month = "0{}".format(month) date_string_match = year + month diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index 06e494a..f3629ad 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -28,6 +28,7 @@ import os # Project import openedgar.clients.edgar +from config.settings.base import DOWNLOAD_PATH from openedgar.clients.adl import ADLClient from openedgar.clients.s3 import S3Client from openedgar.clients.local import LocalClient @@ -49,6 +50,8 @@ def download_filing_index_data(year: int = None, quarter: int = None, month: int = None): """ Download all filing index data. + :param month: + :param quarter: :param year: :return: """ @@ -65,7 +68,7 @@ def download_filing_index_data(year: int = None, quarter: int = None, month: int path_list = [] configured_client = os.environ["CLIENT_TYPE"] - logger.info(msg="Configured client is: {}".format(configured_client)) + logger.info("Configured client is: {}".format(configured_client)) path_prefix = str() if configured_client is None or configured_client == "S3": @@ -75,7 +78,7 @@ def download_filing_index_data(year: int = None, quarter: int = None, month: int download_client = ADLClient() else: download_client = LocalClient() - path_prefix = os.environ["DOWNLOAD_PATH"] + path_prefix = DOWNLOAD_PATH # Now iterate through list to check if already on S3 for filing_index_path in filing_index_list: diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 6f2e473..33eda76 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -39,7 +39,7 @@ from celery import shared_task # Project -from config.settings.base import S3_DOCUMENT_PATH +from config.settings.base import S3_DOCUMENT_PATH, DOWNLOAD_PATH from openedgar.clients.adl import ADLClient from openedgar.clients.s3 import S3Client from openedgar.clients.local import LocalClient @@ -209,6 +209,7 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: store_processed: bool = False): """ Process a filing index from an S3 path or buffer. + :param client_type: :param file_path: S3 or local path to process; if filing_index_buffer is none, retrieved from here :param filing_index_buffer: buffer; if not present, s3_path must be set :param form_type_list: optional list of form type to process @@ -283,7 +284,7 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: continue # Upload - client.put_buffer(filing_path, filing_buffer) + client.put_buffer("{}/{}".format(DOWNLOAD_PATH,filing_path), filing_buffer) logger.info("Downloaded from EDGAR and uploaded to {}...".format(client_type)) else: From cd9137afb7d3b450a96bce5fc9d7d9100188a4a4 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 23 May 2019 14:12:04 +0100 Subject: [PATCH 10/16] Updated tika to 1.21 --- Dockerfile | 2 +- docker/oe-entrypoint.sh | 2 +- tika/download_tika.sh | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index 84b8b49..a150f62 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,7 +24,7 @@ RUN virtualenv -p /usr/bin/python3 env COPY lexpredict_openedgar/requirements/full.txt lexpredict_openedgar/requirements/full.txt RUN ./env/bin/pip install -r lexpredict_openedgar/requirements/full.txt RUN ./env/bin/pip install azure-mgmt-resource azure-mgmt-datalake-store azure-datalake-store -COPY tika/tika-server-1.20.jar /opt/openedgar/tika/tika-server-1.20.jar +COPY tika/tika-server-1.21.jar /opt/openedgar/tika/tika-server-1.21.jar COPY lexpredict_openedgar/ /opt/openedgar/lexpredict_openedgar/ COPY docker/default.env /opt/openedgar/ diff --git a/docker/oe-entrypoint.sh b/docker/oe-entrypoint.sh index 0933d55..03803ef 100755 --- a/docker/oe-entrypoint.sh +++ b/docker/oe-entrypoint.sh @@ -21,7 +21,7 @@ rabbitmqctl set_permissions -p openedgar openedgar ".*" ".*" ".*" cd /opt/openedgar/tika -java -jar tika-server-1.20.jar > /data/logs/tika.log 2>&1 & +java -jar tika-server-1.21.jar > /data/logs/tika.log 2>&1 & cd /opt/openedgar/lexpredict_openedgar diff --git a/tika/download_tika.sh b/tika/download_tika.sh index d875ea4..53910ea 100644 --- a/tika/download_tika.sh +++ b/tika/download_tika.sh @@ -1 +1,2 @@ -wget http://www-us.apache.org/dist/tika/tika-server-1.20.jar +#!/bin/bash +wget http://www-us.apache.org/dist/tika/tika-server-1.21.jar From b183e990ec7177a839ec7629bca28398ac382fdf Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 23 May 2019 14:30:37 +0100 Subject: [PATCH 11/16] Ignore data folder --- .gitignore | 219 +++++++++++++++++++++++++++-------------------------- 1 file changed, 110 insertions(+), 109 deletions(-) diff --git a/.gitignore b/.gitignore index e272905..bdca348 100644 --- a/.gitignore +++ b/.gitignore @@ -1,109 +1,110 @@ -*# -*~ -.idea/ - -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -env/ -ve/ -venv/ -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -*.egg-info/ -.installed.cfg -*.egg - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -.hypothesis/ - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# pyenv -.python-version - -# celery beat schedule file -celerybeat-schedule - -# SageMath parsed files -*.sage.py - -# dotenv -.env - -# virtualenv -.venv -venv/ -ENV/ - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ - -benchmarks +*# +*~ +.idea/ +data/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +ve/ +venv/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# dotenv +.env + +# virtualenv +.venv +venv/ +ENV/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +benchmarks From d338484152b1bc3249385c291838a24cb033781a Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 23 May 2019 14:31:23 +0100 Subject: [PATCH 12/16] Multiple years handling from variable --- docker/run_edgar.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docker/run_edgar.py b/docker/run_edgar.py index d8a9713..d0fca4e 100644 --- a/docker/run_edgar.py +++ b/docker/run_edgar.py @@ -2,7 +2,7 @@ import os from openedgar.processes.edgar import download_filing_index_data, process_all_filing_index -year = os.getenv("EDGAR_YEAR") +years = [t.upper().strip() for t in os.getenv("EDGAR_YEAR").split(",")] quarter = os.getenv("EDGAR_QUARTER") month = os.getenv("EDGAR_MONTH") types = [t.upper().strip() for t in os.getenv("FORM_TYPES").split(",")] @@ -13,8 +13,9 @@ month = None print("Edgar analysis started") -print("Analysing year {} types {}".format(year, types)) -process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=types) +for year in years: + print("Analysing year {} types {}".format(year, types)) + process_all_filing_index(year=year, quarter=quarter, month=month, form_type_list=types) for i in range(1, 10): print("###############################################") From 0b4314b7a35429ab69f84e54889492ffd0bede7f Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Thu, 23 May 2019 14:31:55 +0100 Subject: [PATCH 13/16] Avoid to initialise ADL when not required --- lexpredict_openedgar/openedgar/clients/adl.py | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/lexpredict_openedgar/openedgar/clients/adl.py b/lexpredict_openedgar/openedgar/clients/adl.py index 99284a5..e85088e 100755 --- a/lexpredict_openedgar/openedgar/clients/adl.py +++ b/lexpredict_openedgar/openedgar/clients/adl.py @@ -1,5 +1,6 @@ # Libraries import logging +import os from typing import Union import adal @@ -15,21 +16,22 @@ console.setFormatter(formatter) logger.addHandler(console) -authority_host_uri = 'https://login.microsoftonline.com' -authority_uri = authority_host_uri + '/' + ADL_TENANT -RESOURCE = 'https://management.core.windows.net/' +if os.environ["CLIENT_TYPE"] == "ADL": + authority_host_uri = 'https://login.microsoftonline.com' + authority_uri = authority_host_uri + '/' + ADL_TENANT + RESOURCE = 'https://management.core.windows.net/' -adlCreds = lib.auth(tenant_id=ADL_TENANT, - client_secret=ADL_SECRET, - client_id=ADL_CID, - resource=RESOURCE) + adlCreds = lib.auth(tenant_id=ADL_TENANT, + client_secret=ADL_SECRET, + client_id=ADL_CID, + resource=RESOURCE) -context = adal.AuthenticationContext(authority_uri, api_version=None) -mgmt_token = context.acquire_token_with_client_credentials(RESOURCE, ADL_CID, ADL_SECRET) -armCreds = AADTokenCredentials(mgmt_token, ADL_CID, resource=RESOURCE) + context = adal.AuthenticationContext(authority_uri, api_version=None) + mgmt_token = context.acquire_token_with_client_credentials(RESOURCE, ADL_CID, ADL_SECRET) + armCreds = AADTokenCredentials(mgmt_token, ADL_CID, resource=RESOURCE) -## Create a filesystem client object -adlsFileSystemClient = core.AzureDLFileSystem(adlCreds, store_name=ADL_ACCOUNT) + ## Create a filesystem client object + adlsFileSystemClient = core.AzureDLFileSystem(adlCreds, store_name=ADL_ACCOUNT) class ADLClient: From ab32e9e73f9ec977b14740aec28f4b3c4d20348a Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Fri, 24 May 2019 09:54:13 +0100 Subject: [PATCH 14/16] Create blob storage client --- .dockerignore | 5 + Dockerfile | 7 +- docker/default.env | 7 +- docker/dot_env.sh | 4 + lexpredict_openedgar/config/settings/base.py | 4 + .../openedgar/clients/blob.py | 105 ++++++++++++++++++ .../openedgar/processes/edgar.py | 3 + lexpredict_openedgar/openedgar/tasks.py | 3 + 8 files changed, 131 insertions(+), 7 deletions(-) create mode 100644 .dockerignore create mode 100755 lexpredict_openedgar/openedgar/clients/blob.py diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f2f205f --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +# ignore .git and .cache folders +.git +.cache +.idea +data diff --git a/Dockerfile b/Dockerfile index a150f62..fbca14b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -23,18 +23,13 @@ WORKDIR /opt/openedgar/ RUN virtualenv -p /usr/bin/python3 env COPY lexpredict_openedgar/requirements/full.txt lexpredict_openedgar/requirements/full.txt RUN ./env/bin/pip install -r lexpredict_openedgar/requirements/full.txt -RUN ./env/bin/pip install azure-mgmt-resource azure-mgmt-datalake-store azure-datalake-store +RUN ./env/bin/pip install azure-mgmt-resource azure-mgmt-datalake-store azure-datalake-store azure-storage-blob COPY tika/tika-server-1.21.jar /opt/openedgar/tika/tika-server-1.21.jar COPY lexpredict_openedgar/ /opt/openedgar/lexpredict_openedgar/ COPY docker/default.env /opt/openedgar/ RUN cp lexpredict_openedgar/sample.env lexpredict_openedgar/.env #COPY docker/erlang-solutions_1.0_all.deb lexpredict_openedgar/erlang-solutions_1.0_all.deb -#COPY tasks.py lexpredict_openedgar/openedgar/tasks.py -#COPY edgar.py lexpredict_openedgar/openedgar/processes/edgar.py -#COPY parsers/edgar.py lexpredict_openedgar/openedgar/parsers/edgar.py -#COPY clients/adl.py lexpredict_openedgar/openedgar/clients/adl.py -#COPY clients/edgar.py lexpredict_openedgar/openedgar/clients/edgar.py COPY docker/oe-entrypoint.sh /usr/local/bin/ COPY docker/run_edgar.py /opt/openedgar/lexpredict_openedgar/run_edgar.py COPY docker/dot_env.sh /opt/openedgar diff --git a/docker/default.env b/docker/default.env index 2689971..65f3a67 100755 --- a/docker/default.env +++ b/docker/default.env @@ -10,7 +10,7 @@ DJANGO_SECRET_KEY=${DJANGO_SECRET_KEY:="openedgar"} # General settings # DJANGO_READ_DOT_ENV_FILE=True -# CLIENT_TYPE: S3, ADL, Local +# CLIENT_TYPE: S3, ADL, Local, Blob CLIENT_TYPE=${CLIENT_TYPE:="Local"} @@ -32,6 +32,11 @@ ADL_CID=${ADL_CID:=""} # Client secret/password ADL_SECRET=${ADL_SECRET:=""} +# Azure Blob Storage +BLOB_CONNECTION_STRING=${BLOB_CONNECTION_STRING:=""} +BLOB_CONTAINER=${BLOB_CONTAINER:="openedgar"} + + # Read rate limit CELERY_TASK_DEFAULT_RATE_LIMIT={$CELERY_TASK_DEFAULT_RATE_LIMIT:="10/s"} diff --git a/docker/dot_env.sh b/docker/dot_env.sh index 218156c..4555d96 100755 --- a/docker/dot_env.sh +++ b/docker/dot_env.sh @@ -31,6 +31,10 @@ ADL_CID=$ADL_CID # Client secret/password ADL_SECRET=$ADL_SECRET +# Azure Blob Storage +BLOB_CONNECTION_STRING=$BLOB_CONNECTION_STRING +BLOB_CONTAINER=$BLOB_CONTAINER + # AWS Settings DJANGO_AWS_ACCESS_KEY_ID=$DJANGO_AWS_ACCESS_KEY_ID DJANGO_AWS_SECRET_ACCESS_KEY=$DJANGO_AWS_SECRET_ACCESS_KEY diff --git a/lexpredict_openedgar/config/settings/base.py b/lexpredict_openedgar/config/settings/base.py index 2342ee4..3f89365 100755 --- a/lexpredict_openedgar/config/settings/base.py +++ b/lexpredict_openedgar/config/settings/base.py @@ -346,3 +346,7 @@ ADL_CID = env('ADL_CID', default="") # Client secret/password ADL_SECRET = env('ADL_SECRET', default="") + +# Azure Blob Storage +BLOB_CONNECTION_STRING = env('BLOB_CONNECTION_STRING', default="") +BLOB_CONTAINER = env("BLOB_CONTAINER", default="openedgar") diff --git a/lexpredict_openedgar/openedgar/clients/blob.py b/lexpredict_openedgar/openedgar/clients/blob.py new file mode 100755 index 0000000..1dbcb85 --- /dev/null +++ b/lexpredict_openedgar/openedgar/clients/blob.py @@ -0,0 +1,105 @@ +# Libraries +import logging +import os +from typing import Union + +from azure.storage.blob import BlockBlobService + +from config.settings.base import BLOB_CONNECTION_STRING, BLOB_CONTAINER + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +console = logging.StreamHandler() +console.setLevel(logging.INFO) +formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') +console.setFormatter(formatter) +logger.addHandler(console) + +if os.environ["CLIENT_TYPE"] == "Blob": + blob_service = BlockBlobService(connection_string=BLOB_CONNECTION_STRING) + blob_service.create_container(BLOB_CONTAINER) + + +class BlobClient: + + def __init__(self): + logger.info("Initialized Blob client") + + def path_exists(self, path: str, client=None): + """ + Check if an AKS path exists + :param path: + :return: true if AKS object exists, else false + """ + return blob_service.exists(BLOB_CONTAINER, path) + + def get_buffer(self, remote_path: str, client=None, deflate: bool = True): + """ + Get a file from Blob given a path and optional client. + :param remote_path: S3 path under bucket + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: buffer bytes/str + """ + if remote_path[0] == '/': + remote_path = remote_path[1:] + if not self.path_exists(remote_path): + return blob_service.get_blob_to_bytes(BLOB_CONTAINER, remote_path).content + + def get_file(self, remote_path: str, local_path: str, client=None, deflate: bool = True): + """ + Save a local file from Blob given a path and optional client. + :param remote_path: Blob path under bucket + :param local_path: local path to save to + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: + """ + if remote_path[0] == '/': + remote_path = remote_path[1:] + if not self.path_exists(remote_path): + blob_service.get_blob_to_path(BLOB_CONTAINER, remote_path, local_path) + + def get_buffer_segment(self, remote_path: str, start_pos: int, end_pos: int, client=None, deflate: bool = True): + """ + Get a file from S3 given a path and optional client. + :param remote_path: S3 path under bucket + :param start_pos: + :param end_pos: + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: + """ + # Retrieve buffer and return subset + buffer = self.get_buffer(remote_path, client, deflate) + return buffer[start_pos:end_pos] + + def put_buffer(self, remote_path: str, buffer: Union[str, bytes], client=None, deflate: bool = True, + write_bytes=True): + """ + Upload a buffer to AKS given a path. + :param remote_path: AKS path + :param buffer: buffer to upload + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents + :return: + """ + # Ensure we have bytes object + if isinstance(buffer, str): + upload_buffer = bytes(buffer, "utf-8") + else: + upload_buffer = buffer + if remote_path[0] == '/': + remote_path = remote_path[1:] + blob_service.create_blob_from_bytes(BLOB_CONTAINER, remote_path, upload_buffer) + + def put_file(self, remote_path: str, local_path: str): + """ + Save a local file from AKS. + :param remote_path: AKS remote path + :param local_path: local path to save to + :return: + """ + if remote_path[0] == '/': + remote_path = remote_path[1:] + blob_service.create_blob_from_path(BLOB_CONTAINER, remote_path, remote_path) diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index f3629ad..bd7f247 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -30,6 +30,7 @@ import openedgar.clients.edgar from config.settings.base import DOWNLOAD_PATH from openedgar.clients.adl import ADLClient +from openedgar.clients.blob import BlobClient from openedgar.clients.s3 import S3Client from openedgar.clients.local import LocalClient import openedgar.clients.local @@ -76,6 +77,8 @@ def download_filing_index_data(year: int = None, quarter: int = None, month: int download_client = S3Client() elif configured_client == "ADL": download_client = ADLClient() + elif configured_client == "Blob": + download_client = BlobClient() else: download_client = LocalClient() path_prefix = DOWNLOAD_PATH diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 33eda76..3ea2d4e 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -41,6 +41,7 @@ # Project from config.settings.base import S3_DOCUMENT_PATH, DOWNLOAD_PATH from openedgar.clients.adl import ADLClient +from openedgar.clients.blob import BlobClient from openedgar.clients.s3 import S3Client from openedgar.clients.local import LocalClient import openedgar.clients.edgar @@ -225,6 +226,8 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: client = S3Client() elif client_type == "ADL": client = ADLClient() + elif client_type == "Blob": + client = BlobClient() else: client = LocalClient() From 0a07a4b865eca4a282d43496ff47394a3052f052 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Fri, 24 May 2019 13:16:18 +0100 Subject: [PATCH 15/16] Fixed bug about empty filing --- lexpredict_openedgar/openedgar/clients/blob.py | 3 +-- lexpredict_openedgar/openedgar/tasks.py | 3 +++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/lexpredict_openedgar/openedgar/clients/blob.py b/lexpredict_openedgar/openedgar/clients/blob.py index 1dbcb85..a06c45c 100755 --- a/lexpredict_openedgar/openedgar/clients/blob.py +++ b/lexpredict_openedgar/openedgar/clients/blob.py @@ -43,8 +43,7 @@ def get_buffer(self, remote_path: str, client=None, deflate: bool = True): """ if remote_path[0] == '/': remote_path = remote_path[1:] - if not self.path_exists(remote_path): - return blob_service.get_blob_to_bytes(BLOB_CONTAINER, remote_path).content + return blob_service.get_blob_to_bytes(BLOB_CONTAINER, remote_path).content def get_file(self, remote_path: str, local_path: str, client=None, deflate: bool = True): """ diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index 3ea2d4e..f113f17 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -235,6 +235,9 @@ def process_filing_index(client_type: str, file_path: str, filing_index_buffer: if filing_index_buffer is None: logger.info("Retrieving filing index buffer for: {}...".format(file_path)) filing_index_buffer = client.get_buffer(file_path) + if filing_index_buffer is None: + logger.error("SOMETHING WRONG! FILING IS NONE! Client {} file {}".format(client_type, file_path)) + raise ValueError("Filing index is none!") # Write to disk to handle headaches temp_file = tempfile.NamedTemporaryFile(delete=False) From b51548c41bc3d745719bde33e99b4b84f2fc3460 Mon Sep 17 00:00:00 2001 From: Mirko Bernardoni Date: Fri, 24 May 2019 13:45:47 +0100 Subject: [PATCH 16/16] Improved logging and added deflate option to Blob --- lexpredict_openedgar/openedgar/clients/adl.py | 10 ++--- .../openedgar/clients/blob.py | 40 ++++++++++--------- .../openedgar/clients/edgar.py | 7 ---- .../openedgar/clients/local.py | 6 --- lexpredict_openedgar/openedgar/clients/s3.py | 6 --- .../openedgar/parsers/edgar.py | 6 --- .../openedgar/processes/edgar.py | 6 --- .../openedgar/processes/s3.py | 6 --- lexpredict_openedgar/openedgar/tasks.py | 6 --- 9 files changed, 24 insertions(+), 69 deletions(-) diff --git a/lexpredict_openedgar/openedgar/clients/adl.py b/lexpredict_openedgar/openedgar/clients/adl.py index e85088e..5b2b535 100755 --- a/lexpredict_openedgar/openedgar/clients/adl.py +++ b/lexpredict_openedgar/openedgar/clients/adl.py @@ -9,12 +9,6 @@ from config.settings.base import ADL_ACCOUNT, ADL_TENANT, ADL_CID, ADL_SECRET logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) if os.environ["CLIENT_TYPE"] == "ADL": authority_host_uri = 'https://login.microsoftonline.com' @@ -35,7 +29,9 @@ class ADLClient: - + """ + TODO: This class does not support the deflate option + """ def __init__(self): logger.info("Initialized AKS client") diff --git a/lexpredict_openedgar/openedgar/clients/blob.py b/lexpredict_openedgar/openedgar/clients/blob.py index a06c45c..d78d51f 100755 --- a/lexpredict_openedgar/openedgar/clients/blob.py +++ b/lexpredict_openedgar/openedgar/clients/blob.py @@ -1,19 +1,16 @@ # Libraries import logging import os +import zlib from typing import Union from azure.storage.blob import BlockBlobService -from config.settings.base import BLOB_CONNECTION_STRING, BLOB_CONTAINER +from config.settings.base import BLOB_CONNECTION_STRING, BLOB_CONTAINER, S3_COMPRESSION_LEVEL logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) +# Remove garbage logging from MS +logging.getLogger("azure.storage.common.storageclient").setLevel(logging.ERROR) if os.environ["CLIENT_TYPE"] == "Blob": blob_service = BlockBlobService(connection_string=BLOB_CONNECTION_STRING) @@ -27,23 +24,27 @@ def __init__(self): def path_exists(self, path: str, client=None): """ - Check if an AKS path exists + Check if an Blob path exists :param path: - :return: true if AKS object exists, else false + :return: true if Blob object exists, else false """ return blob_service.exists(BLOB_CONTAINER, path) def get_buffer(self, remote_path: str, client=None, deflate: bool = True): """ Get a file from Blob given a path and optional client. - :param remote_path: S3 path under bucket + :param remote_path: Blob path under bucket :param client: optional client to re-use :param deflate: whether to automatically zlib deflate contents :return: buffer bytes/str """ if remote_path[0] == '/': remote_path = remote_path[1:] - return blob_service.get_blob_to_bytes(BLOB_CONTAINER, remote_path).content + buffer = blob_service.get_blob_to_bytes(BLOB_CONTAINER, remote_path).content + if deflate: + return zlib.decompress(buffer) + else: + return buffer def get_file(self, remote_path: str, local_path: str, client=None, deflate: bool = True): """ @@ -54,10 +55,8 @@ def get_file(self, remote_path: str, local_path: str, client=None, deflate: bool :param deflate: whether to automatically zlib deflate contents :return: """ - if remote_path[0] == '/': - remote_path = remote_path[1:] - if not self.path_exists(remote_path): - blob_service.get_blob_to_path(BLOB_CONTAINER, remote_path, local_path) + with open(local_path, "wb") as out_file: + out_file.write(self.get_buffer(remote_path, client, deflate)) def get_buffer_segment(self, remote_path: str, start_pos: int, end_pos: int, client=None, deflate: bool = True): """ @@ -90,15 +89,18 @@ def put_buffer(self, remote_path: str, buffer: Union[str, bytes], client=None, d upload_buffer = buffer if remote_path[0] == '/': remote_path = remote_path[1:] + if deflate: + upload_buffer = zlib.compress(upload_buffer, S3_COMPRESSION_LEVEL) blob_service.create_blob_from_bytes(BLOB_CONTAINER, remote_path, upload_buffer) - def put_file(self, remote_path: str, local_path: str): + def put_file(self, remote_path: str, local_path: str, client=None, deflate: bool = True): """ Save a local file from AKS. :param remote_path: AKS remote path :param local_path: local path to save to + :param client: optional client to re-use + :param deflate: whether to automatically zlib deflate contents :return: """ - if remote_path[0] == '/': - remote_path = remote_path[1:] - blob_service.create_blob_from_path(BLOB_CONTAINER, remote_path, remote_path) + with open(local_path, "rb") as in_file: + self.put_buffer(remote_path, in_file.read(), client, deflate) diff --git a/lexpredict_openedgar/openedgar/clients/edgar.py b/lexpredict_openedgar/openedgar/clients/edgar.py index c732f4f..30d6015 100755 --- a/lexpredict_openedgar/openedgar/clients/edgar.py +++ b/lexpredict_openedgar/openedgar/clients/edgar.py @@ -39,13 +39,6 @@ # Setup logger logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) - def get_buffer(remote_path: str, base_path: str = HTTP_SEC_HOST): """ diff --git a/lexpredict_openedgar/openedgar/clients/local.py b/lexpredict_openedgar/openedgar/clients/local.py index 1a3d917..ff2f053 100644 --- a/lexpredict_openedgar/openedgar/clients/local.py +++ b/lexpredict_openedgar/openedgar/clients/local.py @@ -26,12 +26,6 @@ # Setup logger logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) class LocalClient: diff --git a/lexpredict_openedgar/openedgar/clients/s3.py b/lexpredict_openedgar/openedgar/clients/s3.py index 367f3aa..34e762e 100755 --- a/lexpredict_openedgar/openedgar/clients/s3.py +++ b/lexpredict_openedgar/openedgar/clients/s3.py @@ -38,12 +38,6 @@ # Setup logger logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) class S3Client: diff --git a/lexpredict_openedgar/openedgar/parsers/edgar.py b/lexpredict_openedgar/openedgar/parsers/edgar.py index 9970e55..2e1dcc5 100755 --- a/lexpredict_openedgar/openedgar/parsers/edgar.py +++ b/lexpredict_openedgar/openedgar/parsers/edgar.py @@ -44,12 +44,6 @@ # Setup logger logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) def uudecode(buffer: Union[bytes, str]): diff --git a/lexpredict_openedgar/openedgar/processes/edgar.py b/lexpredict_openedgar/openedgar/processes/edgar.py index bd7f247..582e423 100755 --- a/lexpredict_openedgar/openedgar/processes/edgar.py +++ b/lexpredict_openedgar/openedgar/processes/edgar.py @@ -40,12 +40,6 @@ # Logging setup logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) def download_filing_index_data(year: int = None, quarter: int = None, month: int = None): diff --git a/lexpredict_openedgar/openedgar/processes/s3.py b/lexpredict_openedgar/openedgar/processes/s3.py index 3e3cee1..d5cdd9a 100755 --- a/lexpredict_openedgar/openedgar/processes/s3.py +++ b/lexpredict_openedgar/openedgar/processes/s3.py @@ -31,12 +31,6 @@ # Setup logger logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) def is_access_denied_file(remote_path: str, client=None): diff --git a/lexpredict_openedgar/openedgar/tasks.py b/lexpredict_openedgar/openedgar/tasks.py index f113f17..1da38c2 100755 --- a/lexpredict_openedgar/openedgar/tasks.py +++ b/lexpredict_openedgar/openedgar/tasks.py @@ -56,12 +56,6 @@ from openedgar.process_text import html_to_text logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -console = logging.StreamHandler() -console.setLevel(logging.INFO) -formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s') -console.setFormatter(formatter) -logger.addHandler(console) def create_filing_documents(client, documents, filing, store_raw: bool = True, store_text: bool = True,