용바오의 연구실

JGI (Jaypyon General Intelligence)

빅데이터 영역의 데이터 표준화를 위한 여정

일반적으로 업무를 인계받았을 때,

그게 마음에 쏙 드는 경우는 많이 없을 것이라고 생각합니다.

사람은 모두 생각이 다르고, 필연적으로 자신의 업무에 대해 고수하는 방향, 관심도, 애정도가 달라지기 때문입니다.

최초에 이 업무영역을 인계받았을 때, 제대로된 메뉴얼도 없었고, 메타시스템 등 무엇 하나 갖춰진게 없었습니다.

빅데이터 영역은 저에게 애증의 관계이며, 많은 지식을 쌓아준 고마운 영역이기도 합니다.

오늘은 최근에 많은 시간을 쏟은 빅데이터 영역의 표준화에 대해서 서술하고자 합니다.

# 메타시스템이 없는 하둡 영역의 모든 테이블 리스트를 추출

하둡 영역에서 테이블 리스트를 추출하는 방법에는 여러 가지가 있습니다. 그 중 가장 기본적인 방법은 Hadoop 명령어를 사용하는 것입니다.

하둡 명령어는 'hdfs dfs'와 'hadoop fs' 두 가지가 있는데, 'hdfs dfs' 명령어는 HDFS(Hadoop Distributed File System) 파일 시스템 조작 명령어이며, 'hadoop fs'는 HDFS 외의 파일 시스템을 조작할 때 사용하는 명령어입니다. 여기서는 'hdfs dfs' 명령어를 기준으로 하둡 영역의 모든 테이블 리스트를 추출하는 방법을 소개하겠습니다.

  1. 하둡 영역의 모든 테이블 리스트 확인하기
hdfs dfs -ls /data/kb002s/

이 명령어는 하둡 영역의 /data/kb002s/ 경로에 있는 모든 테이블 리스트를 출력합니다. 이때, 하둡 영역의 경로는 클러스터마다 다를 수 있으니 확인해보시기 바랍니다.

  1. 출력 결과에서 테이블 이름 추출하기

위 명령어를 실행하면 /data/kb002s/ 경로에 있는 모든 테이블 리스트를 출력합니다. 이 명령어의 출력 결과에서는 각 테이블의 정보가 나타나므로, 그 중에서 테이블 이름만 추출할 수 있습니다.

추출 방법은 다음과 같습니다.

hdfs dfs -ls /data/kb002s/ | awk '{print $NF}' | grep -v 'tmp'

이 명령어는 '/data/kb002s/' 경로에서 테이블 리스트를 출력한 후, 출력 결과에서 마지막 열인 테이블 이름만 추출합니다. 이때, 'tmp' 라는 문자열로 시작하는 테이블은 제외하도록 grep 명령어를 사용합니다.

테이블 이름을 추출하면 하둡 영역에서 사용되는 테이블 이름을 모두 확인할 수 있습니다.

# 추출된 110개 테이블 리스트로부터 컬럼명과 타입을 추출

하둡 영역에서 테이블의 컬럼명과 타입을 추출하려면, Hive(하이브)나 Spark(스파크)를 사용해야 합니다. Hive와 Spark는 모두 Hadoop 클러스터에서 데이터를 처리하기 위한 프레임워크입니다. 두 프레임워크 모두 SQL과 비슷한 구문을 사용하여 테이블의 스키마 정보를 쉽게 확인할 수 있습니다. 이번에는 Hive와 Spark를 활용하여 하둡 영역 테이블 110개의 컬럼명과 타입을 추출하는 방법을 알아보겠습니다.

  1. Hive를 사용한 방법

Hive에서 테이블의 컬럼명과 타입을 확인하는 방법은 DESCRIBE 구문을 사용하는 것입니다. 다음 명령어를 통해 원하는 테이블의 스키마 정보를 확인할 수 있습니다.

sql

