Added third example.

master
Vladimir Protsenko 4 years ago
parent 85af3e7611
commit 19295bf552

@ -126,7 +126,7 @@
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>_DEBUG;APLUSBLIB_EXPORTS;_WINDOWS;_USRDLL;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<PrecompiledHeader>Use</PrecompiledHeader>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<PrecompiledHeaderFile>pch.h</PrecompiledHeaderFile>
</ClCompile>
<Link>
@ -143,7 +143,7 @@
<SDLCheck>true</SDLCheck>
<PreprocessorDefinitions>NDEBUG;APLUSBLIB_EXPORTS;_WINDOWS;_USRDLL;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<ConformanceMode>true</ConformanceMode>
<PrecompiledHeader>Use</PrecompiledHeader>
<PrecompiledHeader>NotUsing</PrecompiledHeader>
<PrecompiledHeaderFile>pch.h</PrecompiledHeaderFile>
</ClCompile>
<Link>

@ -0,0 +1,11 @@
## Запуск сервера
```
python server.py 8080
```
## Запуск клиента
```
python main.py
```

@ -0,0 +1,7 @@
*.iml
*.ipr
*.iws
.idea
out
target
*.class

@ -0,0 +1,41 @@
# Задание
Написать систему мгновенного обмена сообщениями между несколькими пользователями. В проекте должна присутствовать
возможность сохранения состояния в формат, поддерживающий валидацию по схеме. Валидация должна производиться либо
в программе при импорте данных, либо в юнит-тестах, проверяющих корректность сохранения состояния.
# Описание реализации
Между клиентом и сервером происходит обмен JSON сообщениями вида (класс `datatypes.Message`):
```
{"senderId":"AD60","receiverId":"C399","body":"hello", timestamp:"2020-09-11T08:54:45.528807100Z"}
```
- `senderId` - идентификатор отправителя,
- `receiverId` - идентификатор получателя,
- `body` - текстовое сообщение,
- `timestamp` - время создания сообщения.
При подключении клиент должен отправить JSON сообщение со своим именем (класс `datatypes.ClientInfo`):
```
{"timestamp":"2020-09-11T08:49:44.644320700Z", "login":"0FB9"}
```
При получении и отправлении сообщения на сервере производится сохранение поля `timestamp` в JSON файл (класс `datatypes.ClientState`)
в заранее созданную папку `states_path`, указанную при старте сервера в качестве аргумента. Пример сохранённого состояния пользователя
0FB9 в файле 0FB9.json:
```
{"lastMessageTimestamp":"2020-09-11T08:49:44.644320700Z"}
```
Для данной структуры разработана JSON схема `resources/schemas/v0.0.1/ClientStateSchema.json`. Проверка соответствия
сохраняемого состояния схеме производится юнит тестом `SchemaCompliance.checkClientStateSchemeCompliance`.
# Инструкция запуска
Тестовый запуск можно произвести из среды разработки или из консоли при наличии в системе утилиты sbt https://www.scala-sbt.org/.
```
sbt "runMain client.Server --host 192.168.0.2 --port 10000 --states_path ./client_states"
sbt "runMain client.Client --host 192.168.0.2 --port 10000"
```

@ -0,0 +1,12 @@
name := "ConsoleChat"
version := "0.1"
scalaVersion := "2.13.3"
// https://mvnrepository.com/artifact/com.github.java-json-tools/json-schema-validator
libraryDependencies += "com.github.java-json-tools" % "json-schema-validator" % "2.2.14"
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"
libraryDependencies += "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test

@ -0,0 +1,14 @@
{
"$schema": "http://json-schema.org/draft/2019-09/schema#",
"$id": "http://mychat.germanywestcentral.cloudapp.azure.com/schemas/v0.0.1/ClientStateSchema.json",
"title": "ClientState",
"type": "object",
"properties": {
"lastMessageTimestamp": {
"type": "string"
}
},
"required": [
"lastMessageTimestamp"
]
}

