Hacking the Data Transformation Interview
I am currently (still) seeking a job in data/software engineering area, and I am preparing for all kinds of technical interviews, ranging from coding, algorithm, system design, SQL to computer science fundamental quiz. Data engineer is a role with vague definition, and people with this title functions as an ETL (extract, transformation, load) engineer in some companies. Thus, topics on data transformation could be covered during the interview. In this blog, I am trying to hack interview focusing on data tranformation.
Prerequisites: boto3
Boto is the AWS SDK for python, which provides easy-to-use, object-oriented API and low-level access to AWS services. We could find the documentation here.
We could easily install the latest Boto 3 release via pip:
$ pip install boto3
Then configure the credential file at ~/.aws/credentials
:
[default]
aws_access_key_id = YOUR_ACCESS_KEY
aws_secret_access_key = YOUR_SECRET_KEY
And set the default region at ~/.aws/config
:
[default]
region=us-east-1
To use boto3, we could import it and pass the service-to-use to it. For example, to use Amazon S3, we could choose a resource by:
import boto3
s3 = boto3.resource('s3')
Create a s3 bucket over client level:
s3_client = boto3.client('s3')
s3_client.create_bucket(Bucket='BUCKET_NAME')
List existing buckets for the AWS account:
response = s3_client.list_buckets()
for bucket in response['Buckets']:
print(f' {bucket["Name"]}')
Upload file into a s3 bucket:
response = s3_client.upload_file(file_name, bucket, object_name)
Or upload object in binary mode into a s3 bucket:
with open('test.jpg', 'rb') as f:
s3.upload_fileobj(f, "BUCKET_NAME", "OBJECT_NAME")
Or directly put object into a s3 bucket over the bucket level:
data = open('test.jpg', 'rb')
bucket = s3.Bucket('my-bucket')
bucket.put_object(Key=object_name, Body=data)
To download file from a S3 bucket, we could:
s3.download_file('BUCKET_NAME', 'OBJECT_NAME', 'FILE_NAME')
Or:
with open('FILE_NAME', 'wb') as f:
s3.download_fileobj('BUCKET_NAME', 'OBJECT_NAME', f)
We could also set configuration when uploading, downloading, or copying a file or S3 object by:
from boto3.s3.transfer import TransferConfig
GB = 1024 ** 3
config = TransferConfig(multipart_threshold=5*GB)
config = TransferConfig(max_concurrency=5)
config = TransferConfig(use_threads=False)
s3.upload_file('FILE_NAME', 'BUCKET_NAME', 'OBJECT_NAME', Config=config)
Prerequisites: psycopg2
Psycopg2 is a popular PostgreSQL database adapter for python. We could find documentation here.
We could easily install psycopg2 via pip:
$ pip install psycopg2-binary
To use psycopg2, we could import it, connect to an existing database and open a cursor to perform database operations:
import psycopg2
conn = psycopg2.connect(dbname="DATABASE_NAME", user="USER", host="HOST", port="PORT", password="PASSWORD")
cur = conn.cursor()
Query the database and fetch data in an iteration-like way:
cur.execute(sql_query)
cur.fetchone()
cur.fetchmany(2)
cur.fetchall()
Pass parameters to SQL queries:
cur.execute("""
INSERT INTO some_table (an_int, a_date, another_date, a_string)
VALUES (%(int)s, %(date)s, %(date)s, %(str)s);
""",
{'int': 10, 'str': "O'Reilly", 'date': datetime.date(2005, 11, 18)})
Make the changes to the database persistent:
conn.commit()
Close communication with the database
cur.close()
conn.close()
Upload data from S3 into Redshift
def main():
connection_parameters = {
'dbname': dbname,
'user': user,
'host': host,
'port': port,
'password': password
}
try:
conn = psycopg2.connect(**connection_parameters)
print("Connected to Redshift.")
except:
print("Unable to connect to Redshift.")
cur = conn.cursor()
upload_statement = "
COPY {}.{}
FROM {}
IAM_ROLE {}
CSV;
COMMIT;
".format(table_name, schema, file_name, iam_role)
try:
cur.execute(upload_statement)
print("Upload successfully")
except psycopg2.Error:
raise ExecuteFailure("Failed to upload the file.")
cur.close()
conn.close()
if __name__ == "__main__":
main()
Load data from Redshift to pandas
import sqlalchemy as sa
import pandas as pd
surl = 'redshift+psycopg2://'
engine = sa.create_engine(surl+user+':'+password+'@'+host+':'+port+'/'+dbname,echo=False)
df = pd.read_sql_query('SELECT * FROM table ;', engine)