tf.data와 tf.record를 이용해 데이터 읽고 저장하기

참고

Using TFRecords and tf.Example

tf.estimator 와 효과적인 Data 사용

tf.estimator 는 Tensorflow의 High-level API로 평가, 학습, 예측과 서빙을 위한 모델 저장등을 편하게 수행할 수 있게 해줍니다. Estimators 를 사용하면서 (Estimator 외에도) Tensorflow 에서 데이터를 효과적으로 사용하기 위해서는 데이터를 여러 개의 직렬화된 데이터에 나눠 저장해 이용하는 것이 좋습니다. Tensorflow 에서는 TFRecord 를 통해 데이터를 직렬화하고 읽을 수 있습니다. 데이터를 직렬화하고 TFRecord 형식으로 저장하고 이를 읽기 위해 tf.Example 을 이용할 수 있습니다.

여기서는 먼저 .txt 형식의 파일을 tf.data.TextLineDataset 을 이용해 읽어 오고, tf.Example 로 데이터를 직렬화한 뒤, TFRecord 형식으로 다시 저장하는 과정을 진행해봅니다.

데이터 읽고 파싱하기

먼저 일반적인 .txt 형식의 파일을 tf.data.TextLineDataset 으로 읽어오겠습니다. 예제 데이터로 NSMC (Naver Sentiment Movie Corpus) 를 이용합니다.

tf.data 로 데이터를 읽는 것은 간단합니다. tf.data (여기서는 tf.data.TextLineDataset) 을 선언하고 한 번에 읽어 올 배치 사이즈를 지정한 다음, iterator 를 생성합니다. 구체적인 코드는 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
FEATURE_NAME = "review"
hparams = tf.contrib.training.HParams(batchsize=1)

def iterate_data(filename):
    """
    Read text file with tf.data.TextLineDataset.
    """
    def _decode_tsv(line):
        parsed_line = tf.decode_csv(line, [[0], ["null"], [0]], field_delim="\t")
        label = parsed_line[-1]
        feature = parsed_line[1]
        d = dict(zip([FEATURE_NAME], [feature])), label
        return d

    dataset = tf.data.TextLineDataset(filename).skip(1).map(_decode_tsv)
    dataset = dataset.batch(hparams.batch_size)

    return dataset.make_one_shot_iterator()

Iterator 를 리턴하는 iterate_data(filename) 함수를 정의합니다. 이 함수는 내부에서 텍스트 파일 한 라인을 파싱할 _decode_tsv(line) 을 매핑합니다. tf.decode_csv(records, record_defaults)record_defaults 는 한 라인 각 요소의 데이터 타입을 예시로 지정해줍니다. NSMC 데이터는 각 라인이 \t 으로 분리되어 있고, 각 원소는 id (int), 리뷰 (string), sentiment (int) 로 이루어져 있습니다. 이를 위해 record_defaults[[0], ["null"], [0]] 으로 지정해 주었습니다. 한편, decode 함수는 (feature Dict, label) 튜플을 리턴합니다.

이후 위의 함수를 이용해서 데이터를 불러오는 코드는 다음과 같습니다.

1
2
3
4
5
iterator = iterate_data(TRAIN_PATH)

next_elem = iterator.get_next()
with tf.Session() as sess:
    batch_features, batch_labels = sess.run(next_elem)

데이터를 불러오면 다음과 같습니다.

1
2
3
batch_features
>>>
{'review': array([b'\xea\xb5\xb3 \xe3\x85\x8b'], dtype=object)}
1
2
3
batch_labels
>>>
array([1], dtype=int32)

tf.data 로 데이터를 불러오면 텍스트는 utf-8로 인코딩 된 상태입니다. 우리가 읽을 수 있고, 여러 가지 전처리를 하기 위해서는 str.decode("utf-8") 로 디코딩 해줍니다. (Tensorflow 1.3 에서는 unicode 인코딩/디코딩 API가 제공됩니다.)

1
2
3
batch_features["review"][0].decode("utf-8")
>>>
'굳 ㅋ'

위에서는 예시로 들기 위해 첫 번째 배치만 얻고 반복문을 중지했지만 실제로는 while 문으로 데이터를 끝까지 불러옵니다. 유의할 점은, 데이터를 불러오는 .get_next() 를 반복문 안이 아니라 밖에서 선언한다는 점입니다. 실제로는 아래와 같은 형태로 데이터를 읽습니다.

1
2
3
4
5
6
with tf.Session() as sess:
    while True:
        try:            
            batch_features, batch_labels = sess.run(next_elem)
        except tf.errors.OutOfRangeError:
            break

tf.Example 로 Example 만들고 직렬화하기

tf.Example 로 Example 만들기

