import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; /** * Kafkaにデータを書き込むサンプルプログラム */ public class BasicKafkaProducer { private static final CountCallback SEND_CALLBACK = new CountCallback(); private static class CountCallback implements Callback { private int count = 0; @Override public void onCompletion(RecordMetadata recordMetadata, Exception paramException) { count++; if (paramException != null) { System.err.println(paramException.getMessage()); System.exit(1); } } public int getCount() { return count; } } public static void main(String[] args) throws FileNotFoundException, IOException { // 設定ファイルを読み込み Properties producerProperties = new Properties(); producerProperties.load(Files.newInputStream(Paths.get("producer.properties"))); // データファイルを読み込み List dataList = Files.readAllLines(Paths.get("X_test.txt")); // メッセージをKafka Brokerに送信する try (KafkaProducer producer = new KafkaProducer<>(producerProperties);) { while (true) { for (String data : dataList) { // JSONメッセージを作成 String jsonString = "{" + "\"user\": \"Test User\"," + "\"time\": " + System.currentTimeMillis() + "," + "\"data\": \"" + data + "\"" + "}"; // メッセージを送信 producer.send(new ProducerRecord<>("human-activity", jsonString), SEND_CALLBACK); } } } } }