@ -0,0 +1,173 @@
package client
import java.io.{BufferedReader, IOException, InputStreamReader}
import java.net.{InetSocketAddress, SocketAddress}
import java.nio.channels.{NonReadableChannelException, NotYetConnectedException, SelectionKey, Selector, SocketChannel}
import ch.qos.logback.classic.Level
import com.typesafe.scalalogging.{LazyLogging, Logger}
import datatypes.{ClientInfo, Message}
import org.slf4j.LoggerFactory
import ch.qos.logback.classic.Level
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.exc.MismatchedInputException
import com.sun.mail.util.SocketConnectException
import scala.collection.immutable.ArraySeq
import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
class Client(host:String,
port:Int,
recieveCallback: Message=>Unit,
login:String
) extends Runnable with LazyLogging {
@volatile var isRunning = false
private val selector = Selector.open()
private var clientChannel: SocketChannel = _
private val inbox = mutable.ArrayDeque[Message]()
private var isInitialized = false
def start(): Unit = {
isRunning = true
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Thread.currentThread().setName("CLEANER")
if (isRunning) {
logger.info("Closing connection.")
isRunning = false
Thread.sleep(10)
if (clientChannel != null) clientChannel.close()
}
}
})
val clientThread = new Thread(this, "CLIENT")
clientThread.start()
}
def leaveMessage(receiverId:String, body: String): Selector = {
val msg = Message(senderId = this.login, receiverId = receiverId, body = body)
inbox.addOne(msg)
clientChannel.register(selector, SelectionKey.OP_WRITE)
selector.wakeup()
}
def run (): Unit = {
clientChannel = SocketChannel.open()
try {
clientChannel.connect(new InetSocketAddress(host, port))
logger.info(s"Connected to $host:$port")
} catch {
case e: SocketConnectException =>
logger.info("Connection failed.")
System.exit(0)
}
clientChannel.configureBlocking(false)
clientChannel.register(selector, SelectionKey.OP_WRITE)
sendRecieveLoop()
}
private def sendRecieveLoop() = {
while (isRunning) {
selector.select()
for (key <- selector.selectedKeys().asScala) {
if (key.isValid && key.isReadable) {
receiveFromServer()
}
if (key.isValid && key.isWritable) {
sendToServer()
}
}
}
}
private def sendToServer() = {
if (!isInitialized) {
val clientInfo = ClientInfo(login = login)
send[ClientInfo](clientInfo)
isInitialized = true
}
if (isInitialized && inbox.nonEmpty) {
try {
val msg = inbox.removeHead()
send(msg)
} catch {
case e@(_: MismatchedInputException |
_: IOException) =>
logger.info(s"Client disconnected.")
System.exit(0)
case e@(_: NotYetConnectedException |
_: NonReadableChannelException |
_: JsonProcessingException) =>
logger.error(e.getMessage)
}
}
}
private def send[T](msg: T)(implicit ct: ClassTag[T]) = {
util.Util.send[T](msg, clientChannel)
clientChannel.register(selector, SelectionKey.OP_READ)
}
private def receiveFromServer() = {
try {
val msg = util.Util.recieve[Message](clientChannel)
if (msg.nonEmpty) {
recieveCallback(msg.get)
}
} catch {
case e@(_: MismatchedInputException |
_: IOException) =>
logger.info(s"Client disconnected.")
System.exit(0)
case e@(_: NotYetConnectedException |
_: NonReadableChannelException |
_: JsonProcessingException) =>
logger.error(e.getMessage)
}
}
}
object Client extends LazyLogging {
LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[ch.qos.logback.classic.Logger].setLevel(Level.INFO)
val defaultArgs = Map(
"host" -> "0.0.0.0",
"port" -> "10000"
)
def uiLoop(client: Client): Unit = {
Thread.currentThread().setName("UI")
val consoleInput = new BufferedReader(new InputStreamReader(System.in))
while(client.isRunning) {
val line = consoleInput.readLine()
if (line != null) {
client.leaveMessage(util.Util.BROADCAST_RECIEVER_ID, line)
}
}
}
def main(args:Array[String]): Unit = {
val argsMap = args.toSeq.sliding(2,2).map{ case ArraySeq(k,v) => (k.replace("--",""), v) }.toMap
.withDefault(defaultArgs)
val host = argsMap("host")
val port = argsMap("port").toInt
val login = argsMap.getOrElse("login", util.Util.genId(java.time.Instant.now()))
val client = new Client(
host,
port,
message => println(s"${message.senderId}: ${message.body}"),
login = login)
client.start()
uiLoop(client)
}
}

@ -0,0 +1,8 @@
package datatypes
import com.fasterxml.jackson.annotation.JsonProperty
case class ClientInfo (
@JsonProperty("timestamp") timestamp:String = java.time.Instant.now().toString,
@JsonProperty("login") login:String
)

