AWS

[Re18특집] Apache Flink를 Kinesis 와 함께 Serverless로 – Amazon Kinesis Data Analytics를 위한 Java 언어 지원 출시

여기서 다루는 내용

· 관리형 Flink
· Wordcount 예제
· 정리


들어가며


안녕하십니까, GS네오텍 김범환입니다.

Re:invent 2018 특집입니다.

이번에 Kinesis Data Analytics에서 Java를 사용할 수 있게 되었습니다.
다시 말하면 Flink라는 프레임워크를 사용할 수 있게 되었습니다.

Kinesis Analytics와 이번부터 지원하는 프레임워크인 Flink에 대해 간단한 WordCount 예제와 함께 살펴보시죠.


관리형 Flink


Kinesis라는 이름이 붙은 서비스는 무려 4개나 있습니다.
스트리밍, 실시간 처리에 관한 서비스 앞에는 Kinesis라는 이름이 붙어있는데 그 목록은 다음과 같습니다.

  • Kinesis Streams
  • Kinesis Data Firehose
  • Kinesis Analytics
  • Kinesis Video Streams

짧게 말하면 Streams는 저장, Firehose는 전송, Analytics는 분석, 그리고 Video Streams는 비디오 스트리밍 처리를 도와줍니다.

그 중 오늘 살펴볼 Kinesis Analytics는 실시간 데이터 분석 서비스입니다.
Kinesis Streams에 스트리밍으로 들어오는 데이터를 실시간으로 처리할 수 있게 서비스가 구성되어 있습니다.

이전에는 Kinesis Analytics에서 SQL문 분석을 지원해서 실시간으로 들어오는 데이터를 SQL문으로 쿼리해 실시간으로 데이터를 분석할 수 있었습니다. 실시간 처리를 위해서는 Kinesis에 들어오는 데이터를 Spark와 같은 프레임워크를 구축하고 Kinesis와 연결해서 처리해야 하는데 그런 수고를 덜게끔 관리형 서비스의 형태로, 거기다가 SQL문을 이용할 수 있게 해주는 장점이 있었죠.


※ SQL문을 지원하는 Kinesis Data Analytics

거기에 이번 기능업데이트를 통해 SQL로 분석할 수 있는 옵션 외에도
Flink라는 프레임워크를 사용할 수 있는 옵션이 하나 더 생겼습니다.