DESCRIBE table_name;

하지만 이렇게 하나씩 확인하기에는 비효율적이기 때문에, Hive 스크립트로 자동화를 시도해 볼 수 있습니다. 아래 예시를 참고하시면 되겠습니다.

bash

#!/bin/bash

HADOOP_TABLES=`hdfs dfs -ls /data/kb002s/ | awk '{print $NF}' | grep -v '_tmp_' | head -110`

for TABLE_NAME in $HADOOP_TABLES
do
  echo "Table: $TABLE_NAME"
  hive -S -e "USE YOUR_DATABASE_NAME; DESCRIBE $TABLE_NAME;"
  echo "------------------"
done

위 스크립트는 테이블 개수를 110개로 제한하고, 각 테이블의 컬럼명과 타입을 출력합니다. YOUR_DATABASE_NAME 부분을 원하는 데이터베이스 명으로 변경하시면 됩니다.

  1. Spark를 사용한 방법

개인적으로 SHELL SCRIPT를 선호하지만, Spark에서도 Hive와 유사하게 DESCRIBE 구문을 사용하여 테이블의 컬럼명과 타입을 확인할 수 있습니다.

python

from pyspark.sql import SparkSession

spark = SparkSession.builder \\\\
  .master("yarn") \\\\
  .appName("Hadoop Table Column Names and Types") \\\\
  .getOrCreate()

hadoop_tables = ["table0", "table2", ..., "table109"]

for table_name in hadoop_tables:
  print(f"Table: {table_name}")
  spark.sql(f"DESCRIBE YOUR_DATABASE_NAME.{table_name}").show()
  print("------------------")

spark.stop()

위 코드는 PySpark를 사용하여 하둡 영역 테이블 100개의 컬럼명과 타입을 출력하는 예제입니다. 이 코드를 통해 간단하게 원하는 정보를 추출할 수 있습니다. YOUR_DATABASE_NAME 부분을 원하는 데이터베이스 명으로 변경하시면 됩니다.

이렇게 Hive나 Spark를 사용하면 하둡 영역 테이블의 컬럼명과 타입을 쉽게 확인하고 추출할 수 있습니다.

# 추출된 테이블 110개의 컬럼 중 string 타입인 자원의 max length들을 추출

테이블 100개의 string 타입 컬럼에 대한 최대 길이를 찾으려면, 각 테이블의 각 string 컬럼에 대해 최대 길이를 계산하는 집계 함수를 사용해야 합니다. Hive와 Spark에서 모두 간단한 SQL 쿼리를 통해 계산이 가능합니다.

아래 예시는 Hive와 Spark를 사용하여 110개 테이블의 각 string 타입 컬럼의 최대 길이를 추출하는 방법을 설명합니다.

  1. Hive를 사용한 방법

sql

USE YOUR_DATABASE_NAME;

CREATE TEMPORARY FUNCTION string_length AS 'com.your.package.StringLengthFunction';

bash

#!/bin/bash

HADOOP_TABLES=`hdfs dfs -ls /data/kb002s/ | awk '{print $NF}' | grep -v '_tmp_' | head -110`