@ -0,0 +1,7 @@
package datatypes
import com.fasterxml.jackson.annotation.JsonProperty
case class ClientState (
@JsonProperty("lastMessageTimestamp") lastMessageTimestamp:String
)

@ -0,0 +1,10 @@
package datatypes
import com.fasterxml.jackson.annotation.JsonProperty
case class Message(
@JsonProperty("timestamp") timestamp:String = java.time.Instant.now().toString,
@JsonProperty("senderId") senderId:String,
@JsonProperty("receiverId") receiverId:String,
@JsonProperty("body") body:String
)

@ -0,0 +1,36 @@
package server
import java.nio.channels.SocketChannel
import java.nio.file.Path
import com.typesafe.scalalogging.LazyLogging
import datatypes.{ClientState, Message}
import util.SavableState
import scala.collection.mutable
class Client(
var login:String = "",
var lastMessageTimestamp:String = "",
val inbox:mutable.ArrayDeque[Message] = new mutable.ArrayDeque[Message](),
val socketChannel:SocketChannel,
var isInitialized:Boolean = false) extends SavableState with LazyLogging {
private def getStateFilePath(statesDir:Path): Path = statesDir.resolve(s"${login}.json")
def saveState(statesDir:Path):Unit = {
val filePath = getStateFilePath(statesDir)
val clientState = ClientState(lastMessageTimestamp = lastMessageTimestamp)
util.Util.writeJsonToFile(clientState, filePath)
logger.debug(s"Client $login state saved.")
}
def loadState(statesDir:Path):Unit = {
val filePath = getStateFilePath(statesDir)
val clientState = util.Util.readJsonFromFile[ClientState](filePath)
if (clientState.nonEmpty) {
lastMessageTimestamp = clientState.get.lastMessageTimestamp
logger.debug(s"Client $login state loaded.")
}
}
}