그럼 데이터를 읽어올 수 있으니, TFRecord로 다시 저장하기 위해 Example 을 만들고 직렬화해줍니다. tf.Example 을 이용합니다. tf.Example 메시지는 기본적으로 {"string": value} 매핑이며, Tensorflow에서 유연하게 사용할 수 있도록 설계된 메시지 타입입니다.

또한 여기서는 tf.Example 을 이용할 때 tf.train.Feature 을 사용합니다. tf.train.Feature 도 마찬가지로 프로토콜이며, {value: value} 로 표현됩니다. tf.train.Feature 는 세 가지 타입을 받을 수 있습니다. (한편 non-scalar 타입을 직렬화하는 가장 간단한 방법은 tf.serialize_tensor 를 사용하는 것입니다.)

  • tf.train.BytesList
    • string
    • byte
  • tf.train.FloatList
    • float (float32)
    • double (float64)
  • tf.train.Int64List
    • bool
    • enum
    • int32
    • uint32
    • int64
    • uint64

Tensorflow 데이터를 tf.Example-compatible tf.train.Feature 로 변환하기 위해서는 아래와 같은 함수를 이용합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# The following functions can be used to convert a value to a type compatible
# with tf.Example.

def _bytes_feature(value):
    """Returns a bytes_list from a string / byte."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """Returns a float_list from a float / double."""
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """Returns an int64_list from a bool / enum / int / uint."""
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

그럼 위의 함수를 이용해서 실제로 데이터를 변환해보겠습니다. 우리가 사용하는 NSMC 에는 stringint 타입만 있기 때문에 _bytes_featuers_int64_feature 만 사용합니다.

위에서 얻은 feature로 간단히 결과를 얻으면 다음과 같습니다.

1
2
3
4
5
6
7
review = batch_feature["review"][0]	# utf-8 encoded

_bytes_feature(review)
>>>
bytes_list {
  value: "526563 430513"
}
1
2
3
4
5
6
7
label = batch_labels[0]

_int64_feature(label)
>>>
int64_list {
  value: 1
}

Serialization

이렇게 데이터를 tf.train.Feature 로 변환했으면, 이를 직렬화해주어야 합니다. 이를 위해 tf.Example 프로토콜을 정의하고 직렬화해줍니다. 이제 직렬화의 마지막 단계입니다. 직렬화에는 tf.Example().SerializeToString 을 사용합니다. 구체적인 코드는 아래와 같습니다.

1
2
3
4
def serialize_example(feature):
    example_proto = tf.train.Example(
        features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

위에서 얻은 값을 직렬화하는 과정은 다음과 같습니다.

1
2
3
4
feature = {
    "review": _bytes_feature(batch_feature["review"][0]),
    "label": _int64_feature(batch_label[0]),
}
1
2
3
serialize_example(feature)
>>>
b"\n'\n\x15\n\x06review\x12\x0b\n\t\n\x07\xea\xb5\xb3 \xe3\x85\x8b\n\x0e\n\x05label\x12\x05\x1a\x03\n\x01\x01"

그럼 전체 데이터를 tf.train.Feature 로 변환하고 직렬화하겠습니다. 여기서는 학습 시의 편의를 위해 텍스트를 전처리한 뒤 직렬화하겠습니다. 이를 위해 리뷰 텍스트는 간단한 토크나이징을 해주고 tf.train.Feature 로 변환하겠습니다. 또한 tf.train.Feature 를 만들기 위해서는 string 을 다시 utf-8 로 인코딩해주어야 합니다. 이를 위해 아래와 같은 함수를 간단히 정의합니다. 한가지 더 주의할 점은, tf.train.BytesList(value=[value] 처럼, value 는 리스트 형태로 지정해주어야 합니다.

1
2
3
4
5
6
7
8
from konlpy.tag import Okt
tagger = Okt()

def preprocess_text(text):
    return " ".join(tagger.morphs(text))

def _encode_texts(texts):
    return np.array([text.encode("utf-8") for text in texts], dtype=bytes)

전체 과정은 아래와 같습니다. 조금 번거롭습니다. tf.data.TextLineDataset 으로 배치사이즈만큼 데이터를 읽어오고, 세션을 열어 값을 얻은 다음, 전처리를 진행하고 다시 인코딩해서 tf.train.Feature 로 변환합니다. 코드는 아래와 같습니다. 아래 함수는 한 번에 하나의 feature를 리턴하는 제너레이터입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
tf.logging.set_verbosity("INFO")

FEATURE_NAME = "review"
filename = TRAIN_PATH

num_instances = 150000	# number of train instances
batch_size = 1000

def generate_feature(filename):
    iterator = iterate_data(filename)
    batch_features, batch_labels = iterator.get_next()

	num_batches = math.ceil(num_instances / batch_size)

    i = 1
    with tf.Session() as sess:
        while True:
            try:
                tf.logging.info(f"Reading batch: {i}/{num_batches}") # for python >= 3.6
                # tf.logging.info("Reading batch: {}/{}".format(i, num_batches)) # for python < 3.6
                features, labels = sess.run([batch_features[FEATURE_NAME], batch_labels])
                processed_features = preprocess_text(features)
                for feature, label in zip(_encode_texts(processed_features), labels):
                    yield {
                        "review": _bytes_feature(feature),
                        "label": _int64_feature(label),
                    }
                    i += 1
            except tf.errors.OutOfRangeError:
                break

위의 코드에 읽어올 파일 위치와 tf.record 저장 위치 등을 인자로 주고 코드를 실행하면 다음과 같습니다. 실행이 완료되면 하나의 TFRecord 파일이 생성됩니다.

1
2
3
4
5
INFO:tensorflow:Reading batch: 1/150
INFO:tensorflow:Reading batch: 2/150
INFO:tensorflow:Reading batch: 3/150
INFO:tensorflow:Reading batch: 4/150
...

TFRecord 읽고 파싱하기

TFRecord 생성을 완료했으면 다시 데이터를 읽어올 차례입니다. TFRecord는 크게 tf.python_io.tf_record_iteratortf.data.TFRecordDataset 두 가지 방법으로 읽을 수 있습니다. (참고로 데이터 읽기를 소개하는 Tensorflow 공식 문서 에서는 tf.TFRecordReader 를 이용해서 TFRecord를 읽도록 설명하고 있는데, Queue-based input pipelines 은 tf.data 로 교체되었으므로 여기서는 무시합니다.)

tf.python_io.tf_record_iterator 이용하기

먼저 tf_record_iterator 를 이용하는 방법은 다음과 같습니다. iterator를 정의하고, tf.train.Example 을 이용합니다. 우리가 원하는 feature에 접근하기 위해서는 features.feature 속성을 이용합니다.

1
2
3
4
5
6
7
8
9
record_iterator = tf.python_io.tf_record_iterator(
    path=RECORD_PATH)

for record in record_iterator:
    example = tf.train.Example()
    example.ParseFromString(record)
    
    value = example
    break
1
2
3
4
5
6
7
8
features = dict(value.features.feature)
features
>>>
{'review': bytes_list {
   value: "526563 430513"
 }, 'label': int64_list {
   value: 1
 }}

위의 features 에서 각 feature는 데이터 타입으로 접근합니다.

1
2
3
4
5
6
7
bytes_tokens = features["review"].bytes_list.value
decoded_tokens = bytes_tokens[0].decode("utf-8")
print(bytes_tokens)
print(decoded_tokens)
>>>
[b'\xea\xb5\xb3 \xe3\x85\x8b']
'굳 ㅋ'
1
2
3
4
label = features["label"].int64_list.value
print(label)
>>>
[1]

tf.data.TFRecordDataset 이용하기

앞에서 데이터를 읽을 때 사용했던 tf.data 를 사용하려면 tf.data.TFRecordDataset 을 사용하면 됩니다.

1
2
3
4
5
tf_record_data = tf.data.TFRecordDataset(
    filenames=RECORD_PATH)
tf_record_data = tf_record_data.batch(1)
iterator = tf_record_data.make_one_shot_iterator()
next_elem = iterator.get_next()

tf.data.TFRecordDataset 을 이용할 때는 tf.python_io.tf_record_iterator 를 사용할 때와 달리 데이터를 parsing 해주는 작업이 필요합니다. 이 때는 tf.parse_example() 을 이용하는데, features 인자로 각 feature의 parsing 타입을 명시해줍니다. 간단한 함수로 나타내면 아래와 같습니다.

1
2
3
4
5
6
7
8
9
def parse_example(serialized_item):
    parsed_example = tf.parse_example(
        serialized=serialized_item,
        features={
            "review": tf.VarLenFeature(dtype=tf.string),
            "label": tf.FixedLenFeature([], dtype=tf.int64)
        }
    )
    return parsed_example

next_elem 을 parsing 하면 tf.python_io.tf_record_iterator 를 사용했을 때와 동일한 결과를 얻을 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
parsed_example = parse_example(next_elem)
with tf.Session() as sess:
    features = sess.run(parsed_example)
    
bytes_tokens = features["review"].values
decoded_tokens = bytes_tokens[0].decode("utf-8")
print(bytes_tokens)
print(decoded_tokens)
>>>
[b'\xea\xb5\xb3 \xe3\x85\x8b']
'굳 ㅋ'
1
2
3
4
label = features["label"]
print(label)
>>>
[1]