Flink는 최근에 실시간 처리에 대한 요구가 늘어나면서 각광받게 된 프로젝트입니다.
요새는 버전업이 하도 빨라서 몇달전 1.5버전이 최신이었는데 `18/12에는 1.8버전까지 릴리즈가 되어 있네요.
그만큼 사용하는 유저도 많고 프로젝트에 기여하는 유저도 많다는 뜻이겠죠. Flink는 SQL문과 같은 High-level의 데이터 분석부터 상태, 이벤트시간 등을 관리할 수 있는 Low-Level까지 접근이 가능합니다. 즉 성능에 민감한 프로젝트에서는 로우레벨로 접근해 성능 튜닝도 가능하다는 의미입니다.

하지만, Flink는 분산처리를 기본으로 하기 때문에 이를 구축하려면 클러스터를 구축해야 합니다. EMR 에서도 Flink 1.6 버전을 지원하고 있어서 EMR을 통해 구축할 수도 있습니다. 하지만 Kinesis Data Analytics를 이용하면 Flink를 관리형 서비스의 형태로 더 편하게 이용할 수 있게 만들어 놓은 것이 이번 업데이트의 핵심입니다. Flink를 관리형 서비스로 만들어서 인프라 구축과 관리에 대한 부담은 줄이면서 스트리밍 데이터 처리 시에 event time등에 대한 접근까지 가능해졌습니다.

Flink의 실시간 처리방식, 실시간 처리 프레임워크간의 비교는 여기 를 참고하시기 바랍니다.


Wordcount 예제


Wordcount 예제는 실시간처리 프레임워크에서는 빠지지 않고 등장하는 예제입니다.
간단하게 짧은 순간 들어오는 단어의 개수를 세는 작업을 통해 실시간 처리를 경험해 볼 수 있어서 널리 쓰이는 것 같습니다

이번 예제는 다음과 같은 구성으로 돌아갑니다


※ WordCount 예제 구성도

이번 예제의 처리순서는 다음과 같습니다.

  1. TextInputStream에 지속적으로 문장들이 들어갑니다
  2. Kinesis Data Analytics의 어플리케이션이 5초마다 문장속의 단어의 개수를 셉니다
  3. 단어를 센 결과를 WordCountOutputStreams로 전송합니다

자 그러면 먼저 스트리밍 데이터를 저장할

  • TextInputStream
  • WordCountOutput

이 두가지 Streams가 필요하겠죠.

Kinesis Data Analytics는 아직 5개 리전(프랑크푸르트, 버지니아, 오하이오, 아일랜드, 오레곤)만 지원하기 때문에
이번 예제의 모든 리소스는 버지니아리전에서 생성합니다.

Kinesis Streams에서 간단하게 shard를 한개씩만 갖고 있는 Stream을 두개 생성합니다.


※ Kinesis streams 생성

Kinesis Streams 생성이 완료되었습니다.
그러면 이제 스트림과 스트림사이에 처리를 할 어플리케이션을 작성해야합니다.
간단한 어플리케이션이지만 build하는건 간단하지 않습니다.

우선, maven을 통해 Flink 기본 template을 통해 flink 프로젝트를 생성합니다.

$ mvn archetype:generate                               \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-java      \
      -DarchetypeVersion=1.6.2

groupId와 artifactId등을 입력하면 다음과 같은 구조의 프로젝트가 생성됩니다.

$ tree quickstart/
quickstart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

Flink 사용을 위한 기본적인 뼈대입니다.
그 중에서 StreamingJob.java 파일을 다음과 같이 수정합니다.

package myorg;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.util.Collector;

import java.util.Properties;


public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "TextInputStream";
	private static final String outputStreamName = "WordCountOutputStream";

	private static DataStream createSourceFromStaticConfig(
			StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
				"LATEST");

		return env.addSource(new FlinkKinesisConsumer(inputStreamName,
				new SimpleStringSchema(), inputProperties));
	}

	private static FlinkKinesisProducer createSinkFromStaticConfig() {
		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisProducer sink = new FlinkKinesisProducer(new
				SimpleStringSchema(), outputProperties);
		sink.setDefaultStream(outputStreamName);
		sink.setDefaultPartition("0");
		return sink;
	}

	public static void main(String[] args) throws Exception {

		final StreamExecutionEnvironment env =
				StreamExecutionEnvironment.getExecutionEnvironment();

		DataStream input = createSourceFromStaticConfig(env);

		input.flatMap(new Tokenizer())
				.keyBy(0)
				.timeWindow(Time.seconds(5))
				.sum(1)
				.map(new MapFunction<Tuple2, String>() {
					@Override
					public String map(Tuple2 value) throws Exception {
						return value.f0 + "," + value.f1.toString();
					}
				})
				.addSink(createSinkFromStaticConfig());

		env.execute("Word Count");
	}

	public static final class Tokenizer
			implements FlatMapFunction<String, Tuple2> {

		@Override
		public void flatMap(String value, Collector<Tuple2> out) {
			String[] tokens = value.toLowerCase().split("\\W+");
			for (String token : tokens) {
				if (token.length() > 0) {
					out.collect(new Tuple2(token, 1));
				}
			}
		}
	}

}

※ StreamingJob.java – AWS 공식블로그 참고

이제 코드는 작성했고, 중요한 작업이 하나 남았습니다.

우리가 작성한 코드엔 flink-connector-kinesis 라는 library가 필요합니다.
이름 그대로 flink와 Kinesis를 연결시켜주는 라이브러리죠.
하지만 이 라이브러리는 라이센스 문제로 maven 중앙 레포지토리에서 받을 수가 없습니다.
그래서 따로 직접 빌드를 해주어야 합니다.

Flink 1.6.2 release의 소스코드를 받아와서 압축을 푼뒤 다음과 같이 빌드합니다.

mvn clean install -Pinclude-kinesis -DskipTests

꽤 시간이 걸립니다. 제 컴퓨터에선 15분 정도 걸리네요.
커넥터 빌드가 완료되면 우리가 작성할 어플리케이션의 pom.xml파일에 다음과같이 dependency를 추가해줘야 합니다.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kinesis_2.11</artifactId>
  <version>1.6.2</version>
</dependency>

그럼이제 설정이 완료가 되었으니 빌드를 해야겠죠. 프로젝트의 루트에서 다음과 같이 maven으로 빌드합니다.

mvn clean package

프로젝트가 크지 않기 때문에 금방 빌드가 됩니다.
빌드가 완료되면 {프로젝트루트}/target/ 폴더에 {aritfactId}-{version}.jar 파일이 생성이 됩니다.

이 jar파일을 s3에 올려줍니다.


※ S3에 jar파일 업로드

그럼 이제 드디어 Kinesis Data Analytics 어플리케이션을 생성할 차례입니다.
어플리케이션 이름을 적고 flink를 선택한 후 생성버튼을 누릅니다.


※ Kinesis Data Analytics Application 생성

어플리케이션을 생성하면서 IAM Role 하나가 자동으로 생성되었습니다.
이 Role에는 우리가 생성한 Kinesis Streams에 대한 읽기,쓰기 권한이 있어야합니다.
생성한 Role에 정책을 붙여줍니다.


※ Kinesis streams를 읽고 쓸수 있는 정책 연결

다시 Analytics 어플리케이션으로 돌아와서 Configure 버튼을 누르면 드디어 jar파일을 제출할 수 있습니다.



※ 코드의 위치를 입력하고, Property는 따로 설정하지 않고, 모니터링 레벨을 설정

이렇게 jar파일의 위치를 지정하고, 모니터링 레벨을 설정한뒤 Update를 누르면 어플리케이션이 업데이트 됩니다.


※ 어플리케이션 configure가 완료. 이제 시작만 누르면 될것 같은데..

자 그럼 이제 시작버튼을 누르기 전에…! 아직 구성을 안한것이 있습니다.

아직 Kinesis에 데이터를 전송하는 부분을 구성하지 않았습니다.
또한 OutputStreams를 읽을수도 없죠. 즉, 예제 구성의 가장 앞단과 뒷단이 없는거죠.

데이터 전송과 읽기를 담당할 프로그램을 다음과 같이 파이썬으로 간단하게 만들어줍니다.

import boto3
import json
from datetime import datetime
import calendar
import random
import time


my_stream_name = 'TextInputStream'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

def put_to_stream(thing_id, property_value, property_timestamp):
    
    paragraphs = [ 'Apache Flink is a framework and distributed processing engine \
        for stateful computations over unbounded and bounded data streams. Flink has been\
        designed to run in all common cluster environments, perform computations\
        at in-memory speed and at any scale.',
        'Any kind of data is produced as a stream of events. Credit card transactions, \
        sensor measurements, machine logs, or user interactions on a website or mobile application, \
        all of these data are generated as a stream.',
        'Amazon Kinesis Data Analytics is the easiest way to analyze streaming data, \
        gain actionable insights, and respond to your business and customer needs in real time.\
        Amazon Kinesis Data Analytics reduces the complexity of building, managing, and integrating \
        streaming applications with other AWS services. SQL users can easily query streaming data or \
        build entire streaming applications using templates and an interactive SQL editor. \
        Java developers can quickly build sophisticated streaming applications using open source Java \
        libraries and AWS integrations to transform and analyze data in real-time.',
        'Amazon Kinesis Data Analytics takes care of everything required to run your real-time applications \
        continuously and scales automatically to match the volume and throughput of your incoming data. \
        With Amazon Kinesis Data Analytics, you only pay for the resources your streaming applications consume. \
        There is no minimum fee or setup cost.'
      ]

    message = random.choice(paragraphs)

    print message

    put_response = kinesis_client.put_record(
                        StreamName=my_stream_name,
                        Data=message,
                        PartitionKey=thing_id)

while True:
    property_value = random.randint(40, 120)
    property_timestamp = calendar.timegm(datetime.utcnow().timetuple())
    thing_id = 'aa-bb'

    put_to_stream(thing_id, property_value, property_timestamp)

    # wait for 5 second
    time.sleep(1.5)

※ Kinesis_producer.py – 1.5초에 한번씩 긴 문장을 kinesis로 전송합니다.

import boto3
import json
from datetime import datetime
import time
# import unicodedata
# import base64

my_stream_name = 'WordCountOutputStream'

kinesis_client = boto3.client('kinesis', region_name='us-east-1')

response = kinesis_client.describe_stream(StreamName=my_stream_name)

my_shard_id = response['StreamDescription']['Shards'][0]['ShardId']

shard_iterator = kinesis_client.get_shard_iterator(StreamName=my_stream_name,
                                                      ShardId=my_shard_id,
                                                      ShardIteratorType='LATEST')

my_shard_iterator = shard_iterator['ShardIterator']

record_response = kinesis_client.get_records(ShardIterator=my_shard_iterator,
                                              Limit=2)

while 'NextShardIterator' in record_response:
    record_response = kinesis_client.get_records(ShardIterator=record_response['NextShardIterator'],
                                                  Limit=2)
    if len(record_response['Records']) > 0:
    	result = record_response['Records'][0]['Data']
    	print "----------------------------------------------------------"
    	print result
    	print "----------------------------------------------------------"
    else:
    	print record_response

    # wait for 5 seconds
    time.sleep(5)

※ Kinesis_producer.py – 5초에 한번씩 WordCountOutputStream에서 데이터를 읽어옵니다.

producer는 문장들을 계속해서 input에 쏴주고
consumer는 5초에 한번씩 output에 뭐가있는지 조회합니다.

자 이정도면 구성이 완료되었으니 application을 시작해볼까요?


※ Flink 엔진을 돌려야하니 구동하는데 시간이 조금 걸립니다.

시작이 완료되면 이렇게 application graph가 생성됩니다.


※ Flink에서 제공하는 application graph.

자그러면 WordCount가 되고있는지를 봐야겠죠.
kinesis_producer.py, kinesis_consumer.py를 실행시키면 결과가 다음과 같이 나옵니다.


뭔가 계속 실시간으로 처리가 되고 있는게 보이시나요?
왼쪽 터미널에서는 긴 문장을 전송하고 있고 오른쪽 터미널에서는 단어들이 집계되어 있습니다.


정리


예제를 통해 간단히(?) Kinesis Data Analytics에서 새로 지원하는 Java 프레임워크 Flink를 사용해보았습니다.

사용해보니 좋았던 점은 다음과 같습니다.

  • EMR 등으로 따로 Flink를 구축하지 않아도 간단하게 flink applicaiton을 사용할 수 있는 점
  • 모니터링이 CloudWatch, CloudWatch Logs에서 가능한 점
  • flink의 웹 인터페이스에서 제공하는 어플리케이션 그래프가 같이 나오는 것

기본적으로 필요한 것은 다 있는것 같습니다. 반면 아쉬웠던 점은

  • 서울 리전에서 사용이 불가능한 점
  • flink-connector-kinesis, 즉 kinesis와의 연동을 위한 라이브러리를 따로 빌드해줘야 되는 부분

아직 Kinesis와 Flink를 연결해서 사용하는 사례가 그렇게 많진 않은지 자료도 별로 없더라구요.

하지만 이번 re:invent 2018에 관리형 kafka도 출시되었으니 kafka를 큐로 고려할 수도 있겠네요.
그러면 적어도 Kafka connector는 maven 중앙 레포지토리에 있으니 connector 빌드로 헤매진 않아도 되겠습니다.

이상으로 Kinesis Data Analytics의 신기능에 대해서 알아보았습니다.

끝!

5/5 - (평가 개수 : 2)

필자: GS Neotek

전체 게시물수 : 238

전체 조회수 : 2653

게시물 공유하기