@ -0,0 +1,220 @@
package server
import java.io.IOException
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketAddress, SocketOption}
import java.nio.ByteBuffer
import java.nio.channels.{NonReadableChannelException, NotYetConnectedException, SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.file.{Files, Path}
import java.util.concurrent.{ConcurrentHashMap, Executor, Executors, TimeUnit}
import ch.qos.logback.classic.Level
import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.databind.exc.MismatchedInputException
import com.typesafe.scalalogging.LazyLogging
import datatypes.{ClientInfo, Message}
import org.slf4j.LoggerFactory
import scala.collection.immutable.ArraySeq
import scala.collection.mutable
import scala.jdk.CollectionConverters._
class Server(host:String, port:Int, val savedStatesDir:Path) extends Runnable with LazyLogging {
private val connectedClients = new mutable.HashMap[Int, Client]()
private val loggedinClients = new mutable.HashMap[String, Client]()
private val selector = Selector.open()
@volatile var isRunning = false
def run(): Unit = {
logger.info(s"Server is running on $host:$port.")
val serverSocketChannel = ServerSocketChannel.open()
serverSocketChannel.bind(new InetSocketAddress(host, port))
serverSocketChannel.configureBlocking(false)
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT)
sendRecieveAcceptLoop(serverSocketChannel)
}
private def sendRecieveAcceptLoop(serverSocketChannel: ServerSocketChannel) = {
while (isRunning) {
selector.select()
for (key <- selector.selectedKeys().asScala) {
if (key.isValid && key.isAcceptable)
acceptNewClient(serverSocketChannel)
if (key.isValid && key.isReadable)
receiveClientData(key)
if (key.isValid && key.isWritable)
sendDataToClient(key)
}
}
}
private def acceptNewClient(serverSocketChannel: ServerSocketChannel) = {
val clientChannel = serverSocketChannel.accept()
if (clientChannel != null) {
clientChannel.configureBlocking(false)
clientChannel.register(selector, SelectionKey.OP_READ)
val clientId = clientChannel.hashCode()
val client = new Client(socketChannel = clientChannel)
connectedClients.put(clientId, client)
logger.debug(s"Connection accepted.")
}
}
private def receiveClientData(key: SelectionKey) = {
val clientChannel = key.channel().asInstanceOf[SocketChannel]
val clientId = clientChannel.hashCode()
val client = connectedClients(clientId)
try {
if (!client.isInitialized) {
val clientInfo = util.Util.recieve[ClientInfo](clientChannel)
if (clientInfo.nonEmpty) {
loginClient(client, clientInfo)
} else {
throw new java.io.IOException("Bad client info. Initialization aborted.")
}
} else {
val msg = util.Util.recieve[Message](clientChannel)
if (msg.nonEmpty) {
client.lastMessageTimestamp = msg.get.timestamp
client.saveState(savedStatesDir)
routeMessage(msg.get, clientId)
}
}
} catch {
case e @ (_: com.fasterxml.jackson.databind.exc.MismatchedInputException |
_: java.io.IOException ) =>
logger.error(e.getMessage, e.getCause)
handleDisconnection(clientChannel, clientId)
case e @ (_ : java.nio.channels.NotYetConnectedException |
_ : java.nio.channels.NonReadableChannelException |
_ : com.fasterxml.jackson.core.JsonProcessingException) =>
logger.error(e.getMessage, e.getCause)
}
}
private def loginClient(client: Client, clientInfo: Option[ClientInfo]) = {
val login = clientInfo.get.login
if (loggedinClients.keySet.contains(login)) {
val msg = Message(body=s"$login is already connected.", receiverId = "", senderId = "SERVER")
leaveMessageForClient(msg, client)
} else {
loggedinClients.put(clientInfo.get.login, client)
client.login = clientInfo.get.login
client.loadState(savedStatesDir)
client.isInitialized = true
val msg = if ("".equals(client.lastMessageTimestamp)) {
Message(body = s"Hello new client with login ${client.login} !", receiverId = "", senderId = "SERVER")
} else {
Message(body = s"Welcome back ${client.login} !", receiverId = "", senderId = "SERVER")
}
leaveMessageForClient(msg, client)
logger.info(s"Client ${client.login} logged in.")
}
}
private def handleDisconnection(clientChannel: SocketChannel, clientId: Int) = {
clientChannel.close()
val disconnectedClient = connectedClients.remove(clientId)
if (disconnectedClient.nonEmpty) {
loggedinClients.remove(disconnectedClient.get.login)
val msg = Message(body = s"Client ${disconnectedClient.get.login} left.", receiverId = "", senderId = "SERVER")
leaveMessageForConnectedClients(msg)
}
logger.info(s"Client ${disconnectedClient.get.login} disconnected.")
}
private def sendDataToClient(key: SelectionKey) = {
val clientChannel = key.channel().asInstanceOf[SocketChannel]
val clientId = clientChannel.hashCode()
val client = connectedClients(clientId)
val clientInbox = client.inbox
if (clientInbox.nonEmpty)
try {
val messageToSend = clientInbox.removeHead()
client.lastMessageTimestamp = messageToSend.timestamp
client.saveState(savedStatesDir)
util.Util.send(messageToSend, clientChannel)
clientChannel.register(selector, SelectionKey.OP_READ)
} catch {
case e@(_: MismatchedInputException |
_: IOException) =>
handleDisconnection(clientChannel, clientId)
case e@(_: NotYetConnectedException |
_: NonReadableChannelException |
_: JsonProcessingException) =>
logger.error(e.getMessage, e.getCause)
}
}
private def routeMessage(msg: Message, senderId:Int) = {
msg.receiverId match {
case _ =>
for ((recieverId, reciever) <- connectedClients.iterator) {
if (recieverId != senderId) {
leaveMessageForClient(msg, reciever)
}
}
}
}
private def leaveMessageForConnectedClients(msg:Message) = {
for ((recieverId, reciever) <- connectedClients.iterator)
leaveMessageForClient(msg, reciever)
}
private def leaveMessageForClient(msg: Message, reciever: Client) = {
reciever.inbox.addOne(msg)
reciever.socketChannel.register(selector, SelectionKey.OP_WRITE)
}
def start(): Unit = {
isRunning = true
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
if (isRunning) {
logger.info("Closing connections.")
isRunning = false
Thread.sleep(10)
for (c <- connectedClients.values) {
try c.socketChannel.close()
}
}
}
})
this.run()
}
}
object Server extends LazyLogging {
LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[ch.qos.logback.classic.Logger].setLevel(Level.DEBUG)
val defaultArgs = Map(
"host" -> "0.0.0.0",
"port" -> "10000",
"states_path" -> "./clientStates/"
)
// how socket connection and key.isConnectable are related?
// how socket connection and socket.isConnected are related?
def main(args:Array[String]): Unit = {
val argsMap = args.toSeq.sliding(2,2).map{ case ArraySeq(k,v) => (k.replace("--",""), v) }.toMap
.withDefault(defaultArgs)
val host = argsMap("host")
val port = argsMap("port").toInt
val statesPath = Path.of(argsMap("statesPath")).toAbsolutePath
if (Files.notExists(statesPath)) {
logger.info(s"Path statesPath = ${statesPath} does not exist. Aborting.")
System.exit(0)
}
logger.info(s"Using ${statesPath} for client states.")
val server = new Server(host, port, statesPath)
server.start()
}
}