for TABLE_NAME in $HADOOP_TABLES
do
  STRING_COLUMNS=$(hive -S -e "USE YOUR_DATABASE_NAME;
      DESCRIBE $TABLE_NAME;" | grep 'string' | awk '{print $1}')
  for COLUMN_NAME in $STRING_COLUMNS
  do
    echo "Table: $TABLE_NAME, Column: $COLUMN_NAME"
    MAX_LENGTH=$(hive -S -e "USE YOUR_DATABASE_NAME;
        SELECT MAX(string_length($COLUMN_NAME)) FROM $TABLE_NAME;")
    echo "Max length: $MAX_LENGTH"
    echo "------------------"
  done
done

  1. Spark를 사용한 방법

python

from pyspark.sql import SparkSession

def string_columns(df):
    return {c.name for c, t in df.schema.fields if t.simpleString().startswith("string")}

spark = SparkSession.builder \\\\
    .master("yarn") \\\\
    .appName("Hadoop Table String Column Max Length") \\\\
    .config("spark.jars", "/path/to/your/jar/stringlengthfunction.jar") \\\\
    .getOrCreate()

spark.sql("CREATE TEMPORARY FUNCTION string_length AS 'com.your.package.StringLengthFunction'")

hadoop_tables = ["table0", "table2", ..., "table109"]

for table_name in hadoop_tables:
    df = spark.sql(f"SELECT * FROM YOUR_DATABASE_NAME.{table_name}")
    string_cols = string_columns(df)
    for col_name in string_cols:
        max_length = spark.sql(f"SELECT MAX(string_length({col_name})) FROM YOUR_DATABASE_NAME.{table_name}") \\\\
                               .collect()[0][0]
        print(f"Table: {table_name}, Column: {col_name}, Max Length: {max_length}")
        print("------------------")

spark.stop()

이렇게 Hive와 Spark를 사용하여 110개 테이블의 각 string 타입 컬럼의 최대 길이를 계산하고 추출할 수 있습니다.

# 추출된 테이블 110개의 테이블명 표준화 진행

표준화를 위해 테이블 110개의 특성을 파악하는 과정이 필요합니다.

이는 각 회사의 표준화 정책에 따라 달라지며, 저희 회사의 경우 업무분류코드를 지정하여 각 업무 도메인에 따른 테이블 지정 Rule대로 진행하게 됩니다. 다만 이번 표준화 작업의 경우 업무 분류 코드가 존재하지 않은 경우가 태반이었기에 업무분류코드 제정 역시 꽤나 큰 작업이었습니다.

# 추출된 테이블 110개의 컬럼, 컬럼타입, max length를 이용하여 표준화 진행

표준화를 위한 표준 단어, 표준 용어, 표준 도메인 등을 설정하는 과정은 데이터 표준화 및 데이터 관리의 중요한 단계입니다. 이 과정은 크게 다음 단계로 나눌 수 있습니다.

  1. 표준화 그룹 구성 표준화 작업을 담당할 전문가 그룹을 구성합니다. 이 그룹은 데이터 관리자, 데이터 모델러, 개발자 및 해당 분야의 비즈니스 전문가로 이루어질 수 있습니다. 사실 저 혼자 하긴 합니다. 😂
  2. 기준 및 가이드라인 개발 표준화된 테이블 및 컬럼 이름, 데이터 타입, 날짜 및 시간 형식, 공통 코드의 사용 등의 기준을 개발합니다. 이를 통해 식별되는 데이터의 명확성과 일관성을 유지하고, 이해하기 쉬운 구조와 관계를 지향합니다.
  3. 표준 단어 및 용어 개발 표준 용어는 속성, 상태, 관계를 설명하는 단어를 의미합니다. 각 단어의 정의와 기능을 기록하고 업무와 관련된 문서, 시스템 구성 요소의 이름을 표준화합니다.
  4. 표준 도메인 정의 표준 도메인은 공통으로 사용되는 데이터 집합을 의미합니다. 도메인에 속한 모든 테이블 및 컬럼은 통합 및 표준화 된 데이터 형식 및 값의 집합을 사용합니다. 예를 들어, 고객 주소, 연락처 정보, 이메일 형식 등이 될 수 있습니다.
  5. 표준화 검토 및 업데이트 표준화 프로세스는 지속적인 검토와 업데이트가 필요하며, 기업 환경의 변화에 따라 수시로 조정이 필요합니다.
  6. 표준화된 정보 및 가이드라인 배포 표준화된 가이드라인과 기준을 해당 조직에 전달합니다. 새로운 시스템이 구축되거나 기존 시스템이 변경되는 경우, 표준화된 정보를 적용하고 준수하는 것이 중요합니다.

이러한 표준화 프로세스를 거쳐 통합 및 표준화 작업이 원활하게 진행되고, 최종적으로 정제된 데이터가 관계형 데이터베이스에 적재되면 표준화된 정보를 사용해 데이터를 손쉽게 활용할 수 있게 됩니다.

# 추출된 테이블 110개의 컬럼, 컬럼타입, max length를 이용하여 표준화 진행

추출된 컬럼명, 컬럼타입, 최대 길이를 가지고 관계형 데이터베이스로 데이터베이스 표준화를 진행하려면, 아래와 같은 단계를 따라 진행할 수 있습니다.

  1. 관계형 데이터베이스 및 테이블 생성 먼저 관계형 데이터베이스를 선택해야 합니다. MySQL, PostgreSQL, Oracle, SQL Server 등이 있습니다. 원하는 데이터베이스 관리 시스템(DBMS)를 선택한 후, 데이터베이스 및 필요한 테이블을 생성합니다.
  2. 데이터 변환 계획 수립 하둡 영역에 있는 데이터를 관계형 데이터베이스로 옮겨오기 전에, 데이터 변환 계획을 수립해야 합니다. 스키마 변환, 데이터 타입 매핑, 인덱스 및 제약 조건 설정 등이 포함됩니다.
  3. 데이터 마이그레이션 이제 추출된 컬럼명, 컬럼타입, 최대 길이를 바탕으로 데이터 마이그레이션을 진행합니다. 마이그레이션을 위한 ETL(Extract, Transform, Load) 도구나 스크립트를 작성하면 됩니다.

아래 작업은 추출된 컬럼 정보를 바탕으로 Spark를 사용하여 하둡 데이터를 관계형 데이터베이스로 마이그레이션하는 예제입니다.

python

from pyspark.sql import SparkSession

# SparkSession 생성 및 관계형 데이터베이스 연결 설정
spark = SparkSession.builder \\\\
    .master('yarn') \\\\
    .appName('Hadoop to RDBMS Migration') \\\\
    .config('spark.jars', '/path/to/your/jdbc/driver.jar') \\\\
    .getOrCreate()

# 관계형 데이터베이스 연결 정보
url = 'jdbc:your_database_url'
properties = {
    'user': 'your_username',
    'password': 'your_password',
    'driver': 'your_jdbc_driver_class_name',
}

# 실제 마이그레이션 진행
hadoop_tables = ['table1', 'table2', ..., 'table100']
rdbms_tables = ['rdbms_table1', 'rdbms_table2', ..., 'rdbms_table100']

for hadoop_table, rdbms_table in zip(hadoop_tables, rdbms_tables):
    df = spark.sql(f'SELECT * FROM YOUR_HADOOP_DATABASE_NAME.{hadoop_table}')
    df.write \\\\
        .jdbc(url, rdbms_table, properties=properties, mode='overwrite')

# SparkSession 종료
spark.stop()

사실 저는 위 처럼 Spark를 사용한 게 아닌, 하둡에서 파일을 직접 다른 저장소에 저장하는 방법을 사용했습니다.

  1. HDFS에서 로컬 파일 시스템으로 파일 복사하기 HDFS에서 로컬 파일시스템으로 파일을 복사하려면 hdfs dfs -get 명령어를 사용할 수 있습니다. 다음 명령어를 사용하여 파일을 복사합니다.

bash

hdfs dfs -get /path/to/hdfs/source/file /path/to/local/destination

이 명령어를 실행하면 HDFS의 /path/to/hdfs/source/file 경로에 있는 파일이 로컬 파일시스템의 /path/to/local/destination 경로로 복사됩니다. 그 후 해당 저장소를 다른 전처리 서버에 Mount하여 물리테이블로 만드는 과정을 진행하였습니다. 이 부분은 또 나중에 서술하도록 하겠습니다.