@ -0,0 +1,8 @@
package util
import java.nio.file.Path
trait SavableState {
def loadState(statesDir:Path):Unit
def saveState(statesDir:Path):Unit
}

@ -0,0 +1,81 @@
package util
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
import java.nio.charset.{Charset, StandardCharsets}
import java.nio.file.{Files, Path, StandardOpenOption}
import java.security.MessageDigest
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.{DefaultScalaModule, ScalaObjectMapper}
import com.typesafe.scalalogging.LazyLogging
import scala.collection.mutable
import scala.reflect.ClassTag
object Util extends LazyLogging {
val BUFFER_SIZE = 1024
val ID_LIMIT = 2
val BROADCAST_RECIEVER_ID = "-1"
private val objectMapper = new ObjectMapper() with ScalaObjectMapper
objectMapper.registerModule(DefaultScalaModule)
private val charset = Charset.forName( "utf-8" )
private val decoder = charset.newDecoder
def recieveString(socketChannel:SocketChannel) : String = {
val stringBuilder = new mutable.StringBuilder()
val buffer = ByteBuffer.allocateDirect(BUFFER_SIZE)
while (true) {
val readBytes = socketChannel.read(buffer)
if ((readBytes == 0) || (readBytes == -1))
return stringBuilder.mkString
buffer.flip()
val charBuffer = decoder.decode( buffer )
stringBuilder.appendAll(charBuffer.array())
}
""
}
def sendString(str: String, socketChannel:SocketChannel): Unit = {
logger.debug(s"Send: $str")
val buffer = ByteBuffer.wrap(str.getBytes())
socketChannel.write(buffer)
}
def recieve[T](sc:SocketChannel)(implicit ct: ClassTag[T]): Option[T] = {
val raw = recieveString(sc)
if ("".equals(raw)) return None
val v = objectMapper.readerFor(ct.runtimeClass).readValue[T](raw)
logger.debug(s"Recieve: ${v.toString}")
Some(v)
}
def send[T](x:T, sc:SocketChannel)(implicit ct: ClassTag[T]): Unit =
sendString(objectMapper.writerFor(ct.runtimeClass).writeValueAsString(x), sc)
def genId(s: Any): String = {
val md5 = java.security.MessageDigest.getInstance("MD5").digest(s.toString.getBytes)
md5.map("%02X" format _).take(ID_LIMIT).mkString
}
def writeJsonToFile[T](x:T, file:Path)(implicit ct: ClassTag[T]) = {
val json = objectMapper.writerFor(ct.runtimeClass).writeValueAsString(x)
Files.writeString(
file,
json,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING)
}
def readJsonFromFile[T](file:Path)(implicit ct: ClassTag[T]):Option[T] = {
if (Files.exists(file)) {
val json = Files.readString(file)
val v = objectMapper.readerFor(ct.runtimeClass).readValue[T](json)
Some(v)
} else {
None
}
}
}

@ -0,0 +1,26 @@
import java.nio.file.Path
import com.github.fge.jackson.JsonLoader
import com.github.fge.jsonschema.main.JsonSchemaFactory
import datatypes.ClientState
import org.scalatest.FunSuite
class SchemaCompliance extends FunSuite {
test("checkClientStateSchemeCompliance") {
val clientState = ClientState(lastMessageTimestamp = "2020-09-11T08:54:45.528807100Z")
val clientStateFilePath = Path.of("./target/checkClientStateSchemeCompliance.json")
val schemaPath = Path.of("./resources/schemas/v0.0.1/ClientStateSchema.json")
util.Util.writeJsonToFile(clientState, clientStateFilePath)
val jsonToValidate = JsonLoader.fromFile(clientStateFilePath.toFile)
val sf = JsonSchemaFactory.byDefault()
val schema = sf.getJsonSchema(JsonLoader.fromFile(schemaPath.toFile))
val report = schema.validate(jsonToValidate)
println(report)
assert(report.isSuccess)
}
}
Loading…
Cancel
Save